## KPI Mock data creation and SQL Logic v4 #6 (6.1, 6.2, 6.3)

### Create Mock Data

In [2]:
# Mock data for Batch #7 - Post Out Requests

# Required fields:
#   file_id
#   post_out_id
#   timestamp


import pandas as pd
import numpy as np
from datetime import datetime


### Create Batch 7
# Set conditions
np.random.seed(0)
batch7_output_filename = 'sps_post_out_requests.parquet'
num_samples = 1000

file_ids = ['FID' + f'{i:08}' for i in range(1, num_samples + 1)]
post_out_ids = ['POID' + f'{i:08}' for i in range(1, num_samples + 1)]

start_date = pd.to_datetime('2024-01-01')
end_date = pd.to_datetime(datetime.now()) 
timestamps = pd.to_datetime(np.random.randint(start_date.value, end_date.value, num_samples), unit='ns')

df = pd.DataFrame({
    'file_id': file_ids,
    'post_out_id': post_out_ids,
    'timestamp': timestamps
})

df['timestamp'] = pd.to_datetime(df['timestamp'].dt.strftime('%Y-%m-%d %H:%M:%S')).dt.round('s')
df['timestamp'] = df['timestamp'].astype(str)
df.to_parquet(batch7_output_filename, index=False)

print(df.head())
print(f"Data saved to '{batch7_output_filename}'.")
print(df.dtypes)
print('------------------------')



### Create Batch 24
# Set conditions
batch24_output_filename = 'ppa_post_out_bulk_requests.parquet'
post_out_types_list = ['Prize Warrant', 'daily_files', 'Scheduled Statements']

length_min_days = 1
length_max_days = 14


df['post_out_type'] = np.random.choice(post_out_types_list, size=len(df))

random_days = pd.to_timedelta(np.random.randint(length_min_days, length_max_days, size=len(df)), unit='D')
df['file_received_timestamp'] = pd.to_datetime(df['timestamp']) - random_days
df['file_received_timestamp'] = pd.to_datetime(df['file_received_timestamp']).astype(str)

df = df.drop(columns=['timestamp','post_out_id'])

df.to_parquet(batch24_output_filename, index=False)

print(df.head())
print(f"Data saved to '{batch24_output_filename}'.")
print(df.dtypes)




       file_id   post_out_id            timestamp
0  FID00000001  POID00000001  2024-03-14 18:57:54
1  FID00000002  POID00000002  2024-02-17 21:22:33
2  FID00000003  POID00000003  2024-03-23 11:27:06
3  FID00000004  POID00000004  2024-01-19 14:52:21
4  FID00000005  POID00000005  2024-02-06 14:28:16
Data saved to 'sps_post_out_requests.parquet'.
file_id        object
post_out_id    object
timestamp      object
dtype: object
------------------------
       file_id  post_out_type file_received_timestamp
0  FID00000001  Prize Warrant     2024-03-12 18:57:54
1  FID00000002    daily_files     2024-02-12 21:22:33
2  FID00000003  Prize Warrant     2024-03-22 11:27:06
3  FID00000004  Prize Warrant     2024-01-07 14:52:21
4  FID00000005  Prize Warrant     2024-02-03 14:28:16
Data saved to 'ppa_post_out_bulk_requests.parquet'.
file_id                    object
post_out_type              object
file_received_timestamp    object
dtype: object


### SQL Logic for KPI 6.1
for today-30 to today \
s1. b24 where post_out_type = 'Prize Warrant' \
s2. join b24 b7 on file_id \
s3. time_to_send = b7.timestamp - b24.file_recieved_timestamp \
s4. count s2 \
s5.count s3 where time_to_send <= 7 days \
s6. s5/s4*100 


In [44]:
import pandasql as ps
import pandas as pd

batch7_df = pd.read_parquet(batch7_output_filename)
batch24_df = pd.read_parquet(batch24_output_filename)

sql_query = """
WITH selected_df AS (
    SELECT *
    ,julianday(timestamp) - julianday(file_received_timestamp) AS time_to_send
    FROM (
        SELECT *
        FROM batch24_df
        WHERE post_out_type = 'Prize Warrant'
        AND (file_received_timestamp BETWEEN DATETIME('NOW','-30 days') AND DATETIME('NOW'))
    ) b24
    LEFT JOIN batch7_df b7
    ON b24.file_id = b7.file_id
)
,
agg_results AS (
SELECT COUNT(*) AS Total
,SUM(CASE WHEN time_to_send <= 7 THEN 1 ELSE 0 END) AS Time_to_send_less_than_7_days
FROM selected_df
)

SELECT (Time_to_send_less_than_7_days * 1.0 / Total) * 100 AS PC_LTE_7_days
FROM agg_results



"""

