In [None]:
!pip install --user pydbtools==5.6.4

In [None]:
pip freeze

In [None]:
#Command to check the version of pydbtools
!pip freeze | grep pydbtools
#5.3.0
#5.6.3
#5.6.4

In [None]:
#refresh the kernel

In [None]:
import pandas as pd  # a module which provides the data structures and functions to store and manipulate tables in dataframes
import pydbtools as pydb  # A module which allows SQL queries to be run on the Analytical Platform from Python, see https://github.com/moj-analytical-services/pydbtools
import boto3  # allows you to directly create, update, and delete AWS resources from Python scripts

# sets parameters to view dataframes for tables easier
pd.set_option("display.max_columns", 100)
pd.set_option("display.width", 900)
pd.set_option("display.max_colwidth", 200)

In [None]:
#Variables to be used in this notebook

#this is the athena database we will be storing our tables in
fcsq_database = "fcsq"

#this is the s3 bucket we will be saving data to
s3 = boto3.resource("s3")
bucket = s3.Bucket("alpha-family-data")

#Last full year to be published - including this publication
annual_year = 2023

#Current publication variables
current_year = 2024
current_quarter = 2

#Next publication variables
next_quarter_year = 2024 
next_quarter = 3

In [None]:
#imports SDP extract data from S3 bucket into a temporary table
#sdp_divorce_data_table = pd.read_csv("s3://alpha-family-data/fcsq_processing/Divorce/sdp_process/sdp_divorce_data_raw_2024q3.csv", low_memory=False)
#Converting selected columns to datetime       
#sdp_divorce_data_table['case_recpt_date'] = pd.to_datetime(sdp_divorce_data_table['case_recpt_date'],  errors = 'coerce', format = '%Y-%m-%d %H:%M:%S')
#sdp_divorce_data_table['dn_proncd_date'] = pd.to_datetime(sdp_divorce_data_table['dn_proncd_date'],  errors = 'coerce', format = '%Y-%m-%d %H:%M:%S')
#sdp_divorce_data_table['da_grntd_date'] = pd.to_datetime(sdp_divorce_data_table['da_grntd_date'],  errors = 'coerce', format = '%Y-%m-%d %H:%M:%S')

#pydb.dataframe_to_temp_table(sdp_divorce_data_table, "sdp_divorce_data");


In [None]:
#csv_location = "s3://alpha-family-data/fcsq_processing/Divorce/sdp_process/sdp_divorce_data_raw_2024q3.csv"



#import awswrangler as wr
#wr.catalog.create_csv_table(
#    database='fscq',
#    table='temp_divorce_extract_202422',
#    path=csv_location,
    #columns_types={'col0': 'bigint', 'col1': 'double'},
    #partitions_types={'col2': 'date'},
    #compression='gzip',
    #description='My own table!',
    #parameters={'source': 'postgresql'},
    #columns_comments={'col0': 'Column 0.', 'col1': 'Column 1.', 'col2': 'Partition.'}
#)
 

In [None]:
#test = pydb.read_sql_query("SELECT * from __temp__.sdp_divorce_data limit 10")
#test

In [None]:
#record count check in extracted data
#sdp_divorce_data_count = pydb.read_sql_query("SELECT count(*) as count from __temp__.sdp_divorce_data")
#sdp_divorce_data_count

In [None]:
#pydb.delete_table_and_data(database="__temp__", table="sdp_divorce_data")

In [None]:
#Imports location information from S3 bucket into a temporary table
#create_div_court_dfj_region_lookup_table = pd.read_csv("s3://alpha-family-data/fcsq_processing/lookups/div_court_dfj_region_lookup.csv", low_memory=False)

#pydb.dataframe_to_temp_table(create_div_court_dfj_region_lookup_table, "div_court_dfj_region_lookup");


In [None]:
#import awswrangler as wr

#create_div_court_dfj_region_lookup_table = pd.read_csv("s3://alpha-family-data/fcsq_processing/lookups/div_court_dfj_region_lookup.csv", low_memory=False)

#wr.s3.to_csv(
#    df=create_div_court_dfj_region_lookup_table,
#    path="s3://alpha-family-data/fcsq_processing/lookups/div_court_dfj_region_lookup.csv",
#    index=False,
#    dataset=True,
#    database="fcsq",
#    table="div_court_dfj_region_lookup",
#    mode="overwrite",
    #boto3_session=session,
#)

In [None]:
test = pydb.read_sql_query("SELECT * from fcsq.div_court_dfj_region_lookup LIMIT 10")
test

