In [2]:
import json
import pandas as pd
import numpy as np
import datetime
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.mysql.hooks.mysql import MySqlHook
import seaborn as sns
warehouse_hook = PostgresHook(postgres_conn_id='rds_afsg_ds_prod_postgresql_dwh', schema='afsg_ds_prod_postgresql_dwh')
mifos_hook = MySqlHook(mysql_conn_id='mifos_db', schema='mifostenant-safaricom')
airflow_hook = MySqlHook(mysql_conn_id='mysql_airflow', schema='bloom_pipeline')
middleware_hook = PostgresHook(postgres_conn_id='asante_mfs_middleware', schema='asante_datawarehouse')
jubilee_hook = MySqlHook(mysql_conn_id='mifos_db', schema='mifostenant-jubilee')
solv_hook = MySqlHook(mysql_conn_id='solv_ke', schema='solvke_staging')

In [8]:
warehouse_clients = warehouse_hook.get_pandas_df(
    sql="""select mifos_id from bloomlive.client_dimension where bloom_version = '2'"""
)
mifos_clients = mifos_hook.get_pandas_df(
    sql="""select id from `mifostenant-safaricom`.m_client"""
)

[[34m2023-06-13 11:10:46,831[0m] {[34mbase.py:[0m73} INFO[0m - Using connection ID 'rds_afsg_ds_prod_postgresql_dwh' for task execution.[0m


[[34m2023-06-13 11:10:55,628[0m] {[34mbase.py:[0m73} INFO[0m - Using connection ID 'mifos_db' for task execution.[0m


In [9]:
xtra = warehouse_clients[~warehouse_clients['mifos_id'].isin(mifos_clients['id'].tolist())]

In [10]:
warehouse_hook.run(sql="""delete from bloomlive.client_dimension where mifos_id in %(mi)s and bloom_version = '2'""", parameters={'mi': tuple(xtra['mifos_id'].tolist())})

Unnamed: 0,mifos_id
2811,204016
5037,204093
13179,204269
13742,204271
13771,204272
...,...
188148,204119
188149,204106
188152,203925
188154,204121


In [5]:
warehouse_loans = warehouse_hook.get_pandas_df(
    sql="""
        select loan_mifos_id, bloom_version, client_mifos_id from bloomlive.loans_fact_table_materialized_summary_view
    """
)
mifos_loans = pd.concat([
    mifos_hook.get_pandas_df(
        sql="""select id, client_id, 2 as "bloom_version" from `mifostenant-safaricom`.m_loan"""
    ), mifos_hook.get_pandas_df(
        sql="""select id, client_id, 1 as "bloom_version" from `mifostenant-default`.m_loan"""
    )
])
merged = warehouse_loans.merge(mifos_loans,
    left_on=['loan_mifos_id', 'bloom_version'],
    right_on=['id', 'bloom_version'],
    how='left'
)
merged[merged['client_mifos_id'] != merged['client_id']]

[[34m2023-05-26 10:29:07,821[0m] {[34mbase.py:[0m73} INFO[0m - Using connection ID 'rds_afsg_ds_prod_postgresql_dwh' for task execution.[0m


[[34m2023-05-26 10:30:07,609[0m] {[34mbase.py:[0m73} INFO[0m - Using connection ID 'mifos_db' for task execution.[0m


[[34m2023-05-26 10:30:39,335[0m] {[34mbase.py:[0m73} INFO[0m - Using connection ID 'mifos_db' for task execution.[0m


Unnamed: 0,loan_mifos_id,bloom_version,client_mifos_id,id,client_id


In [12]:
merged[merged['client_mifos_id'] != merged['client_id']]

Unnamed: 0,loan_mifos_id,bloom_version,client_mifos_id,id,client_id


In [13]:
merged[merged['client_mifos_id'] == merged['client_id']].shape

(529319, 5)

In [7]:
mifos_loans.shape

(529349, 3)

In [8]:
merged = warehouse_loans.merge(
    mifos_loans,
    left_on=['loan_mifos_id', 'bloom_version'],
    right_on=['id', 'bloom_version'],
    how='left'
)
merged

Unnamed: 0,loan_mifos_id,bloom_version,client_mifos_id,id,client_id
0,57385,1.0,31573,57385,31573
1,57462,1.0,38267,57462,38267
2,57972,1.0,11244,57972,11244
3,59101,1.0,30524,59101,30524
4,60311,1.0,25647,60311,25647
...,...,...,...,...,...
529314,309985,2.0,128302,309985,128302
529315,308804,2.0,21048,308804,21048
529316,308933,2.0,102650,308933,102650
529317,309817,2.0,35928,309817,35928


In [9]:
merged[merged['id'].isna()]

Unnamed: 0,loan_mifos_id,bloom_version,client_mifos_id,id,client_id


In [10]:
warehouse_loans[warehouse_loans.duplicated(subset=['loan_mifos_id', 'bloom_version'])]

Unnamed: 0,loan_mifos_id,bloom_version,client_mifos_id


In [11]:
merged[merged['client_mifos_id'] != merged['client_id']]

Unnamed: 0,loan_mifos_id,bloom_version,client_mifos_id,id,client_id


In [3]:
affected_records = warehouse_hook.get_pandas_df(
    sql="""
        select lft.surrogate_id as lft_surrogate_id, lft.client_key, loan_mifos_id, lftmsv.bloom_version, client_surrogate_id
        from bloomlive.loans_fact_table_materialized_summary_view lftmsv
        inner join (
            SELECT DISTINCT mifos_id, bloom_version
            FROM bloomlive.loans_fact_table
            GROUP BY mifos_id, bloom_version
            HAVING COUNT(DISTINCT client_key) > 1
        ) dups on lftmsv.loan_mifos_id = dups.mifos_id and lftmsv.bloom_version = dups.bloom_version
        inner join bloomlive.loans_fact_table lft on lftmsv.loan_mifos_id = lft.mifos_id and lftmsv.bloom_version = lft.bloom_version
        and lft.client_key != lftmsv.client_surrogate_id
    """
)

[[34m2023-05-29 02:48:01,279[0m] {[34mbase.py:[0m73} INFO[0m - Using connection ID 'rds_afsg_ds_prod_postgresql_dwh' for task execution.[0m


In [4]:
affected_records.head()

Unnamed: 0,lft_surrogate_id,client_key,loan_mifos_id,bloom_version,client_surrogate_id
0,1173636,102092,3700,2.0,114196
1,1168939,102092,3700,2.0,114196
2,1170115,102092,3700,2.0,114196
3,1185791,102092,3700,2.0,114196
4,1174920,102092,3700,2.0,114196


In [5]:
updates = []
for i, r in affected_records.iterrows():
    updates.append(f"update bloomlive.loans_fact_table set client_key = {r['client_surrogate_id']} where surrogate_id = {r['lft_surrogate_id']}")

In [7]:
warehouse_hook.run(sql=updates)

[[34m2023-05-29 02:51:15,977[0m] {[34mbase.py:[0m73} INFO[0m - Using connection ID 'rds_afsg_ds_prod_postgresql_dwh' for task execution.[0m
[[34m2023-05-29 02:51:18,740[0m] {[34msql.py:[0m364} INFO[0m - Running statement: update bloomlive.loans_fact_table set client_key = 114196.0 where surrogate_id = 1173636.0, parameters: None[0m
[[34m2023-05-29 02:51:20,072[0m] {[34msql.py:[0m373} INFO[0m - Rows affected: 1[0m
[[34m2023-05-29 02:51:20,078[0m] {[34msql.py:[0m364} INFO[0m - Running statement: update bloomlive.loans_fact_table set client_key = 114196.0 where surrogate_id = 1168939.0, parameters: None[0m
[[34m2023-05-29 02:51:21,096[0m] {[34msql.py:[0m373} INFO[0m - Rows affected: 1[0m
[[34m2023-05-29 02:51:21,098[0m] {[34msql.py:[0m364} INFO[0m - Running statement: update bloomlive.loans_fact_table set client_key = 114196.0 where surrogate_id = 1170115.0, parameters: None[0m
[[34m2023-05-29 02:51:22,099[0m] {[34msql.py:[0m373} INFO[0m - Rows aff

In [10]:
with open('/home/henrykuria/data/client_key_updates_bloomlive_loans_fact_table.sql', 'w+') as f:
    for update in updates:
        f.write(f'{update}; \n')
