<table style="border: none" align="left">
   <tr style="border: none">
      <th style="border: none"><font face="verdana" size="5" color="black"><b>Mortgage Default Prediction</b></th>
      
   </tr>
</table>

### Step 1: Download Data as Spark Dataframe from Local Storage

Read the three Mortgage files from local storage - Insert them as SPARK Dataframes

In [1]:
import os
from pyspark.sql import SQLContext
# Add asset from file system
df_data_1 = SQLContext(sc).read.csv(os.environ['DSX_PROJECT_DIR']+'/datasets/MortgageCustomer.csv', header='true', inferSchema = 'true')

Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20200802100021-0000
KERNEL_ID = 7a34d030-180a-4b0e-9b04-b066cf5aa65e


KeyError: 'DSX_PROJECT_DIR'

In [13]:
# Add asset from file system
df_data_2 = SQLContext(sc).read.csv(os.environ['DSX_PROJECT_DIR']+'/datasets/Property.csv', header='true', inferSchema = 'true')

In [14]:
# Add asset from file system
df_data_3 = SQLContext(sc).read.csv(os.environ['DSX_PROJECT_DIR']+'/datasets/Default.csv', header='true', inferSchema = 'true')

In [15]:
customer = df_data_1
property = df_data_2
default = df_data_3

In [16]:
customer.cache()
property.cache()
default.cache()

DataFrame[ID: double, MortgageDefault: string]

View the data within the three data frames created above. 

In [17]:
customer.show(5)
#property.show(5)
#default.show(5)

+--------+-------+-------------+--------------+----------------------+-------------------------+---------------+---------------+-----+-----------+
|      ID| Income|AppliedOnline|     Residence|Yrs at Current Address|Yrs with Current Employer|Number of Cards|Creditcard Debt|Loans|Loan Amount|
+--------+-------+-------------+--------------+----------------------+-------------------------+---------------+---------------+-----+-----------+
|100522.0|43982.0|          YES|Owner Occupier|                  13.0|                     11.0|            2.0|         1055.0|  0.0|     9405.0|
|101756.0|59944.0|          YES|Owner Occupier|                  20.0|                     11.0|            2.0|         3894.0|  0.0|     9880.0|
|101354.0|57718.0|          YES|Owner Occupier|                  25.0|                     16.0|            2.0|         1555.0|  1.0|     6285.0|
|100512.0|45621.0|          YES|Owner Occupier|                   1.0|                     19.0|            1.0|      

### Step 2: Merge Files

In [18]:
# Join the Customer, Property and Default tables together with ID being the key field.
# Keep all fields from the Customer and Default tables but only SalePrice and Location from the Property table

merged = customer.join(property, customer['ID'] == property['ID'])\
                   .join(default, customer['ID']==default['ID'])\
                   .select(customer['ID'],customer['Income'],customer['AppliedOnline'],customer['Residence'],\
                           customer['Yrs at Current Address'],customer['Yrs with Current Employer'],customer['Number of Cards'],\
                           customer['Creditcard Debt'],customer['Loans'],customer['Loan Amount'],\
                           property['SalePrice'], property['Location'], default['MortgageDefault'])

# Preview  5 rows
merged.toPandas().head()

Unnamed: 0,ID,Income,AppliedOnline,Residence,Yrs at Current Address,Yrs with Current Employer,Number of Cards,Creditcard Debt,Loans,Loan Amount,SalePrice,Location,MortgageDefault
0,100522.0,43982.0,YES,Owner Occupier,13.0,11.0,2.0,1055.0,0.0,9405.0,500000,110,NO
1,101756.0,59944.0,YES,Owner Occupier,20.0,11.0,2.0,3894.0,0.0,9880.0,750000,110,NO
2,101354.0,57718.0,YES,Owner Occupier,25.0,16.0,2.0,1555.0,1.0,6285.0,155000,130,YES
3,100512.0,45621.0,YES,Owner Occupier,1.0,19.0,1.0,1878.0,0.0,9260.0,195000,100,YES
4,100537.0,45081.0,NO,Owner Occupier,14.0,15.0,2.0,713.0,1.0,8430.0,140000,110,NO


### Step 3: Simple Data Preparation - Rename some columns and ensure correct data types 
This step is to remove spaces from columns names

In [19]:
merged = merged.withColumnRenamed("Yrs at Current Address", "YearCurrentAddress").withColumnRenamed("Yrs with Current Employer","YearsCurrentEmployer")\
                .withColumnRenamed("Number of Cards","NumberOfCards").withColumnRenamed("Creditcard Debt","CCDebt").withColumnRenamed("Loan Amount", "LoanAmount")
merged.toPandas().head(3)

Unnamed: 0,ID,Income,AppliedOnline,Residence,YearCurrentAddress,YearsCurrentEmployer,NumberOfCards,CCDebt,Loans,LoanAmount,SalePrice,Location,MortgageDefault
0,100522.0,43982.0,YES,Owner Occupier,13.0,11.0,2.0,1055.0,0.0,9405.0,500000,110,NO
1,101756.0,59944.0,YES,Owner Occupier,20.0,11.0,2.0,3894.0,0.0,9880.0,750000,110,NO
2,101354.0,57718.0,YES,Owner Occupier,25.0,16.0,2.0,1555.0,1.0,6285.0,155000,130,YES


