#### Loading the packages and creating spark and sparkcontext objects

In [1]:
# Load the packages needed for this part
# create spark and sparkcontext objects
from pyspark.sql import SparkSession
import numpy as np

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

import pyspark
from pyspark.ml import feature, regression, Pipeline, evaluation
from pyspark.sql import functions as fn, Row
from pyspark import sql

from pyspark.ml.stat import Correlation 
from pyspark.ml.feature import VectorAssembler

import matplotlib.pyplot as plt
import pandas as pd

from pyspark.ml.stat import Correlation
from sklearn.preprocessing import Imputer

#### Loading the dataset

In [2]:
#Loading the dataset
df = spark.read.format("csv").option("header", "true").option("inferSchema","true").load("data_master.csv")

#### Excluding extra column added during spark dataframe creation

In [3]:
#Dropping indexed column '_c0', 'Unnamed: 0'
df = df.drop(fn.col('_c0')).drop(fn.col('Unnamed: 0'))

In [4]:
#Dataset dimension
print((df.count(), len(df.columns)))

(564400, 140)


#### Converting output variable into 0's & 1's

In [5]:
df= df.withColumn('loan_status', fn.when(fn.col('loan_status') == 'Default',0).otherwise(1))

#### Dropping features with Null value count greater than 100000

In [6]:
df = df.drop(fn.col('mths_since_last_delinq')).\
drop(fn.col('mths_since_last_record')).\
drop(fn.col('mths_since_last_major_derog')).\
drop(fn.col('mths_since_recent_bc_dlq')).\
drop(fn.col('mths_since_recent_revol_delinq')).\
drop(fn.col('deferral_term')).\
drop(fn.col('hardship_amount')).\
drop(fn.col('hardship_length')).\
drop(fn.col('hardship_dpd')).\
drop(fn.col('orig_projected_additional_accrued_interest')).\
drop(fn.col('hardship_payoff_balance_amount')).\
drop(fn.col('hardship_last_payment_amount')).\
drop(fn.col('settlement_amount')).\
drop(fn.col('settlement_percentage')).\
drop(fn.col('settlement_term')).\
drop(fn.col('annual_inc_joint')).\
drop(fn.col('next_pymnt_d')).\
drop(fn.col('hardship_type')).\
drop(fn.col('dti_joint')).\
drop(fn.col('verification_status_joint')).\
drop(fn.col('revol_bal_joint')).\
drop(fn.col('sec_app_earliest_cr_line')).\
drop(fn.col('sec_app_inq_last_6mths')).\
drop(fn.col('sec_app_mort_acc')).\
drop(fn.col('sec_app_open_acc')).\
drop(fn.col('sec_app_revol_util')).\
drop(fn.col('sec_app_open_act_il')).\
drop(fn.col('sec_app_num_rev_accts')).\
drop(fn.col('sec_app_chargeoff_within_12_mths')).\
drop(fn.col('sec_app_collections_12_mths_ex_med')).\
drop(fn.col('sec_app_mths_since_last_major_derog')).\
drop(fn.col('hardship_reason')).\
drop(fn.col('hardship_status')).\
drop(fn.col('hardship_start_date')).\
drop(fn.col('hardship_end_date')).\
drop(fn.col('payment_plan_start_date')).\
drop(fn.col('hardship_loan_status')).\
drop(fn.col('debt_settlement_flag_date')).\
drop(fn.col('settlement_status')).\
drop(fn.col('settlement_date'))

In [7]:
#Dataset dimension
print((df.count(), len(df.columns)))

(564400, 100)


#### Dropping na values across the dataset

In [8]:
df = df.na.drop()

In [9]:
#Dataset dimension
print((df.count(), len(df.columns)))

(404922, 100)


#### Dropping the feature policy_code as it has no unique value to contribute to the model

In [10]:
df = df.drop(fn.col('policy_code'))

#### Balancing the number of data points in the dataframe based on loan_status value
#### Here, we are trying to have equal number of data points for loan_status values of 0 and 1

In [11]:
df_zero = df.where('loan_status = 0')
df_one = df.where('loan_status = 1')

In [12]:
df_zero.count()

329912

In [13]:
df_one.count()

75010

In [14]:
df_zero_to_add, df_removed = df_zero.randomSplit([0.24, 0.76], seed=0)
df_new = df_one.union(df_zero_to_add)
df_new.groupBy("loan_status").count().show()