In [None]:
#pydb.delete_table_and_data(database="__temp__", table="#pydb.delete_table_and_data(database="__temp__", table="sdp_divorce_data")")

In [None]:
#Creating a table holding petition / application information.

In [None]:
drop_divorce_fct_petitions = "DROP TABLE IF EXISTS fcsq.divorce_fct_petitions"
pydb.start_query_execution_and_wait(drop_divorce_fct_petitions)
bucket.objects.filter(Prefix="fcsq_processing/Divorce/divorce_fct_petitions").delete();


In [None]:
create_divorce_fct_petitions_table =f"""
CREATE TABLE IF NOT EXISTS fcsq.divorce_fct_petitions
WITH (format = 'PARQUET', external_location = 's3://alpha-family-data/fcsq_processing/Divorce/divorce_fct_petitions') AS
SELECT  'Petition' As stage,
        CAST(a.pet_year AS INT) AS year,
        EXTRACT(QUARTER FROM date_parse(a.case_recpt_date, '%Y-%m-%d %H:%i:%s.%f')) AS quarter,
        EXTRACT(MONTH FROM date_parse(a.case_recpt_date, '%Y-%m-%d %H:%i:%s.%f')) AS month,
        CAST (a.fm_case_cid AS VARCHAR) AS fm_case_cid,
        CAST(a.legal_case_id AS BIGINT) AS legal_case_id,
        CAST(a.digital_paper AS VARCHAR) AS digital_paper,
        CASE WHEN a.petnr_reprsntd_ind = 'NULL' THEN NULL 
             ELSE CAST(a.petnr_reprsntd_ind AS INT)
        END AS petnr_reprsntd_ind,
        CASE WHEN respndnt_reprsntd_ind = 'NULL' THEN NULL 
             ELSE CAST(respndnt_reprsntd_ind AS INT)
        END AS respndnt_reprsntd_ind,
        DATE_PARSE(a.case_recpt_date, '%Y-%m-%d %H:%i:%s.%f') AS event_date,
        DATE_PARSE(a.case_issd_date,'%Y-%m-%d %H:%i:%s.%f') AS case_issd_date,  
        CAST(a.case_type_descriptor AS VARCHAR) AS reason,
        CAST(a.proceeding_type AS VARCHAR) AS proceeding_type,
        CAST(a.proceeding_type_code AS VARCHAR) AS proceeding_type_code,
        CASE    WHEN a.pettnr_gender_type_name = 'female' THEN 'Female'
                WHEN a.pettnr_gender_type_name = 'male' THEN 'Male'
                WHEN a.pettnr_gender_type_name = 'other' THEN 'Other/unknown'
                WHEN a.pettnr_gender_type_name = 'notGiven' THEN 'Other/unknown'
                ELSE 'check' 
        END AS Petitioners_gender,
        CASE   WHEN a.respndnt_gender_type_name = 'female' THEN 'Female'
               WHEN a.respndnt_gender_type_name = 'male' THEN 'Male'
               WHEN a.respndnt_gender_type_name = 'other' THEN 'Other/unknown'
               WHEN a.respndnt_gender_type_name = 'notGiven' THEN 'Other/unknown'
               ELSE 'check' 
        END AS respondents_gender,   
        CAST(a.pet_location_key AS DOUBLE) as location_key,
        CASE WHEN a.first_fr_hrng_ind = 'NULL' THEN NULL 
             ELSE CAST(a.first_fr_hrng_ind AS INT)
        END AS first_fr_hrng_ind,
        CASE WHEN a.first_divorce_hrng_ind = 'NULL' THEN NULL 
             ELSE CAST(a.first_divorce_hrng_ind AS INT)
        END AS first_divorce_hrng_ind,
        CAST(a.law AS VARCHAR) AS law,
        CAST(a.nfd_app_type AS VARCHAR) AS nfd_app_type,
        CAST(a.new_case_type AS VARCHAR) AS new_case_type,
        CAST(b.location_name AS VARCHAR) AS court,
        CAST(b.dfj_lookup AS VARCHAR) AS dfj_area,
        CAST(b.region_lookup AS VARCHAR) AS region
        
FROM data_eng_uploader_prod_family_sdp_data.divorce_test AS a
  LEFT JOIN fcsq.div_court_dfj_region_lookup AS b
    ON cast(a.pet_location_key as DOUBLE) = b.location_key 

WHERE CAST(a.pet_ind as INT)  = 1;
"""
pydb.start_query_execution_and_wait(create_divorce_fct_petitions_table)

In [None]:
test = pydb.read_sql_query("SELECT * from fcsq.divorce_fct_petitions LIMIT 10")
test

