# * ETL-Actual MKS : Broadband
    (P, G, H, HH, Z) level

## Parameter

In [1]:
import configparser
import datetime as dt
import pandas as pd
import numpy as np
import oracledb
import re
import FN_Actual_MKS_Broadband as fn

config = configparser.ConfigParser()
config.read('../../../my_config.ini')
config.sections()

TDMDBPR_user = config['TDMDBPR']['username']
TDMDBPR_pwd = config['TDMDBPR']['password']
TDMDBPR_db = config['TDMDBPR']['db']
TDMDBPR_host = config['TDMDBPR']['host']
TDMDBPR_port = config['TDMDBPR']['port']

AKPIPRD_user = config['AKPIPRD']['username']
AKPIPRD_pwd = config['AKPIPRD']['password']
AKPIPRD_db = config['AKPIPRD']['db']
AKPIPRD_host = config['AKPIPRD']['host']
AKPIPRD_port = config['AKPIPRD']['port']

curr_dt = dt.datetime.now().date()
next_week_dt = curr_dt + dt.timedelta(days=7)
str_curr_dt = curr_dt.strftime('%Y%m%d')
str_next_week_dt = next_week_dt.strftime('%Y%m%d')
yyyymm_curr = curr_dt.strftime('%Y%m')
yyyymm_next_week = next_week_dt.strftime('%Y%m')

### Lasted Source Summary

In [2]:
# Connect : TDMDBPR
src_dsn = f'{TDMDBPR_user}/{TDMDBPR_pwd}@{TDMDBPR_host}:{TDMDBPR_port}/{TDMDBPR_db}'
src_conn = oracledb.connect(src_dsn)
# print(f'\n{TDMDBPR_db} : Connected')
src_cur = src_conn.cursor()


try:
    src_cur.execute("""
        SELECT TM_KEY_MTH
            , SUM(SUBS) TOTAL
            , SUM(CASE WHEN ISP = 'TOL' THEN SUBS END) TOL
            , SUM(CASE WHEN ISP = '3BB' THEN SUBS END) "3BB"
            , SUM(CASE WHEN ISP = 'AIS' THEN SUBS END) AIS
            , SUM(CASE WHEN ISP IN ('CAT', 'TOT') THEN SUBS END) NT
        FROM CORPNSBOX.FCT_BB_SHARE_SUBS_CCAATT A
        WHERE TM_KEY_MTH >= 202401
        --WHERE TM_KEY_MTH = (SELECT MAX(TM_KEY_MTH) FROM CORPNSBOX.FCT_BB_SHARE_SUBS_CCAATT NOLOCK)
        AND EXISTS (SELECT 1 FROM CDSAPPO.DIM_MOOC_AREA O
			        WHERE O.REMARK <> 'Dummy'
			        AND O.CCAATT = A.CCAATT)
        GROUP BY TM_KEY_MTH
        ORDER BY 1
    """)
    rows = src_cur.fetchall()
    # print(f'\nCurrent Source Summary...')
    chk_src_df = pd.DataFrame.from_records(rows, columns=[x[0] for x in src_cur.description])
    # chk_src_df['DATA'] = 'Source'
    mod_col_list = chk_src_df.iloc[:, 1:6].columns.tolist()
    # for col in mod_col_list:
    #     chk_src_df[col] = chk_src_df[col].apply(lambda x: format(x, ',.0f'))
    print(f'\n{chk_src_df}')


except oracledb.DatabaseError as e:
    print(f'\nError with Oracle : {e}')


finally:
    src_conn.close()
    # print(f'\n{TDMDBPR_db} : Disconnected')


   TM_KEY_MTH         TOTAL      TOL           3BB           AIS            NT
0      202401  8.392163e+06  3062361  2.206741e+06  1.703305e+06  1.419756e+06
1      202402  8.399105e+06  3065282  2.197017e+06  1.716028e+06  1.420779e+06
2      202403  8.414203e+06  3072991  2.187215e+06  1.732168e+06  1.421829e+06
3      202404  8.422602e+06  3073168  2.177393e+06  1.749183e+06  1.422858e+06
4      202405  8.439795e+06  3082458  2.167565e+06  1.765884e+06  1.423888e+06


