# Models

Matt Thomas, Max McGaw, Liam Mulcahy, Will Carruthers

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
import pandas as pd

In [29]:
from pyspark.ml import Pipeline  
from pyspark.ml.feature import *  
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [3]:
df = spark.read.csv('train_data.csv', inferSchema=True, header=True)

In [4]:
df.show(2)

+---+------------------+---------+----------+--------+--------------+-----------+-----------+----------+----------+--------------+---------------+--------+---------+--------------+
|_c0|loan_status_binary|loan_amnt|      term|int_rate|home_ownership|tot_cur_bal|total_pymnt|annual_inc|addr_state|fico_range_low|last_pymnt_amnt|grade_CD|grade_EFG|emp_length_low|
+---+------------------+---------+----------+--------+--------------+-----------+-----------+----------+----------+--------------+---------------+--------+---------+--------------+
|  0|                 0|  11000.0| 36 months|    7.21|           ANY|    28511.0|    1354.03|   40000.0|        PA|         715.0|         340.71|       0|        0|             0|
|  1|                 0|   4000.0| 36 months|    22.9|           ANY|   108997.0|    1386.67|   40000.0|        MI|         665.0|         154.64|       0|        1|             0|
+---+------------------+---------+----------+--------+--------------+-----------+-----------+--

In [5]:
dfs = df.select([col for col in df.columns if col not in ['_c0']])

In [6]:
dfs.show(5)

+------------------+---------+----------+--------+--------------+-----------+-----------+----------+----------+--------------+---------------+--------+---------+--------------+
|loan_status_binary|loan_amnt|      term|int_rate|home_ownership|tot_cur_bal|total_pymnt|annual_inc|addr_state|fico_range_low|last_pymnt_amnt|grade_CD|grade_EFG|emp_length_low|
+------------------+---------+----------+--------+--------------+-----------+-----------+----------+----------+--------------+---------------+--------+---------+--------------+
|                 0|  11000.0| 36 months|    7.21|           ANY|    28511.0|    1354.03|   40000.0|        PA|         715.0|         340.71|       0|        0|             0|
|                 0|   4000.0| 36 months|    22.9|           ANY|   108997.0|    1386.67|   40000.0|        MI|         665.0|         154.64|       0|        1|             0|
|                 0|  10000.0| 36 months|   17.97|           ANY|    20320.0|    1435.54|   60000.0|        CA|    

In [7]:
dfs.printSchema()

root
 |-- loan_status_binary: integer (nullable = true)
 |-- loan_amnt: double (nullable = true)
 |-- term: string (nullable = true)
 |-- int_rate: double (nullable = true)
 |-- home_ownership: string (nullable = true)
 |-- tot_cur_bal: double (nullable = true)
 |-- total_pymnt: double (nullable = true)
 |-- annual_inc: double (nullable = true)
 |-- addr_state: string (nullable = true)
 |-- fico_range_low: double (nullable = true)
 |-- last_pymnt_amnt: double (nullable = true)
 |-- grade_CD: integer (nullable = true)
 |-- grade_EFG: integer (nullable = true)
 |-- emp_length_low: integer (nullable = true)



In [8]:
dfs.filter(dfs['tot_cur_bal'].isNull()).count()
#Need to do something abou this

70018

In [6]:
train, test = dfs.randomSplit([0.8, 0.2], seed=12345)

In [7]:
categories = ['home_ownership', 'term']
stages = []

In [8]:
for col in categories:
    stringIndexer = StringIndexer(inputCol=col, outputCol=col + "_Index")
    encoder = OneHotEncoder(inputCol=stringIndexer.getOutputCol(),
                            outputCol=col + "classVec")
    stages += [stringIndexer, encoder]

In [9]:
scaled_vectors = VectorAssembler(inputCols = ['loan_amnt',\
                                             'total_pymnt', 'annual_inc', \
                                             'fico_range_low', 'last_pymnt_amnt',\
                                             'int_rate','emp_length_low'], 
                                 outputCol='vector_features')

In [10]:
scaler = StandardScaler(inputCol='vector_features', outputCol='scaled_features')
#scaler = StandardScaler(inputCol='vector_features', outputCol='scaled_features')

In [11]:
labelIndexer = StringIndexer(inputCol='loan_status_binary', outputCol='label')

In [12]:
#AT added
assembler = VectorAssembler(inputCols=['scaled_features', 'home_ownershipclassVec', 'termclassVec', 'grade_EFG', 'grade_CD' ], outputCol='features')

