# Import library

In [1]:
from pyspark.sql.types import StructType, StructField, IntegerType,\
LongType, FloatType, StringType, BooleanType, DateType
import pyspark.sql.functions as psf
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator

import re

# Import custom libs
import config
import crm_schema

import importlib
importlib.reload(config)
importlib.reload(crm_schema)

<module 'crm_schema' from '/mnt/d/git/lgbm/crm_schema.py'>

# Load data

In [2]:
# Initialize spark session
spark = (
    SparkSession
    .builder
    .appName("crm modelling")
    .getOrCreate()
)

In [3]:
input_data_file = "MODELLING_DATA_SUB.csv" 
sdf_raw = spark.read.csv(input_data_file, header=True, schema=crm_schema.input_data_schema)

In [4]:
sdf_raw.limit(5).toPandas().head()

Unnamed: 0,ID_CUID,EFFECTIVE_DATE,REPORT_DATE,RUN_DATE,CA_START_DT,CA_OFFER_TYPE_CODE,CA_F_CLX_ELIGIBLE,CA_LIMIT_OF,CA_MAXINST_OF,CA_RBP_OF,...,PCB_CNT_CLX_LOAN,PCB_CNT_CLX_NON_HOME_LOAN,PCB_BALANCE_VOL_REMAINING,PCB_CNT_TENOR_REMAINING,PCB_AVG_SCORE,SC_GR,FLAG_SIGN_WO_CANCEL,FLAG_BEFORE_REPORT,FLAG_CAL_CONTRACT_SIGNED_1M,FLAG_CCX_CONTRACT_SIGNED_1M
0,6160752,2020-05-14,2019-08-01,2020-05-14,2019-07-26,,1,17338116.0,1015092.0,RBP 03,...,0,0,,,,OC3Y,1,1,0,0
1,8228212,2020-05-14,2019-08-01,2020-05-14,2019-07-25,,1,11000000.0,2598660.0,RBP 03,...,0,0,0.0,0.0,506.0,MFLC_12,1,1,0,0
2,12744249,2020-05-14,2019-08-01,2020-05-14,2019-07-10,,1,200000000.0,11914759.0,RBP 01,...,0,0,,,0.0,MFLC_05,1,1,0,0
3,10409324,2020-05-14,2019-08-01,2020-05-14,2019-07-17,,1,12000000.0,2096486.0,RBP 05,...,0,0,,,,OC2Y,1,1,0,0
4,10218197,2020-05-14,2019-08-01,2020-05-14,2019-07-12,,1,15000000.0,2323125.0,RBP 03,...,0,0,0.0,0.0,494.0,OC2Y,1,1,0,0


# Pre-processing data

In [5]:
#give a unique name to your model, all files created in process of modelling will have names ended on the unique_name
unique_name = '_cl'

#predictors from ADM to use
pred_cols = config.pred_cols

#preprocessing
cols_to_drop = []
cols_to_log = []
cols_to_fix = {}

## Label encoding

In [6]:
# Get attributes with object type (category)
cat_cols = []
for col, typo in sdf_raw.dtypes:
    if typo in ("string"):
        cat_cols.append(col)
    elif re.search(".*SK.?_",col) is not None:
        cat_cols.append(col)

# Fill in na with -1
# Upper case for the values
sdf_pre_processed = sdf_raw.fillna(-1)
for col in cat_cols:
    sdf_pre_processed = sdf_pre_processed.withColumn(col, psf.upper(psf.col(col)))

# Generate new DF with label encoding
# Keep null values
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index", handleInvalid="keep").fit(sdf_pre_processed) \
            for column in list(cat_cols)]
pipeline = Pipeline(stages=indexers)
sdf_pre_processed = pipeline.fit(sdf_pre_processed).transform(sdf_pre_processed)

# Drop cat cols & rename index cols
for col in cat_cols:
    sdf_pre_processed = (sdf_pre_processed
                         .drop(col)
                        .withColumnRenamed(col+"_index", col)
                        )
# checking if we didn't lose any field while preprocessing data (compare the intial and preprocessed data sets)
print(f"""
# len(pred_cols) = {len(pred_cols)}
# len(sdf_raw.columns) = {len(sdf_raw.columns)}
# len(set(sdf_raw.columns) & set(pred_cols) = {len(set(sdf_raw.columns) & set(pred_cols))}
""")

# Excluding pre_cols that have repeated values with frequency > 0.9999
total_rows = sdf_pre_processed.count()
const_cols = []
not_const_cols = []
for col in pred_cols:
    value_count = (sdf_pre_processed
            .groupby(col)
            .agg(psf.count(col).alias(col + "_count"))
            .orderBy(col + "_count", ascending=False)
            .take(1)
           )[0][1]
    if value_count/total_rows >= 0.9999:
        const_cols.append(col)
    else:
        not_const_cols.append(col)
