In [1]:
# Load the packages needed for this part
# create spark and sparkcontext objects
from pyspark.sql import SparkSession
import numpy as np

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

import pyspark
from pyspark.ml import feature, regression, Pipeline, classification, pipeline, evaluation
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import functions as fn, Row
from pyspark.sql.functions import when, regexp_extract, col
from pyspark import sql
from pyspark.sql.functions import *

from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorIndexer

import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import sys

In [2]:
#Genrating system version
sys.version

In [3]:
#Reading the csv file into spark 
pd.set_option('display.max_columns', 500)
loan_df = spark.read.csv('/FileStore/tables/loan_default.csv', header=True, inferSchema=True)

In [4]:
#Creating a copy of the original dataframe
loan_copy_df = loan_df
loan_copy_df = loan_copy_df
loan_copy_df.toPandas().head()

#Exploratory Data Analysis

In [6]:
#Checking for null values
loan_copy_df.select([count(when(isnan(c), c)).alias(c) for c in loan_copy_df.columns]).toPandas().head()

In [7]:
#Getting the count of each categorical variables
loan_copy_df.groupBy('year').count().show()
loan_copy_df.groupBy('home_ownership', 'home_ownership_cat').count().sort('home_ownership_cat').show()
loan_copy_df.groupBy('income_category', 'income_cat').count().sort('income_cat').show()
loan_copy_df.groupBy('interest_payments', 'interest_payment_cat').count().sort('interest_payment_cat').show()
loan_copy_df.groupBy('term', 'term_cat').count().sort('term_cat').show()
loan_copy_df.groupBy('application_type', 'application_type_cat').count().sort('application_type_cat').show()
loan_copy_df.groupBy('purpose', 'purpose_cat').count().sort('purpose_cat').show()
loan_copy_df.groupBy('grade', 'grade_cat').count().sort('grade_cat').show()

In [8]:
#List of Column names present in the dataframe and their types
loan_copy_df.dtypes

In [9]:
#Descriptive Statistics applied on our Loan Dataset
loan_copy_df.toPandas().describe()

In [10]:
loan_copy_df.toPandas().head()

In [11]:
#Renaming Target variable i.e. Loan_Condition_Cat to Default
loan_copy_df = loan_copy_df.withColumnRenamed('loan_condition_cat', 'default')

In [12]:
#Correlation Matrix of the numerical orders of all the categorical variables 
plt.figure(figsize=(18, 12))
corr = loan_copy_df.toPandas().corr()
ax = sns.heatmap(corr, cmap="YlGnBu", annot=True)
ax.set_xticklabels(
    ax.get_xticklabels(),
    rotation=45,
    horizontalalignment='right'
);
display(ax.figure)

In [13]:
#Plotting loan applications per year
plt.figure(figsize=(12,6))
ax_yearCat = sns.countplot(x='year',data=loan_copy_df.toPandas())
ax_yearCat.set_title('Loan applicants per Year')
ax_yearCat.set_ylabel('Count')
ax_yearCat.set_xlabel('Year')
display(ax_yearCat.figure)

In [14]:
#Visualizing the categorical variables
#Counting number of defaulters in income categories
plt.figure(figsize=(12,6))
ax_incomeCat = sns.countplot(x='income_category',data=loan_copy_df.toPandas(), hue='default')

ax_incomeCat.set_title('Loan defaults based on income')
ax_incomeCat.set_ylabel('Count')
ax_incomeCat.set_xlabel('Income Category')
display(ax_incomeCat.figure)
#A comparatively larger number of low income people have defaulted their loans

In [15]:
#counting the number of defaulters across different purpose categories
plt.figure(figsize=(20,7))
ax_purposeCat = sns.countplot(x='purpose', data=loan_copy_df.toPandas(), hue='default')
ax_purposeCat.set_xticklabels(ax_purposeCat.get_xticklabels(), rotation=20)
ax_purposeCat.set_title('Purpose for applying Loan')
ax_purposeCat.set_ylabel('Count')
ax_purposeCat.set_xlabel('Purpose')

display(ax_purposeCat.figure)

#A comparatively larger number of purpose category 6 which is debt consolidation people have defaulted their loans

In [16]:
#counting the number of defaulters across different term categories
plt.figure(figsize=(7,4))
ax_termCat = sns.countplot(x='term', data=loan_copy_df.toPandas(), hue='default')
ax_termCat.set_title('Loan Duration')
ax_termCat.set_ylabel('Count')
ax_termCat.set_xlabel('Terms')
display(ax_termCat.figure)

#A comparatively larger number of 36 month term people have defaulted their loans

In [17]:
#Relationship between fairly correlated features from the correlation chart
#loan amount and installments they are 94% correlated
loan_inst = loan_copy_df.toPandas().plot.scatter(x = 'loan_amount', y = 'installment', color = 'DarkBlue')
loan_inst.title.set_text('Loan Amount Vs Installment')

display(loan_inst.figure)

In [18]:
#Plotting scatter graph of Installment vs Grade 
loan_purpose = loan_copy_df.toPandas().plot.scatter(x = 'grade_cat', y = 'interest_rate', color = 'Green')
loan_purpose.title.set_text('Installment Rate Vs Grade')
display(loan_purpose.figure)

In [19]:
#The regression line plot of Grade category and interest rate
loan_grade = sns.regplot(x='grade_cat', y='interest_rate', data=loan_copy_df.toPandas())
display(loan_grade.figure)