+-----------+-----+
|loan_status|count|
+-----------+-----+
|          1|75010|
|          0|79400|
+-----------+-----+



In [15]:
df = df_new

In [16]:
#Dataset dimension
print((df.count(), len(df.columns)))

(154410, 99)


#### Selecting columns of type 'string' as stringColumnList

In [17]:
stringColumnList = [item[0] for item in df.dtypes if item[1].startswith('string') or item[0]=='loan_status']

#### String Indexing the categorical variables and applying OneHotEncoding to convert string categorical features to numerics

In [18]:
categorical_columns = stringColumnList
string_indexer_models = []
one_hot_encoders = []
for col_name in categorical_columns:
    # OneHotEncoders map number indices column to column of binary vectors
    string_indexer_model = feature.StringIndexer(inputCol=col_name, outputCol="{0}_indexed".format(col_name)).fit(df)
    df = string_indexer_model.transform(df)
    string_indexer_models.append(string_indexer_model)
    
    one_hot_encoder = feature.OneHotEncoder(inputCol="{0}_indexed".format(col_name), outputCol="{0}_encoded".format(col_name), dropLast=False)
    df = one_hot_encoder.transform(df)
    one_hot_encoders.append(one_hot_encoder)

#### Creating Train, Validation and Test Splits

In [19]:
training_df, validation_df, testing_df = df.randomSplit([0.6, 0.3, 0.1], seed=0)

In [20]:
[training_df.count(), validation_df.count(), testing_df.count()]

[92575, 46167, 15668]

In [23]:
from pyspark.ml.classification import LogisticRegression

#### Logistic regression on all 99 features (numeric and encoded categorical) to identify logistic regression coefficients for feature selection 


In [24]:
va = feature.VectorAssembler(inputCols=['loan_amnt',
 'funded_amnt',
 'term',
 'emp_length',
 'delinq_2yrs',
 'inq_last_6mths',
 'open_acc',
 'pub_rec',
 'revol_bal',
 'total_acc',
 'collections_12_mths_ex_med',
 'acc_now_delinq',
 'tot_coll_amt',
 'tot_cur_bal',
 'open_acc_6m',
 'open_act_il',
 'open_il_12m',
 'open_il_24m',
 'total_bal_il',
 'open_rv_12m',
 'open_rv_24m',
 'max_bal_bc',
 'total_rev_hi_lim',
 'inq_fi',
 'total_cu_tl',
 'inq_last_12m',
 'acc_open_past_24mths',
 'chargeoff_within_12_mths',
 'delinq_amnt',
 'mo_sin_old_rev_tl_op',
 'mo_sin_rcnt_rev_tl_op',
 'mo_sin_rcnt_tl',
 'mort_acc',
 'num_accts_ever_120_pd',
 'num_actv_bc_tl',
 'num_actv_rev_tl',
 'num_bc_sats',
 'num_bc_tl',
 'num_il_tl',
 'num_op_rev_tl',
 'num_rev_accts',
 'num_rev_tl_bal_gt_0',
 'num_sats',
 'num_tl_30dpd',
 'num_tl_90g_dpd_24m',
 'num_tl_op_past_12m',
 'pub_rec_bankruptcies',
 'tax_liens',
 'tot_hi_cred_lim',
 'total_bal_ex_mort',
 'total_bc_limit',
 'total_il_high_credit_limit',
 'funded_amnt_inv',
 'int_rate',
 'installment',
 'annual_inc',
 'dti',
 'out_prncp',
 'out_prncp_inv',
 'total_pymnt',
 'total_pymnt_inv',
 'total_rec_prncp',
 'total_rec_int',
 'total_rec_late_fee',
 'recoveries',
 'collection_recovery_fee',
 'last_pymnt_amnt',
 'mths_since_rcnt_il',
 'il_util',
 'all_util',
 'avg_cur_bal',
 'bc_open_to_buy',
 'bc_util',
 'mo_sin_old_il_acct',
 'mths_since_recent_bc',
 'mths_since_recent_inq',
 'num_tl_120dpd_2m',
 'pct_tl_nvr_dlq',
 'percent_bc_gt_75',
 'grade_encoded',
 'sub_grade_encoded',
 'emp_title_encoded',
 'home_ownership_encoded',
 'verification_status_encoded',
 'issue_d_encoded',
 'pymnt_plan_encoded',
 'purpose_encoded',
 'title_encoded',
 'zip_code_encoded',
 'addr_state_encoded',
 'earliest_cr_line_encoded',
 'revol_util_encoded',
 'initial_list_status_encoded',
 'last_pymnt_d_encoded',
 'last_credit_pull_d_encoded',
 'hardship_flag_encoded',
 'disbursement_method_encoded',
 'debt_settlement_flag_encoded'] 
 ,outputCol='features')

