In [96]:
import pandas as pd 
from athena_to_pd import athena_query
import sqlite3
#By Book
from pathlib import Path
import os
import awswrangler as wr 
import boto3
from datetime import datetime

In [2]:
pwd = os.getcwd()

PROFILE = "vej-book"
DATABASE = "vtnlake_dev_curated_finance" 
#SQLITE_PATH = "../vtnlake_onedrive.db"
SQLITE_PATH = rf'D:\Work\SQLite\vtnlake_onedrive.db' # Book's Labtop
#SQLITE_PATH = rf'E:\Work\TradeSquare\SQlite Test\29102024\vtnlake_onedrive.db' # Book's PC

In [3]:
date_filter = athena_query(PROFILE, DATABASE,"""
    select 
        min(date_format(parse_datetime(_file_updated_date,'yyyy-MM-dd''T''HH:mm:ss''Z'),'%Y-%m-%d %H:%i:%s')) AS min_date,
        max(date_format(parse_datetime(_file_updated_date,'yyyy-MM-dd''T''HH:mm:ss''Z'),'%Y-%m-%d %H:%i:%s')) AS max_date
        
    from onward
""")

In [4]:
df_date_filter = pd.DataFrame(date_filter, columns = date_filter.columns)
min_date = df_date_filter.iloc[0, 0]
max_date = df_date_filter.iloc[0, 1]

print("Min Date Filter: ", min_date)
print("Max Date Filter: ", max_date)

Min Date Filter:  2024-11-04 09:09:05
Max Date Filter:  2024-11-04 09:09:05


In [None]:
'''
Select DISTINCT script for Athena (only PK)
'''

# Read the SQL template file (Book's PC)
with open(rf"{pwd}/sql_template/onward/select_dis_onward_athena.sql", "r") as file:
    query_template = file.read()

# Substitute placeholders with actual values
query_onward = query_template.format(min_date=min_date, max_date=max_date)

# Run the query with wr
select_onward_athena = wr.athena.read_sql_query(sql=query_onward, 
        database=DATABASE,
        boto3_session=boto3.session.Session(profile_name=PROFILE, region_name='ap-southeast-1')                      
                              )

##---------------------------------##
#global variables for sqlite
TABLE = "onward_full"

'''
Select DISTINCT script for SQLite (only PK)
'''

# Read the SQL template file
with open(rf"{pwd}/sql_template/onward/select_dis_onward_sqlite.sql", "r") as file:
    query_template = file.read()

# Substitute placeholders with actual values
SQLITE_QUERY = query_template.format(
    TABLE=TABLE,
    min_date=min_date,
    max_date=max_date
)    
# Execute the query
conn = sqlite3.connect(SQLITE_PATH)
df_select_sqlite = pd.read_sql_query(SQLITE_QUERY, conn).add_suffix('_sqlite')
conn.close()
select_onward_sqlite = df_select_sqlite


In [None]:
#sqlite
select_onward_sqlite.sort_values(by=['order_item_id_sqlite', 'order_item_sub_id_sqlite', 'bill_doc_no_sqlite', 'package_code_sqlite'], inplace=True)
select_onward_sqlite.reset_index(drop=True, inplace=True)
#athena
select_onward_athena.sort_values(by=['order_item_id', 'order_item_sub_id', 'bill_doc_no', 'package_code'], inplace=True)
select_onward_athena.reset_index(drop=True, inplace=True)

#Concat dataframe for Reconcilation
df_reconcile = pd.concat([select_onward_sqlite, select_onward_athena], axis=1)

#check status if suffix sqlite is match with athena using lambda for each column and row
df_reconcile['order_item_id_status'] = df_reconcile.apply(lambda x: 'Match' if x['order_item_id_sqlite'] == x['order_item_id'] else 'Mismatch', axis=1)
df_reconcile['order_item_sub_id_status'] = df_reconcile.apply(lambda x: 'Match' if x['order_item_sub_id_sqlite'] == x['order_item_sub_id'] else 'Mismatch', axis=1)
df_reconcile['bill_doc_no_status'] = df_reconcile.apply(lambda x: 'Match' if x['bill_doc_no_sqlite'] == x['bill_doc_no'] else 'Mismatch', axis=1)
df_reconcile['package_code_status'] = df_reconcile.apply(lambda x: 'Match' if x['package_code_sqlite'] == x['package_code'] else 'Mismatch', axis=1)
df_reconcile['status'] = df_reconcile.apply(lambda x: 'Match' if x['order_item_id_sqlite'] == x['order_item_id'] and x['order_item_sub_id_sqlite'] == x['order_item_sub_id'] and x['bill_doc_no_sqlite'] == x['bill_doc_no'] and x['package_code_sqlite'] == x['package_code'] else 'Mismatch', axis=1)

# df_reconcile[df_reconcile['status'] == 'Mismatch']

