In [None]:
!pip install pyspark



In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
#spark modules
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from time import time

#PySpark ML modules
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.classification import LogisticRegressionModel

#Numpy and Matplotlib modules to plot curves
import matplotlib.pyplot as plt
import numpy as np

In [None]:
spark = SparkSession.builder\
                    .appName('loan_default_prediction_DE_OW')\
                    .getOrCreate()

In [None]:
schema = StructType(
    [
        StructField("FDICCert", IntegerType()),
        StructField("Snapshotdate", StringType()),
        StructField("PeriodDate", StringType()),
        StructField("Period", StringType()),
        StructField("foliolossrate", DoubleType()),
        StructField("TotalAssets", DoubleType()),
        StructField("folioloan", DoubleType()),
        StructField("State", StringType()),
        StructField("Quarter_Period", IntegerType()),
        StructField("Year_Period", IntegerType()),
        StructField("MacroMergeKey", StringType()),
        StructField("foliolossrateLag1", DoubleType()),
        StructField("foliolossrateLag2", DoubleType()),
        StructField("foliolossrateLag3", DoubleType()),
        StructField("foliolossrateLag4", DoubleType()),
        StructField("unemployment", DoubleType()),
        StructField("unemployment_lag1", DoubleType()),
        StructField("unemployment_lag6", DoubleType()),
        StructField("unemployment_lag8", DoubleType()),
        StructField("unemployment_lag2growth", DoubleType()),
        StructField("house_prices_all_change", DoubleType()),
        StructField("house_purchase_prices_growth", DoubleType()),
        StructField("house_purchase_prices", DoubleType()),
        StructField("house_prices_all", DoubleType()),
        StructField("CommercialPriceNat", DoubleType()),
        StructField("CommercialPriceNat_lag8", DoubleType()),
        StructField("nominal_gdp_lag8", DoubleType()),
        StructField("nominal_personalincome_lag5change", DoubleType()),
        StructField("real_disposableincome_lag3change", DoubleType()),
        StructField("real_gdp", DoubleType()),
        StructField("RepDate", StringType()),
        StructField("MovingAverage", DoubleType()),
        StructField("Target", IntegerType()),
        StructField("P1", IntegerType()),
        StructField("P10", IntegerType()),
        StructField("P11", IntegerType()),
        StructField("P12", IntegerType()),
        StructField("P2", IntegerType()),
        StructField("P3", IntegerType()),
        StructField("P4", IntegerType()),
        StructField("P5", IntegerType()),
        StructField("P6", IntegerType()),
        StructField("P7", IntegerType()),
        StructField("P8", IntegerType()),
        StructField("P9", IntegerType())
    ]
)

In [None]:
input_df = spark.read.csv('/content/drive/MyDrive/DE OW Case Study/data.txt',
                            sep='\t',
                            header=True,
                            schema=schema)

In [None]:
categoricalColumns = ['FDICCert', 'Snapshotdate', 'PeriodDate', 'Period', 'State', 'Quarter_Period', 'Year_Period', 'MacroMergeKey', 'RepDate']
numericCols = [col for col in input_df.columns if col not in categoricalColumns and col!='Target']

In [None]:
stages = []

for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]
    
label_stringIdx = StringIndexer(inputCol = 'Target', outputCol = 'label')
stages += [label_stringIdx]

assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [None]:
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(input_df)
df = pipelineModel.transform(input_df)
selectedCols = ['label', 'features']
df = df.select(selectedCols)
# df.printSchema()

In [None]:
path = "/content/drive/MyDrive/DE OW Case Study/lrModel"

saved_lrModel = LogisticRegressionModel.load(path)

In [None]:
new_df = pipelineModel.transform(input_df.limit(20)) # Add validation dataset for evaluation
selectedCols = ['features']
new_df = new_df.select(selectedCols)

new_pred = saved_lrModel.transform(new_df)
new_pred.show(truncate=False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------+-------------------------------------------+----------+
|features                                                                                                                                                                                                                                                                                                                                                                                              |rawPrediction                           |probability                                |prediction|
+---------------------