In [20]:
#detecting outliers with annual income and defaulters
plt.figure(figsize=(5,5))
sns.set_style("whitegrid")
annual_inc_box = sns.boxplot(x='default', y='annual_inc', data=loan_copy_df.toPandas())
annual_inc_box.set_title('Default based on Annual Income ')

annual_inc_box.set(ylim=(0, 500000))
display(annual_inc_box.figure)

In [21]:
#detecting outliers with defaulters and installment
plt.figure(figsize=(5,5))
sns.set_style("whitegrid")
loan_box = sns.boxplot(x='default', y='installment', data=loan_copy_df.toPandas())
loan_box.set_title('Default based on Monthly Installment ')
#loan_box.set(ylim=(0, 100000))
display(loan_box.figure)

In [22]:
#detecting outliers with purpose and installment
plt.figure(figsize=(6,5))
sns.set_style("whitegrid")
purpose_box = sns.boxplot(x='purpose', y='installment', data=loan_copy_df.toPandas())
purpose_box.set_xticklabels(ax_purposeCat.get_xticklabels(), rotation=35)
purpose_box.set_title('Purpose for applying Loan')
purpose_box.set_ylabel('Installments')
purpose_box.set_xlabel('Purpose')

display(purpose_box.figure)

In [23]:
#Filtering out bad loans to analyze
bad_loans = loan_copy_df.toPandas().loc[loan_copy_df.toPandas().default == 1]

In [24]:
#Filtering out good loans to analyze
good_loans = loan_copy_df.toPandas().loc[loan_copy_df.toPandas().default == 0]

In [25]:
bad_loans.head()

In [26]:
#detecting outliers with home purpose and annual_income in bad_loans
plt.figure(figsize=(7,7))
sns.set_style("whitegrid")
home_ann_box = sns.boxplot(x='purpose', y='loan_amount', data=bad_loans)
home_ann_box.set(ylim=(0, 50000))
display(home_ann_box.figure)

In [27]:
#detecting outliers with home purpose and annual_income in good_loans
plt.figure(figsize=(10,7))
sns.set_style("whitegrid")
home_good_box = sns.boxplot(x='purpose', y='annual_inc', data=good_loans)
home_good_box.set(ylim=(0, 500000))
display(home_good_box.figure)

In [28]:
#Creating a pandas dataframe with only the bad loans
data = bad_loans['home_ownership'].value_counts()
Bad_loans_home_sum = pd.DataFrame(data)
Bad_loans_home_sum.reset_index(level = 0, inplace=True)

In [29]:
# Creating a pie diagram of categories with the bad loans
# Create a list of colors (from iWantHue)
colors = ["#E13F29", "#D69A80", "#D63B59", "#AE5552", "#CB5C3B", "#EB8076", "#96624E", "#96624F", "#96524E"]

# Create a pie chart
fig = plt.figure(figsize=(4,4))
plt.pie(
    # using data total)arrests
    Bad_loans_home_sum['home_ownership'],
    # with the labels being officer names
    labels=Bad_loans_home_sum['index'],
    # with no shadows
    shadow=True,
    # with colors
    colors=colors,
    # with one slide exploded out
    explode=(0, 0, 0.15, 0, 0),
    # with the start angle at 90%
    startangle=90,
    # with the percent listed as a fraction
    autopct='%1.1f%%',
    )

# View the plot drop above
plt.axis('equal')

# View the plot
plt.tight_layout()
plt.title('Types of bad loan home owners')
display(fig)

In [30]:
#Creating a pandas dataframe with only the good loans
data = good_loans['home_ownership'].value_counts()
good_loans_home_sum = pd.DataFrame(data)
good_loans_home_sum.reset_index(level = 0, inplace=True)
good_loans_home_sum

In [31]:
# Creating a pie chart of categories with only the good loans
# Create a list of colors (from iWantHue)
colors = ["#E13F29", "#D69A80", "#D63B59", "#AE5552", "#CB5C3B", "#EB8076", "#96624E", "#96624F", "#96524E"]

# Create a pie chart
fig_good = plt.figure(figsize=(4,4))
plt.pie(
    # using data total)arrests
    good_loans_home_sum['home_ownership'],
    # with the labels being officer names
    labels=good_loans_home_sum['index'],
    # with no shadows
    shadow=True,
    # with colors
    colors=colors,
    # with one slide exploded out
    explode=(0, 0, 0.15, 0, 0, 0),
    # with the start angle at 90%
    startangle=90,
    # with the percent listed as a fraction
    autopct='%1.1f%%',
    )

# View the plot drop above
plt.axis('equal')

# View the plot
plt.tight_layout()
plt.title('Types of good loan home owners')
display(fig_good)

In [32]:
#Plotting counts of different categories in Grade
plt.figure(figsize = (6,6))
sns.countplot(x="grade",data=loan_copy_df.toPandas(), palette= "YlGnBu")
plt.xticks(rotation=10)
plt.title("Grade", fontsize=20)
plt.xlabel("Grade", fontsize=10)
plt.ylabel("Number of Loans", fontsize=20)
display(plt.draw())

In [33]:
#Counting the different loan conditions according to their issue dates
fig, ax = plt.subplots(figsize=(7,5))
loan_copy_df.toPandas().groupby(['issue_d']).count()['loan_condition'].plot(ax=ax)
display(fig)

In [34]:
#Creating a pie chart of percentages of good loan and bad loans
f, ax = plt.subplots(figsize=(12,8))

colors = ["#3791D7", "#D72626"]
labels ="Good Loan", "Bad Loan"

