In [17]:
!pip install awswrangler



In [61]:
import awswrangler as wr
from datetime import date
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import io
import boto3
from sklearn.preprocessing import RobustScaler, StandardScaler
from sklearn.ensemble import RandomForestClassifier
import joblib

In [39]:
def getCols(tableName):
    schema = wr.catalog.get_table_types(database='prod_da', table=tableName)
    cols = [key if val!='timestamp' else f"cast({key} as timestamp) as {key}" for key, val in schema.items()]
    return cols

In [51]:
def getTableData(subTableName):
    cols = getCols(subTableName)
    mainDf = wr.athena.read_sql_query(sql=f"select {', '.join(cols)} from {subTableName}", 
                              database="prod_da", 
                              workgroup='data_analytics',)
    return mainDf     

In [41]:
def getSubData(mainDf, nonSubsDf): 
    mainDf['Days_ECO'] = pd.Series(mainDf['first_sub_date'] - mainDf['first_activity_date'],index = mainDf.index)
    mainDf['Days_ECO'] = [c.days for c in mainDf['Days_ECO']]
    mainDf = mainDf[mainDf['first_sub_date'].notna()]

    # nonSubsDf = nonSubsDf.set_index('sam_id').reindex(columns=nonSubsDf.set_index('sam_id').columns.union(mainDf.set_index('sam_id').columns))
    mainDf = mainDf.reindex(columns=nonSubsDf.columns)
    nonSubsDf.set_index('sam_id').update(mainDf.set_index('sam_id'))

    nonSubsDf.reset_index(inplace=True)
    return nonSubsDf

In [42]:
def get_table(table_name, table_date, samIDsr = False, anchor_date = None, mainDf = None, nonSubsDf = None):  
    cols1 = getCols(table_name)
    if nonSubsDf is None: 
        nonSubsDf = wr.athena.read_sql_query(sql=f"select {', '.join(cols1)} from {table_name}", 
                              database="prod_da", 
                              workgroup='data_analytics',)
    
    ffilter = pd.Series([firstCond or secondCond for firstCond, secondCond in 
                         zip(nonSubsDf['first_sub_date'] > pd.to_datetime(table_date),  
                                    nonSubsDf['first_sub_date'].isna())], index = nonSubsDf.index)
    
    nonSubsDf = nonSubsDf[ffilter] 

    nonSubsDf = nonSubsDf[nonSubsDf['first_activity_date'] < pd.to_datetime(table_date)]
    if anchor_date is not None: nonSubsDf = nonSubsDf[nonSubsDf['first_activity_date'] > pd.to_datetime(anchor_date)]
    nonSubsDf['Days_ECO'] = pd.Series(pd.to_datetime(table_date) - nonSubsDf['first_activity_date'],index = nonSubsDf.index)
    nonSubsDf['Days_ECO'] = [c.days for c in nonSubsDf['Days_ECO']]
    
    
    ysr = nonSubsDf['first_sub_date'].notna().astype('int')
    convSr = [c.days for c in (nonSubsDf['first_sub_date']- pd.to_datetime(table_date))]
    convSr = pd.Series(convSr, index = nonSubsDf.index)
    
    if samIDsr: samids = nonSubsDf['sam_id']
    
    if mainDf is not None: nonSubsDf = getSubData(mainDf,nonSubsDf)
    
    nonSubsDf = nonSubsDf[list(combined_set)]
    nonSubsDf[to_num] = nonSubsDf[to_num].astype(float)
    
    for col in to_num: 
        if nonSubsDf[col].min()<0:
            nonSubsDf[col] = np.where(nonSubsDf[col]<0, 0 ,nonSubsDf[col])
    
    for col in to_num: nonSubsDf.loc[:,col] = np.clip(nonSubsDf[col],None,nonSubsDf[col].quantile(0.95))
    for col in to_num: nonSubsDf.loc[:,col] = StandardScaler().fit_transform(nonSubsDf[[col]]) 
    for col in to_num: nonSubsDf.loc[:,col] = nonSubsDf[col].fillna(nonSubsDf[col].min()-1)
    
    
    nonSubsDf['last_private'] = nonSubsDf['last_private'].apply(lambda x: int(x) if pd.notnull(x) else -1)
    nonSubsDf['last_watchlist_private']  = nonSubsDf['last_watchlist_private'].apply(lambda x: int(x) if pd.notnull(x) else -1)
    nonSubsDf['phone_filtered_id'] = nonSubsDf['phone_filtered_id'].notna().astype('int')
    nonSubsDf['phone_watchlisted_id'] = nonSubsDf['phone_watchlisted_id'].notna().astype('int')
    
    
    # for dummy variable : form another dummies
    changeSubsDf = pd.get_dummies(nonSubsDf[to_Dummy], dtype=int)
    nonSubsDf = nonSubsDf.drop(columns = to_Dummy)
    nonSubsDf = nonSubsDf.join(changeSubsDf)
    
    for var_name in to_freq:
        tmp_fe = nonSubsDf.groupby(var_name).size()/len(nonSubsDf)
        nonSubsDf.loc[:,var_name + '_Freq'] = nonSubsDf[var_name].map(tmp_fe)
        nonSubsDf.loc[:,var_name + '_Freq'] = StandardScaler().fit_transform(nonSubsDf[[var_name + '_Freq']])
        nonSubsDf.loc[:,var_name + '_Freq'] = nonSubsDf[var_name + '_Freq'].fillna(nonSubsDf[var_name + '_Freq'].min()-1)
    
    nonSubsDf = nonSubsDf.drop(columns = to_freq)
    nonSubsDf = nonSubsDf.astype(float)
    
    if samIDsr:
        return nonSubsDf, ysr, convSr, samids
    else:
        return nonSubsDf, ysr, convSr