### Lasted Fact Summary

In [3]:
# Connect : AKPIPRD
tgt_dsn = f'{AKPIPRD_user}/{AKPIPRD_pwd}@{AKPIPRD_host}:{AKPIPRD_port}/{AKPIPRD_db}'
tgt_conn = oracledb.connect(tgt_dsn)
# print(f'\n{AKPIPRD_db} : Connected')
tgt_cur = tgt_conn.cursor()


try:
    tgt_cur.execute("""
        SELECT TM_KEY_MTH
            , SUM(CASE WHEN METRIC_CD IN ('VIN00025', 'VIN00026', 'VIN00027', 'VIN00028') THEN METRIC_VALUE END) TOTAL
            , SUM(CASE WHEN METRIC_CD = 'VIN00025' THEN METRIC_VALUE END) TOL
            , SUM(CASE WHEN METRIC_CD = 'VIN00026' THEN METRIC_VALUE END) "3BB"
            , SUM(CASE WHEN METRIC_CD = 'VIN00027' THEN METRIC_VALUE END) AIS
            , SUM(CASE WHEN METRIC_CD = 'VIN00028' THEN METRIC_VALUE END) NT
            --, MAX(LOAD_DATE) LOAD_DATE
        FROM AUTOKPI.FCT_BROADBAND_MKS NOLOCK
        WHERE TM_KEY_MTH >= 202401 --AND TM_KEY_MTH <= 202404
        --WHERE TM_KEY_MTH = 202403
        --WHERE TM_KEY_MTH = (SELECT MAX(TM_KEY_MTH) FROM AUTOKPI.FCT_BROADBAND_MKS NOLOCK WHERE REMARK IS NULL)
        AND TM_KEY_DAY LIKE '%01'
        AND AREA_TYPE = 'P'
        AND REMARK IS NULL
        GROUP BY TM_KEY_MTH
        ORDER BY 1
    """)
    rows = tgt_cur.fetchall()
    # print(f'\nCurrent Fact Summary...')
    chk_tgt_df = pd.DataFrame.from_records(rows, columns=[x[0] for x in tgt_cur.description])
    # chk_tgt_df['DATA'] = 'Fact'
    mod_col_list = chk_tgt_df.iloc[:, 1:6].columns.tolist()
    # for col in mod_col_list:
    #     chk_tgt_df[col] = chk_tgt_df[col].apply(lambda x: format(x, ',.0f'))
    print(f'\n{chk_tgt_df}')


except oracledb.DatabaseError as e:
    print(f'\nError with Oracle : {e}')


finally:
    tgt_conn.close()
    # print(f'\n{AKPIPRD_db} : Disconnected')


   TM_KEY_MTH         TOTAL      TOL           3BB           AIS            NT
0      202401  8.392163e+06  3062361  2.206741e+06  1.703305e+06  1.419756e+06
1      202402  8.399105e+06  3065282  2.197017e+06  1.716028e+06  1.420779e+06
2      202403  8.414203e+06  3072991  2.187215e+06  1.732168e+06  1.421829e+06
3      202404  8.422602e+06  3073168  2.177393e+06  1.749183e+06  1.422858e+06
4      202405  8.439795e+06  3082458  2.167565e+06  1.765884e+06  1.423888e+06


### Check Diff Summary

In [4]:
chk_diff_df = chk_src_df.iloc[-1].compare(chk_tgt_df.iloc[-1])
chk_diff_df['diff'] = chk_diff_df['self'] - chk_diff_df['other']
mod_col_list = chk_diff_df.columns.tolist()
for col in mod_col_list:
    chk_diff_df[col] = chk_diff_df[col].apply(lambda x: format(x, ',.0f'))

chk_diff_df

Unnamed: 0,self,other,diff


### Input Parameter