plt.suptitle('Loan Condition', fontsize=20)
plt.axis('off')

loan_copy_df.toPandas().loan_condition.value_counts().plot.pie(explode=[0,0.25], autopct='%1.2f%%', shadow=True, colors=colors, 
                                             labels=labels, fontsize=12, startangle=70)
display(f)

In [35]:
#Detecting outliers through violin and box plots
fig, ((ax1), (ax2))= plt.subplots(nrows=1, ncols=2, figsize=(14,6))

sns.violinplot(x="grade", y="loan_amount", data=loan_copy_df.toPandas(), palette="Set2", ax=ax1)
sns.boxplot(x="grade", y="total_pymnt", data=loan_copy_df.toPandas(), palette="Set2", ax=ax2)
display(fig)

#Feature Engineering

In [37]:
#Creating dummy variables for all the categorical ordinal variables
dummy = loan_copy_df.toPandas()
dummy_loanCondition = pd.get_dummies(dummy['loan_condition'])
dummy_loanCondition = pd.concat([dummy, dummy_loanCondition], axis = 1)

dummy_gradeCat = dummy_loanCondition
dummy_gradeCat = pd.get_dummies(dummy_gradeCat['grade'])
dummy_gradeCat = pd.concat([dummy_loanCondition, dummy_gradeCat], axis = 1)

dummy_homeOwnCat = dummy_gradeCat
dummy_homeOwnCat = pd.get_dummies(dummy_homeOwnCat['home_ownership'])
dummy_homeOwnCat = pd.concat([dummy_gradeCat, dummy_homeOwnCat], axis = 1)

dummy_incomeCat = dummy_homeOwnCat
dummy_incomeCat = pd.get_dummies(dummy_incomeCat['income_category'])
dummy_incomeCat = pd.concat([dummy_homeOwnCat, dummy_incomeCat], axis = 1)

dummy_purposeCat = dummy_incomeCat
dummy_purposeCat = pd.get_dummies(dummy_purposeCat['purpose'])
dummy_purposeCat = pd.concat([dummy_incomeCat, dummy_purposeCat], axis = 1)

dummy_termCat = dummy_purposeCat
dummy_termCat = pd.get_dummies(dummy_termCat['term_cat'])
dummy_termCat = pd.concat([dummy_purposeCat, dummy_termCat], axis = 1)

loan_dummy_df = dummy_termCat

#loan_dummy_df = loan_dummy_df.drop(['Good Loan', 'Bad Loan', 'ANY', '3', 'other', '60 months'], axis=1)

In [38]:
#dropping the reference variables of the categorical dummies
loan_dummy_df_final = loan_dummy_df.drop(['G', 'Low', 'Good Loan', 'Bad Loan', 'ANY', 'other'], axis = 1)
loan_dummy_df_final1 = loan_dummy_df_final.drop(2, axis = 1)

In [39]:
#Rounding the interest rate
loan_dummy_df_final1['interest_rate'] = loan_dummy_df_final1['interest_rate'].round(0)

In [40]:
#Creating a spark dataframe with the dataset containing dummies and preapring for linear regression
loan_linear_reg = spark.createDataFrame(loan_dummy_df_final1)

In [41]:
loan_dummy_df_final1.head()

In [42]:
#Creating a columns list for multiple logistic regressin and casting them as integer data type
columns_list = ['loan_amount','default','dti','total_pymnt','total_rec_prncp','recoveries','installment','interest_rate','A','B','C','D','E','F','MORTGAGE','NONE','OTHER','OWN','RENT','High','Medium','car','credit_card','debt_consolidation','educational','home_improvement','house','major_purchase','medical','moving','renewable_energy','small_business','vacation','wedding','1']

for col in columns_list:
  loan_linear_reg = loan_linear_reg.withColumn(col, loan_linear_reg[col].cast('Integer'))

#Model Creation

In [44]:
#Splitting the dataset 
training_df, validation_df, testing_df = loan_linear_reg.randomSplit([0.6, 0.3, 0.1], seed=100)

In [45]:
#Importing pyspark libraries for machine learning
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

##Building a classification model using Logistic Regression

In [47]:
#Intial steps of performing balancing ratio for remedies of class imbalance
#Retreiving the ratio of good loans and bad loans separately out of the total datasize
dataset_size=float(training_df.select("default").count())
numPositives=training_df.select("default").where('default == 0').count()
per_ones=(float(numPositives)/float(dataset_size))*100
numNegatives=float(dataset_size-numPositives)
print('The number of ones are {}'.format(numPositives))
print('Percentage of ones are {}'.format(per_ones))

In [48]:
#Balancing Ratio continued: 
BalancingRatio= numNegatives/dataset_size
print('BalancingRatio = {}'.format(BalancingRatio))

In [49]:
#Assigning class weights
training_df=training_df.withColumn("classWeights", fn.when(training_df.default == 0,BalancingRatio).otherwise(1-BalancingRatio))
training_df.select("classWeights").show(5)

In [50]:
#Including a set of column for logistic regression
cols = ['total_pymnt','loan_amount', 'dti', 'installment', 'total_rec_prncp', 'interest_rate', 'A', 'B', 'C', 'D', 'E', 'F', 'MORTGAGE','NONE','OTHER','OWN','RENT', 'car','credit_card','debt_consolidation','educational','home_improvement','house','major_purchase','medical','moving','renewable_energy','small_business','vacation','wedding','1']

