In [1]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LinearSVC
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.mllib.stat import Statistics
from pyspark.ml.linalg import DenseVector
from pyspark.sql import functions as F

In [2]:
import random, os
import numpy as np
from pyspark.sql import Row
from pyspark.ml.feature import VectorAssembler
from pyspark.mllib.stat import Statistics

In [3]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import abs

In [5]:
#from LC_Helper import vectorizerFunction, SmoteSampling

In [6]:
spark = SparkSession\
    .builder\
    .appName("Baseline_Model")\
    .config("spark.hadoop.fs.s3a.s3guard.ddb.region","us-east-2")\
    .config("spark.yarn.access.hadoopFileSystems",os.environ["STORAGE"])\
    .getOrCreate()

Setting spark.hadoop.yarn.resourcemanager.principal to pauldefusco


In [7]:
df = spark.sql("SELECT * FROM default.lc_smote_subset")

Hive Session ID = d8810fc1-63a7-4dd7-a956-fd4379df6765


In [8]:
#Creating list of categorical and numeric features
num_cols = [item[0] for item in df.dtypes if item[1].startswith('in') or item[1].startswith('dou')]

In [9]:
df = df.dropna()

In [10]:
df = df.select(['acc_now_delinq', 'acc_open_past_24mths', 'annual_inc', 'avg_cur_bal', 'funded_amnt', 'is_default'])

In [11]:
train = df.sampleBy("is_default", fractions={0: 0.8, 1: 0.8}, seed=10)

In [12]:
test = df.subtract(train)

Creating Model Pipeline

In [13]:
#Creates a Pipeline Object including One Hot Encoding of Categorical Features  
def make_pipeline(spark_df):        
     
    for c in spark_df.columns:
        spark_df = spark_df.withColumn(c, spark_df[c].cast("float"))
    
    stages= []

    cols = ['acc_now_delinq', 'acc_open_past_24mths', 'annual_inc', 'avg_cur_bal', 'funded_amnt']
    
    spark_df = spark_df.withColumn('acc_now_delinq',abs(spark_df.acc_now_delinq))
    spark_df = spark_df.withColumn('acc_open_past_24mths',abs(spark_df.acc_open_past_24mths))
    spark_df = spark_df.withColumn('annual_inc',abs(spark_df.annual_inc))
    spark_df = spark_df.withColumn('avg_cur_bal',abs(spark_df.avg_cur_bal))
    spark_df = spark_df.withColumn('funded_amnt',abs(spark_df.funded_amnt))
    
    #Assembling mixed data type transformations:
    assembler = VectorAssembler(inputCols=cols, outputCol="features")
    stages += [assembler]    
    
    #Scaling features
    #scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True)
    #stages += [scaler]
    
    
    #RF Classifier
    rf = LinearSVC(featuresCol='features', labelCol='is_default')
    stages += [rf]
    
    #Creating and running the pipeline:
    pipeline = Pipeline(stages=stages)
    pipelineModel = pipeline.fit(spark_df)

    return pipelineModel

In [14]:
pipelineModel = make_pipeline(train)

                                                                                

In [15]:
for c in test.columns:
    test = test.withColumn(c, test[c].cast("float"))

In [16]:
df_model = pipelineModel.transform(test)

In [17]:
df_model.columns

['acc_now_delinq',
 'acc_open_past_24mths',
 'annual_inc',
 'avg_cur_bal',
 'funded_amnt',
 'is_default',
 'features',
 'rawPrediction',
 'prediction']

In [18]:
df_model.show(5)

                                                                                