In [6]:
''' Input Parameter '''

# v_update_flag = 'Y' if chk_diff_df.size > 0 else 'N'
v_last_mth_fct = chk_tgt_df['TM_KEY_MTH'].max().astype(float)
v_last_mth_src = chk_src_df['TM_KEY_MTH'].max().astype(float)

# Create Param
v_param = dict(last_mth_fct=v_last_mth_fct, last_mth_src=v_last_mth_src)
# v_param = dict(mth_start = 202405, mth_end = 202405, str_period = v_str_period)
v_target_schema = 'AUTOKPI'
v_target_table = 'FCT_BROADBAND_MKS'

# Create Query
query_delete = f"""
    DELETE {v_target_schema}.{v_target_table} 
    WHERE TM_KEY_MTH > {v_param['last_mth_fct']}
"""

print(f'\nv_update_flag: {v_update_flag}')
print(f'yyyymm_curr: {yyyymm_curr}')
print(f'yyyymm_next_week: {yyyymm_next_week}')
print(f"\nParameter input...\n\n   -> last_mth_fct: {v_param['last_mth_fct']}\n   -> last_mth_src: {v_param['last_mth_src']}")
# print(f"\nParameter input...\n\n   -> v_update_flag: {v_update_flag}\n   -> mth_start: {v_param['mth_start']}\n   -> mth_end: {v_param['mth_end']}\n   -> str_period: {v_param['str_period']}")
print(f'\nquery_delete...\n{query_delete}')


v_update_flag: Y
yyyymm_curr: 202406
yyyymm_next_week: 202406

Parameter input...

   -> last_mth_fct: 202405.0
   -> last_mth_src: 202405.0

query_delete...

    DELETE AUTOKPI.FCT_BROADBAND_MKS 
    WHERE TM_KEY_MTH > 202405.0



In [6]:
''' Input Parameter '''

v_update_flag = 'Y' # Test

# v_update_flag = 'Y' if chk_diff_df.size > 0 else 'N'
v_last_mth_fct = chk_tgt_df['TM_KEY_MTH'].max().astype(float)
v_last_mth_src = chk_src_df['TM_KEY_MTH'].max().astype(float)

# Create Param
# v_param = dict(last_mth_fct=v_last_mth_fct, last_mth_src=v_last_mth_src)
# v_param = dict(mth_start = 202405, mth_end = 202405, str_period = v_str_period)
v_target_schema = 'AUTOKPI'
v_target_table = 'FCT_BROADBAND_MKS'
v_sql_upd_next_mth = 'Import-FCT_BROADBAND_MKS.sql'
v_sql_upd_curr_mth = '?.sql'
# v_param_src_upd_to_next_mth = dict(last_mth_fct=v_last_mth_fct, last_mth_src=v_last_mth_src)

job_start_datetime = dt.datetime.now().strftime('%Y-%m-%d, %H:%M:%S')
print(f'\nJob Start... {job_start_datetime}')

# Process condition
if yyyymm_next_week > str(v_last_mth_fct):
    if v_update_flag == 'Y':
        print(f'\n*** Source update to next month Fact ***')
        fn.src_upd_to_next_mth(v_last_mth_fct, v_last_mth_src, v_target_schema, v_target_table, v_sql_upd_next_mth)
    else:
        print(f'\n*** Last Fact mockup to next month ***')

else:
    if v_update_flag == 'Y':
        print(f'\n*** Source update to current month Fact ***')
        fn.src_upd_to_next_mth(v_last_mth_fct, v_last_mth_src, v_target_schema, v_target_table, v_sql_upd_curr_mth)
    else:
        print(f'\n*** Not update ***')


Job Start... 2024-06-23, 18:35:01

*** Source update to next month Fact ***

TDMDBPR : Connected

AKPIPRD : Connected

Processing...

TDMDBPR : Disconnected

AKPIPRD : Disconnected

Job Done !!!

Param input...
   -> last_mth_fct
   -> last_mth_src
   -> target_schema
   -> target_table
   -> sql_upd_next_mth