#Creating VectorAssembler, StandardScalar and logistic regression model
va = VectorAssembler(inputCols=cols, outputCol='features')
sc = StandardScaler(withMean=True, withStd=False, inputCol='features', outputCol='std_features')
lr = LogisticRegression().setLabelCol('default').setFeaturesCol('std_features').setWeightCol('classWeights')

#Creaging a pipeline of the tasks above and fitting the model
lr_Model = Pipeline(stages=[va,sc, lr]).fit(training_df)
lr_prediction = lr_Model.transform(testing_df)

In [51]:
#Understanding the prediction outcome
lr_prediction.select("prediction", "default", "features").show()

In [52]:
#Accuracy of Logistic Regression
evaluator = BinaryClassificationEvaluator(labelCol = 'default')
lr_accuracy = evaluator.evaluate(lr_Model.transform(validation_df))
print("Accuracy of LogisticRegression is = %g"% (lr_accuracy))
print("Test Error of LogisticRegression = %g " % (1.0 - lr_accuracy))

In [53]:
#Accuracy of Logistic Regression on the testing Dataframe
AUC_test = evaluator.evaluate(lr_Model.transform(testing_df))
print(AUC_test)

In [54]:
#Plotting the ROC curve of accuracy
trainingSummary = lr_Model.stages[-1].summary
roc = trainingSummary.roc.toPandas()
plt.plot(roc['FPR'],roc['TPR'])
plt.ylabel('False Positive Rate')
plt.xlabel('True Positive Rate')
plt.title('ROC Curve')
a = plt.show()
display(a)

In [55]:
print('Training set areaUnderROC: ' + str(trainingSummary.areaUnderROC))

In [56]:
#Plotting the Recall vs Precision graph (Visualizing confusion matrix)
pr = trainingSummary.pr.toPandas()
plt.plot(pr['recall'],pr['precision'])
plt.ylabel('Precision')
plt.xlabel('Recall')
b = plt.show()
display(b)

In [57]:
#Creating the confusion matrix
tp = lr_prediction[(lr_prediction.default == 0) & (lr_prediction.prediction == 0)].count()
tn = lr_prediction[(lr_prediction.default == 1) & (lr_prediction.prediction == 1)].count()
fp = lr_prediction[(lr_prediction.default == 1) & (lr_prediction.prediction == 0)].count()
fn = lr_prediction[(lr_prediction.default == 0) & (lr_prediction.prediction == 1)].count()
print("True Positives:", tp)
print("True Negatives:", tn)
print("False Positives:", fp)
print("False Negatives:", fn)

In [58]:
#Plotting the confusion matrix
array = [[tp,fn],
     [fp,tn]]        
df_cm = pd.DataFrame(array, range(2),
                  range(2))
plt.figure(figsize = (10,7))
sns.set(font_scale=1.2)#for label size
sns.heatmap(df_cm, annot=True, annot_kws={"size": 10}, cmap='YlGnBu', fmt='g')
plt.title('Confusion Matrix for Logistic Regression \n ')
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.gca().invert_xaxis()
plt.gca().invert_yaxis()
display()

In [59]:
#Extracting the feature importances of the logistic regression model and sorting the variables from highest to lowest order of importance
lr_feature = pd.DataFrame(list(zip(training_df.toPandas()[cols], lr_Model.stages[-1].coefficients.toArray())),
            columns = ['column', 'weight']).sort_values('weight')

##Hyper Parameter Tuning

In [61]:
# Create ParamGrid for Cross Validation (Grid search and evaluator)
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid_lr = (ParamGridBuilder()
                .addGrid(lr.regParam, [0.05, 0.01, 0.2])
             .addGrid(lr.elasticNetParam, [0.3, 0.1])
             .build())

In [62]:
from time import *
start_time = time()

# Create 3-fold CrossValidator
cv_lr = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid_lr,
                    evaluator=evaluator, numFolds=3)

# Run cross validations
cvModel = Pipeline(stages=[va, sc, cv_lr]).fit(training_df)
# likely take a fair amount of time
end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

In [63]:
#Extracting the best model 
bestModel = cvModel.stages[-1].bestModel
bestModel.extractParamMap()

In [64]:
#Fitting the best model on the training data
reg_lr_Model = Pipeline(stages=[va, sc, bestModel]).fit(training_df)
reg_lr_prediction = reg_lr_Model.transform(testing_df)
reg_lr_prediction.select("prediction", "default", "features").show()

In [65]:
#Accuracy of Logistic Regression
evaluator = BinaryClassificationEvaluator(labelCol = 'default')
reg_lr_accuracy = evaluator.evaluate(reg_lr_Model.transform(validation_df))
print("Accuracy of LogisticRegression is = %g"% (reg_lr_accuracy))
print("Test Error of LogisticRegression = %g " % (1.0 - reg_lr_accuracy))

In [66]:
#Plotting the ROC curve of the best model
trainingSummary = reg_lr_Model.stages[-1].summary
roc = trainingSummary.roc.toPandas()
plt.plot(roc['FPR'],roc['TPR'])
plt.ylabel('False Positive Rate')
plt.xlabel('True Positive Rate')
plt.title('ROC Curve')
a = plt.show()
display(a)

In [67]:
#Plotting the recall and precision graph 
pr = trainingSummary.pr.toPandas()
plt.plot(pr['recall'],pr['precision'])
plt.ylabel('Precision')
plt.xlabel('Recall')
b = plt.show()
display(b)