Unnamed: 0,order_item_id_sqlite,order_item_sub_id_sqlite,bill_doc_no_sqlite,package_code_sqlite
0,12107835,10249,8287676,0
1,12107835,10318,8287676,0
2,12107835,12369,8287676,0
3,12107835,12908,8287676,0
4,12107835,15441,8287676,0
...,...,...,...,...
44057,16176464,32,8824488,7AB1
44058,16176464,33,8824488,7AB1
44059,16176464,4,8824488,0
44060,16176464,5,8824488,0


## Describe Statistics

In [88]:
stat_onward_sqlite =  pd.DataFrame(select_onward_sqlite.describe(include='all'))


# Convert all string values to hashed integers in a copy of the DataFrame
hashed_df = select_onward_sqlite.apply(lambda col: col.map(lambda x: hash(x) if isinstance(x, str) else x))

'''
SUM HASHED
'''
# Calculate the sum of each column and convert to a DataFrame row
sum_onward_sqlite = hashed_df.sum().to_frame().T
sum_onward_sqlite.index = ["sum_hashed"] 

'''
AVG HASHED
'''
avg_onward_sqlite = hashed_df.mean().to_frame().T
avg_onward_sqlite.index = ["avg_hashed"]

'''
MIN HASHED
'''
min_onward_sqlite = hashed_df.min().to_frame().T
min_onward_sqlite.index = ["min_hashed"]

'''
MAX HASHED
'''
max_onward_sqlite = hashed_df.max().to_frame().T
max_onward_sqlite.index = ["max_hashed"]

# Concatenate the statistics DataFrames
stat_select_onward_sqlite = pd.concat([stat_onward_sqlite, sum_onward_sqlite, avg_onward_sqlite, min_onward_sqlite, max_onward_sqlite], axis=0)


In [90]:
# Athena Statistics

stat_onward_athena =  pd.DataFrame(select_onward_athena.describe(include='all'))


# Convert all string values to hashed integers in a copy of the DataFrame
hashed_df = select_onward_athena.apply(lambda col: col.map(lambda x: hash(x) if isinstance(x, str) else x))

'''
SUM HASHED
'''
# Calculate the sum of each column and convert to a DataFrame row
sum_onward_athena = hashed_df.sum().to_frame().T
sum_onward_athena.index = ["sum_hashed"] 

'''
AVG HASHED
'''
avg_onward_athena = hashed_df.mean().to_frame().T
avg_onward_athena.index = ["avg_hashed"]

'''
MIN HASHED
'''
min_onward_athena = hashed_df.min().to_frame().T
min_onward_athena.index = ["min_hashed"]

'''
MAX HASHED
'''
max_onward_athena = hashed_df.max().to_frame().T
max_onward_athena.index = ["max_hashed"]

# Concatenate the statistics DataFrames
stat_select_onward_athena = pd.concat([stat_onward_athena, sum_onward_athena, avg_onward_athena, min_onward_athena, max_onward_athena], axis=0)



In [93]:
# Compare statistics between SQLite and Athena
stat_reconclie = pd.concat([stat_select_onward_sqlite, stat_select_onward_athena], axis=1)

#check status if suffix sqlite is match with athena using lambda for each column and row
stat_reconclie['order_item_id_status'] = stat_reconclie.apply(lambda x: 'Match' if x['order_item_id_sqlite'] == x['order_item_id'] else 'Mismatch', axis=1)
stat_reconclie['order_item_sub_id_status'] = stat_reconclie.apply(lambda x: 'Match' if x['order_item_sub_id_sqlite'] == x['order_item_sub_id'] else 'Mismatch', axis=1)
stat_reconclie['bill_doc_no_status'] = stat_reconclie.apply(lambda x: 'Match' if x['bill_doc_no_sqlite'] == x['bill_doc_no'] else 'Mismatch', axis=1)
stat_reconclie['package_code_status'] = stat_reconclie.apply(lambda x: 'Match' if x['package_code_sqlite'] == x['package_code'] else 'Mismatch', axis=1)
stat_reconclie['status'] = stat_reconclie.apply(lambda x: 'Match' if x['order_item_id_sqlite'] == x['order_item_id'] and x['order_item_sub_id_sqlite'] == x['order_item_sub_id'] and x['bill_doc_no_sqlite'] == x['bill_doc_no'] and x['package_code_sqlite'] == x['package_code'] else 'Mismatch', axis=1)

In [97]:
### export to .csv file with sheets for each type of reconcilation
current_date = datetime.now().strftime('%Y%m%d')
output_path = rf"{pwd}/report/{current_date}_onward_reconcile.xlsx"
with pd.ExcelWriter(output_path) as writer:  
    df_reconcile.to_excel(writer, sheet_name='Data Reconcile', index=False)
    stat_reconclie.to_excel(writer, sheet_name='Stat Reconcile', index=True)