In [8]:
print(yyyymm_next_week)
print(v_last_mth_fct)
print(v_update_flag)

202406
202405.0
Y


In [15]:
# Read : SQL file
with open('SQL/Import-TEST.sql', 'r') as sql_file:
# with open('SQL/Import-FCT_BROADBAND_MKS.sql', 'r') as sql_file:
    queries = sql_file.read().split(';')
    query = queries[0].strip()
    sql_file.close()
# print(f'\n{query}')


# Connect : TDMDBPR
src_dsn = f'{TDMDBPR_user}/{TDMDBPR_pwd}@{TDMDBPR_host}:{TDMDBPR_port}/{TDMDBPR_db}'
src_conn = oracledb.connect(src_dsn)
print(f'\n{TDMDBPR_db} : Connected')
src_cur = src_conn.cursor()


try:
# Create Dataframe
    src_cur.execute(query, v_param)
    rows = src_cur.fetchall()
    print(f'\nCreate Dataframe...')
    src_df = pd.DataFrame.from_records(rows, columns=[x[0] for x in src_cur.description])
    print(f'\n   -> src_df : {src_df.shape[0]} rows, {src_df.shape[1]} columns') 

    print(f'\n{src_df}')

    src_cur.close()


except oracledb.DatabaseError as e:
    print(f'\nError with Oracle : {e}')


finally:
    src_conn.close()
    print(f'\n{TDMDBPR_db} : Disconnected')


TDMDBPR : Connected

Create Dataframe...

   -> src_df : 1 rows, 1 columns

   TM_KEY_MTH
0      202406

TDMDBPR : Disconnected


## ETL Process...

### Query from DB source to DB Target
    Delete -> Insert

    Source : CORPNSBOX.FCT_BB_SHARE_SUBS_CCAATT
             CDSAPPO.DIM_MOOC_AREA
             CDSAPPO.DIM_TIME
    
    Target : AUTOKPI.FCT_BROADBAND_MKS

In [444]:
job_start_datetime = dt.datetime.now().strftime('%Y-%m-%d, %H:%M:%S')
print(f'\nJob Start... {job_start_datetime}')


# Read : SQL file
with open('SQL/Import-FCT_BROADBAND_MKS.sql', 'r') as sql_file:
    queries = sql_file.read().split(';')
    query = queries[0].strip()
    sql_file.close()


# Connect : TDMDBPR
src_dsn = f'{TDMDBPR_user}/{TDMDBPR_pwd}@{TDMDBPR_host}:{TDMDBPR_port}/{TDMDBPR_db}'
src_conn = oracledb.connect(src_dsn)
print(f'\n{TDMDBPR_db} : Connected')
src_cur = src_conn.cursor()


# Connect : AKPIPRD
tgt_dsn = f'{AKPIPRD_user}/{AKPIPRD_pwd}@{AKPIPRD_host}:{AKPIPRD_port}/{AKPIPRD_db}'
tgt_conn = oracledb.connect(tgt_dsn)
print(f'\n{AKPIPRD_db} : Connected')
tgt_cur = tgt_conn.cursor()


try:
    print(f'\nProcessing...')
    
# Create Dataframe
    src_cur.execute(query, v_param)
    rows = src_cur.fetchall()
    print(f'\nCreate Dataframe...')
    src_df = pd.DataFrame.from_records(rows, columns=[x[0] for x in src_cur.description])
    print(f'\n   -> src_df : {src_df.shape[0]} rows, {src_df.shape[1]} columns') 

# Truncate
    # tgt_cur.execute(f'TRUNCATE TABLE {v_target_schema}.{v_target_table}')
    # print(f'\n   -> TRUNCATE : "{v_target_table}" : Done !')

# Delete
    tgt_cur.execute(query_delete)
    print(f'\n   -> DELETE : "{v_target_table}" : Done !')
    