In [13]:
stages += [scaled_vectors, scaler, labelIndexer, assembler]

In [14]:
# AT commented
#assembler = VectorAssembler(inputCols=['scaled_features', 'home_ownershipclassVec',\
#                                       'grade_CD', 'termclassVec',\
#                                       'grade_EFG'], outputCol='features')
#stages += [scaler, labelIndexer, assembler]

In [15]:
lr = LogisticRegression(maxIter=10, regParam=0.01)
stages += [lr]
stages

[StringIndexer_6005f44a6947,
 OneHotEncoder_1dae6122d929,
 StringIndexer_9f97983e5709,
 OneHotEncoder_38d70a2b8720,
 VectorAssembler_450d16e17d97,
 StandardScaler_48efb0c23378,
 StringIndexer_65989b05f8c8,
 VectorAssembler_3ec489b46a50,
 LogisticRegression_ddafdde72b1a]

In [16]:
pipeline = Pipeline(stages=stages)

In [17]:
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()

In [18]:
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=5)

In [19]:
# Fit the pipeline
model = crossval.fit(train)

In [21]:
# Make a prediction
prediction = model.transform(test)
prediction.show(2)

+------------------+---------+----------+--------+--------------+-----------+-----------+----------+----------+--------------+---------------+--------+---------+--------------+--------------------+----------------------+----------+-------------+--------------------+--------------------+-----+--------------------+--------------------+--------------------+----------+
|loan_status_binary|loan_amnt|      term|int_rate|home_ownership|tot_cur_bal|total_pymnt|annual_inc|addr_state|fico_range_low|last_pymnt_amnt|grade_CD|grade_EFG|emp_length_low|home_ownership_Index|home_ownershipclassVec|term_Index| termclassVec|     vector_features|     scaled_features|label|            features|       rawPrediction|         probability|prediction|
+------------------+---------+----------+--------+--------------+-----------+-----------+----------+----------+--------------+---------------+--------+---------+--------------+--------------------+----------------------+----------+-------------+-------------------

In [22]:
prediction = prediction.withColumn("loan_status_binary", df["loan_status_binary"].cast('float'))
#This is necessary to compare predictions with actual values

In [23]:
matches = prediction.filter(prediction['loan_status_binary'] == prediction['prediction']).count()

In [33]:
#Calculates area under Precision-Recall curve
evaluator = BinaryClassificationEvaluator(labelCol='loan_status_binary', \
                                          metricName='areaUnderPR')

In [26]:
evaluator.evaluate(prediction)

0.3174520618693921

In [27]:
counts = prediction.count()

In [28]:
accuracy = matches / counts
print(f"accuracy = {accuracy}")

accuracy = 0.8811348828571491


In [30]:
f1_evaluator = MulticlassClassificationEvaluator(predictionCol='prediction',\
                                                 labelCol='loan_status_binary',\
                                                 metricName='f1')

In [31]:
f1_evaluator.evaluate(prediction) #f1 score

0.8318080297768002

In [37]:
!jupyter nbconvert --to pdf '../Spark_ML_DS5559'/*.ipynb

[NbConvertApp] Converting notebook ../Spark_ML_DS5559/Final_Project_DS5559_data.ipynb to pdf
[NbConvertApp] Support files will be in Final_Project_DS5559_data_files/
[NbConvertApp] Making directory ./Final_Project_DS5559_data_files
[NbConvertApp] Writing 80572 bytes to ./notebook.tex
[NbConvertApp] Building PDF
[NbConvertApp] Running xelatex 3 times: ['xelatex', './notebook.tex', '-quiet']
[NbConvertApp] Running bibtex 1 time: ['bibtex', './notebook']
[NbConvertApp] PDF successfully created
[NbConvertApp] Writing 96996 bytes to ../Spark_ML_DS5559/Final_Project_DS5559_data.pdf
[NbConvertApp] Converting notebook ../Spark_ML_DS5559/Project_Models.ipynb to pdf
[NbConvertApp] Writing 43276 bytes to ./notebook.tex
[NbConvertApp] Building PDF
[NbConvertApp] Running xelatex 3 times: ['xelatex', './notebook.tex', '-quiet']
[NbConvertApp] Running bibtex 1 time: ['bibtex', './notebook']
[NbConvertApp] PDF successfully created
[NbConvertApp] Writing 43651 bytes to ../Spark_ML_DS5559/Project_Models