Notebook downloads JudComm's law part code(LPC) related files and saves processed data as zipped .csv for import into the downstream system

@Theepan Thevathasan

**Change Log** </br>
1.0 | 20240722 | Initial stable release </br>
1.1 | 20240723 | Additional tests, remove redundant functions, typos
1.2 | 20240724 | execution timer
1.3 | 20240724 | vectorised lookup for penalty comparison operator
</br>

🤔 **How to**
1. Check the Law Part Codes are entered correctly under _Enter the Law Part Codes_ (duplicates will be cleaned up)
1. Click `Runtime > Run All` to download and process the Law Part codes. This will take 2-5 mins
1. Download the processed files from the _File Explorer_ by navigating the left menu. This will be in `export/lawcodes_processed_subset_ddmmyyyy.zip` Hover over the file and download for downstream applications


## 🤓 Technical details


🌈 **Features**
- Stateless (doesnt persist data) processing
- Dynamically accesses the latest dataset at the tine of processig
- Exports subset LPCs of interest to the user



⚙️ **Under the Bonnet**
- Full load of the JudComm data is downloaded as a ZIP file.
- The .zip file is extracted to access the .txt fixed-width files. These are saved in the location defined in `JUDCOMM_DEST_FOLDER` (e.g., `raw`).
- Fixed-width files are parsed as per the [definition](https://lawcodes.judcom.nsw.gov.au/assets/lawcodes_design_and_export_format.html) from JudComm. Only relevant files are parsed as defined in `JUDCOMM_FILE_DEF`. Upon parsing these files, they are saved in `JUDCOMM_STAGING_FOLDER` (e.g., `staging`).
- Apply data relationship logic:
  - Find the law part code in `05.txt — law_part_jurisdiction.law_part_code` (DataFrame => `DF_JUDCOMM[df_lp_juro]`).

  - Link the record with `22.txt — law_part_jurisdiction_penalty` (DataFrame => `DF_JUDCOMM[df_lp_juro_penalty]`).

  - Find the statutory penalty codes through either of the following mutually exclusive methods:

    1. `22.txt — law_part_jurisdiction_penalty.statutory_penalty_code` where `22.txt — law_part_jurisdiction_penalty.reference_law_part_code` is empty _OR_

    2. If `22.txt — law_part_jurisdiction_penalty.reference_law_part_code` is not empty, look up the corresponding `05.txt — law_part_jurisdiction.law_part_code` that matches the `reference_law_part_code` until 1. is true.

  - Find matching statutory penalty codes from the above in `20.txt — penalty_compound.statutory_penalty_code`.

  - Match `20.txt — penalty_compound.penalty_component_code` with `19.txt — penalty_component.code` and augment with additional penalty-related metadata from:
  
    - `16.txt — unit_of_measure`
    - `17.txt — penalty_type`
    - `18.txt — penalty_role`
    - `23.txt — comparison_operator`
  
  - Augment penalty with Additional Data Items (ADI) descriptor. ADIs give additional context to apply the LPC, e.g., long weekend vs non-long weekend implications. This is done by linking `22.txt — law_part_jurisdiction_penalty.adi_code` with:

    - `28.txt — adi_type`
    - `29.txt — adi_role`
    - `30.txt — adi_component`
    - `31.txt — adi_group`

- Once the relationship between various files (DB tables of JudComm) is established, data is exported for specific downstream applications. Refer to functions such as `prep_data_for_salesforce()`, `prep_data_with_references()`.

- Export-ready content is converted to .csv along with an optional README.TXT and zipped as per `RESULT_EXPORT_URL_TEMPLATE`.



❓**FAQ**
1. Add/remove law part codes
</br> _Change the law part codes in global constant `LPCs` [here](https://colab.research.google.com/drive/1qPPDMzVQPM4nMydQpXtfhnknqammJmwm#scrollTo=3d_4AO94kLss&line=5&uniqifier=1)_

1. Change the README.TXT in the exported .zip file
<br>_Edit `SF_README_CONTENT` (or equivalent in another function that generates export data). Refer to [here](https://colab.research.google.com/drive/1qPPDMzVQPM4nMydQpXtfhnknqammJmwm#scrollTo=hbZwm57obUn2&line=53&uniqifier=1) for example._

1. Remove the README.TXT in the exported .zip file
</br> _Call function with readme_content = None_

  ```
  zip_csv_files(exported_data= exp_data ,
        zip_file_name = zip_file_name,
        readme_content = None)
  ```
1. Add new source files from JudComm
</br> _Define a new dictionary in `JUDCOMM_FILE_DEF`. Refer to the docstring for specification of defining a source file._
1. Add new data or reference files to the export
</br>_Change `EXPORT_DEF` of the `prep_data_for_salesforce()` function._

1. Change the JudComm URL
</br>_Change `JUDCOMM_URL_TEMPLATE`. Please note this template requires a 'date' component which is dynamically injected by other utility functions._

1. How to use this Jupyter Notebook outside Google Colab
</br>_Google Colab is a free platform to host this codebase as a Notebook. There is no dependency on the hosting platform. It can be downloaded by **File > Download** and run locally or hosted elsewhere as a `.ipynb` file. Refer to [this](https://jupyter.org/install) for additional methods to run this Notebook._

1. Speed up the processing time
</br>_Current benchmark is 2:30 to process 240 law part codes. Programmatic improvements need to be identified to further improve the speed of the process._



🧪 **Tests**

These are the use cases the utility has been tested to pass.

Refer to [this](https://colab.research.google.com/drive/1qPPDMzVQPM4nMydQpXtfhnknqammJmwm#scrollTo=QQs3X8mL86mB&line=10&uniqifier=1) section for test code

1. One active law part code
1. Multiple active  law part codes
1. Active law part code at a point in time
1. Inactive  law part code at a point in time
1. Current & historical penalties of a law part code

# Python libraries

In [155]:
from typing import List, Dict, Any, Optional, Tuple, Callable, Union
import os
import re
import shutil
import logging
import zipfile
from io import BytesIO
from datetime import datetime, timedelta
import time
from contextlib import contextmanager
import pytz
import pandas as pd
import numpy as np
import requests
from functools import partial, lru_cache
from concurrent.futures import ThreadPoolExecutor, as_completed

#Google Colab specific DF Formatter #https://colab.research.google.com/notebooks/data_table.ipynb#scrollTo=jcQEX_3vHOUz
try:

    import google.colab
    from google.colab import data_table
    data_table.enable_dataframe_formatter()
    print("DataFrame formatter enabled for Google Colab.")
except ImportError:
    print("Not running in Google Colab. DataFrame formatter not enabled.")

DataFrame formatter enabled for Google Colab.


# 👉🏾 Settings (Global config / Law Part Codes)
Define the list of LPC codes to extract and process data from the JudComm data source

In [156]:
# GLOBAL CONSTANTS
JUDCOMM_URL_TEMPLATE = 'https://lawcodes.judcom.nsw.gov.au/updates/lawcodes-full-{date}.zip'
RESULT_EXPORT_URL_TEMPLATE = './export/lawcodes_processed_subset_{date}.zip'
JUDCOMM_DEST_FOLDER = 'raw' # OS location to download .txt files to
JUDCOMM_STAGING_FOLDER = 'staging' # OS location to save .csv files
DELETE_JUDCOMM_RAW_FILES  = True # Delete raw .txt files after ETL
DELETE_JUDCOMM_STAGING_FILES = True # Delete staging .csv files after ETL

#_______________________________________________________________________________

# Related to the test suite
# DO NOT CHANGE THE TEST PARAMETER - THis will break the tests
IS_TEST_MODE = False # Set to True to run unit tests

"""
  Active LPCs with historical penalties
  79314 - https://lawcodes.judcom.nsw.gov.au/lawcodes/law-part/view/bol_code/3409/law_code/63593/law_part_code/79314
  80123 - https://lawcodes.judcom.nsw.gov.au/lawcodes/law-part/view/bol_code/3430/law_code/64246/law_part_code/80123
  Expired LPC
  74744 - https://lawcodes.judcom.nsw.gov.au/lawcodes/law-part/view/bol_code/2917/law_code/50691/law_part_code/74744
"""
TEST_LPCs= [79314,80123, 74744]
TEST_AS_AT_DATE = datetime(2015, 6, 15)

#_______________________________________________________________________________

# List of Law Part Codes of interest
LPCs = [
    # Prosections - Active
    41599, 92800, 78973, 78974, 75962, 90304, 79101, 79360, 90188, 74744,
    90189, 79075, 74738, 90317, 83063, 82957, 82950, 83019, 83052, 82995,
    83039, 82960, 82946, 83093, 83090, 83162, 83051, 83220, 83154, 83160,
    83196, 82927, 83038, 83008, 83104, 83725, 83200, 83229, 83206, 83025,
    83180, 83207, 82925, 83057, 83149, 82975, 80254, 80256, 95826, 82560,
    95828, 91894, 83708, 95827, 91893, 82626, 82709, 79947, 86118, 79814,
    79815, 80084, 79880, 82792, 83466, 79009, 79010, 79015, 79016, 79221,
    79021, 79023, 79033, 93349, 93350, 93351, 93384, 93385, 93386, 93387,
    93388, 93390, 93389, 79931, 80388, 65527, 80369, 80244, 80041, 80214,
    80225, 80215, 80219, 80216, 92819, 80250, 80158, 83379, 94072, 80153,
    79003, 85103, 79462, 80405, 80395, 80406, 75963, 93376, 79316, 90119,
    90130, 90272, 96610, 79240, 80417, 90694, 93809, 80419, 79322, 79324,
    79325, 96031, 96032, 96030, 79326, 79327, 83730, 83729, 7481, 90279,
    50513, 79012, 78991, 79018, 80255, 80249, 80441, 50511, 79220, 79007,
    79008, 80247, 90135, 85021, 85017, 85016, 90150, 80368, 82885, 82827,
    82824, 82825, 79014, 79037, 79020, 79356, 83491, 83492, 80422, 58545,
    90178, 79358, 96432, 33337, 86999, 87003, 83406, 83408, 93379, 80245,
    80257, 78989, 79314, 21743, 80144, 80147, 80145, 80146, 80124, 80126,
    80127, 80125, 80138, 80139, 80137, 80136, 80123, 80120, 80122, 80121,
    79228, 82817, 79862, 79865, 83443, 83563, 90643, 90607, 80141, 80132,
    80134, 80150, 80148, 80128, 80130, 79066, 78985, 78986, 78987, 90328,
    90329, 92431, 90184, 79099, 79946, 79932, 79844, 95489, 95486, 79843,
    80740, 80738, 87130, 74912, 74737, 79056, 79053, 79057, 79059, 79788,
    79789, 90298, 79359, 79858, 78059, 90465, 83406, 102355, 102360, 90191,

    # Prosections - Inactive
    74744, 74701, 74703, 74739,

    # CA
    78987, 79001, 79314, 82824, 82825, 83725, 83726, 83729, 83730,

    # Based on legacy system usage
    79059, 79221, 79314, 82825, 83729, 83730, 90329
]


# JudCom Files


*   [Source file definition ](https://colab.research.google.com/drive/1qPPDMzVQPM4nMydQpXtfhnknqammJmwm#scrollTo=KQfSqpnFvVdC&line=5&uniqifier=1)
*   [Judcom to python data type mapping](https://colab.research.google.com/drive/1qPPDMzVQPM4nMydQpXtfhnknqammJmwm#scrollTo=IYrg07-w6_c3&line=15&uniqifier=1)



In [157]:

JUDCOMM_FILE_DEF : Dict[str, Any] = {} # Refer below

DF_JUDCOMM : Dict[str, pd.DataFrame] = {} # Dictionary of Dataframes after ETL of JudComm .txt files


# Define the format of the JudComm files to process
# Source : https://lawcodes.judcom.nsw.gov.au/assets/lawcodes_design_and_export_format.html
"""
         {
            'df_name': 'data frame name to saved the processed data',
            'col_prefix*': 'Prefix added to all columns for traceability E.g JP.',
            'name': 'table_name',
            'description': 'Free text description of the table',
            'drop_cols*' :['Columns to be dropped for memory efficiency'],
            'index_cols*' :['Columns to index the DF with for speed of lookup'],
            'fields': [
                {'order': 1,
                  'name': 'colum_name',
                  'data_type': 'SQL DB data typpe',
                  'width': integer,
                  'purpose': 'Primary key/Foreign key'},

            ]
        }

        * => Optional
"""

JUDCOMM_FILE_DEF = {
    '22.txt': {
        'df_name': 'df_lp_juro_penalty',
        'col_prefix': 'JP.',
        'drop_cols' :['export_flag'],
        'name': 'law_part_jurisdiction_penalty',
        'description': 'This table links a statutory penalty or a reference to statutory penalties with the law part jurisdiction and offender type the penalty applies to.',
        'fields': [
            {'order': 1, 'name': 'code', 'data_type': 'number', 'width': 8, 'purpose': 'Primary key'},
            {'order': 2, 'name': 'reference_law_part_code', 'data_type': 'number', 'width': 8, 'purpose': 'Foreign key to the law_part table, used to reference statutory penalties'},
            {'order': 3, 'name': 'law_part_jurisdiction_code', 'data_type': 'number', 'width': 8, 'purpose': 'Foreign key to the law_part_jurisdiction table, used to link the penalty to the law part'},
            {'order': 4, 'name': 'statutory_penalty_code', 'data_type': 'number', 'width': 8, 'purpose': 'Foreign key to the statutory_penalty table'},
            {'order': 5, 'name': 'valid_from_date', 'data_type': 'date', 'width': 8, 'purpose': 'The date from which this penalty first applied to this offence'},
            {'order': 6, 'name': 'valid_to_date', 'data_type': 'date', 'width': 8, 'purpose': 'The last date which this penalty applied for this offence'},
            {'order': 7, 'name': 'offender_type_abbreviation', 'data_type': 'string', 'width': 2, 'purpose': 'Foreign key to the offender_type table'},
            {'order': 8, 'name': 'adi_code', 'data_type': 'number', 'width': 8, 'purpose': 'Foreign key to the adi table'},
            {'order': 9, 'name': 'export_flag', 'data_type': 'string', 'width': 2, 'purpose': 'Status of record'},
            {'order': 10, 'name': 'amending_bol_code', 'data_type': 'number', 'width': 8, 'purpose': 'Foreign key to the amending_bol table, specifies the legislative basis for penalty changes'}
        ]
    },
    '05.txt': {
        'df_name': 'df_lp_juro',
        'col_prefix': 'J.',
        'drop_cols' :['export_flag'],
        'name': 'law_part_jurisdiction',
        'description': 'This table contains information specifying the different jurisdictions in which a determination can be made for each law part.',
        'fields': [
            {'order': 1, 'name': 'code', 'data_type': 'number', 'width': 8, 'purpose': 'Primary key'},
            {'order': 2, 'name': 'export_flag', 'data_type': 'string', 'width': 2, 'purpose': 'Status of record'},
            {'order': 3, 'name': 'jurisdiction_code', 'data_type': 'number', 'width': 8, 'purpose': 'Foreign key to the jurisdiction table'},
            {'order': 4, 'name': 'law_part_code', 'data_type': 'number', 'width': 8, 'purpose': 'Foreign key to the law_part table'}
        ]
    },
    '16.txt': {
        'df_name': 'df_unit_of_measure',
        'col_prefix': 'UOM.',
        'index_cols' :['abbreviation'],
        'drop_cols' :['export_flag'],
        'name': 'unit_of_measure',
        'description': 'This table records units of measure for use in penalties (e.g., years, dollars, penalty units, etc.).',
        'fields': [
            {'order': 1, 'name': 'abbreviation', 'data_type': 'string', 'width': 3, 'purpose': 'Primary key and an abbreviation for a unit of measure'},
            {'order': 2, 'name': 'description', 'data_type': 'string', 'width': 60, 'purpose': 'A full/complete description for a unit of measure'},
            {'order': 3, 'name': 'export_flag', 'data_type': 'string', 'width': 2, 'purpose': 'Status of record'}
        ]
    },
    '17.txt': {
        'df_name': 'df_penalty_type',
        'col_prefix': 'PT.',
        'index_cols' :['abbreviation'],
        'drop_cols' :['export_flag'],
        'name': 'penalty_type',
        'description': 'This table records types of penalties (e.g., fine, imprisonment, etc.).',
        'fields': [
            {'order': 1, 'name': 'abbreviation', 'data_type': 'string', 'width': 3, 'purpose': 'Primary key and an abbreviation for a penalty type'},
            {'order': 2, 'name': 'description', 'data_type': 'string', 'width': 60, 'purpose': 'A full/complete description for a penalty type'},
            {'order': 3, 'name': 'export_flag', 'data_type': 'string', 'width': 2, 'purpose': 'Status of record'}
        ]
    },
    '18.txt': {
        'df_name': 'df_penalty_role',
        'col_prefix': 'PR.',
        'index_cols' :['abbreviation'],
        'drop_cols' :['export_flag'],
        'name': 'penalty_role',
        'description': 'This table records roles of penalties for use in a penalty component (e.g., maximum, minimum, etc.).',
        'fields': [
            {'order': 1, 'name': 'abbreviation', 'data_type': 'string', 'width': 3, 'purpose': 'Primary key and an abbreviation for a penalty role'},
            {'order': 2, 'name': 'description', 'data_type': 'string', 'width': 60, 'purpose': 'A full/complete description for a penalty role'},
            {'order': 3, 'name': 'export_flag', 'data_type': 'string', 'width': 2, 'purpose': 'Status of record'}
        ]
    },
    '19.txt': {
        'df_name': 'df_penalty_component',
        'col_prefix': 'P_CMT.',
        'drop_cols' :['export_flag'],
        'name': 'penalty_component',
        'description': 'This table combines a penalty type, quantity, unit of measure, and penalty role to make a penalty component (e.g., maximum 2 years imprisonment).',
        'fields': [
            {'order': 1, 'name': 'code', 'data_type': 'number', 'width': 8, 'purpose': 'Primary key'},
            {'order': 2, 'name': 'penalty_quantity', 'data_type': 'float', 'width': 8, 'purpose': 'The value or quantity of the penalty'}, # manually changed type from number(Int64) to float(Float64) as data contains decimals
            {'order': 3, 'name': 'export_flag', 'data_type': 'string', 'width': 2, 'purpose': 'Status of record'},
            {'order': 4, 'name': 'unit_of_measure_code', 'data_type': 'string', 'width': 3, 'purpose': 'Foreign key to abbreviation field in unit_of_measure table'},
            {'order': 5, 'name': 'penalty_role_code', 'data_type': 'string', 'width': 3, 'purpose': 'Foreign key to abbreviation field in penalty_role table'},
            {'order': 6, 'name': 'penalty_type_code', 'data_type': 'string', 'width': 3, 'purpose': 'Foreign key to abbreviation field in penalty_type table'}
        ]
    },
    '20.txt': {
        'df_name': 'df_penalty_compound',
        'col_prefix': 'P_CMD.',
        'name': 'penalty_compound',
        'drop_cols' :['export_flag'],
        'description': 'This table builds penalty components into one part of a fully qualified penalty by combining a penalty component and comparison operator.',
        'fields': [
            {'order': 1, 'name': 'code', 'data_type': 'number', 'width': 8, 'purpose': 'Primary key'},
            {'order': 2, 'name': 'sequence_number', 'data_type': 'number', 'width': 4, 'purpose': 'Sort order for components in a statutory penalty'},
            {'order': 3, 'name': 'penalty_component_code', 'data_type': 'number', 'width': 8, 'purpose': 'Foreign key to the penalty_component table'},
            {'order': 4, 'name': 'comparison_operator_code', 'data_type': 'string', 'width': 3, 'purpose': 'Foreign key to abbreviation field in comparison_operator table'},
            {'order': 5, 'name': 'export_flag', 'data_type': 'string', 'width': 2, 'purpose': 'Status of record'},
            {'order': 6, 'name': 'statutory_penalty_code', 'data_type': 'number', 'width': 8, 'purpose': 'Foreign key to the statutory_penalty table'}
        ]
    },
    '23.txt': {
        'df_name': 'df_comp_op',
        'col_prefix': 'CO.',
        'index_cols' :['abbreviation'],
        'drop_cols' :['export_flag'],
        'name': 'comparison_operator',
        'description': 'This table is used by the penalty_compound and adi_group tables to show the relationship between two items (e.g., and, or, etc.).',
        'fields': [
            {'order': 1, 'name': 'abbreviation', 'data_type': 'string', 'width': 3, 'purpose': 'Primary key and indicates how two items are to be joined'},
            {'order': 2, 'name': 'description', 'data_type': 'string', 'width': 60, 'purpose': 'A full/complete description of the comparison operator'},
            {'order': 3, 'name': 'export_flag', 'data_type': 'string', 'width': 2, 'purpose': 'Status of record'}
        ]
    },
    '24.txt': {
      'df_name': 'df_offender_type',
      'col_prefix': 'OT.',
      'index_cols' :['abbreviation'],
      'drop_cols': ['export_flag'],
      'name': 'offender_type',
      'description': 'This table is used by the law_part_jurisdiction_penalty table to classify penalties on the basis of whether they apply to natural persons, incorporated bodies, or both.',
      'fields': [
          {'order': 1, 'name': 'abbreviation', 'data_type': 'string', 'width': 1, 'purpose': 'Primary key and abbreviation of the offender type'},
          {'order': 2, 'name': 'description', 'data_type': 'string', 'width': 60, 'purpose': 'A full/complete description of the offender type'},
          {'order': 3, 'name': 'export_flag', 'data_type': 'string', 'width': 2, 'purpose': 'Status of record'}
      ]
   },
    '04.txt': {
        'df_name': 'df_law_part',
        'col_prefix': 'LP.',
        'drop_cols' :['export_flag', 'sort', 'approving_officer', 'approval_date', 'drug_flag'],
        'name': 'law_part',
        'description': 'This table contains information concerning laws which have been divided into law parts. This division occurs when the legislation specifies different elements of an offence under a single law.',
        'fields': [
            {'order': 1, 'name': 'code', 'data_type': 'number', 'width': 8, 'purpose': 'The official law part code (also the primary key)'},
            {'order': 2, 'name': 'short_description', 'data_type': 'string', 'width': 60, 'purpose': 'An abbreviated description of the law part'},
            {'order': 3, 'name': 'long_description', 'data_type': 'string', 'width': 255, 'purpose': 'A full/complete description of the law part'},
            {'order': 4, 'name': 'valid_from_date', 'data_type': 'date', 'width': 8, 'purpose': 'The first operational date of the law part'},
            {'order': 5, 'name': 'valid_to_date', 'data_type': 'date', 'width': 8, 'purpose': 'The last operational date of the law part'},
            {'order': 6, 'name': 'approving_officer', 'data_type': 'string', 'width': 20, 'purpose': 'Legacy field, no longer used'},
            {'order': 7, 'name': 'approval_date', 'data_type': 'date', 'width': 8, 'purpose': 'Legacy field, no longer used'},
            {'order': 8, 'name': 'sort', 'data_type': 'string', 'width': 3, 'purpose': 'Legacy field, no longer used'},
            {'order': 9, 'name': 'export_flag', 'data_type': 'string', 'width': 2, 'purpose': 'Status of record'},
            {'order': 10, 'name': 'law_code', 'data_type': 'number', 'width': 8, 'purpose': 'Foreign key to the law table'},
            {'order': 11, 'name': 'drug_flag', 'data_type': 'char', 'width': 1, 'purpose': 'Indicates whether the law part refers to a drug offence'}
        ]
    },
    # Jurisdiction Level
    '12.txt': {
      'df_name': 'df_jurisdiction',
      'col_prefix': 'JUR.',
      'drop_cols': ['export_flag'],
      'name': 'jurisdiction',
      'description': 'This table combines a jurisdiction type and a jurisdiction level to create a jurisdiction, eg Local Court — Criminal.',
      'fields': [
          {'order': 1, 'name': 'code', 'data_type': 'number', 'width': 8, 'purpose': 'Primary key'},
          {'order': 2, 'name': 'export_flag', 'data_type': 'string', 'width': 2, 'purpose': 'Status of record'},
          {'order': 3, 'name': 'jurisdiction_type_code', 'data_type': 'number', 'width': 8, 'purpose': 'Foreign key to the jurisdiction_type table'},
          {'order': 4, 'name': 'jurisdiction_level_code', 'data_type': 'number', 'width': 8, 'purpose': 'Foreign key to the jurisdiction_level table'}
      ]
    },
    '11.txt': {
      'df_name': 'df_jurisdiction_level',
      'col_prefix': 'JUR_L.',
      'drop_cols': ['export_flag'],
      'name': 'jurisdiction_level',
      'description': 'This table records the authority level of the body dealing with laws recorded, eg Local Court, District Court, etc.',
      'fields': [
          {'order': 1, 'name': 'code', 'data_type': 'number', 'width': 8, 'purpose': 'Primary key'},
          {'order': 2, 'name': 'export_flag', 'data_type': 'string', 'width': 2, 'purpose': 'Status of record'},
          {'order': 3, 'name': 'short_description', 'data_type': 'string', 'width': 6, 'purpose': 'An abbreviated description for the jurisdiction level'},
          {'order': 4, 'name': 'long_description', 'data_type': 'string', 'width': 60, 'purpose': 'A full/complete description for the jurisdiction level'}
      ]
    },

    # Related to Legislation
    '03.txt': {
        'df_name': 'df_law',
        'col_prefix': 'L.',
        'drop_cols' :['export_flag','sort','find','seins_flag', 'valid_to_gazette_page', 'valid_to_gazette_ref', 'valid_to_gazette_year', 'valid_from_gazette_year'],
        'name': 'law',
        'description': 'This table records information concerning law references within a body of law.',
        'fields': [
            {'order': 1, 'name': 'code', 'data_type': 'number', 'width': 8, 'purpose': 'Primary key'},
            {'order': 2, 'name': 'reference', 'data_type': 'string', 'width': 30, 'purpose': 'This is the law reference as specified in legislation'},
            {'order': 3, 'name': 'law_type_code', 'data_type': 'char', 'width': 1, 'purpose': 'Foreign key to abbreviation field in law_type table'},
            {'order': 4, 'name': 'law_mode_code', 'data_type': 'char', 'width': 1, 'purpose': 'Foreign key to abbreviation field in law_mode table'},
            {'order': 5, 'name': 'valid_from_date', 'data_type': 'date', 'width': 8, 'purpose': 'The date the law first became operational'},
            {'order': 6, 'name': 'valid_from_time', 'data_type': 'time', 'width': 19, 'purpose': 'The time the law first became operational, recorded as midnight when unknown'},
            {'order': 7, 'name': 'valid_from_gazette_ref', 'data_type': 'string', 'width': 5, 'purpose': 'The gazette reference that recorded the commencement date for this law'},
            {'order': 8, 'name': 'valid_from_gazette_year', 'data_type': 'number', 'width': 4, 'purpose': 'The year of gazettal that recorded the commencement date for this law'},
            {'order': 9, 'name': 'valid_from_gazette_page', 'data_type': 'number', 'width': 6, 'purpose': 'The gazette page where the commencement date for this law is recorded'},
            {'order': 10, 'name': 'valid_to_date', 'data_type': 'date', 'width': 8, 'purpose': 'The date the law was last operational'},
            {'order': 11, 'name': 'valid_to_time', 'data_type': 'time', 'width': 19, 'purpose': 'The time the law was last operational, recorded as 23:59 of the day prior to the valid_to_date when unknown'},
            {'order': 12, 'name': 'valid_to_gazette_ref', 'data_type': 'string', 'width': 5, 'purpose': 'The gazette reference that recorded the repeal date for this law'},
            {'order': 13, 'name': 'valid_to_gazette_year', 'data_type': 'number', 'width': 4, 'purpose': 'The year of gazettal that recorded the repeal date for this law'},
            {'order': 14, 'name': 'valid_to_gazette_page', 'data_type': 'number', 'width': 6, 'purpose': 'The gazette page where the repeal date for this law is recorded'},
            {'order': 15, 'name': 'export_flag', 'data_type': 'string', 'width': 2, 'purpose': 'Status of record'},
            {'order': 16, 'name': 'bol_code', 'data_type': 'number', 'width': 8, 'purpose': 'Foreign key to the bol table — the parent body of law'},
            {'order': 17, 'name': 'repealing_bol_code', 'data_type': 'number', 'width': 8, 'purpose': 'Foreign key to the amending_bol table — the body of law that repealed this law'},
            {'order': 18, 'name': 'amending_bol_code', 'data_type': 'number', 'width': 8, 'purpose': 'Foreign key to the amending_bol table — the body of law that amended this law'},
            {'order': 19, 'name': 'logical_deletion_date', 'data_type': 'date', 'width': 8, 'purpose': 'Legacy field, no longer used'},
            {'order': 20, 'name': 'seins_flag', 'data_type': 'char', 'width': 1, 'purpose': 'Self Enforcement Infringement Notices flag'},
            {'order': 21, 'name': 'sort', 'data_type': 'string', 'width': 60, 'purpose': 'Used for sorting laws in the correct order'},
            {'order': 22, 'name': 'find', 'data_type': 'string', 'width': 40, 'purpose': 'Short description of the law — combination of the parent body of law abbreviation and the law reference'}
        ]
    },
  #     '01.txt': {
  #       'df_name': 'df_legislature',
  #       'col_prefix': 'LEG.',
  #       'drop_cols' :['export_flag'],
  #       'name': 'legislature',
  #       'description': 'This table contains information concerning the constitutionally endorsed law making bodies of the Commonwealth of Australia (e.g., NSW, Cth, etc.).',
  #       'fields': [
  #           {'order': 1, 'name': 'code', 'data_type': 'number', 'width': 8, 'purpose': 'Primary key'},
  #           {'order': 2, 'name': 'short_description', 'data_type': 'string', 'width': 6, 'purpose': 'An abbreviated description for a type of legislature'},
  #           {'order': 3, 'name': 'long_description', 'data_type': 'string', 'width': 60, 'purpose': 'A full/complete description for a type of legislature'},
  #           {'order': 4, 'name': 'export_flag', 'data_type': 'string', 'width': 2, 'purpose': 'Status of record'}
  #       ]
  # },
      '02.txt': {
        'name': 'bol',
        'col_prefix': 'BOL.',
        'df_name': 'df_bol',
        'drop_cols' :['export_flag','logical_deletion_date','approving_officer','approval_date', 'valid_to_gazette_ref', 'valid_from_gazette_ref', 'valid_to_gazette_year', 'valid_from_gazette_year', 'valid_to_gazette_page', 'valid_from_gazette_page'],
        'description': 'This table contains information concerning bodies of law (e.g., Acts, Codes, Regulations etc) under which laws exist.',
        'fields': [
            {'order': 1, 'name': 'code', 'data_type': 'number', 'width': 8, 'purpose': 'Primary key'},
            {'order': 2, 'name': 'abbreviation', 'data_type': 'string', 'width': 10, 'purpose': 'A unique abbreviation for the body of law'},
            {'order': 3, 'name': 'number', 'data_type': 'string', 'width': 4, 'purpose': 'The number assigned by the legislative body to the piece of legislation; for common law, and when no number is allocated, a value of zero is used'},
            {'order': 4, 'name': 'year', 'data_type': 'number', 'width': 4, 'purpose': 'The year when the body of law came under consideration'},
            {'order': 5, 'name': 'bol_type_code', 'data_type': 'number', 'width': 8, 'purpose': 'Foreign key to the bol_type table'},
            {'order': 6, 'name': 'short_description', 'data_type': 'string', 'width': 200, 'purpose': 'The legal short title for the body of law'},
            {'order': 7, 'name': 'jurisdiction_type_code', 'data_type': 'number', 'width': 8, 'purpose': 'Foreign key to the jurisdiction_type table'},
            {'order': 8, 'name': 'valid_from_date', 'data_type': 'date', 'width': 8, 'purpose': 'The date the body of law first became operational'},
            {'order': 9, 'name': 'valid_from_time', 'data_type': 'time', 'width': 19, 'purpose': 'The time the body of law first became operational, recorded as midnight when unknown'},
            {'order': 10, 'name': 'valid_from_gazette_ref', 'data_type': 'string', 'width': 5, 'purpose': 'The gazette reference that recorded the commencement date for this body of law'},
            {'order': 11, 'name': 'valid_from_gazette_year', 'data_type': 'number', 'width': 4, 'purpose': 'The year of gazettal that recorded the commencement date for this body of law'},
            {'order': 12, 'name': 'valid_from_gazette_page', 'data_type': 'number', 'width': 6, 'purpose': 'The gazette page where the commencement date for this body of law is recorded'},
            {'order': 13, 'name': 'valid_to_date', 'data_type': 'date', 'width': 8, 'purpose': 'The date the body of law was last operational'},
            {'order': 14, 'name': 'valid_to_time', 'data_type': 'time', 'width': 19, 'purpose': 'The time the body of law was last operational, recorded as 23:59 of the day prior to the valid_to_date when unknown'},
            {'order': 15, 'name': 'valid_to_gazette_ref', 'data_type': 'string', 'width': 5, 'purpose': 'The gazette reference that recorded the repeal date for this body of law'},
            {'order': 16, 'name': 'valid_to_gazette_year', 'data_type': 'number', 'width': 4, 'purpose': 'The year of gazettal that recorded the repeal date for this body of law'},
            {'order': 17, 'name': 'valid_to_gazette_page', 'data_type': 'number', 'width': 6, 'purpose': 'The gazette page where the repeal date for this body of law is recorded'},
            {'order': 18, 'name': 'parent', 'data_type': 'number', 'width': 8, 'purpose': 'Foreign key to the bol table — for subordinate legislation to link to its enforcing legislation'},
            {'order': 19, 'name': 'logical_deletion_date', 'data_type': 'date', 'width': 8, 'purpose': 'Legacy field, no longer used'},
            {'order': 20, 'name': 'approving_officer', 'data_type': 'string', 'width': 20, 'purpose': 'Legacy field, no longer used'},
            {'order': 21, 'name': 'approval_date', 'data_type': 'date', 'width': 8, 'purpose': 'Legacy field, no longer used'},
            {'order': 22, 'name': 'export_flag', 'data_type': 'string', 'width': 2, 'purpose': 'Status of record'},
            {'order': 23, 'name': 'legislature_code', 'data_type': 'number', 'width': 8, 'purpose': 'Foreign key to the legislature table'}
        ]
  },
    # Additional Data Items (ADI)

    # '32.txt':{
    #     'df_name': 'df_adi',
    #     'col_prefix': 'ADI.',
    #     'drop_cols' :['export_flag'],
    #     'name': 'adi',
    #     'description': 'This table creates a unique identifier for each set of fully specified additional data item groups.',
    #     'fields': [
    #         {'order': 1, 'name': 'code', 'data_type': 'integer', 'width': 8, 'purpose': 'Primary key'},
    #         {'order': 2, 'name': 'export_flag', 'data_type': 'string', 'width': 2, 'purpose': 'Status of record'}
    #     ]
    #   },
    '28.txt': {
      'df_name': 'df_adi_type',
      'col_prefix': 'ADI_T.',
      'drop_cols': ['export_flag'],
      'name': 'adi_type',
      'description': 'This table classifies additional data items, eg assault, drug, etc. This is one of many tables used to construct the description of an adi.',
      'fields': [
          {'order': 1, 'name': 'abbreviation', 'data_type': 'string', 'width': 10, 'purpose': 'Primary key and an abbreviation for an additional data item type'},
          {'order': 2, 'name': 'description', 'data_type': 'string', 'width': 60, 'purpose': 'A full/complete description for an additional data item type'},
          {'order': 3, 'name': 'export_flag', 'data_type': 'string', 'width': 2, 'purpose': 'Status of record'}
      ]
      },
    '29.txt': {
        'df_name': 'df_adi_role',
        'col_prefix': 'ADI_R.',
        'drop_cols': ['export_flag'],
        'name': 'adi_role',
        'description': 'This table is used to indicate the usage of an additional data item value, eg greater than, less than, equal to, etc. This is one of many tables used to construct the description of an adi.',
        'fields': [
            {'order': 1, 'name': 'abbreviation', 'data_type': 'string', 'width': 3, 'purpose': 'Primary key and an abbreviation for an additional data item role'},
            {'order': 2, 'name': 'description', 'data_type': 'string', 'width': 60, 'purpose': 'A full/complete description for an additional data item role'},
            {'order': 3, 'name': 'export_flag', 'data_type': 'string', 'width': 2, 'purpose': 'Status of record'}
        ]
    },
    '30.txt': {
        'df_name': 'df_adi_component',
        'col_prefix': 'ADI_C.',
        'drop_cols': ['export_flag', 'description'],
        'name': 'adi_component',
        'description': 'This table uses an additional data item type, role and value and combines them into a meaningful component of an additional data item, eg drug quantity less than commercial quantity, property value greater than $5000, etc. This is one of many tables used to construct the description of an adi.',
        'fields': [
            {'order': 1, 'name': 'code', 'data_type': 'number', 'width': 8, 'purpose': 'Primary key'},
            {'order': 2, 'name': 'adi_value', 'data_type': 'string', 'width': 60, 'purpose': 'The value of the additional data item'},
            {'order': 3, 'name': 'description', 'data_type': 'string', 'width': 60, 'purpose': 'Legacy field, no longer used'},
            {'order': 4, 'name': 'export_flag', 'data_type': 'string', 'width': 2, 'purpose': 'Status of record'},
            {'order': 5, 'name': 'adi_type_code', 'data_type': 'string', 'width': 10, 'purpose': 'Foreign key to abbreviation field in adi_type table'},
            {'order': 6, 'name': 'adi_role_code', 'data_type': 'string', 'width': 3, 'purpose': 'Foreign key to abbreviation field in adi_role table'}
        ]
    },
    '31.txt': {
        'df_name': 'df_adi_group',
        'col_prefix': 'ADI_G.',
        'drop_cols': ['export_flag'],
        'name': 'adi_group',
        'description': 'This table creates one part of a completely specified additional data item by combining an additional data item component and comparison operator. This is one of many tables used to construct the description of an adi.',
        'fields': [
            {'order': 1, 'name': 'code', 'data_type': 'number', 'width': 8, 'purpose': 'Primary key'},
            {'order': 2, 'name': 'sequence_number', 'data_type': 'number', 'width': 4, 'purpose': 'Used for sorting groups in the correct order'},
            {'order': 3, 'name': 'adi_component_code', 'data_type': 'number', 'width': 8, 'purpose': 'Foreign key to the adi_component table'},
            {'order': 4, 'name': 'comparison_operator_code', 'data_type': 'string', 'width': 3, 'purpose': 'Foreign key to abbreviation field in comparison_operator table'},
            {'order': 5, 'name': 'export_flag', 'data_type': 'string', 'width': 2, 'purpose': 'Status of record'},
            {'order': 6, 'name': 'adi_code', 'data_type': 'number', 'width': 8, 'purpose': 'Foreign key to the adi table'}
        ]
    }

}



In [158]:
# Define a dictionary for data type mapping and conversion functions
DATA_TYPE_MAPPING = {
    'string': {
        'dtype': 'str',
        'convert': lambda x: x
    },
    'char': {
        'dtype': 'str',
        'convert': lambda x: x
    },
    'number': {
        'dtype': 'Int64',
         'convert': lambda x: pd.to_numeric(x, errors='coerce').replace([np.nan, np.inf, -np.inf], np.nan)
    },
    'integer': {
        'dtype': 'Int64',
        'convert': lambda x: pd.to_numeric(x, errors='coerce').replace([np.nan, np.inf, -np.inf], np.nan)
    },
    'float': {
        'dtype': 'Float64',
        'convert': lambda x: pd.to_numeric(x, errors='coerce').replace([np.nan, np.inf, -np.inf], np.nan)
    },
    'boolean': {
        'dtype': 'bool',
        'convert': lambda x: x.map({'True': True, 'False': False, '': pd.NA})
    },
    'time': {
        'dtype': 'datetime64[ns]',
        #'convert': lambda x: (x.replace(r'^\s+|\s+$', '', regex=True))
        #'convert': lambda x: pd.to_datetime(re.sub(r'^\s+|\s+$|30121899\s', '', x), format='%H%M%S', errors='coerce').time()
        # 'convert' : lambda x: pd.to_datetime(x.str.replace(r'^\s+|\s+$|30121899\s', '', regex=True), format='%H%M%S', errors='coerce').dt.time
        'convert': lambda x: pd.to_datetime(x.str.replace(r'^\s+|\s+$|30121899\s', '', regex=True), format='%H%M%S', errors='coerce')
        #'convert': lambda x: x

    },
    'date': {
        'dtype': 'datetime64[ns]',
        'convert': lambda x: pd.to_datetime(x, format='%d%m%Y', errors='coerce')


    }
}

# Utility Functions


1. [JudComm Source Data URL Constructor](https://colab.research.google.com/drive/1qPPDMzVQPM4nMydQpXtfhnknqammJmwm#scrollTo=0C4tv_f3sZUT&line=4&uniqifier=1)
1. [Download and unzip raw data](https://colab.research.google.com/drive/1qPPDMzVQPM4nMydQpXtfhnknqammJmwm#scrollTo=OEmVqjQlFmhD&line=3&uniqifier=1)
1. [Zip CSV of Processed data ](https://colab.research.google.com/drive/1qPPDMzVQPM4nMydQpXtfhnknqammJmwm#scrollTo=9mxkMMPNvJXA&line=23&uniqifier=1)
1. [Fixed width flat file parser](https://colab.research.google.com/drive/1qPPDMzVQPM4nMydQpXtfhnknqammJmwm#scrollTo=cHcLt2nfzHHW&line=4&uniqifier=1)
1.   [Get Statutory Penalty Code for a given LPC](https://colab.research.google.com/drive/1qPPDMzVQPM4nMydQpXtfhnknqammJmwm#scrollTo=pRauuymzujjG&line=13&uniqifier=1)
1.   [Law Part Code Details Finder for a given LPC](https://colab.research.google.com/drive/1qPPDMzVQPM4nMydQpXtfhnknqammJmwm#scrollTo=FhRJpvVWjCDI&line=8&uniqifier=1)
1.   [Penalty Finder for a given LPC](https://colab.research.google.com/drive/1qPPDMzVQPM4nMydQpXtfhnknqammJmwm#scrollTo=hHrzUtPV99JU&line=8&uniqifier=1)
1. [Find Additional Data items for a given LPC](https://colab.research.google.com/drive/1qPPDMzVQPM4nMydQpXtfhnknqammJmwm#scrollTo=u78Zl0ia-CHw&line=32&uniqifier=1)
1. [Find the Penalty Comparison Operator](https://colab.research.google.com/drive/1qPPDMzVQPM4nMydQpXtfhnknqammJmwm#scrollTo=1qHExPAcaq8i&line=5&uniqifier=1)
1. [Utility to prepare export friendly data for each dataset](https://colab.research.google.com/drive/1qPPDMzVQPM4nMydQpXtfhnknqammJmwm#scrollTo=jDBEPQHflI3S&line=20&uniqifier=1)
1. [Utility to export data for the target downstream consumption](https://colab.research.google.com/drive/1qPPDMzVQPM4nMydQpXtfhnknqammJmwm#scrollTo=AdyCMScLbCkx&line=104&uniqifier=1)

In [159]:
# Utility functions to construct the URL based on data refresh on Wednesdays AEST
# There is additional fail safe retrieval previously avaialble data in case there are issues with data freshness
def get_wednesday(start_date: datetime = None) -> datetime:
    """
    Function to get the latest Wednesday
    """
    tz = pytz.timezone('Australia/Sydney')
    today = start_date if start_date else datetime.now(tz)
    days_to_subtract = (today.weekday() - 2) % 7
    latest_wednesday = today - timedelta(days=days_to_subtract)
    return latest_wednesday


def url_exists(url: str) -> bool:
    """
    Function to check if URL exists
    """
    try:
        response = requests.head(url)
        return response.status_code == 200
    except requests.RequestException:
        return False

def construct_url(url_template: str,
                  date_function: Callable[[datetime], datetime],
                  date_format: str = '%Y%m%d',
                  retry_max_weeks: int = 1,
                  retry_url_function : Optional[Callable[[str], bool]] = None
                  ) -> Tuple[str, datetime]:
    """
    Function to construct URL
    """
    tz = pytz.timezone('Australia/Sydney')
    start_date = datetime.now(tz)

    for week in range(retry_max_weeks):
        latest_wednesday = date_function(start_date)
        formatted_date = latest_wednesday.strftime(date_format)
        url = url_template.format(date=formatted_date)
        print(f"{week} - {url}")

        # not retry URL Passed
        if not retry_url_function:
          return url, latest_wednesday

        # Contructed URL is legal and available
        if retry_url_function(url):
            return url, latest_wednesday

        # Move to the previous Wednesday
        start_date -= timedelta(weeks=1)

    raise FileNotFoundError(f"No valid URL found for the past {retry_max_weeks} Wednesdays.")


# Partial functions to
just_construct_url= partial(construct_url, retry_url_function=None)
construct_valid_url= partial(construct_url, retry_url_function=url_exists, date_function= get_wednesday, retry_max_weeks=20)

In [160]:

def download_and_unzip(url: str, extract_to: str = '.', download_def: List[str] = [], download_def_only: bool = False):
    # Send a GET request to the URL
    response = requests.get(url)

    # Ensure the request was successful
    if response.status_code == 200:
        # Create a BytesIO object from the response content
        zip_file = BytesIO(response.content)

        # Open the ZIP file
        with zipfile.ZipFile(zip_file, 'r') as zip_ref:
            # Get the list of all files in the ZIP archive
            all_files = zip_ref.namelist()

            # Determine files not in download_def
            if download_def_only and download_def:
                files_to_extract = [file for file in download_def if file in all_files]
                files_not_in_def = [file for file in all_files if file not in download_def]
                if files_not_in_def:
                    print("Additional JudComm files that won't be downloaded:", files_not_in_def)
            else:
                files_to_extract = all_files

            # Extract the specified files
            for file in files_to_extract:
                if file in all_files:
                    # print(f"Extracting {file}...")
                    zip_ref.extract(file, path=extract_to)
                else:
                    print(f"{file} not found in the archive")
    else:
        print(f"Failed to download file: {response.status_code}")

# Sample usage

# DOWNLOAD_DEF = {
#     '22.txt': {
#         'df_name': 'df_lp_juro_penalty'}
# }

# # Extract the relevant file names from DOWNLOAD_DEF
# file_names_to_extract = list(DOWNLOAD_DEF.keys())

# # Call the function with the extracted file names
# download_and_unzip(
#     url='https://lawcodes.judcom.nsw.gov.au/updates/lawcodes-full-20240626.zip',
#     extract_to='dw_folder',
#     download_def=file_names_to_extract,
#     download_def_only=True
# )


In [161]:
def zip_csv_files(exported_data: List[Dict[str, Any]], zip_file_name: str, readme_content: str = None) -> None:
    """
    Exports DataFrames as csv files and zips them together into a single file.
    Optionally adds a README.TXT with instructions to the recipient.

    Parameters:
    `exported_data` (List[Dict[str, Any]]): List of dictionaries containing file export information.
      {
        'export_file_name': 'xxx.csv',
        'sep' : ',',
        'df': `df`,
      }

    `zip_file_name` (str): The name of the output zip file.

    `readme_content` (str): Optional multiline text to be included in a README.TXT file.
    """
    folder_path = os.path.dirname(zip_file_name)
    os.makedirs(folder_path, exist_ok=True)

    with zipfile.ZipFile(zip_file_name, 'w') as zipf:
        for data in exported_data:
            df = data['df']
            export_file_name = data['export_file_name']
            sep = data['sep']

            # Save the DataFrame to a CSV file
            df.to_csv(export_file_name, index=False, sep=sep)

            # Add the CSV file to the zip file
            zipf.write(export_file_name, os.path.basename(export_file_name))

            # Remove the temporary CSV file
            os.remove(export_file_name)

        if readme_content:
            readme_file = 'README.TXT'
            with open(readme_file, 'w') as f:
                f.write(readme_content)
            zipf.write(readme_file, readme_file)
            os.remove(readme_file)

# Sample Usage
# exported_data = [
#     {
#         'export_file_name': 'example1.csv',
#         'sep': ',',
#         'df': df1
#     },
#     {
#         'export_file_name': 'example2.csv',
#         'sep': ',',
#         'df': df2
#     }
# ]
# zip_file_name = '/export/lawcodes-processed-subset-20240703.zip'
# readme_content = """
# This archive contains the following files:
# - example1.csv: Description of the first CSV file.
# - example2.csv: Description of the second CSV file.
#
# Instructions:
# Please extract the files and use them as per the requirements.
# """
# zip_csv_files(exported_data=exported_data, zip_file_name=zip_file_name, readme_content=readme_content)


In [162]:
@contextmanager
def measure_exec_time():
    """
      Measure time taken to execute a cell
    """
    start_time = time.time()
    yield
    end_time = time.time()
    elapsed_time = end_time - start_time
    print(f"Total execution time: {elapsed_time:.2f} seconds")


In [163]:
def read_fixed_length_file(file :str , metadata:List[Dict[str, Any]]):
    """
      INPUT_PARAMETERS
      file_ : folder/file.ext
      metadata =
        {
            'name': 'table_name',
            'description': 'Free text description of the table',
            'df_name': 'name of the df in the collection',
            'drop_cols': ['col1', 'col1'], # List of columns to drop in the dataframe
            'col_prefix': 'X.', # Prefix to add to all columns for traceability purposes downstream
            'fields': [
                {'order': 1,
                  'name': 'colum_name',
                  'data_type': 'SQL DB data typpe',
                  'width': integer,
                  'purpose': 'Primary key/Foreign key'},

            ]
        }

    """
    fields = metadata['fields']
    colspecs = [(sum(field['width'] for field in fields[:i]), sum(field['width'] for field in fields[:i+1])) for i in range(len(fields))]
    names = [field['name'] for field in fields]


    # Read the fixed-width file with all columns as string to handle NA values
    df = pd.read_fwf(file, colspecs=colspecs, names=names, dtype=str, na_values=['NA','', '<NA>'], na_filter=True)


    # print(f"Before conversion - {field_name}: {df[field_name].head()}")
    # Convert fields to appropriate types using the conversion functions in DATA_TYPE_MAPPING
    for field in fields:
        field_name = field['name']
        data_type = field['data_type']

        if data_type in DATA_TYPE_MAPPING:
            # Apply conversion function
            conversion_func = DATA_TYPE_MAPPING[data_type]['convert']
            df[field_name] = conversion_func(df[field_name])


            # Convert to specified dtype
            df[field_name] = df[field_name].astype(DATA_TYPE_MAPPING[data_type]['dtype'])

    # Drop specified columns if 'drop_cols' is in metadata
    if 'drop_cols' in metadata:
        df.drop(columns=metadata['drop_cols'], inplace=True)

    # Reset index
    if 'index_cols' in metadata:
        df.set_index(metadata['index_cols'], inplace=True)

    # Add prefix to all columns if 'col_prefix' is in metadata
    if 'col_prefix' in metadata:
        # Rename the columns
        df.columns = [metadata['col_prefix'] + col for col in df.columns]
        # Rename the index if it's named
        if df.index.name:
            df.index.name = metadata['col_prefix'] + df.index.name
        # If the index is a MultiIndex, rename all its levels
        elif isinstance(df.index, pd.MultiIndex):
            df.index.names = [metadata['col_prefix'] + (name if name else '') for name in df.index.names]

    return df





In [164]:
def get_statutory_codes_for_lpc(lpc: int,
                                as_at_date: Optional[datetime] = None) -> List[int]:
    """
    Finds statutory penalty codes for a given law part code (LPC) and an optional date.

    Statutory Penalty Codes are either:
      1. Directly in JP.statutory_penalty_code OR
      2. Referenced by JP.reference_law_part_code

    1 & 2 are mutually exclusive.

    In case of 2, the `recurse()` function is used to find
      `JP.statutory_penalty_code`s where `J.law_part_code` == `JP.reference_law_part_code`.

    Parameters:
    lpc (int): The law part code to search for.
    as_at_date (Optional[datetime]): The date to filter the records. If not provided, all records are considered.

    Returns:
    List[int]: A list of JP.statutory_penalty_code

    Notes:
    - The function merges two DataFrames (`df_lp_juro_penalty` and `df_lp_juro`) on `JP.law_part_jurisdiction_code` and `J.code`.
    - The recursion collects statutory penalty codes and reference law part codes.
    - If `as_at_date` is provided, the records are filtered based on `JP.valid_from_date` and `JP.valid_to_date`.

    """

    def recurse(df_j: pd.DataFrame,
                lpc: int, as_at_date: Optional[datetime],
                collected_statutory_codes: List[Dict[str, int]]) -> None:

        if as_at_date:
            df_j_filtered = df_j[
                (df_j['J.law_part_code'] == lpc) &
                (df_j['JP.valid_from_date'] <= as_at_date) &
                ((df_j['JP.valid_to_date'] >= as_at_date) | df_j['JP.valid_to_date'].isna())
            ]
        else:
            df_j_filtered = df_j[
                (df_j['J.law_part_code'] == lpc)
            ]

        # Collect statutory penalty codes as a list of dictionaries
        new_statutory_penalty_codes = df_j_filtered[['JP.code', 'JP.statutory_penalty_code', 'J.law_part_code']].dropna().apply(
            lambda row: {'juro_penalty_code': row['JP.code'], 'statutory_penalty_code': row['JP.statutory_penalty_code'], 'law_part_code': row['J.law_part_code']},
            axis=1
        ).tolist()

        collected_statutory_codes.extend(new_statutory_penalty_codes)

        # Collect reference law part codes and recurse
        reference_law_part_codes = df_j_filtered['JP.reference_law_part_code'].dropna().unique().tolist()

        for ref_lpc in reference_law_part_codes:
            recurse(df_j, ref_lpc, as_at_date, collected_statutory_codes)

    # Initialize list to store results
    statutory_penalty_codes: List[Dict[str, int]] = []

    # Add prefixes to DataFrame columns to avoid confusion
    df_lp_juro_penalty = DF_JUDCOMM['df_lp_juro_penalty']
    df_lp_juro = DF_JUDCOMM['df_lp_juro']

    # Merge the two DataFrames on the appropriate columns
    df_j = pd.merge(df_lp_juro_penalty, df_lp_juro, left_on='JP.law_part_jurisdiction_code', right_on='J.code')

    # Start the recursion
    recurse(df_j, lpc, as_at_date, statutory_penalty_codes)

    return statutory_penalty_codes

# Example Usage
# print(get_statutory_codes_for_lpc(lpc=83284, as_at_date=datetime(2024, 7, 30)))
# print(get_statutory_codes_for_lpc(lpc=97117, as_at_date=datetime(2024, 7, 30)))

# Without `as_at_date`
# print(get_statutory_codes_for_lpc(lpc=74739))
# print(get_statutory_codes_for_lpc(lpc=74744))


In [165]:
def get_law_part_details(lpc:int, as_at_date : Optional[datetime] = None):
  """
    Returns details of the Law for a given LPC by combining details from Law Part Code, Law and Body of Law
  """
  _df_lp = DF_JUDCOMM['df_law_part'][DF_JUDCOMM['df_law_part']['LP.code']==lpc]

  df_lp_l = pd.merge(_df_lp, DF_JUDCOMM['df_law'],  left_on='LP.law_code', right_on='L.code')
  df_lp_bol = pd.merge(df_lp_l, DF_JUDCOMM['df_bol'],  left_on='L.bol_code', right_on='BOL.code')

  if as_at_date:
      df_final = df_lp_bol[
          (df_lp_bol['BOL.valid_from_date'] <= as_at_date) &
          ((df_lp_bol['BOL.valid_to_date'] >= as_at_date) | df_lp_bol['BOL.valid_to_date'].isna())
      ]
  else:
      df_final = df_lp_bol

  return_cols = ['BOL.code','BOL.short_description', 'L.reference',
                 'L.code','LP.code', 'LP.short_description', 'LP.long_description', 'LP.valid_from_date', 'LP.valid_to_date']



  return df_final[return_cols]

# Example usage
# get_law_part_details(74744)

In [166]:
def find_penalties(lpc:int, as_at_date: Optional[datetime]=None):
  """
  1. Get the `JP.statutory_penalty_code`s associated with the LPC
  2. Merge P_CMD.statutory_penalty_code == JP.statutory_penalty_code to get the penalty components of interest
  3. Agument data with reference data
    - UOM.description
    - PR.description
    - PT.description
    - JP.offender_type_abbreviation
    - OT.description
    - CO.description

  """
  # Access the individual dataframes
  #.. related to Penalty component
  df_penalty_compound = DF_JUDCOMM['df_penalty_compound']
  df_penalty_component = DF_JUDCOMM['df_penalty_component']
  df_unit_of_measure = DF_JUDCOMM['df_unit_of_measure']
  df_penalty_role = DF_JUDCOMM['df_penalty_role']
  df_penalty_type = DF_JUDCOMM['df_penalty_type']
  df_comp_op = DF_JUDCOMM['df_comp_op']
  #.. related to Jurisdiction Penalty
  df_lp_juro_penalty = DF_JUDCOMM['df_lp_juro_penalty']
  df_lp_juro = DF_JUDCOMM['df_lp_juro']

  #.. related to Offender Type
  df_offender_type = DF_JUDCOMM['df_offender_type']


  # List for results from `get_statutory_codes_for_lpc()`
  stat_penalty_codes : List[int] = None
  jp_codes : List[int] =  None #List of J.Code from `stat_penalty_codes`
  jp_statutory_codes : List[int] =  None #List of JP.statutory_penalty_code from `stat_penalty_codes`

  # Collect all directy and indirect `JP.statutory_penalty_codes`
  stat_penalty_codes = get_statutory_codes_for_lpc(lpc = lpc, as_at_date = as_at_date)

  if not stat_penalty_codes:
    print(lpc)


  # Ensure that the items contain the required keys before unpacking
  valid_items = [
      (item['juro_penalty_code'], item['statutory_penalty_code'])
      for item in stat_penalty_codes
      if 'juro_penalty_code' in item and 'statutory_penalty_code' in item
  ]

  if valid_items:
      jp_codes, jp_statutory_codes = zip(*valid_items)
  else:
      jp_codes, jp_statutory_codes = [], []

  # Inject the Initial LPC looked up
  df_penalty_component['searched_lpc'] = lpc

  # Stuff Unit of Measure
  df_penalty_component['UOM.description'] = df_penalty_component['P_CMT.unit_of_measure_code'].map(df_unit_of_measure['UOM.description'])

  # Stuff Penalty Role
  df_penalty_component['PR.description'] = df_penalty_component['P_CMT.penalty_role_code'].map(df_penalty_role['PR.description'])

  # Stuff Penalty Type
  df_penalty_component['PT.description'] = df_penalty_component['P_CMT.penalty_type_code'].map(df_penalty_type['PT.description'])


  # Link Penalty Compound with Penalty Component and  Comparison Operator
  _df_pcpd_pc = pd.merge(df_penalty_component, df_penalty_compound,
                      left_on='P_CMT.code', right_on='P_CMD.penalty_component_code',
                      how='left')


  _df_pcpd_pc = _df_pcpd_pc[_df_pcpd_pc['P_CMD.statutory_penalty_code'].isin(jp_statutory_codes)]

  # Get Jurisdiction Penalty records for the matched Statutory Penalty Codes
  df_j = pd.merge(df_lp_juro_penalty, df_lp_juro,  left_on='JP.law_part_jurisdiction_code', right_on='J.code')
  df_j = df_j[df_j['JP.code'].isin(jp_codes)]

  # Link Penalty Compound with Jurisdiction Penalty and Jurisdiction
  df_stat_code = pd.merge(_df_pcpd_pc, df_j, left_on='P_CMD.statutory_penalty_code', right_on='JP.statutory_penalty_code')

  # Link the Offender type
  df_stat_code['OT.description'] = df_stat_code['JP.offender_type_abbreviation'].map(df_offender_type['OT.description'])

  #Link Jurisdiction level data with the penalty
  df_juro_level = pd.merge(DF_JUDCOMM['df_jurisdiction'], DF_JUDCOMM['df_jurisdiction_level'], left_on = 'JUR.jurisdiction_level_code', right_on = 'JUR_L.code', how ='inner')
  df_juro_level= df_juro_level[['JUR.code', 'JUR_L.short_description', 'JUR_L.long_description']]
  df_final= pd.merge(df_stat_code, df_juro_level, left_on = 'J.jurisdiction_code', right_on = 'JUR.code', how ='left')




  # Stuff Penalty Comparison Operator
  # Using custom prefix, _PCO as Comparison Operator is not specific to Penalty
  # df_final['P_CMD.comparison_operator_code'] = df_final.apply(lambda row: get_penalty_comparison_operator(row, df_final), axis=1) # Row based operation
  df_final  = fill_values_within_groups(df = df_final, group_col= 'P_CMD.statutory_penalty_code', match_col= 'P_CMD.comparison_operator_code')

  df_final['_PCO.description'] = df_final['P_CMD.comparison_operator_code'].map(df_comp_op['CO.description'])

  # print(df_stat_code)
  # df_stat_code['searched_lpc'] = lpc

  return_cols = ['searched_lpc', 'P_CMT.penalty_quantity','P_CMD.code',
                 'P_CMD.statutory_penalty_code',
                 'JUR_L.short_description', 'JUR_L.long_description',
                 'JP.offender_type_abbreviation','OT.description',
                 'JP.valid_from_date', 'JP.valid_to_date','JP.adi_code',
                 'P_CMT.penalty_type_code', 'PT.description',
                 'P_CMT.unit_of_measure_code', 'UOM.description',
                 'P_CMT.penalty_role_code','PR.description',
                 '_PCO.description',
                 'P_CMD.comparison_operator_code']

  return df_final[return_cols]


#Sample usage
# find_penalties(lpc=16947, as_at_date=datetime(2024, 7, 30 ))

# find_penalties(lpc=79314)

In [167]:
def get_lpc_details(lpcs: List[int], as_at_date: Optional[datetime] = None) -> pd.DataFrame:
    """
    Combines the Law Part Codes with Penalty Details.

    Parameters:
    ----------
    lpcs : List[int]
        List of law part codes to be processed.
    as_at_date : Optional[datetime]
        The date at which the data is considered for retrieval.

    Returns:
    -------
    pd.DataFrame
        Combined DataFrame with law part codes and penalty details.
    """

    def process_lpc(lpc):
        penalties = find_penalties(lpc, as_at_date)
        lpc_details = get_law_part_details(lpc, as_at_date)
        return penalties, lpc_details

    with ThreadPoolExecutor() as executor:
        results = list(executor.map(process_lpc, lpcs))

    combined_penalties = [result[0] for result in results]
    combined_lpc_details = [result[1] for result in results]

    # Concatenate results from all LPCs
    df_penalties_combined = pd.concat(combined_penalties, ignore_index=True)
    df_lpc_details_combined = pd.concat(combined_lpc_details, ignore_index=True)

    # Combine on 'LPC.code' from get_law_part_details and 'searched_lpc' from find_penalties
    df_combined = pd.merge(df_penalties_combined, df_lpc_details_combined, left_on='searched_lpc', right_on='LP.code')

    # Inject ADI details
    df_combined['_ADI.description'] = df_combined['JP.adi_code'].apply(get_additional_data_items)
    # df_combined['_ADI.description'] =pd.NA

    column_names = [
        # LPC related
        'searched_lpc', 'LP.code', 'LP.short_description', 'LP.long_description', 'LP.valid_from_date', 'LP.valid_to_date',
        # Law related
        'BOL.code', 'L.code', 'BOL.short_description', 'L.reference',
        # Penalty related
        'P_CMD.code', 'JUR_L.short_description', 'JUR_L.long_description', 'P_CMD.statutory_penalty_code',
        'JP.valid_from_date', 'JP.valid_to_date', 'P_CMT.penalty_quantity', 'P_CMT.unit_of_measure_code',
        'UOM.description', 'P_CMT.penalty_type_code', 'PT.description', 'P_CMT.penalty_role_code', 'PR.description',
        'JP.offender_type_abbreviation', 'OT.description', 'P_CMD.comparison_operator_code', '_PCO.description',
        'JP.adi_code', '_ADI.description'
    ]

    df_combined = df_combined[column_names]
    return df_combined

# Sample usage
# lpcs = [79314] # 17118
# active_data = datetime(2024, 7, 30)
# _df = get_lpc_details(lpcs, None)
# _df


In [168]:
# TODO : Implement a partial function using `filter_series_from_df()`
@lru_cache(maxsize=None)
def get_additional_data_items( adi_code: int) -> Optional[str]:
    """
    Constructs a dynamic string describing the concept of additional data items (adi)
    JudComm reference https://lawcodes.judcom.nsw.gov.au/assets/lawcodes_design_and_export_format.html#32_adi

    The adi decription is contructed by combinging the data in
      28.txt — adi_type  => DF_JUDCOMM['df_adi_type']
      29.txt — adi_role => DF_JUDCOMM['df_adi_role']
      30.txt — adi_component => DF_JUDCOMM['df_adi_component']
      31.txt — adi_group  => DF_JUDCOMM['df_adi_group']

    Parameters:
    adi_code (int): The adi_code to filter the DataFrame.

    Returns:
    str: A dynamically constructed string.
      E.g. Drug quantity < Commercial quantity and Drug type is not Cannabis plant or leaf
    """

    df_adi_type = DF_JUDCOMM['df_adi_type']
    df_adi_role = DF_JUDCOMM['df_adi_role']
    df_adi_component = DF_JUDCOMM['df_adi_component']
    df_adi_group = DF_JUDCOMM['df_adi_group']


    # Merge df_adi_component with df_adi_type
    merged_adi_component_type = pd.merge(
      df_adi_component,
      df_adi_type,
      left_on='ADI_C.adi_type_code',
      right_on='ADI_T.abbreviation'
    )

    # Merge the result with df_adi_role
    merged_adi_component_type_role = pd.merge(
      merged_adi_component_type,
      df_adi_role,
      left_on='ADI_C.adi_role_code',
      right_on='ADI_R.abbreviation'
    )

    # Merge the result with df_adi_group
    merged_adi_all = pd.merge(
      merged_adi_component_type_role,
      df_adi_group,
      left_on='ADI_C.code',
      right_on='ADI_G.adi_component_code'
    )

    # Select relevant columns and remove duplicates
    final_columns = [
      'ADI_G.code',
      'ADI_C.code', 'ADI_C.adi_value', 'ADI_C.adi_type_code',
      'ADI_C.adi_type_code', 'ADI_T.description',
      'ADI_C.adi_role_code', 'ADI_R.description',
      'ADI_G.code','ADI_G.sequence_number', 'ADI_G.adi_component_code', 'ADI_G.comparison_operator_code', 'ADI_G.adi_code'
    ]

    df = merged_adi_all[final_columns].drop_duplicates()

    # Filter the DataFrame for the given adi_code
    df_filtered = df[df['ADI_G.adi_code'] == adi_code].sort_values(by=['ADI_G.sequence_number'])

    # Initialize the parts of the string
    conditions = []
    logical_connectors = [] #QUESTION : Just on logical connector?

    for index, row in df_filtered.iterrows():
        condition = f"{row['ADI_T.description']} {row['ADI_R.description'].lower()} {row['ADI_C.adi_value']}"
        conditions.append(condition)


        # Collect logical connectors if they are not NaN
        if pd.notna(row['ADI_G.comparison_operator_code']) and row['ADI_G.comparison_operator_code'].lower() != 'nan':
            logical_connectors.append(row['ADI_G.comparison_operator_code'].lower())


    # Construct the final string
    if not conditions:
        return None

    result_string = conditions[0]
    for i in range(1, len(conditions)):
        connector = logical_connectors[i-1] if i-1 < len(logical_connectors) else '???'  # Default to '???' if no connector for error diagnosis
        result_string += f" {connector} {conditions[i]}"

    return result_string

# Example usage
#adi_code = 37
#dynamic_string = construct_dynamic_string(final_df, adi_code)
#print(dynamic_string)

# Assuming DF_JUDCOMM contains the DataFrames



"""
Sample output
Drug quantity < Commercial quantity and Drug type is not Cannabis plant or leaf
"""

#Sample usage
# print(construct_dynamic_string(37))
# print(get_additional_data_items(36))



'\nSample output\nDrug quantity < Commercial quantity and Drug type is not Cannabis plant or leaf\n'

In [169]:
def filter_series_from_df(
    row: pd.Series,
    df: pd.DataFrame,
    filter_conditions: dict,
    return_func: Optional[Callable[[pd.DataFrame], Union[pd.Series, list, str]]] = None
      ) -> Optional[Union[pd.Series, list, str]]:
    """
    A generic function to filter a DataFrame based on conditions from a row and apply a custom return function.

    Parameters:
    row (pd.Series): The row containing values to filter the DataFrame.
    df (pd.DataFrame): The DataFrame to be filtered.
    filter_conditions (dict): A dictionary of filter conditions with column names as keys and operators as values.
    return_func (Optional[Callable[[pd.DataFrame], Union[pd.Series, list, str]]]): A custom function to process the filtered DataFrame.

    Returns:
    Union[pd.Series, list, str, pd.NA]: The processed result based on the custom function or filtered DataFrame.
    """
    try:
        # Build filter query
        query = pd.Series([True] * len(df))
        for col, condition in filter_conditions.items():
            if isinstance(condition, Callable):
                query &= condition(df[col], row[col])
                # try:
                #   # Print the condition for debugging
                #   print(f"Applying callable condition on column '{col}' with row value '{row[col]}': {condition(df[col], row[col])}")
                #   query &= condition(df[col], row[col])
                # except Exception as e:
                #     print(f"Error applying callable condition on column '{col}' with row value '{row[col]}': {e}")
            else:
                op = condition
                value = row[col]

                # print(f"Applying condition '{op}' on column '{col}' with value '{value}'")

                match op:
                    case 'eq':
                        query &= (df[col] == value)
                    case 'ne':
                        query &= (df[col] != value)
                    case 'lt':
                        query &= (df[col] < value)
                    case 'le':
                        query &= (df[col] <= value)
                    case 'gt':
                        query &= (df[col] > value)
                    case 'ge':
                        query &= (df[col] >= value)
                    case 'isna':
                        query &= df[col].isna()
                    case 'notna':
                        query &= df[col].notna()
                    case _:
                        raise ValueError(f"Unsupported operator: {op}")

        # Filter the DataFrame based on the provided conditions
        _df_comp_ops = df[query]

        if return_func:
            # Apply the return function to the filtered DataFrame
            ret_val = return_func(_df_comp_ops)
            return ret_val
        else:
            return _df_comp_ops

    except KeyError as ke:
        print(f"KeyError: {str(ke)} - Ensure all columns exist in the DataFrame.")
        return pd.NA
    except ValueError as ve:
        print(f"ValueError: {str(ve)} - Check the filter conditions and operators.")
        return pd.NA
    except Exception as e:
        print(f"Error: {str(e)}")
        return pd.NA


# PARTIAL FUNCTIONS FOR SPECIFIC CASES
# ------------------------------------
# PENALTY COMPARISON OPERATOR

# Define filter conditions with operators (values will be taken from the row)
pco_conditions = {
    'searched_lpc': 'eq',
    'P_CMD.statutory_penalty_code': 'eq',
    'JP.valid_from_date': 'eq',
    'JP.valid_to_date': lambda df_val, row_val: (df_val.isna() & pd.isna(row_val)) | (df_val == row_val)
  }

# Get a specific column
pco_custom_return_func = lambda df: ''.join(
        [val for val in df['P_CMD.comparison_operator_code'].dropna().unique().tolist() if val.lower() != 'nan']
    )
# Create a partial function with predefined arguments
get_penalty_comparison_operator = partial(filter_series_from_df, filter_conditions=pco_conditions, return_func=pco_custom_return_func)



# EXAMPLE USAGE
# Example DataFrame and row for testing
# df_example = pd.DataFrame({
#     'searched_lpc': [1, 2, 3],
#     'P_CMD.statutory_penalty_code': [100, 200, 300],
#     'JP.valid_from_date': [pd.Timestamp('2020-01-01'), pd.Timestamp('2020-01-01'), pd.Timestamp('2020-01-01')],
#     'JP.valid_to_date': [pd.NaT, pd.Timestamp('2021-01-01'), pd.NaT],
#     'P_CMD.comparison_operator_code': ['A', 'B', 'nan']
# })

# row_example = pd.Series({
#     'searched_lpc': 1,
#     'P_CMD.statutory_penalty_code': 100,
#     'JP.valid_from_date': pd.Timestamp('2020-01-01'),
#     'JP.valid_to_date': pd.NaT
# })

# # Test the function
# result = get_penalty_comparison_operator(row_example, df_example)
# print(result)


In [170]:
def process_and_export(df: pd.DataFrame,
                       exported_cols: List[str],
                       export_file_name: str,
                       sep: str = ',',
                       sort_cols: Optional[str] = None,
                       rename_cols: Optional[Dict[str, str]] = None) -> Dict[str, Any]:
    """
    Process the DataFrame by keeping specified columns, optionally renaming columns,
    and preparing it for export.

    Parameters:
    - df: pd.DataFrame - The input DataFrame.
    - exported_cols: List[str] - List of columns to retain.
    - export_file_name: str - Name of the file to export.
    - sep: str - Separator for the export file (default is ',').
    - sort_col: Optional[str] - Column to sort the DataFrame by (default is None).
    - rename_cols: Optional[Dict[str, str]] - Dictionary to rename columns (default is None).

    Returns:
    - Dict[str, Any] - A dictionary containing export metadata and processed DataFrame.
    """
    df_copy = df.copy()

    df_copy= df_copy.reset_index()

    if sort_cols:
        df_copy = df_copy.sort_values(by=sort_cols)

    if exported_cols:
      df_copy = df_copy[exported_cols]
      df_copy = df_copy.drop_duplicates(subset=exported_cols)

    if rename_cols:
        df_copy = df_copy.rename(columns=rename_cols)

    return {
        'export_file_name': export_file_name,
        'sep': sep,
        'df': df_copy
    }





In [171]:
# Function to export the data plus reference data
# Alternative to `prep_data_for_salesforce()`
DEFAULT_COLUMN_NAME_MAP = {#LPC related
                  'LP.code':'lpc',
                  'LP.short_description' : 'lpc_short_description',
                  'LP.long_description': 'lpc_long_description',
                  'LP.valid_from_date': 'lpc_valid_from',
                  'LP.valid_to_date':'lpc_valid_to',

                  # Law related
                  'BOL.code':'bol_code',
                  'L.code' : 'law_code',
                  'BOL.short_description': 'law_short_description',
                  'L.reference':'law_section',

                  #Penalty related
                  'P_CMD.code' : 'penalty_code',
                  'JUR_L.short_description' :'jurisdiction_level_code',
                  'JUR_L.long_description': 'jurisdiction_level_description',
                  'P_CMD.statutory_penalty_code':'statutory_penalty_code',
                  'JP.valid_from_date' :'penalty_valid_from',
                  'JP.valid_to_date' :'penalty_valid_to',
                  'P_CMT.penalty_quantity' :'penalty_amount',

                  'P_CMT.unit_of_measure_code' :'penalty_uom_code',
                  'UOM.description':'penalty_uom',
                  'UOM.abbreviation':'penalty_uom_code',

                  'P_CMT.penalty_type_code':'penalty_type_code',
                  'PT.abbreviation': 'penalty_type_code',
                  'PT.description':'penlaty_type',

                  'P_CMT.penalty_role_code':'penalty_role_code',
                  'PR.abbreviation':'penalty_role',
                  'PR.description':'penalty_role',

                  'JP.offender_type_abbreviation':'offender_type_code',
                  'OT.abbreviation':'offender_type_code',
                  'OT.description':'offender_type',

                  'P_CMD.comparison_operator_code' :'penalty_comparison_operator_code',
                  'CO.abbreviation': 'penalty_comparison_operator_code',
                  'CO.description': 'penalty_comparison_operator',
                  '_PCO.description':'penalty_comparison_operator',

                  'JP.adi_code':'adi_code',
                  '_ADI.description':'adi_description'


                }



def prep_data_with_references(df_denorm_results: pd.DataFrame,
                             default_column_mapping: Optional[Dict[str, str]] = DEFAULT_COLUMN_NAME_MAP,
                             include_denormalised_data: bool = True,
                             readme_content:str = None) -> List[pd.DataFrame]:
    """
    Normalizes the Lookup data along with associated reference data
    """


    """
      {
        'export_file_name': 'xxx.csv',
        'sep' : ',',
        df: `df`,
      }
    """
    _df_juro_level = pd.merge(DF_JUDCOMM['df_jurisdiction'],DF_JUDCOMM['df_jurisdiction_level'],
            left_on='JUR.jurisdiction_level_code', right_on='JUR_L.code', how='inner')

    EXPORT_DEF = [
          #UOM
        {   'data_df': DF_JUDCOMM['df_unit_of_measure'],
            'exported_cols': ['UOM.abbreviation','UOM.description'],
            'export_file_name': '10_ref_uom.csv',
            'sort_cols': ['UOM.abbreviation'],
            'rename_cols': None
        }, #PR
        {
            'data_df': DF_JUDCOMM['df_penalty_role'],
            'exported_cols': ['PR.abbreviation', 'PR.description'],
            'export_file_name': '20_ref_penalty_role.csv',
            'sort_cols': ['PR.abbreviation'],
            'rename_cols': {'PR.abbreviation': 'penalty_role_code', 'PR.description': 'penalty_role'}
        }, #PT
        {
            'data_df': DF_JUDCOMM['df_penalty_type'],
            'exported_cols': ['PT.abbreviation', 'PT.description'],
            'export_file_name': '30_ref_penalty_type.csv',
            'sort_cols': ['PT.abbreviation'],
            'rename_cols': None
        },#Juro Level
        {
            'data_df': _df_juro_level,
            'exported_cols': ['JUR_L.short_description', 'JUR_L.long_description'],
            'export_file_name': '50_ref_jurisdicton_level.csv',
            'sort_cols': ['JUR_L.short_description'],
            'rename_cols': {'JUR_L.short_description': 'jurisdiction_level_code', 'JUR_L.long_description': 'jurisdiction_level_description'}
        },
          # Comp Operator
        {
            'data_df': DF_JUDCOMM['df_comp_op'],
            'exported_cols': ['CO.abbreviation', 'CO.description'],
            'export_file_name': '60_ref_Comp_Operator.csv',
            'sort_cols': ['CO.abbreviation'],
            'rename_cols': None
        },
        #   #OT
        {
            'data_df': DF_JUDCOMM['df_offender_type'],
            'exported_cols': ['OT.abbreviation', 'OT.description'],
            'export_file_name': '40_ref_offender_type.csv',
            'sort_cols': ['OT.abbreviation'],
            'rename_cols': None
        },
        #Laws
        {
            'data_df': df_denorm_results,
            'exported_cols': ['BOL.code','LP.code', 'BOL.short_description'],
            'export_file_name': '70_data_law.csv',
            'sort_cols': ['BOL.code'],
            'rename_cols': None
        },#LPC
        {   'data_df': df_denorm_results,
            'exported_cols': [
                      'LP.code','LP.short_description','LP.long_description',
                      'LP.valid_from_date', 'LP.valid_to_date'],
            'export_file_name': '80_data_lpc.csv',
            'sort_cols': ['LP.code', 'LP.valid_from_date', 'LP.valid_to_date'],
            'rename_cols': None

        },
        #     #Penalty
        {   'data_df': df_denorm_results,
            'exported_cols': [
                      'BOL.code','LP.code','P_CMD.code',
                      'JUR_L.short_description','JUR_L.long_description','P_CMD.statutory_penalty_code',
                      'JP.valid_from_date','JP.valid_to_date','P_CMT.penalty_quantity','P_CMT.unit_of_measure_code',
                      'UOM.description','P_CMT.penalty_type_code','PT.description','P_CMT.penalty_role_code',
                      'PR.description','JP.offender_type_abbreviation','OT.description','P_CMD.comparison_operator_code',
                      '_PCO.description','JP.adi_code','_ADI.description'],
            'export_file_name': '90_data_penalty.csv',
            'sort_cols': ['LP.code', 'JUR_L.short_description','P_CMD.statutory_penalty_code','JP.valid_from_date','JP.valid_to_date'],
            'rename_cols': None
        }
    ]

    #Add denormalised data
    if include_denormalised_data:
      EXPORT_DEF.append(
        #Denormalised Data
        {   'data_df': df_denorm_results,
            'exported_cols': [
                      'BOL.code', 'BOL.short_description',

                      'LP.code','LP.short_description','LP.long_description',
                      'LP.valid_from_date', 'LP.valid_to_date','BOL.code',

                      'JUR_L.short_description','JUR_L.long_description','P_CMD.statutory_penalty_code',
                      'JP.valid_from_date','JP.valid_to_date','P_CMT.penalty_quantity','P_CMT.unit_of_measure_code',
                      'UOM.description','P_CMT.penalty_type_code','PT.description','P_CMT.penalty_role_code',
                      'PR.description','JP.offender_type_abbreviation','OT.description','P_CMD.comparison_operator_code',
                      '_PCO.description','JP.adi_code','_ADI.description'],
            'export_file_name': 'denormalised_data.csv',
            'sort_cols': ['LP.code', 'JUR_L.short_description','P_CMD.statutory_penalty_code','JP.valid_from_date','JP.valid_to_date'],
            'rename_cols': None
        }
      )

    exported_data = []

    # Process each export operation
    for definition in EXPORT_DEF:
        processed_data = process_and_export(
            df=definition['data_df'],
            exported_cols=definition['exported_cols'],
            export_file_name=definition['export_file_name'],
            sep=',',
            sort_cols=definition['sort_cols'],
            rename_cols=definition['rename_cols'] if definition['rename_cols'] else default_column_mapping
        )
        exported_data.append(processed_data)
    return exported_data, readme_content



In [172]:
# Function to export the data for later import into Salesforce. Content honours the extablished data contract for the donwstream system

DEFAULT_COLUMN_NAME_MAP = {#LPC related
                  'LP.code':'lpc',
                  'LP.short_description' : 'lpc_short_description',
                  'LP.long_description': 'lpc_long_description',
                  'LP.valid_from_date': 'lpc_valid_from',
                  'LP.valid_to_date':'lpc_valid_to',

                  # Law related
                  'BOL.code':'bol_code',
                  'L.code' : 'law_code',
                  'BOL.short_description': 'law_short_description',
                  'L.reference':'law_section',

                  #Penalty related
                  'P_CMD.code' : 'penalty_code',
                  'JUR_L.short_description' :'jurisdiction_level_code',
                  'JUR_L.long_description': 'jurisdiction_level_description',
                  'P_CMD.statutory_penalty_code':'statutory_penalty_code',
                  'JP.valid_from_date' :'penalty_valid_from',
                  'JP.valid_to_date' :'penalty_valid_to',
                  'P_CMT.penalty_quantity' :'penalty_amount',

                  'P_CMT.unit_of_measure_code' :'penalty_uom_code',
                  'UOM.description':'penalty_uom',
                  'UOM.abbreviation':'penalty_uom_code',

                  'P_CMT.penalty_type_code':'penalty_type_code',
                  'PT.abbreviation': 'penalty_type_code',
                  'PT.description':'penlaty_type',

                  'P_CMT.penalty_role_code':'penalty_role_code',
                  'PR.abbreviation':'penalty_role',
                  'PR.description':'penalty_role',

                  'JP.offender_type_abbreviation':'offender_type_code',
                  'OT.abbreviation':'offender_type_code',
                  'OT.description':'offender_type',

                  'P_CMD.comparison_operator_code' :'penalty_comparison_operator_code',
                  'CO.abbreviation': 'penalty_comparison_operator_code',
                  'CO.description': 'penalty_comparison_operator',
                  '_PCO.description':'penalty_comparison_operator',

                  'JP.adi_code':'adi_code',
                  '_ADI.description':'adi_description'


                }

# Instructions for the user
SF_README_CONTENT  = """
README

**Author**: Theepan Thevathasan

Version
=======

- **Version**: 1.0
- **Release Date**: 20/07/2024

Description
===========

LAW PART CODE EXTRACTOR
=======================

This archive contains the following CSV files with detailed information regarding laws, law part codes, and penalties.
The number prefix in each file name indicates the recommended order of processing to ensure referential integrity.

1. 70_data_law.csv
    - **Description**: Contains basic information about laws.
    - **Columns**:
        - `bol_code`: Judcomm's identifier for Body of Law
        - `law_short_description`: Short description of the law.
    - **Sorting Criteria**: The data is sorted by `bol_code`.

2. 80_data_lpc.csv
    - **Description**: Contains detailed information about law part codes.
    - **Columns**:
        - `lpc`: Law part code. Judcomm's identifier for Law Part Code.
        - `lpc_short_description`: Short description of the law part.
        - `lpc_long_description`: Long description of the law part.
        - `lpc_valid_from`: The date from which the law part is valid.
        - `lpc_valid_to`: The date until which the law part is valid (if applicable). Empty == Current record
    - **Sorting Criteria**: The data is sorted by `lpc`, `lpc_valid_from`, and `lpc_valid_to`.

3. 90_data_penalty.csv
    - **Description**: Contains comprehensive information about penalties associated with laws.
    - **Columns**:
        - `bol_code`: Ref to xx_data_law.csv
        - `lpc`: Ref to xx_data_lpc.csv
        - `penalty_code`: Penalty command code.
        - `jurisdiction_level_code`: Short description of the jurisdiction level.
        - `jurisdiction_level_description`: Long description of the jurisdiction level.
        - `statutory_penalty_code`: Statutory penalty code.
        - `penalty_valid_from`: The date from which the penalty is valid.
        - `penalty_valid_to`: The date until which the penalty is valid (if applicable). Empty == Current record
        - `penalty_amount`: Quantity of the penalty. Used in conjunction with penalty_uom_code
        - `penalty_uom_code`: Code of the unit of measure.
        - `penalty_uom`: Description of the unit of measure.
        - `penalty_type_code`: Code of the penalty type.
        - `penlaty_type`: Description of the penalty type.
        - `penalty_role_code`: Code of the penalty role.
        - `penalty_role`: Description of the penalty role.
        - `offender_type_code`: Abbreviation of the offender type.
        - `offender_type`: Description of the offender type.
        - `penalty_comparison_operator_code`: Code of the comparison operator.
        - `penalty_comparison_operator`: Description of the comparison operator.
        - `adi_code`: ADI code.
        - `adi_description`: Description of the ADI.
    - **Sorting Criteria**: The data is sorted by `lpc`, `jurisdiction_level_code`, `statutory_penalty_code`, `penalty_valid_from`, and `penalty_valid_to`.


4. denormalised_data.csv
    - **Columns**:
        - `bol_code`
        - `law_short_description`
        - `lpc`
        - `lpc_short_description`
        - `lpc_long_description`
        - `lpc_valid_from`
        - `lpc_valid_to`
        - `jurisdiction_level_code`
        - `jurisdiction_level_description`
        - `statutory_penalty_code`
        - `penalty_valid_from`
        - `penalty_valid_to`
        - `penalty_amount`
        - `penalty_uom_code`
        - `penalty_uom`
        - `penalty_type_code`
        - `penlaty_type`
        - `penalty_role_code`
        - `penalty_role`
        - `offender_type_code`
        - `offender_type`
        - `penalty_comparison_operator_code`
        - `penalty_comparison_operator`
        - `adi_code`
        - `adi_description`
    - **Sorting Criteria**: The data is sorted by `lpc`, `jurisdiction_level_code`, `statutory_penalty_code`, `penalty_valid_from`, and `penalty_valid_to`.


Data Source
===========

The data in this archive is extracted from the official file available at https://lawcodes.judcom.nsw.gov.au/updates/lawcodes-full-20240717.zip, which is updated weekly. The data included here is from the latest available update as of last Wednesday. The data represents a snapshot from the full data download at a specific point in time.

For a detailed definition of the data source and its structure, please refer to https://lawcodes.judcom.nsw.gov.au/assets/lawcodes_design_and_export_format.html.

Referential Integrity
=====================

To maintain referential integrity, it is crucial to process the files in the order indicated by their number prefixes. This ensures that all necessary reference data is loaded before any dependent data is processed.

1. 70_data_law.csv: Start by processing this file to load the basic information about laws.
2. 80_data_lpc.csv: Next, process this file to load detailed information about law part codes, which may reference the laws loaded in the previous step.
3. 90_data_penalty.csv: Finally, process this file to load comprehensive penalty information, which references both laws and law part codes.

Denormalized Data
=================

`denormalised_data.csv` is only present if the export specifically requests it at the time of processing

Instructions
============

1. Extract the files: Use any standard zip extraction tool to extract the files from the archive.
2. Use the CSV files: The CSV files can be opened using spreadsheet software (e.g., Microsoft Excel, Google Sheets) or imported into a database or data analysis tool for further processing.
3. Contact Information:
    Processed data : Author of the utility
    Source data : lawcodes@judcom.nsw.gov.au

Thank you for using the utility!
"""


def prep_data_for_salesforce(df_denorm_results: pd.DataFrame,
                             default_column_mapping: Optional[Dict[str, str]] = DEFAULT_COLUMN_NAME_MAP,
                             include_denormalised_data: bool = True,
                             readme_content:str = SF_README_CONTENT) -> List[pd.DataFrame]:
    """
    Prepares data for export of
    - 70_data_law.csv
    - 80_data_lpc.csv
    - 90_data_penalty.csv
    - denormalised_data.csv (Optional)

    Parameters:
    ----------
    df_denorm_results : pd.DataFrame
        The DataFrame containing the denormalized data results to be prepared for export.

    include_denormalised_data : bool, optional (default=False)
        If True, includes the denormalized data in the export definition.

    Returns:
    -------
       List[Dict[str, Any]]
        A list of dictionaries where each dictionary contains export information for a specific data component.
        Each dictionary contains the following keys:
            - 'export_file_name' (str): The name of the CSV file to be created.
            - 'sep' (str): The separator to be used in the CSV file.
            - 'df' (pd.DataFrame): The DataFrame to be exported.

    """

    EXPORT_DEF = [

        #Laws
        {
            'data_df': df_denorm_results,
            'exported_cols': ['BOL.code', 'BOL.short_description'],
            'export_file_name': '70_data_law.csv',
            'sort_cols': ['BOL.code'],
            'rename_cols': None
        },#LPC
        {   'data_df': df_denorm_results,
            'exported_cols': [
                      'LP.code','LP.short_description','LP.long_description',
                      'LP.valid_from_date', 'LP.valid_to_date'],
            'export_file_name': '80_data_lpc.csv',
            'sort_cols': ['LP.code', 'LP.valid_from_date', 'LP.valid_to_date'],
            'rename_cols': None

        },
        #     #Penalty
        {   'data_df': df_denorm_results,
            'exported_cols': [
                      'BOL.code','LP.code','P_CMD.code',
                      'JUR_L.short_description','JUR_L.long_description','P_CMD.statutory_penalty_code',
                      'JP.valid_from_date','JP.valid_to_date','P_CMT.penalty_quantity','P_CMT.unit_of_measure_code',
                      'UOM.description','P_CMT.penalty_type_code','PT.description','P_CMT.penalty_role_code',
                      'PR.description','JP.offender_type_abbreviation','OT.description','P_CMD.comparison_operator_code',
                      '_PCO.description','JP.adi_code','_ADI.description'],
            'export_file_name': '90_data_penalty.csv',
            'sort_cols': ['LP.code', 'JUR_L.short_description','P_CMD.statutory_penalty_code','JP.valid_from_date','JP.valid_to_date'],
            'rename_cols': None
        }
    ]

    #Add denormalised data
    if include_denormalised_data:
      EXPORT_DEF.append(
        #Denormalised Data
        {   'data_df': df_denorm_results,
            'exported_cols': [
                      'BOL.code', 'BOL.short_description',

                      'LP.code','LP.short_description','LP.long_description',
                      'LP.valid_from_date', 'LP.valid_to_date','BOL.code',

                      'JUR_L.short_description','JUR_L.long_description','P_CMD.statutory_penalty_code',
                      'JP.valid_from_date','JP.valid_to_date','P_CMT.penalty_quantity','P_CMT.unit_of_measure_code',
                      'UOM.description','P_CMT.penalty_type_code','PT.description','P_CMT.penalty_role_code',
                      'PR.description','JP.offender_type_abbreviation','OT.description','P_CMD.comparison_operator_code',
                      '_PCO.description','JP.adi_code','_ADI.description'],
            'export_file_name': 'denormalised_data.csv',
            'sort_cols': ['LP.code', 'JUR_L.short_description','P_CMD.statutory_penalty_code','JP.valid_from_date','JP.valid_to_date'],
            'rename_cols': None
        }
      )

    exported_data = []

    # Process each export operation
    for definition in EXPORT_DEF:
        processed_data = process_and_export(
            df=definition['data_df'],
            exported_cols=definition['exported_cols'],
            export_file_name=definition['export_file_name'],
            sep=',',
            sort_cols=definition['sort_cols'],
            rename_cols=definition['rename_cols'] if definition['rename_cols'] else default_column_mapping
        )
        exported_data.append(processed_data)

    return exported_data, readme_content



# Download JudComm Data files
* Download files in the `raw` folder
* Parse and save .csv files in the `staging` folder

In [173]:
with measure_exec_time():
  print("Start processing JudCom files")

  # Construct URL using the latest WED's file => Data contract
  JUDCOMM_SOURCE_FILE, FILE_BATCH_DATE = construct_valid_url(url_template=JUDCOMM_URL_TEMPLATE)
  print(f"Judcomm Source Data URL: {JUDCOMM_SOURCE_FILE}")

  # Download and extract files
  try:
      download_and_unzip(JUDCOMM_SOURCE_FILE,
                        extract_to=JUDCOMM_DEST_FOLDER,
                        # Don't save files that are not needed
                        download_def = list(JUDCOMM_FILE_DEF.keys()),
                        download_def_only=True)
      print(f"Downloaded file {JUDCOMM_SOURCE_FILE} & extracted to folder '{os.getcwd()}/{JUDCOMM_DEST_FOLDER}'")
  except FileNotFoundError as e:
      print(f"Error downloading file from {JUDCOMM_SOURCE_FILE}\nError: {e}")

  # Process the JudComm files of interest (Defined in `JUDCOMM_FILE_DEF`)
  """
  1. Fetch each .txt file of interest and parse
  2. save to the disk as .csv
  3. Add the DataFrame to the dictionary of DataFrames
  """

  # Function to process each file
  def convert_csv_to_df(file_name: str, file_def: dict):
      try:
          _df = read_fixed_length_file(
              file=f"{JUDCOMM_DEST_FOLDER}/{file_name}",
              metadata=file_def
          )

          os.makedirs(JUDCOMM_STAGING_FOLDER, exist_ok=True)
          dest_uri = f"{JUDCOMM_STAGING_FOLDER}/{file_name.replace('.txt', '.csv')}"
          _df.to_csv(dest_uri, index=False, mode='w')

          DF_JUDCOMM[file_def['df_name']] = _df

          print(f"\n \t Parsed file '{file_name}' into DF_JUDCOMM['{file_def['df_name']}']")
      except Exception as e:
          print(f"Error parsing file '{file_name}'\n: {e}")

  # Process files in parallel
  with ThreadPoolExecutor() as executor:
      future_to_file = {executor.submit(convert_csv_to_df, file_name, file_def): file_name for file_name, file_def in JUDCOMM_FILE_DEF.items()}
      for future in as_completed(future_to_file):
          file_name = future_to_file[future]
          try:
              future.result()
          except Exception as e:
              print(f"Error processing file {file_name}\nError: {e}")

  # Delete raw (.txt) and staging files (.csv)

  if DELETE_JUDCOMM_RAW_FILES:
    shutil.rmtree(JUDCOMM_DEST_FOLDER)
    print(f"Deleted JudComm raw files from folder '{os.getcwd()}/{JUDCOMM_DEST_FOLDER}'")

  if DELETE_JUDCOMM_STAGING_FILES:
    shutil.rmtree(JUDCOMM_STAGING_FOLDER)
    print(f"Deleted JudComm staging files from folder '{os.getcwd()}/{JUDCOMM_STAGING_FOLDER}'")

Start processing JudCom files
0 - https://lawcodes.judcom.nsw.gov.au/updates/lawcodes-full-20240724.zip
Judcomm Source Data URL: https://lawcodes.judcom.nsw.gov.au/updates/lawcodes-full-20240724.zip
Additional JudComm files that won't be downloaded: ['26.txt', '07.txt', '35.txt', '47.txt', '36.txt', '39.txt', '08.txt', '43.txt', '45.txt', '10.txt', '09.txt', '48.txt', '44.txt', '41.txt', '51.txt', '32.txt', '21.txt', '33.txt', '01.txt', '14.txt', '06.txt', '27.txt', '13.txt', '25.txt', '37.txt', '50.txt', '34.txt', '46.txt', '38.txt', '42.txt', '15.txt', '40.txt', '49.txt', 'batch_information.txt', 'order_of_import.txt']
Downloaded file https://lawcodes.judcom.nsw.gov.au/updates/lawcodes-full-20240724.zip & extracted to folder '/content/raw'

 	 Parsed file '16.txt' into DF_JUDCOMM['df_unit_of_measure']

 	 Parsed file '17.txt' into DF_JUDCOMM['df_penalty_type']

 	 Parsed file '18.txt' into DF_JUDCOMM['df_penalty_role']

 	 Parsed file '23.txt' into DF_JUDCOMM['df_comp_op']
 	 Parsed 

# ⚙️ Process and Export Law Part Codes of Interest

In [174]:
"""
Performance Benchmark
Normal : 159.91 seconds
With LRU for ADI 88.62 seconds
No ADI 92.33
Vectorised Penalty Comparison Operator 60.91 seconds *
"""

with measure_exec_time():
  LPCs = set(LPCs) if not IS_TEST_MODE else set(TEST_LPCs)

  active_date=None #Change to datetime.now() or specific date of choice e.g `datetime(2024, 7, 30 ) `
  df = get_lpc_details(lpcs = LPCs, as_at_date= active_date)


  # Normalise Data and export
  """
  Custom Functions specific target output formats
    prep_data_for_salesforce() # for data load into Salesforce
    prep_data_with_references() # data tables, reference tables
  """
  exp_data, readme_content = prep_data_for_salesforce(df_denorm_results=df,include_denormalised_data= True)

  zip_file_name, _ = just_construct_url(url_template = RESULT_EXPORT_URL_TEMPLATE,  date_function = lambda x : FILE_BATCH_DATE)

  zip_csv_files(exported_data= exp_data , zip_file_name = zip_file_name, readme_content = readme_content)
  # exp_data
  # df



0 - ./export/lawcodes_processed_subset_20240724.zip
Total execution time: 62.01 seconds


# Test Suite
Eunsure global variable `IS_TEST_MODE=True` for the test suite is to run. Tests are conducted at the data level and dependent on availability of live data from JudComm.

These are not unit tests

In [175]:
#1. One active law part code
def test_one_active_law_part_code(test_lpcs: List[int] = [TEST_LPCs[0]], test_as_at_date: datetime = None):
    try:
        # Get law part code details
        df = get_lpc_details(lpcs=test_lpcs, as_at_date=test_as_at_date)

        # Check if DataFrame is not empty
        if len(df) == 0:
            assert False, "No law part code details found"

        # Group by LP.code and check the length
        grouped_df = df.groupby('LP.code').size()
        assert len(grouped_df) == 1, f"Expected 1 LPC, but got {len(grouped_df)}"

        # Check if all records have the correct LP.code
        assert (df['LP.code'] == test_lpcs[0]).all(), f"Expected all records to have LP.code == {test_lpcs[0]}"

        # Check if LP.valid_to_date is NaT
        valid_to_date_check = df['LP.valid_to_date'].isna()
        assert valid_to_date_check.all(), f"Expected all records to have blank/NaT for LP.valid_to_date"

        # Check if all records have the correct LP.short_description
        expected_LP_short_description = "Responsible person/custodian not disclose driver's details"
        result_LP_short_description = df['LP.short_description'].unique().tolist()
        assert all([(len(result_LP_short_description) == 1), (result_LP_short_description[0] == expected_LP_short_description)]), \
            f"Expected all records to have LP.short_description == {expected_LP_short_description}"

        print(f'✅ : One active law part code. {test_lpcs=}')
        return df
    except AssertionError as e:
        print(f'❌ : Assertion failed - {e}')
        raise e
    except Exception as e:
        print(f'❌ : An unexpected error occurred - {e}')
        raise e
# _______________________________________________________________________________________

#2. Active multiple law part codes
def test_many_active_law_part_code(test_lpcs: List[int] = [TEST_LPCs[0], TEST_LPCs[1]], test_as_at_date: datetime = None):
    try:
        # Get law part code details
        df = get_lpc_details(lpcs=test_lpcs, as_at_date=test_as_at_date)

        # Check if DataFrame is not empty
        if len(df) == 0:
            assert False, "No law part code details found"

        # Group by LP.code and check the length
        grouped_df = df.groupby('LP.code').size()
        assert len(grouped_df) == 2, f"Expected 2 LPC, but got {len(grouped_df)}"

        # Check if all test_lpcs are in df['LP.code']
        missing_lpcs = [lpc for lpc in test_lpcs if lpc not in df['LP.code'].values]
        assert len(missing_lpcs) == 0, f"Expected all items in {test_lpcs} to be in df['LP.code'], but missing {missing_lpcs}"

        # Check if LP.valid_to_date is NaT
        valid_to_date_check = df['LP.valid_to_date'].isna()
        assert valid_to_date_check.all(), f"Expected all records to have blank/NaT for LP.valid_to_date"

        # Check if all expected short descriptions are in df['LP.short_description']
        expected_LP_short_description = [
            "Responsible person/custodian not disclose driver's details",
            "Solo driver work more than std maximum time - critical risk"
        ]
        missing_short_desc = [desc for desc in expected_LP_short_description if desc not in df['LP.short_description'].values]
        assert len(missing_short_desc) == 0, f"Expected all items in {expected_LP_short_description} to be in df['LP.short_description'], but missing {missing_short_desc}"

        print(f'✅ : Many active law part codes. {test_lpcs=}')
        return df
    except AssertionError as e:
        print(f'❌ : Assertion failed - {e}')
        raise e
    except Exception as e:
        print(f'❌ : An unexpected error occurred - {e}')
        raise e

#3. Active law part code at a point in time
def test_one_as_at_date_law_part_code(test_lpcs: List[int] = [TEST_LPCs[2]], test_as_at_date: datetime =TEST_AS_AT_DATE):
    try:
        # Get law part code details
        df = get_lpc_details(lpcs=test_lpcs, as_at_date=test_as_at_date)

        # Check if DataFrame is not empty
        if len(df) == 0:
            assert False, "No law part code details found"

        # Group by LP.code and check the length
        grouped_df = df.groupby('LP.code').size()
        assert len(grouped_df) == 1, f"Expected 1 LPC, but got {len(grouped_df)}"

        # Check if all records have the correct LP.code
        assert (df['LP.code'] == test_lpcs[0]).all(), f"Expected all records to have LP.code == {test_lpcs[0]}"

        # Expired penalty codes should not have a blank. I.e there should be a definite valid_to date
        valid_to_date_check = ~df['LP.valid_to_date'].isna()
        assert valid_to_date_check.all(), f"Expected all records to have dates for LP.valid_to_date"

        # Check if all expected short descriptions are in df['LP.short_description']
        expected_LP_short_description = ["Cause or permit use of unregistered vehicle on road - Camera"]
        missing_short_desc = [desc for desc in expected_LP_short_description if desc not in df['LP.short_description'].values]
        assert len(missing_short_desc) == 0, \
            f"Expected all items in {expected_LP_short_description} to be in df['LP.short_description'], but missing {missing_short_desc}"

        # Assertions for LP.valid_to_date and LP.valid_from_date
        valid_to_date_check = df['LP.valid_to_date'].isna() | (df['LP.valid_to_date'] >= test_as_at_date)
        assert valid_to_date_check.all(), f"Expected all records to have LP.valid_to_date <= {test_as_at_date} or Blanks (presently active records), but found otherwise"
        assert (df['LP.valid_from_date'] <= test_as_at_date).all(), f"Expected all records to have LP.valid_from_date >= {test_as_at_date}, but found otherwise"

        print(f'✅ : One active law part code at a point in time. {test_lpcs=}')
        return df
    except AssertionError as e:
        print(f'❌ : Assertion failed - {e}')
        raise e
    except Exception as e:
        print(f'❌ : An unexpected error occurred - {e}')
        raise e

#4. Inactive  law part code at a point in time
def test_inactive_one_law_part_code_today(test_lpcs: List[int] = [TEST_LPCs[2]], test_as_at_date: datetime =datetime.now()):
    try:
        # Get law part code details
        df = get_lpc_details(lpcs=test_lpcs, as_at_date=test_as_at_date)

        # Check if DataFrame is not empty
        if len(df) != 0:
            assert False, "No law part code details found"

        print(f'✅ : Inactive law part code at at today - no LPC details should be returned. {test_lpcs=}')
        return df
    except AssertionError as e:
        print(f'❌ : Assertion failed - {e}')
        raise e
    except Exception as e:
        print(f'❌ : An unexpected error occurred - {e}')
        raise e

#4. Current & historical penalties of a law part code
def test_one_active_law_part_code_with_historic_and_current_penalties(test_lpcs: List[int] = [TEST_LPCs[0]], test_as_at_date: datetime = None):
    try:
        # Get law part code details
        df = get_lpc_details(lpcs=test_lpcs, as_at_date=test_as_at_date)

        # Check if DataFrame is not empty
        if len(df) == 0:
            assert False, "No law part code details found when one or more records were expected"

        # Group by LP.code and check the length
        grouped_df = df.groupby('LP.code').size()
        assert len(grouped_df) == 1, f"Expected 1 LPC, but got {len(grouped_df)}"

        # Check if all records have the correct LP.code
        assert (df['LP.code'] == test_lpcs[0]).all(), f"Expected all records to have LP.code == {test_lpcs[0]}"

        # Check the valid_to_date at the LPC and Penalty levels
        lpc_valid_to_date_check = df['LP.valid_to_date'].isna()
        pn_valid_to_date_check = df['JP.valid_to_date'].isna()
        pn_valid_to_date_check_expired = ~df['JP.valid_to_date'].isna()
        assert all([lpc_valid_to_date_check.all(),
                    pn_valid_to_date_check.any()]), \
                f"Expected all records to have blank/NaT for LP.valid_to_date and, \
                some JP.valid_to_date's blank and some ome JP.valid_to_date's non blank"

        # Check if all records have the correct LP.short_description
        expected_LP_short_description = "Responsible person/custodian not disclose driver's details"
        result_LP_short_description = df['LP.short_description'].unique().tolist()
        assert all([(len(result_LP_short_description) == 1), (result_LP_short_description[0] == expected_LP_short_description)]), \
            f"Expected all records to have LP.short_description == {expected_LP_short_description}"


        # Spot check specific validate data points (Ref to Assertion error for specifics)
        # Historic
        pn_INS_FIN_01_10_2023__30_06_2024 = df[(df['JUR_L.short_description'] == 'INS')
                                          & (df['P_CMT.unit_of_measure_code']=='$')
                                          & (df['JP.valid_to_date'] == datetime(2024, 6, 30))
                                          & (df['JP.valid_from_date'] == datetime(2023, 10, 1))
                                          & (df['P_CMT.penalty_type_code']=='FIN')
                                          & (df['P_CMT.penalty_quantity']==987)
                                          ]
        assert len(pn_INS_FIN_01_10_2023__30_06_2024) == 1, "Expected one record of fine of $987 from period between 10/1/2023 and 30/6/2024"

        # Active
        pn_LC_PU_01_07_2013__ = df[(df['JUR_L.short_description'] == 'LC')
                                  & (df['JP.valid_to_date'].isna())
                                  & (df['P_CMT.unit_of_measure_code']=='PU')
                                  & (df['JP.valid_from_date'] == datetime(2013, 7, 1))
                                  & (df['P_CMT.penalty_type_code']=='FIN')
                                  & (df['P_CMT.penalty_quantity']==20)
                                  ]
        assert len(pn_LC_PU_01_07_2013__) == 1, "Expected one record of 30 Penalty Units Fine at a Local Court from period between 01/07/2013 (unexpired)"

        #Active
        pn_SCS_PU_01_07_2013__ = df[(df['JUR_L.short_description'] == 'SCS')
                                  & (df['JP.valid_to_date'].isna())
                                  & (df['P_CMT.unit_of_measure_code']=='PU')
                                  & (df['JP.valid_from_date'] == datetime(2013, 7, 1))
                                  & (df['P_CMT.penalty_type_code']=='FIN')
                                  & (df['P_CMT.penalty_quantity']==20)
                                  ]
        assert len(pn_SCS_PU_01_07_2013__) == 1, "Expected one record of 20 Penalty Units at a Supreme Court from period between 01/07/2013 (unexpired)"

        print(f'✅ : Law part code with Active and Historic Penalties. {test_lpcs=}')
        return df
    except AssertionError as e:
        print(f'❌ : Assertion failed - {e}')
        raise e
    except Exception as e:
        print(f'❌ : An unexpected error occurred - {e}')
        raise e

# Run all tests
if IS_TEST_MODE:
  print('🧪 starting tests')

  test_df = test_one_active_law_part_code()
  test_many_active_law_part_code()
  test_one_as_at_date_law_part_code()
  test_inactive_one_law_part_code_today()
  test_one_active_law_part_code_with_historic_and_current_penalties()

  print('...finishing tests')

  test_df.describe()
# test_df[[col for col in test_df.columns if not col.endswith('code')]]

In [176]:
# Filter for rows where 'P_CMD.comparison_operator_code' is neither NaN nor 'nan'
pc = DF_JUDCOMM['df_penalty_compound']
filtered_pc = pc[(pc['P_CMD.comparison_operator_code'].notna()) & (pc['P_CMD.comparison_operator_code'].str.lower() != 'nan')]

# Get unique records for 'P_CMD.comparison_operator_code' and 'P_CMD.statutory_penalty_code'
unique_records = filtered_pc[['P_CMD.comparison_operator_code', 'P_CMD.statutory_penalty_code']]\
                  .drop_duplicates()\
                  # .set_index('P_CMD.comparison_operator_code')

# unique_records['P_CMD.comparison_operator_code_dupe'] = unique_records['P_CMD.comparison_operator_code']

# Merge unique_records back to pc to assign P_CMD.statutory_penalty_code
pc = pc.merge(unique_records, on='P_CMD.statutory_penalty_code', how='left', suffixes=('', '_unique'))

# print(pc[(pc['P_CMD.comparison_operator_code'].notna()) & (pc['P_CMD.comparison_operator_code']!= 'nan')])
# print(unique_records.columns)
# print(pc.columns)

# Assign the unique statutory_penalty_code to P_CMD._comparison_operator_code
pc['P_CMD._comparison_operator_code'] = pc['P_CMD.comparison_operator_code_unique']

# Drop the temporary merge column
# pc = pc.drop(columns=['P_CMD.statutory_penalty_code_unique'])

# Filter for rows where 'P_CMD._comparison_operator_code' is not NaN
result = pc[(pc['P_CMD._comparison_operator_code'].notna()) & (pc['P_CMD._comparison_operator_code']!= 'nan')]

result = result.sort_values(by=['P_CMD.statutory_penalty_code'])
# Drop the temporary merge column

# Display the result
result[['P_CMD.statutory_penalty_code' ,'P_CMD._comparison_operator_code', 'P_CMD.comparison_operator_code']]


Unnamed: 0,P_CMD.statutory_penalty_code,P_CMD._comparison_operator_code,P_CMD.comparison_operator_code
855,3773,OR,OR
2609,3773,OR,
2610,3774,OR,
856,3774,OR,OR
1357,3852,AND,
...,...,...,...
4806,6198,AO,AO
4807,6198,AND,AO
4811,6198,AND,
4812,6202,AO,AO


In [177]:
def fill_values_within_groups(df: pd.DataFrame, group_col: str, match_col: str) -> pd.DataFrame:
    """
    Fill missing values within groups based on a specified column.

    Parameters:
    df (pd.DataFrame): The input DataFrame containing the data.
    group_col (str): The name of the column used for grouping.
    match_col (str): The name of the column with values to fill.

    Returns:
    pd.DataFrame: The DataFrame with filled values within groups.
    """
    # Define invalid values
    invalid_values = {'', 'nan', '-', 'nan'}

    # Replace invalid values with NaN
    mask = df[match_col].str.strip().str.lower().isin(invalid_values)
    df.loc[mask, match_col] = pd.NA

    # Identify groups that have at least one valid (non-NaN) value
    valid_groups = df.groupby(group_col)[match_col].transform(lambda x: x.notna().any())

    # Fill values within valid groups only, forward and backward fill within groups
    df.loc[valid_groups, match_col] = df.loc[valid_groups].groupby(group_col)[match_col].ffill()
    df.loc[valid_groups, match_col] = df.loc[valid_groups].groupby(group_col)[match_col].bfill()

    return df

# ------------------------------------------------------------------------------
# Unit Test
def test_fill_values_within_groups():
    # Example data
    data = {
        'Group_Col': [1, 1, 2, 3, 3, 3],
        'Matching_Col': ['A', ' nan ', '-', 'Nan', 'b', 'B'],
        'Misc_Col': [10, 20, 30, 40, 50, 60]
    }
    df_input = pd.DataFrame(data)

    # Expected result
    expected_data = {
        'Group_Col': [1, 1, 2, 3, 3, 3],
        'Matching_Col': ['A', 'A', pd.NA, 'b', 'b', 'B'],
        'Misc_Col': [10, 20, 30, 40, 50, 60]
    }
    df_expected = pd.DataFrame(expected_data)

    # Apply the function
    df_result = fill_values_within_groups(df_input, 'Group_Col', 'Matching_Col')

    # Assert the result matches the expected output
    pd.testing.assert_frame_equal(df_result, df_expected)

test_fill_values_within_groups()