In [68]:
#Extracting the feature importances by the best model
lr_best_feature = pd.DataFrame(list(zip(training_df.toPandas()[cols], reg_lr_Model.stages[-1].coefficients.toArray())),
            columns = ['column', 'weight']).sort_values('weight', ascending=False)

In [69]:
lr_best_feature

In [70]:
#Creating the confusion matrix for visualization
tp = reg_lr_prediction[(reg_lr_prediction.default == 0) & (reg_lr_prediction.prediction == 0)].count()
tn = reg_lr_prediction[(reg_lr_prediction.default == 1) & (reg_lr_prediction.prediction == 1)].count()
fp = reg_lr_prediction[(reg_lr_prediction.default == 1) & (reg_lr_prediction.prediction == 0)].count()
fn = reg_lr_prediction[(reg_lr_prediction.default == 0) & (reg_lr_prediction.prediction == 1)].count()
print("True Positives:", tp)
print("True Negatives:", tn)
print("False Positives:", fp)
print("False Negatives:", fn)

In [71]:
#Visualizing the confusion matrix
array = [[tp,fn],
     [fp,tn]]        
df_cm = pd.DataFrame(array, range(2),
                  range(2))
plt.figure(figsize = (10,7))
sns.set(font_scale=1.2)#for label size
sns.heatmap(df_cm, annot=True, annot_kws={"size": 10}, cmap='YlGnBu', fmt='g')
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.gca().invert_xaxis()
plt.gca().invert_yaxis()
display()

##Building a classification model using Decision Tree

### Under Sampling the dataset due to highy imbalanced classes

In [74]:
#Undersampling the dataset with good loans to take care of the Imbalanced dataset
good_loans = loan_linear_reg.filter(loan_linear_reg.default == 0)
bad_loans = loan_linear_reg.filter(loan_linear_reg.default == 1)
sampleRatio = bad_loans.count() / loan_linear_reg.count()
good_loansSampleDf = good_loans.sample(False, sampleRatio)
loan_linear_reg1 = bad_loans.unionAll(good_loansSampleDf)

In [75]:
loan_linear_reg1.groupby('default').count().show()

In [76]:
#Splitting the dataset 
training_df1, validation_df1, testing_df1 = loan_linear_reg1.randomSplit([0.6, 0.3, 0.1], seed=100)

In [77]:
#Creating tasks for decision tree, VectorAssembler, Standard Scalar and Decision Tree
vaD = VectorAssembler(inputCols=['total_pymnt','loan_amount','installment','total_rec_prncp','interest_rate','A', 'B', 'C','D','E','F', 'MORTGAGE','NONE','OTHER','OWN','RENT','car','dti','credit_card','debt_consolidation','educational','home_improvement','house','major_purchase','medical','moving','renewable_energy','small_business','vacation','wedding','1'], outputCol='features')
sc = StandardScaler(withMean=True, withStd=False, inputCol='features', outputCol='std_features')
dt = DecisionTreeClassifier(featuresCol='std_features', labelCol='default')

#Creating a pipeline for the above tasks
dt_model = Pipeline(stages=[vaD, sc, dt]).fit(training_df1)

In [78]:
#Evaluating the default model of Decision tree created above
evaluator = BinaryClassificationEvaluator(labelCol="default", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
validation_accuracy = evaluator.evaluate(dt_model.transform(validation_df1))
print("Validation Accuracy = %g " % (validation_accuracy))
print("Validation Error = %g " % (1.0 - validation_accuracy))

In [79]:
#Extracting the tree and understanding it by visualizing
treeModel = dt_model.stages[-1]
print(treeModel)

In [80]:
print(treeModel.toDebugString)

In [81]:
display(dt_model.stages[-1])

In [82]:
#Understanding the prediction of the default model
dt_prediction = dt_model.transform(testing_df1)
dt_prediction.select("prediction", "default", "features").show()

In [83]:
#Creating the confusion matrix of the default model for visualization 
tp = dt_prediction[(dt_prediction.default == 0) & (dt_prediction.prediction == 0)].count()
tn = dt_prediction[(dt_prediction.default == 1) & (dt_prediction.prediction == 1)].count()
fp = dt_prediction[(dt_prediction.default == 1) & (dt_prediction.prediction == 0)].count()
fn = dt_prediction[(dt_prediction.default == 0) & (dt_prediction.prediction == 1)].count()
print("True Positives:", tp)
print("True Negatives:", tn)
print("False Positives:", fp)
print("False Negatives:", fn)

In [84]:
#Creating the visualization graph of the confusion matrix
array = [[tp,fn],
     [fp,tn]]        
df_cm = pd.DataFrame(array, range(2), range(2))
plt.figure(figsize = (10,7))
sns.set(font_scale=1.2)#for label size
sns.heatmap(df_cm, annot=True, annot_kws={"size": 10}, cmap='YlGnBu', fmt='g')
plt.title('Confusion Matrix for Decision Tree - Default model using Under Sampling \n ')
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.gca().invert_xaxis()
plt.gca().invert_yaxis()
display()

## Hyper Parameter Tuning

In [86]:
# Create ParamGrid for Cross Validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid_dt = (ParamGridBuilder()
                .addGrid(dt.maxDepth, [20])
                .addGrid(dt.maxBins, [50, 60, 70])
             .build())

In [87]:
from time import *
start_time = time()

evaluatorPR = BinaryClassificationEvaluator(labelCol = "default", metricName = "areaUnderROC")
# Create 3-fold CrossValidator
cv_dt = CrossValidator(estimator=dt,
                    estimatorParamMaps=paramGrid_dt,
                    evaluator=evaluatorPR, numFolds=3)