In [43]:
def trainModel(table_name, table_date, subDf, nonSubDf, s3_path):

    nonSubsDf,ysr,convSr = get_table(table_name, table_date, mainDf = subDf, nonSubsDf= nonSubDf)
    
    print("--Table reading Done--")
    
    rf = RandomForestClassifier(n_estimators = 1500, min_samples_leaf=500, n_jobs = -1, oob_score = True, random_state = 42, class_weight= "balanced")
    rf.fit(nonSubsDf, ysr)
    
    print("--Model Fit Done--")
    
    oob = rf.oob_score_
    
    buffer = io.BytesIO()
    joblib.dump(rf, buffer)
    buffer.seek(0)  

    wr.s3.upload(local_file=buffer, path=s3_path)
    print("--Model Save Done--")


In [44]:
def getMetadataTable(tableName):
    year = int(tableName.split('_')[-3])
    month = int(tableName.split('_')[-2])
    day = int(tableName.split('_')[-1])
    
    return date(year, month, day), str(year) + "_"+str(month) + "_"+str(day) 

In [None]:
import main as main

In [64]:
tableList = ['ds060425_user_master_table_2024_03_31',
'ds060425_user_master_table_2024_04_30',
'ds060425_user_master_table_2024_05_31',
'ds060425_user_master_table_2024_06_30',
'ds060425_user_master_table_2024_07_31',
'ds060425_user_master_table_2024_08_31',
'ds060425_user_master_table_2024_09_30',
'ds060425_user_master_table_2024_10_31',
'ds060425_user_master_table_2024_11_30',
'ds060425_user_master_table_2024_12_31',
'ds060425_user_master_table_2025_01_31',
'ds060425_user_master_table_2025_02_28']

In [45]:
subTableName = 'ds_sub_minus1_user_master_table_2025_04_06'
# nonSubTableName = 'ds060425_user_master_table_2025_03_31'


In [52]:
subDf = getTableData(subTableName)

In [53]:
subDf.shape

(191195, 82)

In [None]:
for nonSubTableName in tableList:
    tableDate, ymd, modelPrefix = main.getMetadataTable(nonSubTableName)
    s3_path = 's3://dl-prod-analytics/sandbox/research/manu-rathi/RF_270624/RF_MainDf_1500_500/'+ ymd + '_060425.joblib'
    nonSubDf = main.getTableData(nonSubTableName)
    print(nonSubDf.shape)
    main.trainModel(nonSubTableName, tableDate, subDf, nonSubDf, s3_path)

(2868097, 82)


In [None]:
def setSameCols(nonSubsDfCols, nonSubsDf_g):
    colsToRemove = set(nonSubsDf_g.columns) - set(nonSubsDfCols)
    colsToAdd = set(nonSubsDfCols) - set(nonSubsDf_g.columns)
    
    if(len(colsToRemove) !=0): nonSubsDf_g = nonSubsDf_g.drop(columns = list(colsToRemove))
    if(len(colsToAdd) !=0): 
        for col in colsToAdd:
            nonSubsDf_g.loc[:,col] = -1
    return nonSubsDf_g 

In [None]:
def getFilteredDf(nonSubsDf,ysr):
    
    trn_df_1 = nonSubsDf[nonSubsDf['Days_ECO']>0]
    trn_y_1 = ysr.loc[trn_df_1.index]
    
    trn_df_0 = nonSubsDf[nonSubsDf['Days_ECO']<0]
    trn_y_0 = ysr.loc[trn_df_0.index]
    
    return trn_df_1,trn_y_1, trn_df_0, trn_y_0