print(f"Columns to drop: {const_cols}")

# Extract cols used for prediction and not having constant values (>99.99%)
IND_NAME = 'FLAG_CAL_CONTRACT_SIGNED_1M' # target
sdf_pre_processed = sdf_pre_processed.select(not_const_cols + [IND_NAME])
print(f"# len(sdf_pre_processed.columns) = {len(sdf_pre_processed.columns)}")

print("Check target column stats")
sdf_pre_processed.describe(IND_NAME).show()

# Shuffle zeros values of target attribute by cutting of 600,000 records
sdf_zeros_shuffle = sdf_pre_processed.filter(f"{IND_NAME} = 0").orderBy(psf.rand()).limit(600000)
sdf_ones = sdf_pre_processed.filter(f"{IND_NAME} = 1")
sdf_oversample = sdf_ones.union(sdf_zeros_shuffle)




# len(pred_cols) = 170
# len(sdf_raw.columns) = 191
# len(set(sdf_raw.columns) & set(pred_cols) = 170

Columns to drop: ['CA_OFFER_TYPE_CODE', 'CA_F_CLX_ELIGIBLE', 'TR_F_FIRST_TRAN_1M', 'CNT_WEB_APP_MONTHS_OFFER_TAB']
# len(sdf_pre_processed.columns) = 167
Check target column stats
+-------+---------------------------+
|summary|FLAG_CAL_CONTRACT_SIGNED_1M|
+-------+---------------------------+
|  count|                     100000|
|   mean|                    0.00625|
| stddev|        0.07880989538120262|
|    min|                          0|
|    max|                          1|
+-------+---------------------------+



# Process modelling

In [14]:
sdf_train, sdf_test = sdf_oversample.randomSplit([0.7, 0.3], seed=42)
print(f"""There are {sdf_train.count()} rows in the training set, and {sdf_test.count()} in the test set""")

There are 69928 rows in the training set, 
and 30072 in the test set


In [26]:
sdf_train.count()

69928

In [19]:
sdf_test.select("*").limit(5).toPandas().head()

Unnamed: 0,CA_LIMIT_OF,CA_MAXINST_OF,CA_RBP_OF,CA_PROD_PRI_GR,CA_TENOR_MAX,CA_TENOR_MIN,APL_F_CL_BOD01_1M,APL_F_CL_BOD02_1M,APL_F_CL_APPROVE_1M,APL_F_CL_SIGN_1M,...,PCB_CNT_HIST_NON_HOME_CONTRACT,PCB_CNT_CLX_LOAN,PCB_CNT_CLX_NON_HOME_LOAN,PCB_BALANCE_VOL_REMAINING,PCB_CNT_TENOR_REMAINING,PCB_AVG_SCORE,PMT_CNT_TOTAL,PMT_MAX_AMT,PMT_AVG_AMT,FLAG_CAL_CONTRACT_SIGNED_1M
0,6000000.0,2637600.0,2.0,5.0,6000000.0,5000000.0,-1.0,-1.0,-1.0,-1.0,...,6,1,1,1048835.0,1.0,0.0,10,971000.0,1021900.0,1
1,7818530.0,560880.0,1.0,5.0,-1.0,-1.0,-1.0,-1.0,-1.0,-1.0,...,1,0,0,9089000.0,6.0,0.0,18,936000.0,1325500.0,1
2,8186148.0,598000.0,2.0,0.0,20000000.0,5000000.0,-1.0,-1.0,-1.0,-1.0,...,0,0,0,-1.0,-1.0,0.0,6,299000.0,298833.0,1
3,15000000.0,1577100.0,2.0,0.0,20000000.0,5000000.0,-1.0,-1.0,-1.0,-1.0,...,0,0,0,-1.0,-1.0,-1.0,12,751000.0,745532.0,1
4,15000000.0,2205432.0,5.0,0.0,20000000.0,5000000.0,-1.0,-1.0,-1.0,-1.0,...,0,0,0,-1.0,-1.0,-1.0,8,604000.0,604000.0,1


In [98]:
#general splitting for train and validation datasets 
#(I don't use in CL task, instead of this I use crosstime validation - the code in below)
X_train, X_val, Y_train, Y_val = train_test_split( data_oversample.drop([IND_NAME],axis=1)
                                                    , data_oversample[IND_NAME]
                                                    , test_size=0.3, stratify=data_oversample[IND_NAME]
                                                    , random_state=7)

print ('Mean target: ', target.mean())
print ('Train', (X_train.shape, Y_train.mean(), Y_train.sum()))
print ('Valid', (X_val.shape, Y_val.mean(), Y_val.sum()))