print(ps.sqldf(sql_query,locals()))

   PC_LTE_7_days
0      63.380282


In [None]:
# SQL logic used in ETL_KPI_data_KPI6.2 in AWS Glue
# Note, some functions have been changed to accomdate the change from SQLite to Postgres


WITH current_date_id AS (
    SELECT myDimDate.pk_id
    FROM myDimDate
    WHERE myDimDate.date = CURRENT_DATE
)
,
current_kpi_id AS (
    SELECT pk_id
    FROM myDimKPI
    WHERE kpi_reference = '6.1'
)
,
selected_df AS (
    SELECT *
    ,EXTRACT(DAY FROM (timestamp - file_received_timestamp)) AS days_to_process
    FROM (
        SELECT *
        FROM df
        WHERE post_out_type = 'Prize Warrant'
        AND (file_received_timestamp BETWEEN (CURRENT_DATE-31) AND (CURRENT_DATE-1))
    )
)
,
agg_results AS (
SELECT COUNT(*) AS Total
,SUM(CASE WHEN days_to_process <= 7 THEN 1 ELSE 0 END) AS time_to_process_lte_7_days
FROM selected_df
)

SELECT (SELECT pk_id FROM current_date_id) AS fk_date_id
,(SELECT pk_id FROM current_kpi_id) AS fk_kpi_id
,(time_to_process_lte_7_days * 100.0 / Total) AS Value
FROM agg_results

### SQL Logic for KPI 6.2

for today - 30 to today \
for post_out_type = 'daily_files' \
s1. B24.Day =  reset a day to today - 13hrs for start and today @ 13hrs for end and select date for today \
s2. B7.Processed_Day = extract the date the file is posted to SFTP \
s3. join s1 & s2 based on File_id where Day = Processed_Day then Success = 1 else 0 \
s4. count s3 where Success = 1/count s3*100


In [54]:
import pandasql as ps
import pandas as pd

batch7_df = pd.read_parquet(batch7_output_filename)
batch24_df = pd.read_parquet(batch24_output_filename)

# WIP
sql_query = """
WITH selected_df AS (
    SELECT *
    FROM (
        SELECT *
        FROM batch24_df
        WHERE post_out_type = 'daily_files'
        AND (file_received_timestamp BETWEEN DATETIME('NOW','-30 days') AND DATETIME('NOW'))
    ) b24
    LEFT JOIN batch7_df b7
    ON b24.file_id = b7.file_id
)
,
adjusted_dates_df AS (
SELECT *
,CASE
    WHEN strftime('%H', file_received_timestamp) < '13' THEN DATE(file_received_timestamp)
    ELSE DATE(file_received_timestamp, '+1 day')
END AS recv_adjusted_date
,DATE(timestamp) AS processed_adjusted_date
FROM selected_df
)
,
agg_results AS (
SELECT *
,CASE
    WHEN DATE(recv_adjusted_date) = DATE(processed_adjusted_date) THEN 1
    ELSE 0
END AS processed_same_day
FROM adjusted_dates_df
)

SELECT (SUM(processed_same_day) * 1.0 / COUNT(*)) * 100 AS PC_Processed_Same_Day
FROM agg_results


"""

print(ps.sqldf(sql_query,locals()))

   PC_Processed_Same_Day
0              10.447761


In [None]:
# SQL logic used in ETL_KPI_data_KPI6.2 in AWS Glue
# Note, some functions have been changed to accomdate the change from SQLite to Postgres

