In [0]:
print("Healthy")

In [0]:
from re import search
from sqlalchemy import create_engine
import duckdb

import pandas as pd 
import numpy as np 
import mysql.connector

pd.set_option('display.max_columns', None)

import warnings
warnings.filterwarnings('ignore')

import json
with open('/Workspace/Credentials/db_data.json', 'r') as fp:
    data = json.load(fp)

Dumpdb = mysql.connector.connect(
  host=data['Dumpdb']['host'],
  user=data['Dumpdb']['user'],
  passwd=data['Dumpdb']['passwd'],
  database = data['Dumpdb']['database']
)


Mifosdb = mysql.connector.connect(
  host=data['mifos']['host'],
  user=data['mifos']['user'],
  passwd=data['mifos']['passwd'],
  database = data['mifos']['database']
)

host = data['redshift']['host']
user = data['redshift']['user']
passwd = data['redshift']['passwd']
database = data['redshift']['database']

conn = create_engine(f"postgresql+psycopg2://{user}:{passwd}@{host}:5439/{database}")

pd.set_option('display.float_format', lambda x: '%.2f' % x)

from datetime import datetime, timedelta
today = datetime.today().strftime('%Y-%m-%d')
yesterday =  (datetime.today() - timedelta(days = 1)).strftime('%Y-%m-%d')
print(today)
print(yesterday)

In [0]:
engine = create_engine(f"postgresql+psycopg2://{user}:{passwd}@{host}:5439/{database}")
sql = f'''
DROP TABLE IF EXISTS public.dwh_account_balances;
'''

with engine.begin() as con:     
    con.execute(sql)