Check data types and re-cast numeric fields to **Integers**

In [20]:
merged.dtypes

[('ID', 'double'),
 ('Income', 'double'),
 ('AppliedOnline', 'string'),
 ('Residence', 'string'),
 ('YearCurrentAddress', 'double'),
 ('YearsCurrentEmployer', 'double'),
 ('NumberOfCards', 'double'),
 ('CCDebt', 'double'),
 ('Loans', 'double'),
 ('LoanAmount', 'double'),
 ('SalePrice', 'int'),
 ('Location', 'int'),
 ('MortgageDefault', 'string')]

In [21]:
merged = merged.select(merged.ID.cast("integer"),merged.Income.cast('integer'),merged.AppliedOnline,merged.Residence,\
                   merged.YearCurrentAddress.cast('integer'),merged.YearsCurrentEmployer.cast('integer'),\
                   merged.NumberOfCards.cast('integer'),merged.CCDebt.cast('integer'),merged.Loans.cast('integer'),\
                   merged.LoanAmount.cast('integer'),merged.SalePrice,merged.Location,merged.MortgageDefault)
merged.dtypes

[('ID', 'int'),
 ('Income', 'int'),
 ('AppliedOnline', 'string'),
 ('Residence', 'string'),
 ('YearCurrentAddress', 'int'),
 ('YearsCurrentEmployer', 'int'),
 ('NumberOfCards', 'int'),
 ('CCDebt', 'int'),
 ('Loans', 'int'),
 ('LoanAmount', 'int'),
 ('SalePrice', 'int'),
 ('Location', 'int'),
 ('MortgageDefault', 'string')]

### Step 4: Build the Spark pipeline and the Random Forest model
"Pipeline" is an API in SparkML that's used for building models.
Additional information on SparkML: https://spark.apache.org/docs/2.0.2/ml-guide.html

In [22]:
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier

# Prepare string variables so that they can be used by the algorithm
stringIndexer1 = StringIndexer(inputCol='AppliedOnline', outputCol='AppliedOnlineEncoded')
stringIndexer2 = StringIndexer(inputCol='Residence',outputCol='ResidenceEncoded')
stringIndexer3 = StringIndexer(inputCol='MortgageDefault', outputCol='label')

# Instanciate the algorithm
rf=RandomForestClassifier(labelCol="label", featuresCol="features")


# Pipelines API requires that input variables are passed in  a vector
assembler = VectorAssembler(inputCols=["Income", "AppliedOnlineEncoded", "ResidenceEncoded", "YearCurrentAddress", "YearsCurrentEmployer", "NumberOfCards", \
                                       "CCDebt", "Loans", "LoanAmount", "SalePrice", "Location"], outputCol="features")

pipeline = Pipeline(stages=[stringIndexer1, stringIndexer2, stringIndexer3, assembler, rf])

In [23]:
# Split data into train and test datasets
train, test = merged.randomSplit([80.0,20.0], seed=6)

In [27]:
# Build model based upon the pipeline defined in the above cell
model = pipeline.fit(train)

### Step 6: Score the test data set

In [30]:
results = model.transform(test)
results.toPandas().head(3)

Unnamed: 0,ID,Income,AppliedOnline,Residence,YearCurrentAddress,YearsCurrentEmployer,NumberOfCards,CCDebt,Loans,LoanAmount,SalePrice,Location,MortgageDefault,AppliedOnlineEncoded,ResidenceEncoded,label,features,rawPrediction,probability,prediction
0,100282,45715,YES,Owner Occupier,8,14,2,772,1,12985,137000,100,NO,0.0,0.0,0.0,"[45715.0, 0.0, 0.0, 8.0, 14.0, 2.0, 772.0, 1.0...","[14.298486693013132, 5.701513306986869]","[0.7149243346506566, 0.2850756653493435]",0.0
1,100284,45049,YES,Public Housing,6,16,2,1345,1,9085,280000,110,NO,0.0,2.0,0.0,"[45049.0, 0.0, 2.0, 6.0, 16.0, 2.0, 1345.0, 1....","[9.209687987949586, 10.790312012050412]","[0.4604843993974793, 0.5395156006025206]",1.0
2,100285,44974,YES,Public Housing,14,18,2,2772,0,9515,264000,130,NO,0.0,2.0,0.0,"[44974.0, 0.0, 2.0, 14.0, 18.0, 2.0, 2772.0, 0...","[7.9321583794028125, 12.067841620597187]","[0.39660791897014064, 0.6033920810298594]",1.0


In [31]:
evaluatorRF = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction", metricName="areaUnderROC")
accuracy = evaluatorRF.evaluate(results)

print("Accuracy = %g" % accuracy)

NameError: name 'BinaryClassificationEvaluator' is not defined

### Step 8: Save Model in ML repository


In [None]:
from dsx_ml.ml import save

model_name = "Predict Mortgage Default model- openscale"
save(name = model_name,
     model = model,
     algorithm_type = 'Classification',
     test_data = test)