In [None]:
#pydb.delete_table_and_data(database="__temp__", table="divorce_fct_petitions")

In [None]:
#Creating a table holding nisi / conditional order information.

In [None]:
drop_divorce_fct_nisi = "DROP TABLE IF EXISTS fcsq.divorce_fct_nisi"
pydb.start_query_execution_and_wait(drop_divorce_fct_nisi)
bucket.objects.filter(Prefix="fcsq_processing/Divorce/divorce_fct_nisi").delete();

In [None]:
create_divorce_fct_nisi_table =f"""
CREATE TABLE IF NOT EXISTS fcsq.divorce_fct_nisi
WITH (format = 'PARQUET', external_location = 's3://alpha-family-data/fcsq_processing/Divorce/divorce_fct_nisi') AS
SELECT  CASE    WHEN a.proceeding_type_code IN ('D','N') THEN 'Decree Nisi'
                WHEN a.proceeding_type_code = 'J' THEN 'Judicial Separations Granted'
        END As stage,
        CASE WHEN a.nisi_year = 'NULL' THEN NULL 
             ELSE CAST(a.nisi_year AS INT)
        END AS year,
        CASE WHEN a.dn_proncd_date = 'NULL' THEN NULL 
             ELSE EXTRACT(QUARTER FROM date_parse(a.dn_proncd_date, '%Y-%m-%d %H:%i:%s.%f'))
        END AS quarter,
        CASE WHEN a.dn_proncd_date = 'NULL' THEN NULL 
             ELSE EXTRACT(MONTH FROM date_parse(a.dn_proncd_date, '%Y-%m-%d %H:%i:%s.%f'))
        END AS month,
        CAST (a.fm_case_cid AS VARCHAR) AS fm_case_cid,
        CAST(a.legal_case_id AS BIGINT) AS legal_case_id,
        CAST(a.digital_paper AS VARCHAR) AS digital_paper,
        CASE WHEN a.dn_proncd_date = 'NULL' THEN NULL
             ELSE DATE_PARSE(a.dn_proncd_date,'%Y-%m-%d %H:%i:%s.%f')
        END AS event_date,
        CASE WHEN pet_to_nisi ='NULL' THEN NULL
             ELSE CAST(a.pet_to_nisi AS DOUBLE) 
        END AS pet_to_nisi,
        CAST(a.proceeding_type AS VARCHAR) AS proceeding_type,
        CAST(a.proceeding_type_code AS VARCHAR) AS proceeding_type_code,
        CAST(a.contested AS VARCHAR) AS contested,   
        CASE WHEN a.nisi_location_key = 'NULL' THEN NULL
             ELSE CAST(a.nisi_location_key AS DOUBLE) 
        END AS location_key,
        CASE WHEN a.first_fr_hrng_ind = 'NULL' THEN NULL 
             ELSE CAST(a.first_fr_hrng_ind AS INT)
        END AS first_fr_hrng_ind,
        CASE WHEN a.first_divorce_hrng_ind = 'NULL' THEN NULL 
             ELSE CAST(a.first_divorce_hrng_ind AS INT)
        END AS first_divorce_hrng_ind,
        CAST(a.law AS VARCHAR) AS law,
        CAST(a.nfd_app_type AS VARCHAR) AS nfd_app_type,
        CAST(a.new_case_type AS VARCHAR) AS new_case_type,
        CAST(b.location_name AS VARCHAR) AS court,
        CAST(b.dfj_lookup AS VARCHAR) AS dfj_area,
        CAST(b.region_lookup AS VARCHAR) AS region,
        CASE WHEN a.petnr_reprsntd_ind = 'NULL' THEN NULL
             ELSE CAST (a.petnr_reprsntd_ind AS INT)
        END AS petnr_reprsntd_ind,
        CASE WHEN a.respndnt_reprsntd_ind = 'NULL' THEN NULL
             ELSE CAST (a.respndnt_reprsntd_ind AS INT)
        END AS respndnt_reprsntd_ind,
        CASE WHEN (CASE WHEN a.petnr_reprsntd_ind = 'NULL' THEN NULL ELSE CAST (a.petnr_reprsntd_ind AS INT) END) = 1 AND 
                  (CASE WHEN a.respndnt_reprsntd_ind = 'NULL' THEN NULL ELSE CAST (a.respndnt_reprsntd_ind AS INT) END) IN (0, -1) THEN 1 
             ELSE 0 
        END AS pet_rep,
        CASE WHEN (CASE WHEN a.petnr_reprsntd_ind = 'NULL' THEN NULL ELSE CAST (a.petnr_reprsntd_ind AS INT) END) IN (0, -1)  AND 
                  (CASE WHEN a.respndnt_reprsntd_ind = 'NULL' THEN NULL ELSE CAST (a.respndnt_reprsntd_ind AS INT) END) = 1  THEN 1 
             ELSE 0 
        END AS resp_rep,
        CASE WHEN (CASE WHEN a.petnr_reprsntd_ind = 'NULL' THEN NULL ELSE CAST (a.petnr_reprsntd_ind AS INT) END) = 1 AND 
                  (CASE WHEN a.respndnt_reprsntd_ind = 'NULL' THEN NULL ELSE CAST (a.respndnt_reprsntd_ind AS INT) END) = 1 THEN 1 
             ELSE 0 
        END AS both_rep,
        CASE WHEN (CASE WHEN a.petnr_reprsntd_ind = 'NULL' THEN NULL ELSE CAST (a.petnr_reprsntd_ind AS INT) END) IN (0, -1) AND 
                  (CASE WHEN a.respndnt_reprsntd_ind = 'NULL' THEN NULL ELSE CAST (a.respndnt_reprsntd_ind AS INT) END) IN (0, -1) THEN 1 
             ELSE 0 
        END AS neither_rep,
        CAST(a.PETTNR_REPRSNTD_CIND AS VARCHAR) AS pettnr_reprsntd_cind,
        CAST(a.RESPNDNT_REPRSNTD_CIND AS VARCHAR) AS respndnt_reprsntd_cind,
        CASE WHEN a.pet_to_nisi = 'NULL' THEN NULL
             ELSE CAST(a.pet_to_nisi AS DOUBLE)/7.0 
        END AS app_to_nisi_weeks
                
FROM data_eng_uploader_prod_family_sdp_data.divorce_test AS a
   LEFT JOIN fcsq.div_court_dfj_region_lookup AS b
     ON (CASE WHEN a.nisi_location_key = 'NULL' THEN NULL ELSE CAST(a.nisi_location_key AS DOUBLE) END)= b.location_key

WHERE CAST(a.nisi_ind AS INT) = 1;
"""
pydb.start_query_execution_and_wait(create_divorce_fct_nisi_table)