def load_table(schema_name, table_name, dataframe):
    chunk_size = 10000
    num_rows = len(dataframe)

    conn = create_engine(f"postgresql+psycopg2://{user}:{passwd}@{host}:5439/{database}")
    num_iterations = (num_rows // chunk_size) + 1 if num_rows % chunk_size != 0 else num_rows // chunk_size

    for i in range(num_iterations):
        start_idx = i * chunk_size
        end_idx = (i + 1) * chunk_size if (i + 1) * chunk_size < num_rows else num_rows

        chunk_df = dataframe.iloc[start_idx:end_idx]

        chunk_df.to_sql(schema= schema_name, name = table_name, con =conn, index = False, if_exists = 'append', chunksize = 5000, method = 'multi')

    print("Data loaded successfully.")
    

### Account balance ETL

In [0]:
%%time

Mifosdb = mysql.connector.connect(
  host=data['mifos']['host'],
  user=data['mifos']['user'],
  passwd=data['mifos']['passwd'],
  database = data['mifos']['database']
)

df_max = pd.read_sql_query(f"""
select
MAX(id) as last
from m_savings_account 
""", Mifosdb)
last = (df_max['last'][0])
print(last, " Max ID")

start = 6000 
end = last  

batch_size = 300000

for batch_start in range(start, end, batch_size):
    batch_end = min(batch_start + batch_size - 1, end)

    aBal = pd.read_sql_query(f"""
    select
    ifnull(sa.client_id, sa.group_id) as client_id,
    sa.id as account_id,
    sa.account_no,
    sa.account_balance_derived as balance,
    sa.product_id,
    sp.name as product_type,
    ifnull(sa.submittedon_userid, 0) as request_submittedby_userid,
    mpus.username as request_submittedby_username,
    concat(mpus.firstname,' ', mpus.lastname) as request_sbumittedby_user_fullname,

    ifnull(sa.activatedon_userid, 0) as activatedby_userid,
    map.username as activatedby_username,
    concat(map.firstname,' ', map.lastname) as activatedby_user_fullname,

    ifnull(sa.approvedon_userid, 0) as approvedby_userid,
    mapu.username as approvedby_username,
    concat(mapu.firstname,' ', mapu.lastname) as approvedby_user_fullname,

    sa.nominal_annual_interest_rate,
    sa.nominal_annual_interest_rate_overdraft,
    sa.approvedon_date,
    sa.closedon_date
    from m_savings_account sa
    left join m_appuser mpus on mpus.id = sa.submittedon_userid
    left join m_appuser mapu on mapu.id = sa.approvedon_userid
    left join m_appuser map on map.id = sa.activatedon_userid

    join m_savings_product sp on sp.id = sa.product_id
    where (sa.account_no != '' and !isnull(sa.account_no))
    AND sa.id BETWEEN {batch_start} AND {batch_end}

    """, Mifosdb)
    aBal['account_no'] = aBal['account_no'].astype('int64')
    print(f"Batch from {batch_start} to {batch_end}:")

    load_table('public', 'dwh_account_balances', aBal)
    #aBal.to_sql('dwh_account_balances', conn, index = False, if_exists = 'replace', chunksize = 10000, method = 'multi')

### Fixed Term Deposits ETL

In [0]:
%%time

## term deposits ETL
Mifosdb = mysql.connector.connect(
  host=data['mifos']['host'],
  user=data['mifos']['user'],
  passwd=data['mifos']['passwd'],
  database = data['mifos']['database']
)

ftd = pd.read_sql(f"""
            select 
            mdat.id,
            sa.account_no,
            ifnull(sa.client_id, sa.group_id) as client_id,
            savings_account_id as account_id,
            sa.approvedon_date as start_date,
            min_deposit_term,
            max_deposit_term,
            min_deposit_term_type_enum,
            max_deposit_term_type_enum,
            in_multiples_of_deposit_term,
            pre_closure_penal_applicable,
            pre_closure_penal_interest,
            deposit_period,
            deposit_period_frequency_enum,
            deposit_amount,
            maturity_amount,
            maturity_date,
            on_account_closure_enum,
            expected_firstdepositon_date,
            transfer_interest_to_linked_account,
            interest_carried_forward_on_top_up,
            target_amount,
            target_maturity_amount,
            pre_closure_charge_applicable,
            auto_rollover,
            transfer_to_savings_account_id
            from m_deposit_account_term_and_preclosure mdat
            join m_savings_account sa on sa.id = mdat.savings_account_id
            """, Mifosdb)

ftd.head()

In [0]:
ftd.shape

In [0]:
%%time

engine = create_engine(f"postgresql+psycopg2://{user}:{passwd}@{host}:5439/{database}")
sql = f'''
DROP TABLE IF EXISTS public.dwh_term_deposit;
'''

with engine.begin() as con:     
    con.execute(sql)

load_table('public', 'dwh_term_deposit', ftd)
#ftd.to_sql("dwh_term_deposit", conn, index = False, if_exists = 'replace', chunksize = 10000, method = 'multi')

In [0]:
del ftd

### Term Deposit Maturity ETL

In [0]:
%%time

Mifosdb = mysql.connector.connect(
  host=data['mifos']['host'],
  user=data['mifos']['user'],
  passwd=data['mifos']['passwd'],
  database = data['mifos']['database']
)
fd = pd.read_sql(f'''
SELECT
mda.id,
sa.id as account_id,
sa.account_no,
a.enum_message_property as status,
sa.status_enum,
sa.closedon_date,
min_deposit_term,
max_deposit_term,
deposit_period,
deposit_amount,
account_balance_derived as current_balance,
maturity_amount,
maturity_date
from m_deposit_account_term_and_preclosure mda
left join m_savings_account sa on sa.id = mda.savings_account_id
left join (select enum_name, enum_id, enum_message_property from r_enum_value where enum_name = 'status_enum') a on a.enum_id = sa.status_enum
''', Mifosdb)

print(fd.columns.tolist(), "\n")
fd = fd[fd['status'] == 'Active'].reset_index(drop = True)
fd.head()

In [0]:
fd.shape

In [0]:
from datetime import date
fd['days_to_maturity'] = [datetime.strptime(str(x), "%Y-%m-%d").date() for x in fd['maturity_date']]
fd['days_to_maturity'] = fd['days_to_maturity'] - date.today()
fd['days_to_maturity'] = [str(x).split(' ')[0] for x in fd['days_to_maturity']]


In [0]:
fd = duckdb.query(f"""
select
id,
account_id,
account_no,
status,
status_enum,
closedon_date,
min_deposit_term,
max_deposit_term,
deposit_period,
deposit_amount,
current_balance,
maturity_amount,
maturity_date,
days_to_maturity
from fd
""").to_df()

"""
,
case when maturity_date >= DATE_TRUNC('WEEK', now())
    AND maturity_date < DATE_TRUNC('WEEK', now()) + INTERVAL '1 WEEK' then 'Yes' else 'No' end as maturing_within_the_week,
case when maturity_date >= DATE_TRUNC('MONTH', now())
    AND maturity_date < DATE_TRUNC('MONTH', now()) + INTERVAL '1 MONTH' then 'Yes' else 'No' end as maturing_within_the_month
"""

fd.head()

In [0]:
%%time

engine = create_engine(f"postgresql+psycopg2://{user}:{passwd}@{host}:5439/{database}")
sql = f'''
DROP TABLE IF EXISTS public.dwh_term_deposits_maturity;
'''

with engine.begin() as con:     
    con.execute(sql)
    
load_table('public', 'dwh_term_deposits_maturity',fd)
#fd.to_sql("dwh_term_deposits_maturity", conn, index = False, if_exists= 'replace', chunksize = 10000, method = 'multi')

In [0]:
# This script was migrated here due to the report having a dependency on the updated version of the dwh_account_balance table
#exec(open("Shared/PRODUCTION/AUTO-REPORT/Fresh FD Bookings.py").read())  #Downloads/V_bank_tasks/projects/Fresh FD Bookings/Fresh fd bookings daily report.py

## Daily vs expected maturity amount 

In [0]:
dly_exptd_maturity = duckdb.query(f'''
            select
            maturity_date,
            sum(maturity_amount) as expected_maturity_amount,
            sum(current_balance) as actual_maturity_amount
            from fd
            group by 1
            order by 1
            ''').to_df()
dly_exptd_maturity

In [0]:
%%time

engine = create_engine(f"postgresql+psycopg2://{user}:{passwd}@{host}:5439/{database}")
sql = f'''
DROP TABLE IF EXISTS public.dwh_daily_expected_vs_actual_maturing_deposits;
'''

with engine.begin() as con:     
    con.execute(sql)
    

load_table('public', 'dwh_daily_expected_vs_actual_maturing_deposits', dly_exptd_maturity)
#dly_exptd_maturity.to_sql("dwh_daily_expected_vs_actual_maturing_deposits", conn, index = False, if_exists = 'replace', chunksize = 10000, method = 'multi')

In [0]:
import gc 
gc.collect()

In [0]:
import datetime

last_run_date = datetime.datetime.now()
print("Last run date:", last_run_date)