In [25]:
lr = LogisticRegression().setFeaturesCol('features').setLabelCol('loan_status')

In [26]:
pipe = Pipeline(stages=[va, lr])
pipe_model = pipe.fit(training_df)

In [27]:
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='loan_status')

In [28]:
evaluator.evaluate(pipe_model.transform(validation_df))#model implemented to extract coefficients

0.9962739509190175

In [29]:
pipe_model.stages[1].coefficients

DenseVector([-0.0026, -0.0026, -0.9218, 0.4505, -0.2581, -1.1329, -0.1354, -1.0207, 0.0, 0.1077, -10.0, 2.4051, 0.0001, -0.0, -1.0324, -0.2681, 0.1455, 0.3267, 0.0, 0.6821, -0.0564, -0.0003, -0.0, 0.8845, -1.0449, -0.2014, 0.0265, -3.1725, -0.0002, -0.0097, -0.0006, 0.059, 0.4172, 0.8444, -0.1605, -0.6437, 0.2797, 0.3422, 0.1296, -0.051, 0.1235, -0.6181, -0.1355, 12.6935, 1.4496, 0.6127, 2.3967, -4.2165, -0.0, 0.0, -0.0001, -0.0, -0.0026, -0.2822, -0.0835, -0.0, -0.2915, -0.0063, -0.0063, 0.0046, 0.0046, 0.0058, -0.0145, -0.7508, -0.1196, -0.6573, 0.0066, -0.0415, 0.0586, 0.0155, -0.0, -0.0, -0.0123, -0.0166, 0.0002, 0.2818, -35.9113, 0.1485, 0.0207, 0.0369, 0.0146, 2.1819, -1.7504, 0.2662, -9.7999, -10.3316, 4.2421, -2.7927, -8.7207, 0.9734, 0.3402, 0.4959, 3.055, 6.7695, 2.1554, -5.7466, 1.5449, 7.763, -1.6195, -1.6701, 2.8494, 0.6924, -0.6795, -6.513, -0.8872, 0.8771, 2.5734, 2.7187, 10.7152, -4.7457, -12.4215, -16.4849, 5.905, 16.5588, 20.3215, -22.3088, -27.2866, 15.8183, -49.145,

#### As the features based on correlation have very less logistic regression coefficients, it was conculded that the impact of below features is higher on 'loan_status' as compared to featues selected based on correlation

#### Please note that during poster we had eliminated a few rows by taking subset, but for final submission we have considered the entire clean dataset i.e. instead of around 100,000 rows,we have considered around 150,000 by balancing the number of o and 1 loan_status values this change may affect the AUC of models along with coefficients of logistic regression and weights of RF model

In [30]:
#based on coefficients -- below are the numeric features with higher impact than others()
# the categorical based on understood impact
va = feature.VectorAssembler(inputCols=['num_tl_30dpd',
 'delinq_2yrs',
 'acc_now_delinq', 
 'num_tl_120dpd_2m',
 'collections_12_mths_ex_med',
 'chargeoff_within_12_mths',
 'mort_acc',
 'num_tl_90g_dpd_24m',
 'pub_rec_bankruptcies',
 'sub_grade_encoded',
 'home_ownership_encoded',
 'verification_status_encoded',
 'purpose_encoded',
 'revol_util_encoded'] 
 ,outputCol='features')

In [31]:
lr = LogisticRegression().setFeaturesCol('features').setLabelCol('loan_status')
pipe = Pipeline(stages=[va, lr])
pipe_model = pipe.fit(training_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='loan_status')
evaluator.evaluate(pipe_model.transform(validation_df))#model 1

0.5732141161366605

#### Implementing logistic regression grid with differnet eleastic new parameters and regularization to improve AUC

In [32]:
lambda_par = 0.02
alpha_par = 0.3
lr = LogisticRegression().\
        setLabelCol('loan_status').\
        setFeaturesCol('features').\
        setRegParam(lambda_par).\
        setMaxIter(100).\
        setElasticNetParam(alpha_par)

In [33]:
pipe = Pipeline(stages=[va, lr])#models 2,3,4,5

In [34]:
from pyspark.ml.tuning import ParamGridBuilder

In [35]:
grid = ParamGridBuilder().\
    addGrid(lr.regParam, [0., 0.02]).\
    addGrid(lr.elasticNetParam, [0.4, 0.8]).\
    build()

In [36]:
all_models = []
for j in range(len(grid)):
    print("Fitting model {}".format(j+1))
    model = pipe.fit(training_df, grid[j])
    all_models.append(model)

Fitting model 1
Fitting model 2
Fitting model 3
Fitting model 4


In [39]:
AUCs = [evaluator.evaluate(m.transform(validation_df)) for m in all_models]

In [41]:
grid

[{Param(parent='LogisticRegression_9392368e22ed', name='regParam', doc='regularization parameter (>= 0).'): 0.0,
  Param(parent='LogisticRegression_9392368e22ed', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.4},
 {Param(parent='LogisticRegression_9392368e22ed', name='regParam', doc='regularization parameter (>= 0).'): 0.0,
  Param(parent='LogisticRegression_9392368e22ed', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.8},
 {Param(parent='LogisticRegression_9392368e22ed', name='regParam', doc='regularization parameter (>= 0).'): 0.02,
  Param(parent='LogisticRegression_9392368e22ed', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.4},
 {P

In [42]:
AUCs

[0.5732141161366606,
 0.5732141161366601,
 0.5598480537513904,
 0.5545736417288462]

#### Random Forest Classifier implementation based on correlation values and intuitive feature selection

In [44]:
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml import Pipeline

In [45]:
rf = RandomForestClassifier().setFeaturesCol('features').setLabelCol('loan_status')
rf_pipeline = Pipeline(stages=[va, rf]).fit(training_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='loan_status')
evaluator.evaluate(rf_pipeline.transform(validation_df))#model 6 as per poster #AUC

0.5571845219124684

In [46]:
rf_model = rf_pipeline.stages[-1]

In [47]:
inputCols=['num_tl_30dpd',
 'delinq_2yrs',
 'acc_now_delinq', 
 'num_tl_120dpd_2m',
 'collections_12_mths_ex_med',
 'chargeoff_within_12_mths',
 'mort_acc',
 'num_tl_90g_dpd_24m',
 'pub_rec_bankruptcies',
 'sub_grade_encoded',
 'home_ownership_encoded',
 'verification_status_encoded',
 'purpose_encoded',
 'revol_util_encoded']

In [48]:
pd.DataFrame(list(zip(inputCols, rf_model.featureImportances.toArray())),
            columns = ['column', 'weight']).sort_values('weight', ascending = False)
#Feature importance of encoded features is zero as they have multiple sets

Unnamed: 0,column,weight
6,mort_acc,0.122471
8,pub_rec_bankruptcies,0.085555
13,revol_util_encoded,0.003816
10,home_ownership_encoded,0.002303
5,chargeoff_within_12_mths,0.001322
1,delinq_2yrs,0.00097
7,num_tl_90g_dpd_24m,0.000802
12,purpose_encoded,8.4e-05
0,num_tl_30dpd,0.0
2,acc_now_delinq,0.0


#### Since AUC of basic logistic regression i.e. model 1 is the best , 
#### we conclude that model 1 is the best model to classify loan_status as Default or Fully_Paid

In [49]:
best_model = LogisticRegression().setFeaturesCol('features').setLabelCol('loan_status')
pipe_best = Pipeline(stages=[va, best_model])
pipe_model_best = pipe_best.fit(training_df)
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='loan_status')
best_model_validate = evaluator.evaluate(pipe_model_best.transform(validation_df))#AUC of best model on validation df

In [50]:
print("Best Model (Validation)", best_model_validate)

Best Model (Validation) 0.5732141161366604


In [51]:
best_model_test = evaluator.evaluate(pipe_model_best.transform(testing_df))#test the model of testing dataset

In [52]:
print("Best Model (Testing): " ,best_model_test)

Best Model (Testing):  0.5635995497582571


#### read the predictions of tested dataset

In [53]:
predictions = pipe_model_best.transform(testing_df)#read the predictions of tested dataset

In [54]:
evaluator.evaluate(predictions)

0.5635995497582571

#### Thus the AUC of the best model is 0.5635995497582571