In [None]:
test = pydb.read_sql_query("SELECT * from fcsq.divorce_fct_nisi LIMIT 10")
test

In [None]:
#test1 = pydb.read_sql_query("SELECT new_case_type,stage, count(*) as count from fcsq.divorce_fct_nisi group by new_case_type,stage ")
#test1

In [None]:
#pydb.delete_table_and_data(database="__temp__", table="divorce_fct_nisi")

In [None]:
#Creating a table holding absolute / final order information.

In [None]:
drop_divorce_fct_abs = "DROP TABLE IF EXISTS fcsq.divorce_fct_abs"
pydb.start_query_execution_and_wait(drop_divorce_fct_abs)
bucket.objects.filter(Prefix="fcsq_processing/Divorce/divorce_fct_abs").delete();

In [None]:
create_divorce_fct_abs_table =f"""
CREATE TABLE IF NOT EXISTS fcsq.divorce_fct_abs
WITH (format = 'PARQUET', external_location = 's3://alpha-family-data/fcsq_processing/Divorce/divorce_fct_abs') AS
SELECT  'Decree Absolute' As stage,
        CASE WHEN a.abs_year = 'NULL' THEN NULL
             ELSE CAST(a.abs_year AS INT)
        END AS year,
        CASE WHEN a.da_grntd_date = 'NULL' THEN NULL 
             ELSE EXTRACT(QUARTER FROM date_parse(a.da_grntd_date, '%Y-%m-%d %H:%i:%s.%f'))
        END AS quarter,
        CASE WHEN a.da_grntd_date = 'NULL' THEN NULL 
             ELSE EXTRACT(MONTH FROM date_parse(a.da_grntd_date, '%Y-%m-%d %H:%i:%s.%f'))
        END AS month,
        CAST (a.fm_case_cid AS VARCHAR) AS fm_case_cid,
        CAST(a.legal_case_id AS BIGINT) AS legal_case_id,
        CAST(a.digital_paper AS VARCHAR) AS digital_paper,
        CASE WHEN a.da_grntd_date = 'NULL' THEN NULL
             ELSE DATE_PARSE(a.da_grntd_date, '%Y-%m-%d %H:%i:%s.%f')
        END AS event_date,
        CASE WHEN a.pet_to_abs ='NULL' THEN NULL
             ELSE CAST(a.pet_to_abs AS DOUBLE) 
        END AS pet_to_abs,
        CAST(a.proceeding_type AS VARCHAR) AS proceeding_type,
        CAST(a.proceeding_type_code AS VARCHAR) AS proceeding_type_code,
        CAST(a.contested AS VARCHAR) AS contested,  
        CASE WHEN a.abs_location_key = 'NULL' THEN NULL
             ELSE CAST(a.abs_location_key AS DOUBLE) 
        END AS location_key,
        CASE WHEN a.first_fr_hrng_ind = 'NULL' THEN NULL 
             ELSE CAST(a.first_fr_hrng_ind AS INT)
        END AS first_fr_hrng_ind,
        CASE WHEN a.first_divorce_hrng_ind = 'NULL' THEN NULL 
             ELSE CAST(a.first_divorce_hrng_ind AS INT)
        END AS first_divorce_hrng_ind,
        CAST(a.law AS VARCHAR) AS law,
        CAST(a.nfd_app_type AS VARCHAR) AS nfd_app_type,
        CAST(a.new_case_type AS VARCHAR) AS new_case_type,
        CAST(b.location_name AS VARCHAR) AS court,
        CAST(b.dfj_lookup AS VARCHAR) AS dfj_area,
        CAST(b.region_lookup AS VARCHAR) AS region
        
FROM data_eng_uploader_prod_family_sdp_data.divorce_test AS a
  LEFT JOIN fcsq.div_court_dfj_region_lookup AS b
    ON (CASE WHEN a.abs_location_key = 'NULL' THEN NULL ELSE CAST(a.abs_location_key AS DOUBLE) END) = b.location_key

WHERE CAST (a.absolute_ind AS INT) = 1;
"""
pydb.start_query_execution_and_wait(create_divorce_fct_abs_table)

