In [None]:
%reload_ext sparkmagic.magics
%manage_spark

In [1]:
print("Notebook 4")

The code failed because of a fatal error:
	Error sending http request and maximum retry encountered..

Some things to try:
a) Make sure Spark has enough available resources for Jupyter to create a Spark context.
b) Contact your Jupyter administrator to make sure the Spark magics library is configured correctly.
c) Restart the kernel.


In [None]:
from pyspark.sql.types import StructType, StructField, StringType, DateType, FloatType, IntegerType
import pyspark.sql.functions as F
from pyspark.sql.functions import when
from pyspark.sql.functions import col
from time import time

In [None]:
#Reading the joined parquet file 
df1 = spark.read.parquet("/Fannie-Mae/2016/FNMA_2016_Join_result_test.parquet/part*")
df2 = spark.read.parquet("/Fannie-Mae/2017/FNMA_2017_Join_result_test.parquet/part*")

In [None]:
#Renaming the ForeclosureDate column of 2016 to Default
df1 = df1.withColumnRenamed('ForeclosureDate','Default')
#Renaming the ForeclosureDate column of 2017 to Default
df2 = df2.withColumnRenamed('ForeclosureDate','Default')

In [None]:
df1 = df1.withColumn("Default",when(col("Default").isNull(),0).otherwise(1))
df2 = df2.withColumn("Default",when(col("Default").isNull(),0).otherwise(1))

In [None]:
df1 = df1.drop('LoanID','Channel','SellerName','OrDate','FirstPayment','FTHomeBuyer','LoanPurpose','PropertyType','OccStatus','PropertyState','ProductType','RelMortInd','Servicer','MaturityDate','CurDelStatus','ModFlag','ZeroBalEffDate','LastInstallDate','DispositionDate','PricipleForgiven','RMWPF','FPWA','ServicingIndicator','OrLTV','Zip','MortInsPerc','CoCreditScore','MortInsType','CurrInterestRate','CAUPB','MSA','ForeclosureCost','RepairCost','AssetRecCost','MiscCostsPF','ATFHP','NetSaleProceeds','CreditEnhProceeds','RPMWP','OtherForePro','NonInterestUPB','ReportingDate')

In [None]:
df2 = df2.drop('LoanID','Channel','SellerName','OrDate','FirstPayment','FTHomeBuyer','LoanPurpose','PropertyType','OccStatus','PropertyState','ProductType','RelMortInd','Servicer','MaturityDate','CurDelStatus','ModFlag','ZeroBalEffDate','LastInstallDate','DispositionDate','PricipleForgiven','RMWPF','FPWA','ServicingIndicator','OrLTV','Zip','MortInsPerc','CoCreditScore','MortInsType','CurrInterestRate','CAUPB','MSA','ForeclosureCost','RepairCost','AssetRecCost','MiscCostsPF','ATFHP','NetSaleProceeds','CreditEnhProceeds','RPMWP','OtherForePro','NonInterestUPB','ReportingDate')

In [None]:
df_2016 = df1.na.fill(0)
df_2017 = df2.na.fill(0)

In [None]:
## Let's stratify the data since we have a small amount of Foreclosures
positive_count_2016 = df_2016.filter(df_2016['Default'] == 1.0).count()

In [None]:
positive_count_2016

In [None]:
positive_count_2017 = df_2017.filter(df_2017['Default'] == 1.0).count()

In [None]:
positive_count_2017

In [None]:
data_size_2016 = df_2016.count()
strat_data_2016 = df_2016.sampleBy('Default', fractions={0: float(positive_count_2016)/ data_size_2016, 1: 1.0})

In [None]:
strat_data_2016.persist()

In [None]:
print(strat_data_2016.groupby('Default').count().toPandas())

In [None]:
data_size_2017 = df_2017.count()
strat_data_2017 = df_2017.sampleBy('Default', fractions={0: float(positive_count_2017)/ data_size_2017, 1: 1.0})

In [None]:
strat_data_2017.persist()

In [None]:
print(strat_data_2017.groupby('Default').count().toPandas())

In [None]:
train_data = strat_data_2016

In [None]:
test_data = strat_data_2017

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import VectorAssembler
from pyspark.mllib.evaluation import MulticlassMetrics

In [None]:
train_data.persist()

In [None]:
feature_cols_2016 = df_2016.drop('Default').drop('id').columns
assembler_2016 = VectorAssembler(inputCols=feature_cols_2016, outputCol='features')

In [None]:
lr = LogisticRegression(labelCol='Default', featuresCol='features')

In [None]:
pipeline = Pipeline(stages=[assembler_2016, lr])

In [None]:
paramGrid = ParamGridBuilder() \
    .addGrid(lr.maxIter, [1, 10, 100]) \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()

In [None]:
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(labelCol='Default', predictionCol='prediction'),
                          numFolds=3)


In [None]:
time_s = time()
cv_model = crossval.fit(train_data)
time_e = time()

print ('Total training time: %f' % (time_e - time_s))

In [None]:
def print_metrics(predictions_and_labels):
    metrics = MulticlassMetrics(predictions_and_labels)
    print('Precision of True ', metrics.precision(1))
    print('Precision of False', metrics.precision(0))
    print('Recall of True    ', metrics.recall(1))
    print('Recall of False   ', metrics.recall(0))
    print('F-1 Score         ', metrics.fMeasure())
    print('Confusion Matrix\n', metrics.confusionMatrix().toArray())

In [None]:
test_data.persist()

In [None]:
predictions = cv_model.transform(test_data)
accuracy = cv_model.getEvaluator().evaluate(predictions)
print('F1 Accuracy: %f' % accuracy)

In [None]:
predictions_and_labels = predictions.select("prediction", "Default").rdd.map(lambda r: (float(r[0]), float(r[1])))

In [None]:
print_metrics(predictions_and_labels)