In [1]:
#Snowpark lib
from snowflake.snowpark import Session

# Data Science Libs
import numpy as np
import pandas as pd

# create_temp_table warning suppresion
import warnings; warnings.simplefilter('ignore')

#ConfigParser to read ini file
import configparser

import numpy as np

from sklearn.compose import ColumnTransformer
from sklearn.feature_selection import SelectPercentile, chi2
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import RandomizedSearchCV, train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.cluster import DBSCAN

np.random.seed(0)

config = configparser.ConfigParser()
config.read("/notebooks/notebooks/credentials.ini")

connection_parameters = {
    "user": f'{config["Snowflake"]["user"]}',
    "password": f'{config["Snowflake"]["password"]}',
    "account": f'{config["Snowflake"]["account"]}',
    "WAREHOUSE": f'{config["Snowflake"]["WAREHOUSE"]}',
    "DATABASE": f'{config["Snowflake"]["DATABASE"]}',
    "SCHEMA": f'{config["Snowflake"]["SCHEMA"]}'
}

def snowflake_connector(conn):
    try:
        session = Session.builder.configs(conn).create()
        print("connection successful!")
    except:
        raise ValueError("error while connecting with db")
    return session

session = snowflake_connector(connection_parameters)

connection successful!


In [2]:
df = session.table("MEMBER_FUNDS_ENRICHED_DETAILS").to_pandas()
# df = pd.read_csv("/data/funds.csv")

In [4]:
# df.to_csv("/data/funds.csv", index=False)

NO_OF_CHURN column has almost 50% missing value, it wont be wise to impute these many values so will just drop it.                                     

In [3]:
frame = df[['MEMBER_GENDER', 'MEMBER_STATE', 'MEMBER_CONTACT_VERIFIED','FUND_TOTAL_ASSETS','FUND_RETURN_TARGET_PERCENTAGE',
            'INVESTMENT_RISK_CATEGORY', 'CASH_BENCHMARK_ALLOCATION','FIXED_INCOME_BENCHMARK_ALLOCATION',
            'DOMESTIC_LISTED_EQUITY_BENCHMARK_ALLOCATION','INTERNATIONAL_LISTED_EQUITY_BENCHMARK_ALLOCATION',
            'UNLISTED_EQUITY_BENCHMARK_ALLOCATION', 'EQUITY_BENCHMARK_ALLOCATION','PROPERTY_BENCHMARK_ALLOCATION',
            'INFRA_BENCHMARK_ALLOCATION','COMMODITIES_BENCHMARK_ALLOCATION', 'OTHERS_BENCHMARK_ALLOCATION','FUND_RISK_LEVEL', 
            'FUND_RISK_CATEGORY','NEG_NETRETURN_SINCE_INCEPTION', 'YEAR_1_RETURNS', 'YEAR_3_RETURNS','YEAR_5_RETURNS',
            'YEAR_7_RETURNS','YEAR_10_RETURNS', 'SUPER_FEES','PENSION_FEES',
            'INVESTMENT_AGE_GROUP', 'RETIREMENT_AGE_GROUP', 'TOTAL_FUNDS_INVESTED','CHURN_FLAG']].copy()

In [4]:
frame["CHURN_FLAG"] = frame["CHURN_FLAG"].apply(lambda x: 1 if x =="Y" else 0)

In [8]:
# frame.groupby(["RETIREMENT_AGE_GROUP","CHURN_FLAG"])[["CHURN_FLAG"]].count()

The distribution of CHURN is pretty common bettween the employers and hence wont contribute inn modelling

In [9]:
# multiple = {i for i, j in dict(df["MEMBER_ID"].value_counts()).items() if j > 1 }
# multiple[multiple["value_counts"]>=2].index.to_list()

# MODEL TO PREDICT CHURN

