# Step 7 : Data Mining

## PD Model Estimation

In [1]:
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7/')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('creditRisk').getOrCreate()

In [2]:
data = spark.read.csv('loan_data_to_model.csv', inferSchema = True, header = True)

In [3]:
data.printSchema()

root
 |-- grade:A: integer (nullable = true)
 |-- grade:B: integer (nullable = true)
 |-- grade:C: integer (nullable = true)
 |-- grade:D: integer (nullable = true)
 |-- grade:E: integer (nullable = true)
 |-- grade:F: integer (nullable = true)
 |-- home_ownership:OWN: integer (nullable = true)
 |-- home_ownership:MORTGAGE: integer (nullable = true)
 |-- verification_status:Source Verified: integer (nullable = true)
 |-- verification_status:Not Verified: integer (nullable = true)
 |-- purpose:oth_med_vacation: integer (nullable = true)
 |-- purpose:debt_consolidation: integer (nullable = true)
 |-- purpose:major_car_home: integer (nullable = true)
 |-- purpose:credit_card: integer (nullable = true)
 |-- addr_state:AL_FL: integer (nullable = true)
 |-- addr_state:NY: integer (nullable = true)
 |-- addr_state:VA_CA_UT: integer (nullable = true)
 |-- addr_state:NM_OK_SD_LA_NJ: integer (nullable = true)
 |-- addr_state:NC_MD: integer (nullable = true)
 |-- addr_state:MO_AZ: integer (nullab

In [4]:
data.columns

['grade:A',
 'grade:B',
 'grade:C',
 'grade:D',
 'grade:E',
 'grade:F',
 'home_ownership:OWN',
 'home_ownership:MORTGAGE',
 'verification_status:Source Verified',
 'verification_status:Not Verified',
 'purpose:oth_med_vacation',
 'purpose:debt_consolidation',
 'purpose:major_car_home',
 'purpose:credit_card',
 'addr_state:AL_FL',
 'addr_state:NY',
 'addr_state:VA_CA_UT',
 'addr_state:NM_OK_SD_LA_NJ',
 'addr_state:NC_MD',
 'addr_state:MO_AZ',
 'addr_state:PA_RI_MI_AR',
 'addr_state:MA_MN_DE',
 'addr_state:OH_WA_KY',
 'addr_state:OR_GA',
 'addr_state:IN_TN_WI_AK',
 'addr_state:TX',
 'addr_state:MT_CT',
 'addr_state:SC_KS_CO_IL',
 'addr_state:WV_WY_VT_NH',
 'addr_state:MS_DC_NE_ND_ME',
 'initial_list_status:w',
 'term:36',
 'emp_length:1',
 'emp_length:2-3',
 'emp_length:4',
 'emp_length:5',
 'emp_length:6',
 'emp_length:7',
 'emp_length:8-9',
 'emp_length:10',
 'mths_since_issue_d:<4',
 'mths_since_issue_d:4-5',
 'mths_since_issue_d:6-7',
 'mths_since_issue_d:8-11',
 'mths_since_issue_d:

### Project the data

In [5]:
from pyspark.ml.feature import VectorAssembler

In [6]:
assembler = VectorAssembler(inputCols=['grade:A',
 'grade:B',
 'grade:C',
 'grade:D',
 'grade:E',
 'grade:F',
 'home_ownership:OWN',
 'home_ownership:MORTGAGE',
 'verification_status:Source Verified',
 'verification_status:Not Verified',
 'purpose:oth_med_vacation',
 'purpose:debt_consolidation',
 'purpose:major_car_home',
 'purpose:credit_card',
 'addr_state:AL_FL',
 'addr_state:NY',
 'addr_state:VA_CA_UT',
 'addr_state:NM_OK_SD_LA_NJ',
 'addr_state:NC_MD',
 'addr_state:MO_AZ',
 'addr_state:PA_RI_MI_AR',
 'addr_state:MA_MN_DE',
 'addr_state:OH_WA_KY',
 'addr_state:OR_GA',
 'addr_state:IN_TN_WI_AK',
 'addr_state:TX',
 'addr_state:MT_CT',
 'addr_state:SC_KS_CO_IL',
 'addr_state:WV_WY_VT_NH',
 'addr_state:MS_DC_NE_ND_ME',
 'initial_list_status:w',
 'term:36',
 'emp_length:1',
 'emp_length:2-3',
 'emp_length:4',
 'emp_length:5',
 'emp_length:6',
 'emp_length:7',
 'emp_length:8-9',
 'emp_length:10',
 'mths_since_issue_d:<4',
 'mths_since_issue_d:4-5',
 'mths_since_issue_d:6-7',
 'mths_since_issue_d:8-11',
 'mths_since_issue_d:12-15',
 'mths_since_issue_d:16-19',
 'mths_since_issue_d:20-25',
 'mths_since_issue_d:26-29',
 'mths_since_issue_d:30-37',
 'mths_since_issue_d:38-49',
 'mths_since_earliest_cr_line:120-153',
 'mths_since_earliest_cr_line:154-186',
 'mths_since_earliest_cr_line:187-219',
 'mths_since_earliest_cr_line:220-252',
 'mths_since_earliest_cr_line:253-269',
 'mths_since_earliest_cr_line:270-335',
 'mths_since_earliest_cr_line:336-384',
 'mths_since_earliest_cr_line:385-451',
 'mths_since_earliest_cr_line:>451',
 'delinq_2yrs:0',
 'delinq_2yrs:1-3',
 'inq_last_6mths:0',
 'inq_last_6mths:1',
 'inq_last_6mths:2-4',
 'open_acc:8-9',
 'open_acc:10-12',
 'open_acc:13-16',
 'open_acc:17-21',
 'open_acc:22-25',
 'open_acc:>25',
 'pub_rec:1-2',
 'pub_rec:>2',
 'total_acc:12-17',
 'total_acc:18-27',
 'total_acc:28-34',
 'total_acc:35-41',
 'total_acc:42-48',
 'total_acc:>48',
 'acc_now_delinq:>0',
 'annual_inc:20K-30K',
 'annual_inc:30K-40K',
 'annual_inc:40K-50K',
 'annual_inc:50K-60K',
 'annual_inc:60K-70K',
 'annual_inc:70K-80K',
 'annual_inc:80K-90K',
 'annual_inc:90K-100K',
 'annual_inc:>100K',
 'mths_since_last_delinq:Missing',
 'mths_since_last_delinq:16-30',
 'mths_since_last_delinq:31-37',
 'mths_since_last_delinq:38-52',
 'mths_since_last_delinq:53-75',
 'mths_since_last_delinq:>=76',
 'mths_since_last_record:Missing',
 'mths_since_last_record:21-30',
 'mths_since_last_record:31-46',
 'mths_since_last_record:47-61',
 'mths_since_last_record:62-67',
 'mths_since_last_record:68-79',
 'mths_since_last_record:>=80'],outputCol='features')

In [7]:
output = assembler.transform(data)

In [8]:
final_data = output.select('features','good_bad')

In [9]:
final_data.count()

887379

### Logistic Regression

In [10]:
# Split data into training (80%) and test (20%)
training, test = final_data.randomSplit([0.8,0.2], seed=22)

In [11]:
training.count()

710129

In [12]:
test.count()

177250

In [13]:
from pyspark.ml.classification import LogisticRegression

In [14]:
# Logistic Regression

lrReg = LogisticRegression().setLabelCol("good_bad").setRegParam(0.003).setElasticNetParam(1)

# Fit the model
pdModel = lrReg.fit(training)

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1035, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 883, in send_command
    response = connection.send_command(command)
  File "/home/ubuntu/spark-2.1.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1040, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving


Py4JError: An error occurred while calling o52.fit

In [15]:
# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(pdModel.coefficientMatrix))
print("Intercept: " + str(pdModel.interceptVector))

NameError: name 'pdModel' is not defined

In [None]:
training_sum = pdModel.summary

In [None]:
training_sum.predictions.describe().show()

### Re-Model

In [8]:
from pyspark.ml.feature import VectorAssembler

In [9]:
assembler = VectorAssembler(inputCols=['grade:A',
 'grade:B',
 'grade:C',
 'grade:D',
 'grade:E',
 'grade:F',
 'home_ownership:OWN',
 'home_ownership:MORTGAGE',
 'verification_status:Source Verified',
 'verification_status:Not Verified',
 'purpose:oth_med_vacation',
 'purpose:debt_consolidation',
 'purpose:major_car_home',
 'purpose:credit_card',
 'addr_state:AL_FL',
 'addr_state:NY',
 'addr_state:VA_CA_UT',
 'addr_state:NM_OK_SD_LA_NJ',
 'addr_state:NC_MD',
 'addr_state:MO_AZ',
 'addr_state:PA_RI_MI_AR',
 'addr_state:MA_MN_DE',
 'addr_state:OH_WA_KY',
 'addr_state:OR_GA',
 'addr_state:IN_TN_WI_AK',
 'addr_state:TX',
 'addr_state:MT_CT',
 'addr_state:SC_KS_CO_IL',
 'addr_state:WV_WY_VT_NH',
 'addr_state:MS_DC_NE_ND_ME',
 'initial_list_status:w',
 'emp_length:1',
 'emp_length:2-3',
 'emp_length:4',
 'emp_length:5',
 'emp_length:6',
 'emp_length:7',
 'emp_length:8-9',
 'emp_length:10',
 'mths_since_issue_d:<4',
 'mths_since_issue_d:4-5',
 'mths_since_issue_d:6-7',
 'mths_since_issue_d:8-11',
 'mths_since_issue_d:12-15',
 'mths_since_issue_d:16-19',
 'mths_since_issue_d:20-25',
 'mths_since_issue_d:26-29',
 'mths_since_issue_d:30-37',
 'mths_since_issue_d:38-49',
 'mths_since_earliest_cr_line:120-153',
 'mths_since_earliest_cr_line:154-186',
 'mths_since_earliest_cr_line:187-219',
 'mths_since_earliest_cr_line:220-252',
 'mths_since_earliest_cr_line:253-269',
 'mths_since_earliest_cr_line:270-335',
 'mths_since_earliest_cr_line:336-384',
 'mths_since_earliest_cr_line:385-451',
 'mths_since_earliest_cr_line:>451',
 'inq_last_6mths:0',
 'inq_last_6mths:1',
 'inq_last_6mths:2-4',
 'open_acc:8-9',
 'open_acc:10-12',
 'open_acc:13-16',
 'open_acc:17-21',
 'open_acc:22-25',
 'open_acc:>25',
 'pub_rec:1-2',
 'pub_rec:>2',
 'total_acc:12-17',
 'total_acc:18-27',
 'total_acc:28-34',
 'total_acc:35-41',
 'total_acc:42-48',
 'total_acc:>48',
 'annual_inc:20K-30K',
 'annual_inc:30K-40K',
 'annual_inc:40K-50K',
 'annual_inc:50K-60K',
 'annual_inc:60K-70K',
 'annual_inc:70K-80K',
 'annual_inc:80K-90K',
 'annual_inc:90K-100K',
 'annual_inc:>100K',
 'mths_since_last_delinq:Missing',
 'mths_since_last_delinq:16-30',
 'mths_since_last_delinq:31-37',
 'mths_since_last_delinq:38-52',
 'mths_since_last_delinq:53-75',
 'mths_since_last_delinq:>=76',
 'mths_since_last_record:Missing',
 'mths_since_last_record:21-30',
 'mths_since_last_record:31-46',
 'mths_since_last_record:47-61',
 'mths_since_last_record:62-67',
 'mths_since_last_record:68-79',
 'mths_since_last_record:>=80'],outputCol='features')

In [10]:
output = assembler.transform(data)

In [11]:
final_data = output.select('features','good_bad')

In [12]:
# Split data into training (80%) and test (20%)
training, test = final_data.randomSplit([0.8,0.2], seed=22)

In [13]:
from pyspark.ml.classification import LogisticRegression

In [14]:
# Logistic Regression

lrReg2 = LogisticRegression().setLabelCol("good_bad").setRegParam(0.003).setElasticNetParam(1)

# Fit the model
pdModel2 = lrReg2.fit(training)

In [16]:
# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(pdModel2.coefficientMatrix))
print("Intercept: " + str(pdModel2.interceptVector))

Coefficients: 1 X 97 CSRMatrix
(0,0) 0.9178
(0,1) 0.3808
(0,3) -0.2516
(0,4) -0.5321
(0,5) -0.7485
(0,7) 0.1079
(0,13) 0.0448
(0,27) 0.0263
(0,30) 0.1546
(0,38) 0.0216
(0,39) 2.2321
(0,40) 1.5401
(0,41) 1.1341
(0,42) 0.7484
(0,43) 0.2823
(0,46) -0.1028
..
..
Intercept: [2.022923880727304]


In [18]:
training_sum = pdModel2.summary

In [19]:
training_sum.predictions.describe().show()

+-------+------------------+----------+
|summary|          good_bad|prediction|
+-------+------------------+----------+
|  count|            710129|    710129|
|   mean|0.9308703066625923|       1.0|
| stddev|0.2536747316053523|       0.0|
|    min|               0.0|       1.0|
|    max|               1.0|       1.0|
+-------+------------------+----------+



## PD Model Validation (Test)

### Out-of-sample vallidation (test)

In [20]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [21]:
pred_and_labels = pdModel2.evaluate(test)

In [22]:
pred_and_labels.predictions.show()

+--------------------+--------+--------------------+--------------------+----------+
|            features|good_bad|       rawPrediction|         probability|prediction|
+--------------------+--------+--------------------+--------------------+----------+
|(97,[0,6,8,10,14,...|       1|[-5.5017100546146...|[0.00406321176227...|       1.0|
|(97,[0,6,8,10,15,...|       1|[-2.2086008233564...|[0.09898078677842...|       1.0|
|(97,[0,6,8,10,16,...|       1|[-5.3473571181573...|[0.00473815936560...|       1.0|
|(97,[0,6,8,10,16,...|       1|[-2.2924393634781...|[0.09175106837840...|       1.0|
|(97,[0,6,8,10,16,...|       0|[-2.9925224225357...|[0.04776483065660...|       1.0|
|(97,[0,6,8,10,17,...|       1|[-5.4311956582790...|[0.00435877614556...|       1.0|
|(97,[0,6,8,10,17,...|       1|[-2.5088660313053...|[0.07523897083992...|       1.0|
|(97,[0,6,8,10,19,...|       1|[-2.5088660313053...|[0.07523897083992...|       1.0|
|(97,[0,6,8,10,20,...|       1|[-2.7224802923672...|[0.0616598057

### Accuracy and Area under the Curve

In [23]:
good_bad_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='good_bad')

In [None]:
auc = good_bad_eval.evaluate(pred_and_labels.predictions)

In [None]:
auc