# Insert
    tgt_cur.executemany(f"""
        INSERT INTO {v_target_schema}.{v_target_table}
        (TM_KEY_YR, TM_KEY_MTH, TRUE_TM_KEY_WK, TM_KEY_DAY, METRIC_CD, METRIC_NAME, COMP_CD, VERSION, AREA_NO, AREA_TYPE, AREA_CD, AREA_NAME, METRIC_VALUE, AGG_TYPE, FREQUENCY, REMARK) 
        VALUES (:1,:2,:3,:4,:5,:6,:7,:8,:9,:10,:11,:12,:13,:14,:15,:16)
        """, rows)
    print(f'\n   -> INSERT : "{v_target_table}" : Done !')

    tgt_cur.close()
    tgt_conn.commit()


except oracledb.DatabaseError as e:
    print(f'\nError with Oracle : {e}')


finally:
    src_conn.close()
    print(f'\n{TDMDBPR_db} : Disconnected')

    tgt_conn.close()
    print(f'\n{AKPIPRD_db} : Disconnected')
    
    print(f'\nJob Done !!!')



Job Start... 2024-06-23, 16:49:53

TDMDBPR : Connected

AKPIPRD : Connected

Processing...

Create Dataframe...

   -> src_df : 0 rows, 16 columns

   -> DELETE : "FCT_BROADBAND_MKS" : Done !

TDMDBPR : Disconnected

AKPIPRD : Disconnected

Job Done !!!


In [46]:
''' Example DataFrame '''

# tmp_df = src_df.groupby(['VERSION', 'COMP_CD', 'METRIC_CD', 'METRIC_NAME']).agg({'METRIC_VALUE': 'mean', 'TM_KEY_MTH': 'nunique', 'AREA_TYPE': 'nunique', 'AREA_CD': 'nunique'}).reset_index()
# tmp_df = src_df.groupby(['VERSION', 'COMP_CD', 'METRIC_CD', 'METRIC_NAME']).agg({'TM_KEY_MTH': ['min','max'], 'METRIC_VALUE': 'mean', 'AREA_TYPE': 'nunique', 'AREA_CD': 'nunique'}).reset_index()
tmp_df = src_df.groupby(['VERSION', 'COMP_CD', 'METRIC_CD', 'METRIC_NAME']).agg({'TM_KEY_DAY': ['min','max'], 'METRIC_VALUE': 'mean', 'AREA_TYPE': 'nunique', 'AREA_CD': 'nunique'}).reset_index()
tmp_df

Unnamed: 0_level_0,VERSION,COMP_CD,METRIC_CD,METRIC_NAME,TM_KEY_DAY,TM_KEY_DAY,METRIC_VALUE,AREA_TYPE,AREA_CD
Unnamed: 0_level_1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,min,max,mean,nunique,nunique
0,A,True,VIN00019,Broadband Subs Share : AIS & 3BB,20240101,20240630,47.133255,5,177
1,A,True,VIN00020,Broadband Subs Share : TOL,20240101,20240630,34.913027,5,177
2,A,True,VIN00021,Broadband Subs Share : 3BB,20240101,20240630,26.142298,5,177
3,A,True,VIN00022,Broadband Subs Share : AIS,20240101,20240630,20.990957,5,177
4,A,True,VIN00023,Broadband Subs Share : NT,20240101,20240630,17.953718,5,177
5,A,True,VIN00024,Broadband Subs Share (Subs) : AIS & 3BB,20240101,20240630,119102.637927,5,177
6,A,True,VIN00025,Broadband Subs Share (Subs) : TOL,20240101,20240630,96090.440585,5,177
7,A,True,VIN00026,Broadband Subs Share (Subs) : 3BB,20240101,20240630,65591.18435,5,177
8,A,True,VIN00027,Broadband Subs Share (Subs) : AIS,20240101,20240630,53511.453577,5,177
9,A,True,VIN00028,Broadband Subs Share (Subs) : NT,20240101,20240630,43148.977497,5,177


In [65]:
# 1 + 8 + 65 + 96
# 5487 * 10 * 12