In [5]:
numeric_features = ["FUND_TOTAL_ASSETS", 'FUND_RETURN_TARGET_PERCENTAGE','CASH_BENCHMARK_ALLOCATION',
       'FIXED_INCOME_BENCHMARK_ALLOCATION','DOMESTIC_LISTED_EQUITY_BENCHMARK_ALLOCATION','INTERNATIONAL_LISTED_EQUITY_BENCHMARK_ALLOCATION',
       'UNLISTED_EQUITY_BENCHMARK_ALLOCATION', 'EQUITY_BENCHMARK_ALLOCATION','PROPERTY_BENCHMARK_ALLOCATION', 'INFRA_BENCHMARK_ALLOCATION',
       'COMMODITIES_BENCHMARK_ALLOCATION', 'OTHERS_BENCHMARK_ALLOCATION', 'YEAR_1_RETURNS', 'YEAR_3_RETURNS',
       'YEAR_5_RETURNS', 'YEAR_7_RETURNS', 'YEAR_10_RETURNS', 'SUPER_FEES','PENSION_FEES',]
numeric_transformer = Pipeline(
    steps=[("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler())]
)

categorical_features = ["MEMBER_GENDER", "MEMBER_STATE", "MEMBER_CONTACT_VERIFIED","INVESTMENT_RISK_CATEGORY",'FUND_RISK_LEVEL',
                        'FUND_RISK_CATEGORY',"NEG_NETRETURN_SINCE_INCEPTION", 'INVESTMENT_AGE_GROUP', 'RETIREMENT_AGE_GROUP', "TOTAL_FUNDS_INVESTED"]
categorical_transformer = Pipeline(
    steps=[
        ("encoder", OneHotEncoder(handle_unknown="ignore")),
        ("selector", SelectPercentile(chi2, percentile=50)),
    ]
)
preprocessor = ColumnTransformer(
    transformers=[
        ("num", numeric_transformer, numeric_features),
        ("cat", categorical_transformer, categorical_features),
    ]
)

In [6]:
clf = Pipeline(
    steps=[("preprocessor", preprocessor), ("classifier", RandomForestClassifier(n_estimators=500, max_depth=8, 
                                                                                 max_features=0.6,
                                                                                 bootstrap=True, max_samples=0.8))]
)



In [7]:
X = frame.drop("CHURN_FLAG", axis=1)
y = frame["CHURN_FLAG"]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=0)


In [8]:
clf.fit(X_train, y_train)
print("model score: %.3f" % clf.score(X_test, y_test))

model score: 0.989


In [9]:
import pickle

# save
with open('churn_model.pkl','wb') as f:  ## use rb while reading the fie
    pickle.dump(clf,f)

In [10]:
clf

In [11]:
frame = clf.predict(frame)
df["CHURN_PREDICTED"] = frame

# MODEL_REASON

In [12]:
df["unique_id"] = [i for i in range(df.shape[0])]

In [13]:
churned = df[df["CHURN_FLAG"] == "Y"]

In [14]:
churned.groupby(["CHURN_REASON"])[["CHURN_REASON"]].count() / churned.shape[0]*100

Unnamed: 0_level_0,CHURN_REASON
CHURN_REASON,Unnamed: 1_level_1
FUND REPUTATION DECLINING,29.070673
HIGH ACCOUNT FEE,16.143163
HIGH TRANSACTION FEE,21.015857
LIFE EVENT,5.547624
NO REASON IDENTIFIED,25.771735
POOR FUND PERFORMANCE,2.450949


In [15]:
from sklearn.multiclass import OneVsRestClassifier
from sklearn.linear_model import LogisticRegression

In [16]:
# frame_id = churned[["FUND_ID",""]]
frame = churned[['MEMBER_GENDER', 'MEMBER_STATE', 'MEMBER_CONTACT_VERIFIED','FUND_TOTAL_ASSETS','FUND_RETURN_TARGET_PERCENTAGE',
            'INVESTMENT_RISK_CATEGORY', 'CASH_BENCHMARK_ALLOCATION','FIXED_INCOME_BENCHMARK_ALLOCATION',
            'DOMESTIC_LISTED_EQUITY_BENCHMARK_ALLOCATION','INTERNATIONAL_LISTED_EQUITY_BENCHMARK_ALLOCATION',
            'UNLISTED_EQUITY_BENCHMARK_ALLOCATION', 'EQUITY_BENCHMARK_ALLOCATION','PROPERTY_BENCHMARK_ALLOCATION',
            'INFRA_BENCHMARK_ALLOCATION','COMMODITIES_BENCHMARK_ALLOCATION', 'OTHERS_BENCHMARK_ALLOCATION','FUND_RISK_LEVEL', 
            'FUND_RISK_CATEGORY','NEG_NETRETURN_SINCE_INCEPTION', 'YEAR_1_RETURNS', 'YEAR_3_RETURNS','YEAR_5_RETURNS',
            'YEAR_7_RETURNS','YEAR_10_RETURNS', 'SUPER_FEES','PENSION_FEES',
            'INVESTMENT_AGE_GROUP', 'RETIREMENT_AGE_GROUP', 'TOTAL_FUNDS_INVESTED',"CHURN_REASON"]].copy()

In [17]:
churn_reason_dic = {reason:i for i, reason in enumerate(churned["CHURN_REASON"].unique())}
frame["CHURN_REASON"] = frame["CHURN_REASON"].apply(lambda x: churn_reason_dic[x])

In [18]:
numeric_features = ["FUND_TOTAL_ASSETS", 'FUND_RETURN_TARGET_PERCENTAGE','CASH_BENCHMARK_ALLOCATION',
       'FIXED_INCOME_BENCHMARK_ALLOCATION','DOMESTIC_LISTED_EQUITY_BENCHMARK_ALLOCATION','INTERNATIONAL_LISTED_EQUITY_BENCHMARK_ALLOCATION',
       'UNLISTED_EQUITY_BENCHMARK_ALLOCATION', 'EQUITY_BENCHMARK_ALLOCATION','PROPERTY_BENCHMARK_ALLOCATION', 'INFRA_BENCHMARK_ALLOCATION',
       'COMMODITIES_BENCHMARK_ALLOCATION', 'OTHERS_BENCHMARK_ALLOCATION', 'YEAR_1_RETURNS', 'YEAR_3_RETURNS',
       'YEAR_5_RETURNS', 'YEAR_7_RETURNS', 'YEAR_10_RETURNS', 'SUPER_FEES','PENSION_FEES',]
numeric_transformer = Pipeline(
    steps=[("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler())]
)

categorical_features = ["MEMBER_GENDER", "MEMBER_STATE", "MEMBER_CONTACT_VERIFIED","INVESTMENT_RISK_CATEGORY",'FUND_RISK_LEVEL',
                        'FUND_RISK_CATEGORY',"NEG_NETRETURN_SINCE_INCEPTION", 'INVESTMENT_AGE_GROUP', 'RETIREMENT_AGE_GROUP', "TOTAL_FUNDS_INVESTED"]
categorical_transformer = Pipeline(
    steps=[
        ("encoder", OneHotEncoder(handle_unknown="ignore")),
        ("selector", SelectPercentile(chi2, percentile=50)),
    ]
)
preprocessor = ColumnTransformer(
    transformers=[
        ("num", numeric_transformer, numeric_features),
        ("cat", categorical_transformer, categorical_features),
    ]
)

In [19]:
multi_lable_clf = OneVsRestClassifier(RandomForestClassifier(n_estimators=500, max_depth=8, max_features=0.6, bootstrap=True,
                                                             max_samples=0.8))
clf2 = Pipeline(
    steps=[("preprocessor", preprocessor), ("classifier", multi_lable_clf)]
)


X = frame.drop("CHURN_REASON", axis=1)
y = frame["CHURN_REASON"]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=0)
print(X_train.shape)

clf2.fit(X_train, y_train)
print("model score: %.3f" % clf2.score(X_test, y_test))

(73963, 29)
model score: 1.000


In [20]:
frame_pred = clf2.predict(frame)
churned["CHURN_REASON_PREDICTED"] = frame_pred

In [21]:
import pickle

# save
with open('reason_model.pkl','wb') as f:  ## use rb while reading the fie
    pickle.dump(clf2,f)

In [22]:
rev_churn_dic = {val:key for key, val in churn_reason_dic.items()}
temp_df = df[["unique_id"]]
temp_churned = churned[["unique_id", "CHURN_REASON_PREDICTED"]]
temp_df = pd.merge(temp_df, temp_churned, on="unique_id", how="left")
    
    
temp_df = temp_df.replace(np.nan, "Not Applicable")
temp_df["CHURN_REASON_PREDICTED"] = temp_df["CHURN_REASON_PREDICTED"].apply(lambda  x: rev_churn_dic[int(x)] if x !=  "Not Applicable" else x)
df["CHURN_REASON_PREDICTED"] = temp_df["CHURN_REASON_PREDICTED"]

# FUND RECOMMENDATION

In [23]:
fund_allocation = session.table("FUND_ALLOCATION_MASTER").to_pandas()
fund_master = session.table("FUND_MASTER").to_pandas()
fund_performance = session.table("FUND_PERFORMANCE_MASTER").to_pandas()

In [24]:
fund = pd.merge(fund_allocation, fund_master, on="FUND_ID")
fund_total = pd.merge(fund, fund_performance, on="FUND_ID")
fund_charge = [float(i) for i in fund_total["FUND_FEES_CHARGES"] if i != 'Not Available']
avg_fund_charge = sum(fund_charge) / len(fund_charge)

fund_total["FUND_FEES_CHARGES"] = fund_total["FUND_FEES_CHARGES"].apply(lambda x: avg_fund_charge if x == 'Not Available' else x)
fund_total["FUND_FEES_CHARGES"] = fund_total["FUND_FEES_CHARGES"].astype(float) 

fund_meta = fund_total[["FUND_ID","FUNDNAME"]]
fund_cluster = fund_total.drop(["FUND_ID","FUNDNAME","FUND_NAME_y","FUND_TRUSTEE","FUND_INCEPTION_DATE"], axis=1)

In [25]:
fund_data = pd.get_dummies(fund_cluster)

fund_data = fund_data.replace(False, 0)
fund_data = fund_data.replace(True, 1)

In [26]:
clustering = DBSCAN(eps=2, min_samples=3).fit(fund_data)

In [27]:
fund_total["cluster"] = clustering.labels_
fund_total["avg_return"] = fund_total[["YEAR_1_RETURNS","YEAR_3_RETURNS",
                                 "YEAR_5_RETURNS","YEAR_7_RETURNS",
                                 "YEAR_10_RETURNS"]].apply(lambda x:
                                                        (x[0] + (0.9*x[1]) + (0.75*x[2]) + (0.6*x[3]) + (0.55*x[4]))/5, axis=1)

In [28]:
def recommendation(fund_id, reason):
    if reason == 'NO REASON IDENTIFIED' or reason == "NOT APPLICABLE":
        return "Not Applicable"
    elif reason == 'HIGH ACCOUNT FEE' or reason == 'HIGH TRANSACTION FEE':
        clus_frame = fund_total[fund_total["FUND_ID"] == fund_id]
        clus = list(clus_frame["cluster"])[0]
        clus_frame["charge"] = clus_frame[["SUPER_FEES","PENSION_FEES","FUND_FEES_CHARGES"]].apply(lambda x: (x[0]+x[1]+x[2])/3, axis=1)
        total_charge = list(clus_frame["charge"])[0]
        group = fund_total[fund_total["cluster"] == clus]
        group["avg_fee"] = group[["SUPER_FEES","PENSION_FEES","FUND_FEES_CHARGES"]].apply(lambda x: (x[0]+x[1]+x[2])/3, axis=1)
        group["rec"] = group["avg_fee"].apply(lambda x: 1 if x < total_charge else 0)
        group = group.sort_values("avg_return")
        recommendations = list(group[group["rec"] == 1]["FUND_ID"])
        if fund_id in recommendations:
            recommendations.remove(fund_id)
        recommendations = recommendations[:5]
        if len(recommendations) >= 1:
            recommendations = np.random.choice(recommendations,1)[0]
        else:
            return "Not Applicable"
        return recommendations

    elif reason == 'FUND REPUTATION DECLINING' or reason == 'POOR FUND PERFORMANCE':
        clus_frame = fund_total[fund_total["FUND_ID"] == fund_id]
        clus = list(clus_frame["cluster"])[0]
        group = fund_total[fund_total["cluster"] == clus]
        group = group.sort_values("avg_return")
        recommendations = list(group["FUND_ID"])
        if fund_id in recommendations:
            recommendations.remove(fund_id)
        recommendations = recommendations[:5]
        if len(recommendations) >= 1:
            recommendations = np.random.choice(recommendations,1)[0]
        else:
            return "Not Applicable"
        return recommendations
    
    else:
        return "Not Applicable"
        
            
            
            
rec = recommendation("FID000014","HIGH ACCOUNT FEE")
print(len(rec))

9


In [29]:
rec

'FID000016'

In [30]:
from tqdm import tqdm

In [31]:
rec_fund = []
for fund_id, f_reason, churn in tqdm(zip(df["FUND_ID"], df["CHURN_REASON"], df["CHURN_PREDICTED"])):
    if churn == 1:
        rec = recommendation(fund_id,f_reason)
        rec_fund.append(rec)
    else:
        rec_fund.append("Not Applicable")
    

335999it [06:36, 847.89it/s] 


In [32]:
df["funds_rec"] = rec_fund

In [33]:
df.to_csv("/data/funds_recommendated.csv", index=False)

In [34]:
fund_total.to_csv("/data/FUNDS_COMPLETE_DATA.csv", index=False)

In [100]:
df["FUNDS_RECOMMENDATIONS"] = df["FUNDS_RECOMMENDATIONS"].astype(str)

In [98]:
df = df.rename(columns={'FUNDS RECOMMENDATIONS': 'FUNDS_RECOMMENDATIONS'})

In [87]:
# from snowflake.snowpark.types import StructType, StructField, StringType, IntegerType, FloatType

# schema = StructType([
#     StructField("MEMBER_ID", StringType()),
#     StructField("MEMBER_NAME", StringType()),
#     StructField("MEMBER_EMPLOYMENT", StringType()),
#     StructField("MEMBER_GENDER", StringType()),
#     StructField("MEMBER_CITY_TOWN", StringType()),
#     StructField("MEMBER_STATE", StringType()),
#     StructField("MEMBER_CONTACT_VERIFIED", StringType()),
#     StructField("FUND_ID", StringType()),
#     StructField("CHURN_REASON", StringType()),
#     StructField("LATITUDE", FloatType()),
#     StructField("LONGITUDE", FloatType()),
#     StructField("COUNTRY", StringType()),
#     StructField("COUNTRYCODE", StringType()),
#     StructField("CAPITAL", StringType()),
#     StructField("POPULATION", IntegerType()),
#     StructField("FUND_TOTAL_ASSETS", FloatType()),
#     StructField("FUND_RETURN_TARGET_PERCENTAGE", FloatType()),
#     StructField("INVESTMENT_RISK_LEVEL", FloatType()),
#     StructField("INVESTMENT_RISK_CATEGORY", StringType()),
#     StructField("CASH_BENCHMARK_ALLOCATION", FloatType()),
#     StructField("FIXED_INCOME_BENCHMARK_ALLOCATION", FloatType()),
#     StructField("DOMESTIC_LISTED_EQUITY_BENCHMARK_ALLOCATION", FloatType()),
#     StructField("INTERNATIONAL_LISTED_EQUITY_BENCHMARK_ALLOCATION", FloatType()),
#     StructField("UNLISTED_EQUITY_BENCHMARK_ALLOCATION", FloatType()),
#     StructField("EQUITY_BENCHMARK_ALLOCATION", FloatType()),
#     StructField("PROPERTY_BENCHMARK_ALLOCATION", FloatType()),
#     StructField("INFRA_BENCHMARK_ALLOCATION", FloatType()),
#     StructField("COMMODITIES_BENCHMARK_ALLOCATION", FloatType()),
#     StructField("OTHERS_BENCHMARK_ALLOCATION", FloatType()),
#     StructField("FUND_RISK_LEVEL", FloatType()),
#     StructField("FUND_RISK_CATEGORY", StringType()),
#     StructField("NEG_NETRETURN_SINCE_INCEPTION", StringType()),
#     StructField("YEAR_1_RETURNS", FloatType()),
#     StructField("YEAR_3_RETURNS", FloatType()),
#     StructField("YEAR_5_RETURNS", FloatType()),
#     StructField("YEAR_7_RETURNS", FloatType()),
#     StructField("YEAR_10_RETURNS", FloatType()),
#     StructField("SUPER_FEES", FloatType()),
#     StructField("PENSION_FEES", FloatType()),
#     StructField("CHURN_FLAG", StringType()),
#     StructField("MEMBER_AGE", IntegerType()),
#     StructField("INVESTMENT_AGE_GROUP", StringType()),
#     StructField("RETIREMENT_AGE", IntegerType()),
#     StructField("RETIREMENT_AGE_GROUP", StringType()),
#     StructField("TOTAL_FUNDS_INVESTED", IntegerType()),
#     StructField("NO_OF_CHURN", StringType()),
#     StructField("MEMBER_DOB_DT", StringType()),
#     StructField("ALLOCATION_DT", StringType()),
#     StructField("CHURN_DT", StringType()),
#     StructField("RETIREMENT_DT", StringType()),
#     StructField("CHURN_PREDICTED", IntegerType()),
#     StructField("unique_id", IntegerType()),
#     StructField("CHURN_REASON_PREDICTED", StringType()),
#     StructField("funds_rec", StringType())
# ])

In [101]:
df_snowflake = session.createDataFrame(df.values.tolist(), schema=df.columns.tolist())

TypeError: Unable to infer the type of the field FUNDS_RECOMMENDATIONS.

In [80]:
df_snowflake.write.mode("overwrite").save_as_table("FDC_Banking_FS.BFS_SUPER_ANNU_SCHEMA.FUNDS_COMPLETE_DATA")

SnowparkSQLException: (1304): 252011: Python data type [str_] cannot be automatically mapped to Snowflake data type. Specify the snowflake data type explicitly.

In [None]:
temp_ = {}
for i, val in enumerate(df['FUNDS_RECOMMENDATIONS']):
    df["FUNDS_RECOMMENDATIONS"].iloc[i] = str(val)
    temp_[val] =type(val)
    

In [106]:
temp_

{'Not Applicable': str,
 'FID000100': numpy.str_,
 'FID000033': numpy.str_,
 'FID000096': numpy.str_,
 'FID000034': numpy.str_,
 'FID000117': numpy.str_,
 'FID000192': numpy.str_,
 'FID000136': numpy.str_,
 'FID000080': numpy.str_,
 'FID000121': numpy.str_,
 'FID000106': numpy.str_,
 'FID000119': numpy.str_,
 'FID000097': numpy.str_,
 'FID000011': numpy.str_,
 'FID000105': numpy.str_,
 'FID000005': numpy.str_,
 'FID000103': numpy.str_,
 'FID000116': numpy.str_,
 'FID000049': numpy.str_,
 'FID000135': numpy.str_,
 'FID000060': numpy.str_,
 'FID000122': numpy.str_,
 'FID000059': numpy.str_,
 'FID000102': numpy.str_,
 'FID000185': numpy.str_,
 'FID000146': numpy.str_,
 'FID000114': numpy.str_,
 'FID000107': numpy.str_,
 'FID000108': numpy.str_,
 'FID000006': numpy.str_,
 'FID000067': numpy.str_,
 'FID000016': numpy.str_,
 'FID000171': numpy.str_,
 'FID000163': numpy.str_,
 'FID000115': numpy.str_,
 'FID000113': numpy.str_,
 'FID000008': numpy.str_,
 'FID000010': numpy.str_,
 'FID000118': 