# Run cross validations
cvModel = Pipeline(stages=[vaD, sc, cv_dt]).fit(training_df1)
# likely take a fair amount of time
end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

In [88]:
#extracting the best model from the grid search results
dt_bestModel = cvModel.stages[-1].bestModel
dt_bestModel.extractParamMap()

In [89]:
#Creating a pipeline with the default tasks and best model and finally fitting the model on training dataset
tuned_dt_Model = Pipeline(stages=[vaD, sc, dt_bestModel]).fit(training_df1)
tuned_dt_prediction = tuned_dt_Model.transform(testing_df1)
tuned_dt_prediction.select("prediction", "default", "features").show()

In [90]:
#Evaluating the best model with Binary Classification evaluator
evaluator = BinaryClassificationEvaluator(labelCol="default", metricName="areaUnderPR")
validation_accuracy = evaluator.evaluate(tuned_dt_Model.transform(validation_df1))
print("Validation Accuracy = %g " % (validation_accuracy))
print("Validation Error = %g " % (1.0 - validation_accuracy))

In [91]:
#Extracting the tree of the best model
treeModel = tuned_dt_Model.stages[-1]
print(treeModel)

In [92]:
print(treeModel.toDebugString)

In [93]:
display(dt_bestModel)

In [94]:
#Creating the confusion matrix of the best model
tp = tuned_dt_prediction[(tuned_dt_prediction.default == 0) & (tuned_dt_prediction.prediction == 0)].count()
tn = tuned_dt_prediction[(tuned_dt_prediction.default == 1) & (tuned_dt_prediction.prediction == 1)].count()
fp = tuned_dt_prediction[(tuned_dt_prediction.default == 1) & (tuned_dt_prediction.prediction == 0)].count()
fn = tuned_dt_prediction[(tuned_dt_prediction.default == 0) & (tuned_dt_prediction.prediction == 1)].count()
print("True Positives:", tp)
print("True Negatives:", tn)
print("False Positives:", fp)
print("False Negatives:", fn)

In [95]:
#Plotting the confusion matrix for visualization
array = [[tp,fn],
     [fp,tn]]        
df_cm = pd.DataFrame(array, range(2),
                  range(2))
plt.figure(figsize = (10,7))
sns.set(font_scale=1.2)#for label size
sns.heatmap(df_cm, annot=True, annot_kws={"size": 10}, cmap='YlGnBu', fmt='g')
plt.title('Confusion Matrix for Decision Tree - Grid Search model using Under Sampling \n ')
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.gca().invert_xaxis()
plt.gca().invert_yaxis()
display()

##Trying Startified Sampling

In [97]:
#Implementing Stratified Sampling technique to prevent the affects of class imbalance
stratified_data = loan_linear_reg.sampleBy('default', fractions={1: 0.9, 0: 0.40}).cache()

stratified_data.groupby('default').count().show()

In [98]:
#Splitting the dataset 
training_df1, validation_df1, testing_df1 = stratified_data.randomSplit([0.6, 0.3, 0.1], seed=100)

In [99]:
#Creating the tasks for Decision tree: Vector Assembler,  Standard Scalar and Decision tree
vaD = VectorAssembler(inputCols=['total_pymnt','loan_amount','installment','total_rec_prncp','interest_rate','A', 'B', 'C','D','E','F', 'MORTGAGE','NONE','OTHER','OWN','RENT','car','dti','credit_card','debt_consolidation','educational','home_improvement','house','major_purchase','medical','moving','renewable_energy','small_business','vacation','wedding','1'], outputCol='features')
sc = StandardScaler(withMean=True, withStd=False, inputCol='features', outputCol='std_features')
dt = DecisionTreeClassifier(featuresCol='std_features', labelCol='default')

#Creating a pipeline for The tasks created above and fitting the data
dt_model = Pipeline(stages=[vaD, sc, dt]).fit(training_df1)

In [100]:
#Creating an evaluator for the default model with weighted features
evaluator = BinaryClassificationEvaluator(labelCol="default", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
validation_accuracy = evaluator.evaluate(dt_model.transform(validation_df1))
print("Validation Accuracy = %g " % (validation_accuracy))
print("Validation Error = %g " % (1.0 - validation_accuracy))

In [101]:
#Transforming the test data by the fit model
dt_prediction = dt_model.transform(testing_df1)
dt_prediction.select("prediction", "default", "features").show()

In [102]:
#Creating the confusion matrix of the defautl mdoel
tp = dt_prediction[(dt_prediction.default == 0) & (dt_prediction.prediction == 0)].count()
tn = dt_prediction[(dt_prediction.default == 1) & (dt_prediction.prediction == 1)].count()
fp = dt_prediction[(dt_prediction.default == 1) & (dt_prediction.prediction == 0)].count()
fn = dt_prediction[(dt_prediction.default == 0) & (dt_prediction.prediction == 1)].count()
print("True Positives:", tp)
print("True Negatives:", tn)
print("False Positives:", fp)
print("False Negatives:", fn)

In [103]:
#Visualizing the confusion matrix by graphical representation
array = [[tp,fn],
     [fp,tn]]        
df_cm = pd.DataFrame(array, range(2),
                  range(2))
plt.figure(figsize = (10,7))
sns.set(font_scale=1.2)#for label size
sns.heatmap(df_cm, annot=True, annot_kws={"size": 10}, cmap='YlGnBu', fmt='g')

plt.title('Confusion Matrix for Decision Tree - Default model using Stratified Sampling \n ')
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.gca().invert_xaxis()
plt.gca().invert_yaxis()
display()

##Hyper Parameter Tuning

In [105]:
# Create ParamGrid for Cross Validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid_dt = (ParamGridBuilder()
                .addGrid(dt.maxDepth, [10, 20, 30])
                .addGrid(dt.maxBins, [50, 60, 70])
             .build())