+--------------+--------------------+-----------+-----------+------------+----------+--------------------+--------------------+----------+
|acc_now_delinq|acc_open_past_24mths| annual_inc|avg_cur_bal| funded_amnt|is_default|            features|       rawPrediction|prediction|
+--------------+--------------------+-----------+-----------+------------+----------+--------------------+--------------------+----------+
|   -0.08590768|            0.225552|-0.17482829|-0.34515885| 0.018119322|       1.0|[-0.0859076827764...|[-2.7969551340209...|       1.0|
|   -0.08590768|           1.3480183|-0.02713315| -0.6537855|    0.640682|       1.0|[-0.0859076827764...|[-2.9291477748199...|       1.0|
|   -0.08590768|          -1.1496115|-0.24321264| -0.7294536| -0.86209434|       0.0|[-0.0859076827764...|[-3.6664050970882...|       1.0|
|   -0.08590768|         -0.23897669|0.056176037|-0.45687118|  0.31276748|       0.0|[-0.0859076827764...|[-1.9227331311467...|       1.0|
|   -0.08590768|         -0

In [19]:
input_data = df_model.rdd.map(lambda x: (x["is_default"], x["prediction"])) #, float(x["probability"][1])))

In [20]:
predictions = spark.createDataFrame(input_data, ["is_default", "prediction"])

                                                                                

In [21]:
predictions.dtypes

[('is_default', 'double'), ('prediction', 'double')]

#### Baseline Model

In [22]:
df_model.select("is_default", "prediction").groupby('prediction').count().show()

                                                                                

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|   32|
|       1.0|  219|
+----------+-----+



In [23]:
df_model.select("is_default", "prediction").groupby('is_default').count().show()

                                                                                

+----------+-----+
|is_default|count|
+----------+-----+
|       1.0|  105|
|       0.0|  146|
+----------+-----+



Model Evaluation

In [24]:
evaluator = BinaryClassificationEvaluator(labelCol="is_default", rawPredictionCol="prediction")

In [25]:
auroc = evaluator.evaluate(df_model, {evaluator.metricName: "areaUnderROC"})
auprc = evaluator.evaluate(df_model, {evaluator.metricName: "areaUnderPR"})

                                                                                

In [26]:
print("Area under ROC Curve: {:.4f}".format(auroc))
print("Area under PR Curve: {:.4f}".format(auprc))

Area under ROC Curve: 0.5768
Area under PR Curve: 0.4604


In [27]:
# Handy Spark Library 
# Creates instance of extended version of BinaryClassificationMetrics
# using a DataFrame and its probability and label columns, as the output
# from the classifier
#bcm = BinaryClassificationMetrics(df_model, scoreCol='probability', labelCol='is_default')

In [28]:
# Now we can PLOT both ROC and PR curves!
#fig, axs = plt.subplots(1, 2, figsize=(12, 4))
#bcm.plot_roc_curve(ax=axs[0])
#bcm.plot_pr_curve(ax=axs[1])
#plt.show()

In [29]:
# And get the confusion matrix for any threshold we want
y_true = df_model.select(['is_default']).collect()
y_pred = df_model.select(['prediction']).collect()

from sklearn.metrics import classification_report, confusion_matrix
print(classification_report(y_true, y_pred))

                                                                                

              precision    recall  f1-score   support

         0.0       0.88      0.19      0.31       146
         1.0       0.46      0.96      0.62       105

    accuracy                           0.51       251
   macro avg       0.67      0.58      0.47       251
weighted avg       0.70      0.51      0.44       251



In [30]:
#Saving pipeline to S3:
pipelineModel.write().overwrite().save(os.environ["STORAGE"]+"/pdefusco/pipeline")

                                                                                

In [31]:
#Saving predictions

In [32]:
predictions\
  .write.format("parquet")\
  .mode("overwrite")\
  .saveAsTable(
    'default.LC_predictions'
)

                                                                                

In [33]:
train\
  .write.format("parquet")\
  .mode("overwrite")\
  .saveAsTable(
    'default.LC_train'
)

                                                                                

In [34]:
test\
  .write.format("parquet")\
  .mode("overwrite")\
  .saveAsTable(
    'default.LC_test'
)

                                                                                

In [35]:
spark.stop()