WITH current_date_id AS (
    SELECT myDimDate.pk_id
    FROM myDimDate
    WHERE myDimDate.date = CURRENT_DATE
)
,
current_kpi_id AS (
    SELECT pk_id
    FROM myDimKPI
    WHERE kpi_reference = '6.2'
)
,
selected_df AS (
    SELECT *
    ,EXTRACT(DAY FROM (timestamp - file_received_timestamp)) AS days_to_process
    FROM (
        SELECT *
        FROM df
        WHERE post_out_type = 'daily_files'
        AND (file_received_timestamp BETWEEN (CURRENT_DATE-31) AND (CURRENT_DATE-1))
    )
)
,
adjusted_dates_df AS (
    SELECT *
    ,CASE
        WHEN EXTRACT(HOUR FROM file_received_timestamp) < 13 THEN DATE(file_received_timestamp)
        ELSE
            CASE
                WHEN EXTRACT(DOW FROM file_received_timestamp) = 5 THEN DATE(file_received_timestamp) + 3 -- For Friday, +3 days
                WHEN EXTRACT(DOW FROM file_received_timestamp) = 6 THEN DATE(file_received_timestamp) + 2 -- For Saturday, +2 days
                ELSE DATE(file_received_timestamp) + 1 -- For Sunday to Thursday, +1 day
            END
    END AS recv_adjusted_date
    ,DATE(timestamp) AS processed_adjusted_date
    FROM selected_df
)
,
agg_results AS (
    SELECT *
    ,CASE
        WHEN DATE(recv_adjusted_date) = DATE(processed_adjusted_date) THEN 1
        ELSE 0
    END AS processed_same_day
    FROM adjusted_dates_df
)

SELECT (SELECT pk_id FROM current_date_id) AS fk_date_id
,(SELECT pk_id FROM current_kpi_id) AS fk_kpi_id
,(SUM(processed_same_day) * 100.0 / COUNT(*)) AS Value
FROM agg_results

In [None]:
#TODO
Change the +1day into working days

### SQL Logic for KPI 6.3

for today-30 to today \
s1. b24 where post_out_type = 'Scheduled Statements' \
s2. join b24 b7 on file_id \
s3. time_to_send = b7.timestamp - b24.file_recieved_timestamp \
s4. count s2 \
s5.count s3 where time_to_send <= 7 days \
s6. s5/s4*100 


In [45]:
import pandasql as ps
import pandas as pd

batch7_df = pd.read_parquet(batch7_output_filename)
batch24_df = pd.read_parquet(batch24_output_filename)

sql_query = """
WITH selected_df AS (
    SELECT *
    ,julianday(timestamp) - julianday(file_received_timestamp) AS time_to_send
    FROM (
        SELECT *
        FROM batch24_df
        WHERE post_out_type = 'Scheduled Statements'
        AND (file_received_timestamp BETWEEN DATETIME('NOW','-30 days') AND DATETIME('NOW'))
    ) b24
    LEFT JOIN batch7_df b7
    ON b24.file_id = b7.file_id
)
,
agg_results AS (
SELECT COUNT(*) AS Total
,SUM(CASE WHEN time_to_send <= 7 THEN 1 ELSE 0 END) AS Time_to_send_less_than_7_days
FROM selected_df
)

SELECT (Time_to_send_less_than_7_days * 1.0 / Total) * 100 AS PC_LTE_7_days
FROM agg_results



"""

print(ps.sqldf(sql_query,locals()))

   PC_LTE_7_days
0      54.054054


In [None]:
# SQL logic used in ETL_KPI_data_KPI6.3 in AWS Glue
# Note, some functions have been changed to accomdate the change from SQLite to Postgres


WITH current_date_id AS (
    SELECT myDimDate.pk_id
    FROM myDimDate
    WHERE myDimDate.date = CURRENT_DATE
)
,
current_kpi_id AS (
    SELECT pk_id
    FROM myDimKPI
    WHERE kpi_reference = '6.3'
)
,
selected_df AS (
    SELECT *
    ,EXTRACT(DAY FROM (timestamp - file_received_timestamp)) AS days_to_process
    FROM (
        SELECT *
        FROM df
        WHERE post_out_type = 'Scheduled Statements'
        AND (file_received_timestamp BETWEEN (CURRENT_DATE-31) AND (CURRENT_DATE-1))
    )
)
,
agg_results AS (
SELECT COUNT(*) AS Total
,SUM(CASE WHEN days_to_process <= 7 THEN 1 ELSE 0 END) AS time_to_process_lte_7_days
FROM selected_df
)

SELECT (SELECT pk_id FROM current_date_id) AS fk_date_id
,(SELECT pk_id FROM current_kpi_id) AS fk_kpi_id
,(time_to_process_lte_7_days * 100.0 / Total) AS Value
FROM agg_results