In [None]:
test = pydb.read_sql_query("SELECT * from fcsq.divorce_fct_abs LIMIT 10")
test

In [None]:
#pydb.delete_table_and_data(database="__temp__", table="divorce_fct_abs")

In [None]:
#Creating a table holding stages information.

In [None]:
create_divorce_fct_stages_table =f"""
SELECT 'Petitions' As Type, 
       '' As Order_Type, 
        Year, 
        Quarter, 
        Proceeding_type, 
       '' As Contested,
        Law
        
FROM fcsq.divorce_fct_petitions

UNION ALL

SELECT 'Order made' As Type, 
        Stage As Order_Type, 
        Year, 
        Quarter, 
        Proceeding_type, 
        Contested, 
        Law
        
FROM fcsq.divorce_fct_nisi

UNION ALL

SELECT 'Order made' As Type, 
        Stage As Order_Type, 
        Year, 
        Quarter, 
        Proceeding_type, 
        Contested, 
        Law
        
FROM fcsq.divorce_fct_abs; 
"""
pydb.create_temp_table(create_divorce_fct_stages_table,'divorce_fct_stages')

In [None]:
test = pydb.read_sql_query("SELECT * from __temp__.divorce_fct_stages LIMIT 10")
test

In [None]:
#pydb.delete_table_and_data(database="__temp__", table="divorce_fct_stages")

In [None]:
#Creating the summary table DIVORCE_FCSQ_CSV which is used in the publication RAP process.

In [None]:
drop_divorce_fcsq_csv = "DROP TABLE IF EXISTS fcsq.divorce_fcsq_csv"
pydb.start_query_execution_and_wait(drop_divorce_fcsq_csv)
bucket.objects.filter(Prefix="fcsq_processing/Divorce/divorce_fcsq_csv").delete();

In [None]:
create_divorce_fcsq_csv_table =f"""
CREATE TABLE IF NOT EXISTS fcsq.divorce_fcsq_csv
WITH (format = 'PARQUET', external_location = 's3://alpha-family-data/fcsq_processing/Divorce/divorce_fcsq_csv') AS
SELECT  Type,
        Order_type,
        Year,
        Quarter,
        Proceeding_type,
        Law,
        Contested,
        count(*) as count
        
FROM __temp__.divorce_fct_stages

WHERE   YEAR > 2002 AND
        NOT (Year = {next_quarter_year} AND Quarter = {next_quarter})
    
GROUP BY TYPE, Order_type, Year, QUARTER, Proceeding_type, Law, Contested

ORDER BY YEAR, QUARTER, TYPE, ORDER_TYPE;
"""
pydb.start_query_execution_and_wait(create_divorce_fcsq_csv_table)

In [None]:
df = pydb.read_sql_query("select * from fcsq.divorce_fcsq_csv;")
df.to_csv(path_or_buf = 's3://alpha-family-data/fcsq_processing/Divorce/sdp_process/divorce_fcsq_csv.csv',index=False)