In [106]:
from time import *
start_time = time()

evaluatorPR = BinaryClassificationEvaluator(labelCol = "default", metricName = "areaUnderROC")
# Create 3-fold CrossValidator
cv_dt = CrossValidator(estimator=dt,
                    estimatorParamMaps=paramGrid_dt,
                    evaluator=evaluatorPR, numFolds=3)

# Run cross validations
cvModel = Pipeline(stages=[vaD, sc, cv_dt]).fit(training_df1)
# likely take a fair amount of time
end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

In [107]:
#Extracting the best model of the grid search decision tree
dt_bestModel = cvModel.stages[-1].bestModel
dt_bestModel.extractParamMap()

In [108]:
#Creating a pipeline with the best model and fitting the training data
tuned_dt_Model = Pipeline(stages=[vaD, sc, dt_bestModel]).fit(training_df1)

In [109]:
#Representing the prediction of the best model
tuned_dt_prediction = tuned_dt_Model.transform(testing_df1)
tuned_dt_prediction.select("prediction", "default", "features").show()

In [110]:
#Evaluating the best model's accuracy
evaluator = BinaryClassificationEvaluator(labelCol="default", metricName="areaUnderPR")
validation_accuracy = evaluator.evaluate(tuned_dt_Model.transform(validation_df1))
print("Validation Accuracy = %g " % (validation_accuracy))
print("Validation Error = %g " % (1.0 - validation_accuracy))

In [111]:
#Extracting the tree of the best model
treeModel = tuned_dt_Model.stages[-1]
print(treeModel)

In [112]:
print(treeModel.toDebugString)

In [113]:
display(dt_bestModel)

In [114]:
#Creating the confusion matrix of the best model
tp = tuned_dt_prediction[(tuned_dt_prediction.default == 0) & (tuned_dt_prediction.prediction == 0)].count()
tn = tuned_dt_prediction[(tuned_dt_prediction.default == 1) & (tuned_dt_prediction.prediction == 1)].count()
fp = tuned_dt_prediction[(tuned_dt_prediction.default == 1) & (tuned_dt_prediction.prediction == 0)].count()
fn = tuned_dt_prediction[(tuned_dt_prediction.default == 0) & (tuned_dt_prediction.prediction == 1)].count()
print("True Positives:", tp)
print("True Negatives:", tn)
print("False Positives:", fp)
print("False Negatives:", fn)

In [115]:
#Visualizing the confusion matrix
array = [[tp,fn],
     [fp,tn]]        
df_cm = pd.DataFrame(array, range(2),
                  range(2))
plt.figure(figsize = (10,7))
sns.set(font_scale=1.2)#for label size
sns.heatmap(df_cm, annot=True, annot_kws={"size": 10}, cmap='YlGnBu', fmt='g')
plt.title('Confusion Matrix for Decision Tree - Grid Search model using Stratified Sampling \n ')
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.gca().invert_xaxis()
plt.gca().invert_yaxis()
display()

In [116]:
%sh
pip install FeatureImportanceSelector

In [117]:
#Feature Importance of Decision Tree
dt_features = pd.DataFrame(list(zip(tuned_dt_prediction.toPandas()[cols], dt_model.stages[-1].featureImportances.toArray())),
            columns = ['feature', 'importance']).sort_values('importance', ascending = False)

##Building a classification model using Random Forest

In [119]:
#Creating tasks for random forest 
rf_assembler = VectorAssembler(inputCols=['loan_amount','dti','total_pymnt','total_rec_prncp','recoveries','installment','interest_rate','A','B','C','D','E','F','MORTGAGE','NONE','OTHER','OWN','RENT','High','Medium','car','credit_card','debt_consolidation','educational','home_improvement','house','major_purchase','medical','moving','renewable_energy','small_business','vacation','wedding','1'], outputCol="features")
rf = classification.RandomForestClassifier(featuresCol = "features", labelCol = "default")

In [120]:
#Random Forest Pipeline and fitting training data
pipe_rf = Pipeline(stages = [rf_assembler,rf]).fit(training_df1)

In [121]:
#Creating an evaluator for the random forest model
evaluator = evaluation.BinaryClassificationEvaluator(labelCol='default', metricName='areaUnderROC')

In [122]:
#Evaluating the random forest model
AUC = evaluator.evaluate(pipe_rf.transform(validation_df1))
print(AUC)

In [123]:
#Transforming test data on the model
rf_prediction = pipe_rf.transform(testing_df1)
rf_prediction.select("prediction", "default", "features").show()

In [124]:
#Creating the confusion matrix for the default model of random forest
tp = rf_prediction[(rf_prediction.default == 0) & (rf_prediction.prediction == 0)].count()
tn = rf_prediction[(rf_prediction.default == 1) & (rf_prediction.prediction == 1)].count()
fp = rf_prediction[(rf_prediction.default == 1) & (rf_prediction.prediction == 0)].count()
fn = rf_prediction[(rf_prediction.default == 0) & (rf_prediction.prediction == 1)].count()
print("True Positives:", tp)
print("True Negatives:", tn)
print("False Positives:", fp)
print("False Negatives:", fn)