In [None]:
def getResultDf(rf, nonSubsDf_g,ysr_g, convSr_g, table_date_g):   
    prob = rf.predict_proba(nonSubsDf_g)
    tmpDf = pd.DataFrame(index =nonSubsDf_g.index ,columns = ['probability' ,'binary', 'decile','Days_ECO','Days_Conv'])
    tmpDf['probability'] = prob[:,-1]
    tmpDf['binary'] = ysr_g    
    tmpDf['decile'] = [np.round(x) for x in tmpDf['probability']*10]
    tmpDf['Days_ECO'] = nonSubsDf_g['Days_ECO']
    tmpDf['Days_Conv'] = convSr_g
    
    grouped = tmpDf.groupby('decile')['binary']
    resultDf = pd.DataFrame(grouped.sum()/ysr_g.sum()).rename(columns={'binary': str(table_date_g) + '_Base'})
    resultDf[str(table_date_g) + '_Conv'] = grouped.sum()/grouped.count()
    resultDf[str(table_date_g) + '_Count'] = grouped.count()
    resultDf[str(table_date_g) + '_Subs'] = grouped.sum()
    
    resultDf[str(table_date_g) + '_EcoAvg'] = tmpDf.groupby('decile')['Days_ECO'].mean()
    resultDf[str(table_date_g) + '_SubsEcoAvg'] = tmpDf[tmpDf['binary'] == 1].groupby('decile')['Days_ECO'].mean()
    resultDf[str(table_date_g) + '_SubsConvAvg'] = tmpDf[tmpDf['binary'] == 1].groupby('decile')['Days_Conv'].mean()
    
    return resultDf

In [None]:
def getResultSamDf(rf, nonSubsDf_g,ysr_g, convSr_g, samid_g, tableName, subs= True):  
    if subs: 
        subIndex = ysr_g[ysr_g == 1].index
    else: 
        subIndex = ysr_g.index
    prob = rf.predict_proba(nonSubsDf_g.loc[subIndex])
    tmpDf = pd.DataFrame(index = subIndex)
    tmpDf['sam'] = samid_g.loc[subIndex]
    tmpDf['probability_'+ tableName] = prob[:,-1]
    tmpDf['conv_' + tableName] = convSr_g.loc[subIndex]
    tmpDf['Eco_' + tableName] = nonSubsDf_g.loc[subIndex]['Days_ECO']
    tmpDf['Sub_' + tableName] = ysr_g.loc[subIndex]
    tmpDf.set_index('sam', inplace = True)
    return tmpDf

In [None]:
#ENUMs

In [26]:
to_Bin = ['last_private', 'last_watchlist_private']
to_Prsnt = ['phone_filtered_id', 'phone_watchlisted_id']

to_Dummy = ['last_type',
'last_device_type',
'primary_banner_view_first_touch_device_type',
'first_device_type',
'primary_banner_view_last_touch_device_type',
'first_type']

to_freq = ['last_accessed_from',
'secondary_banner_view_last_touch_banner_title',
'primary_banner_view_last_touch_banner_title',  
'primary_banner_view_first_touch_banner_title',
'first_accessed_from',             
'secondary_banner_view_first_touch_banner_title',             
'last_scid',
'first_scid', 
'last_watchlist_scid',
'manager_views_last_touch_display_name']

to_num = ['Days_ECO','count_sc_views_fee_model_based',
 'count_sc_views_fee',
 'last_hours_difference',
 'stocks_invested_amount',
 'smallcap_aum',
 'etf_invested_amount',
 'midcap_aum',
 'primary_banner_view_last_touch_hours_taken_from_plift',
 'total_sid_count',
 'count_sc_views',
 'count_watchlisted_fee',
 'total_sc_buy_invested_amount',
 'count_manager_views',
 'count_sc_views_free',
 'count_sc_views_fee_sector_trackers',
 'manager_views_last_touch_hours_taken_from_plift',
 'count_sc_views_free_etf',
 'count_primary_banner_views_category_track',
 'count_primary_banner_views_category_others',
 'count_primary_banner_views_category_lamf',
 'count_sc_views_accessed_search',
 'secondary_banner_view_last_touch_hours_taken_from_plift',
 'count_primary_banner_views_android',
 'count_secondary_banner_views_category_lamf',
 'count_primary_banner_views',
 'count_primary_banner_views_category_brand_campaign_offer',
 'count_primary_banner_views_category_buy_or_fp',
 'last_filtered_hours_difference',
 'count_manager_views_name_others',
 'count_sc_views_android',
 'count_primary_banner_views_ios',
 'count_sc_views_accessed_explore',
 'count_primary_banner_views_category_subs',
 'last_watchlist_hours_difference',
 'count_primary_banner_views_category_brand_campaign',
 'count_manager_views_accessed_smallcase_profile',
 'count_sc_views_accessed_home',
 'count_sc_views_free_model_based',
 'count_primary_banner_views_category_app_referral',
 'count_filtered',
 'first_filtered_hours_difference',
 'count_primary_banner_views_web',
 'first_hours_difference',
 'count_sc_views_web',
 'count_sc_views_accessed_watchlist',
 'count_sc_views_free_sector_trackers',
 'first_watchlist_hours_difference',
 'count_manager_views_android',
 'count_sc_views_free_awi',
 'count_watchlisted',
 'count_secondary_banner_views',
 'sc_created_last_touch_hours_taken_from_plift']

In [59]:
set1 = set(to_Dummy)
set2 = set(to_freq)
set3 = set(to_Bin)
set4 = set(to_Prsnt)
# set5 = set(to_Sub)
set6 = set(to_num)

# Combine the sets using union
combined_set = set1 | set2 | set3 | set4 | set6