In [125]:
#Visualizing the confusion matrix by graphical representation
array = [[tp,fn],
     [fp,tn]]        
df_cm = pd.DataFrame(array, range(2),
                  range(2))
plt.figure(figsize = (10,7))
sns.set(font_scale=1.2)#for label size
sns.heatmap(df_cm, annot=True, annot_kws={"size": 10}, cmap='YlGnBu', fmt='g')
plt.title('Confusion Matrix for Random Forest - Default model using Stratified Sampling \n ')
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.gca().invert_xaxis()
plt.gca().invert_yaxis()
display()

##Hyper Parameter Tuning

In [127]:
#Creating a paramGrid for grid search
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = (ParamGridBuilder()
             .addGrid(rf.numTrees, [25,30,35])
             .addGrid(rf.maxDepth, [6,10,12])
             .build())

In [128]:
#Creating evalutor for Random forest model
rf_evaluator = BinaryClassificationEvaluator(labelCol='default', metricName = 'areaUnderROC')

In [129]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
cv_rf= CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=rf_evaluator, numFolds=3)
    
# Run cross validations
cv_rf_Model = Pipeline(stages = [rf_assembler, cv_rf]).fit(training_df1)

In [130]:
#Printing the AUC for the grid search model
print("The area under ROC for validation set after CV  is {}".format(rf_evaluator.evaluate(cv_rf_Model.transform(validation_df1))))

In [131]:
#Extracting the best model of Random Forest
best_model = cv_rf_Model.stages[-1].bestModel
best_model.extractParamMap()

In [132]:
print("The area under ROC for testing set after CV  is {}".format(rf_evaluator.evaluate(cv_rf_Model.transform(testing_df1))))

In [133]:
#Creating a Random Forest Pipeline with the best mdoel
rf_model = Pipeline(stages = [rf_assembler, best_model]).fit(training_df1)

In [134]:
#Transforming the test data on the fit model
rf_prediction = rf_model.transform(testing_df1)
rf_prediction.select("prediction", "default", "features").show()

In [135]:
#Creating the confusion matrix of the best model
tp = rf_prediction[(rf_prediction.default == 0) & (rf_prediction.prediction == 0)].count()
tn = rf_prediction[(rf_prediction.default == 1) & (rf_prediction.prediction == 1)].count()
fp = rf_prediction[(rf_prediction.default == 1) & (rf_prediction.prediction == 0)].count()
fn = rf_prediction[(rf_prediction.default == 0) & (rf_prediction.prediction == 1)].count()
print("True Positives:", tp)
print("True Negatives:", tn)
print("False Positives:", fp)
print("False Negatives:", fn)

In [136]:
#Visualizing the confusion matrix
array = [[tp,fn],
     [fp,tn]]        
df_cm = pd.DataFrame(array, range(2),
                  range(2))
plt.figure(figsize = (10,7))
sns.set(font_scale=1.2)#for label size
sns.heatmap(df_cm, annot=True, annot_kws={"size": 10}, cmap='YlGnBu', fmt='g')
plt.title('Confusion Matrix for Random Forest - Grid Search model using Stratified Sampling \n ')
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.gca().invert_xaxis()
plt.gca().invert_yaxis()
display()

In [137]:
#Extracting the feature importances and sorting them according to their importance from higher order to lower
rf_features = pd.DataFrame(list(zip(rf_prediction.toPandas()[cols], rf_model.stages[-1].featureImportances.toArray())),
            columns = ['feature', 'importance']).sort_values('importance', ascending = False)

In [138]:
rf_features

#Inference

In [140]:
#Visualizing feature importance of the Random Forest (barplot)
plt.figure(figsize=(18,7))
ax = sns.barplot(x="feature", y="importance", data=rf_features.head(10), saturation=.5)
ax.set_title('Feature Importance by Random Forest')
ax.set_ylabel('Feature Importance')
ax.set_xlabel('Features')
display(ax.figure)

In [141]:
#Visualizing feature importance by Decision Tree (Bar Plot)
plt.figure(figsize=(18,7))
ax = sns.barplot(x="feature", y="importance", data=dt_features.head(6), saturation=.5)
ax.set_title('Feature Importance by Decision Tree')
ax.set_ylabel('Feature Importance')
ax.set_xlabel('Features')
display(ax.figure)

In [142]:
#Visualizign feature importance by logistic Regression
plt.figure(figsize=(18,7))
ax = sns.barplot(x="column", y="weight", data=lr_best_feature.head(10), saturation=.5)
#ax.set_xticklabels(ax.get_xticklabels(), rotation=20)
ax.set_title('Feature Importance by Logistic Regression')
ax.set_ylabel('Feature Weights')
ax.set_xlabel('Features')
display(ax.figure)

In [143]:
#Creating model comparison dataframe
name = ['Logistic Regression', 'Decision Tree', 'Random Forest']
Accuracy = [0.79, 0.727, 0.855]
result_df = pd.DataFrame(list(zip(name, Accuracy)), columns =['Model', 'Accuracy'])
result_df.head()

#Model Comparison

In [145]:
#Visualizing the model comparison
plt.figure(figsize=(7,5))
ax = sns.barplot(x="Model", y="Accuracy", data=result_df, saturation=.5)
#ax.set_xticklabels(ax.get_xticklabels(), rotation=20)
ax.set_title('Model Comparison')
ax.set_ylabel('Model Accuracy')
ax.set_xlabel('Models Implemented')
display(ax.figure)