<table style="border: none" align="left">
   <tr style="border: none">
      <th style="border: none"><font face="verdana" size="5" color="black"><b>Mortgage Default Machine Learning Model</b></th>
      <th style="border: none"><img src="https://github.com/pmservice/customer-satisfaction-prediction/blob/master/app/static/images/ml_icon_gray.png?raw=true" alt="Watson Machine Learning icon" height="40" width="40"></th>
   </tr>
</table>

This notebook walks you through these steps:
- Access the data
- Cleanse data for analysis
- Explore data
- Build a classification model
- Save the model in the ML repository with associated meta data


### Step 1: Load Data as Spark Dataframe

Read the three Mortgage files - Insert them as SPARK Dataframes

In [54]:
import dsx_core_utils, requests, os, io
from pyspark.sql import SparkSession
# Add asset from remote connection
df = None
dataSet = dsx_core_utils.get_remote_data_set_info('MORTGAGE_JOIN')
dataSource = dsx_core_utils.get_data_source_info(dataSet['datasource'])
sparkSession = SparkSession(sc).builder.getOrCreate()
# Load JDBC data to Spark dataframe
dbTableOrQuery = (dataSet['schema'] + '.' if(len(dataSet['schema'].strip()) != 0) else '') + dataSet['table']
df = sparkSession.read.format("jdbc").option("url", dataSource['URL']).option("dbtable",dbTableOrQuery).option("user",dataSource['user']).option("password",dataSource['password']).load()


'df2 = None\ndataSet = dsx_core_utils.get_remote_data_set_info(\'m_default\')\ndataSource = dsx_core_utils.get_data_source_info(dataSet[\'datasource\'])\n# Load JDBC data to Spark dataframe\ndbTableOrQuery = (dataSet[\'schema\'] + \'.\' if(len(dataSet[\'schema\'].strip()) != 0) else \'\') + dataSet[\'table\']\ndf2 = sparkSession.read.format("jdbc").option("url", dataSource[\'URL\']).option("dbtable",dbTableOrQuery).option("user",dataSource[\'user\']).option("password",dataSource[\'password\']).load()\n\n# Add asset from remote connection\ndf3 = None\ndataSet = dsx_core_utils.get_remote_data_set_info(\'m_property\')\ndataSource = dsx_core_utils.get_data_source_info(dataSet[\'datasource\'])\n# Load JDBC data to Spark dataframe\ndbTableOrQuery = (dataSet[\'schema\'] + \'.\' if(len(dataSet[\'schema\'].strip()) != 0) else \'\') + dataSet[\'table\']\ndf3 = sparkSession.read.format("jdbc").option("url", dataSource[\'URL\']).option("dbtable",dbTableOrQuery).option("user",dataSource[\'user\']).

In [58]:
merged = df

In [59]:
merged.cache()

DataFrame[ID: int, SALE_PRICE: int, LOCATION: int, INCOME: int, APPLIED_ONLINE: string, RESIDENCE: string, YRS_CURRENT_ADD: int, YRS_CURRENT_EMP: int, NO_OF_CARDS: int, CARD_DEBT: int, CURRENT_LOANS: int, LOAN_AMOUNT: int, MORTGAGE_DEFAULT: string]

In [60]:
print "Dataframe contains these fields:"
print merged.schema.names
print ""

Dataframe contains these fields:
['ID', 'SALE_PRICE', 'LOCATION', 'INCOME', 'APPLIED_ONLINE', 'RESIDENCE', 'YRS_CURRENT_ADD', 'YRS_CURRENT_EMP', 'NO_OF_CARDS', 'CARD_DEBT', 'CURRENT_LOANS', 'LOAN_AMOUNT', 'MORTGAGE_DEFAULT']



View the data within the three data frames created above. 

In [61]:
merged.show(5)

+------+----------+--------+------+--------------+---------+---------------+---------------+-----------+---------+-------------+-----------+----------------+
|    ID|SALE_PRICE|LOCATION|INCOME|APPLIED_ONLINE|RESIDENCE|YRS_CURRENT_ADD|YRS_CURRENT_EMP|NO_OF_CARDS|CARD_DEBT|CURRENT_LOANS|LOAN_AMOUNT|MORTGAGE_DEFAULT|
+------+----------+--------+------+--------------+---------+---------------+---------------+-----------+---------+-------------+-----------+----------------+
|100272|    180000|     130| 43593|             Y|        O|             13|              0|          1|     2315|            0|      12820|               N|
|100273|    145000|     100| 45706|             Y|        O|             17|             16|          2|      373|            1|       7275|               Y|
|100280|    170000|     100| 44202|             Y|        O|              8|              0|          2|      748|            0|      10455|               N|
|100283|    259000|     100| 43800|             Y|  

### Step 2: Merge Files

In [62]:
# Preview 5 rows in pandas
merged.toPandas().head()

Unnamed: 0,ID,SALE_PRICE,LOCATION,INCOME,APPLIED_ONLINE,RESIDENCE,YRS_CURRENT_ADD,YRS_CURRENT_EMP,NO_OF_CARDS,CARD_DEBT,CURRENT_LOANS,LOAN_AMOUNT,MORTGAGE_DEFAULT
0,100272,180000,130,43593,Y,O,13,0,1,2315,0,12820,N
1,100273,145000,100,45706,Y,O,17,16,2,373,1,7275,Y
2,100280,170000,100,44202,Y,O,8,0,2,748,0,10455,N
3,100283,259000,100,43800,Y,O,0,4,2,725,0,7340,N
4,100292,95000,101,56087,N,P,27,18,1,4818,1,8910,N


### Step 3: Simple Data Preparation - Rename some columns and ensure correct data types 
This step is to remove spaces from columns names

In [63]:
merged = merged.withColumnRenamed("INCOME", "Income").withColumnRenamed("APPLIED_ONLINE", "AppliedOnline").withColumnRenamed("RESIDENCE", "Residence")\
                .withColumnRenamed("YRS_CURRENT_ADD", "YearCurrentAddress").withColumnRenamed("YRS_CURRENT_EMP","YearsCurrentEmployer")\
                .withColumnRenamed("NO_OF_CARDS","NumberOfCards").withColumnRenamed("CARD_DEBT","CCDebt").withColumnRenamed("CURRENT_LOANS", "Loans")\
                .withColumnRenamed("LOAN_AMOUNT", "LoanAmount").withColumnRenamed("SALE_PRICE", "SalePrice").withColumnRenamed("LOCATION", "Location")\
                .withColumnRenamed("MORTGAGE_DEFAULT", "MortgageDefault")
merged.toPandas().head(3)

Unnamed: 0,ID,SalePrice,Location,Income,AppliedOnline,Residence,YearCurrentAddress,YearsCurrentEmployer,NumberOfCards,CCDebt,Loans,LoanAmount,MortgageDefault
0,100272,180000,130,43593,Y,O,13,0,1,2315,0,12820,N
1,100273,145000,100,45706,Y,O,17,16,2,373,1,7275,Y
2,100280,170000,100,44202,Y,O,8,0,2,748,0,10455,N


Check data types and re-cast numeric fields to **Integers**

In [64]:
merged.dtypes

[('ID', 'int'),
 ('SalePrice', 'int'),
 ('Location', 'int'),
 ('Income', 'int'),
 ('AppliedOnline', 'string'),
 ('Residence', 'string'),
 ('YearCurrentAddress', 'int'),
 ('YearsCurrentEmployer', 'int'),
 ('NumberOfCards', 'int'),
 ('CCDebt', 'int'),
 ('Loans', 'int'),
 ('LoanAmount', 'int'),
 ('MortgageDefault', 'string')]

In [65]:
merged = merged.select(merged.ID.cast("integer"),merged.Income.cast('integer'),merged.AppliedOnline,merged.Residence,\
                   merged.YearCurrentAddress.cast('integer'),merged.YearsCurrentEmployer.cast('integer'),\
                   merged.NumberOfCards.cast('integer'),merged.CCDebt.cast('integer'),merged.Loans.cast('integer'),\
                   merged.LoanAmount.cast('integer'),merged.SalePrice,merged.Location,merged.MortgageDefault)
merged.dtypes

[('ID', 'int'),
 ('Income', 'int'),
 ('AppliedOnline', 'string'),
 ('Residence', 'string'),
 ('YearCurrentAddress', 'int'),
 ('YearsCurrentEmployer', 'int'),
 ('NumberOfCards', 'int'),
 ('CCDebt', 'int'),
 ('Loans', 'int'),
 ('LoanAmount', 'int'),
 ('SalePrice', 'int'),
 ('Location', 'int'),
 ('MortgageDefault', 'string')]

### Step 4: Data Exploration

1) Obtain some data shape summaries in terms of number of fields and records <br>
2) Perform some exploratory analysis of distributions, scatterplots using two different graphics packages

#### Feel free to play around with the charts to explore other features that are in the data set

In [66]:
print "There are " + str(merged.count()) + " records and " + str(len(merged.columns)) + " fields in the dataset."

There are 419 records and 13 fields in the dataset.


In [67]:
import brunel
df = merged.toPandas()
%brunel data('df') bar x(Residence) y(Income) mean(CCDebt) color(MortgageDefault) stack tooltip(Income) | x(YearCurrentAddress) y(YearsCurrentEmployer) point color(MortgageDefault) tooltip(YearCurrentAddress, YearsCurrentEmployer) :: width=1100, height=400 

<IPython.core.display.Javascript object>

In [None]:
from pixiedust.display import *
display(merged)

### Step 5: Build the Spark pipeline and the Random Forest model
"Pipeline" is an API in SparkML that's used for building models.
Additional information on SparkML: https://spark.apache.org/docs/2.0.2/ml-guide.html

In [69]:
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier

# Prepare string variables so that they can be used by the algorithm
stringIndexer1 = StringIndexer(inputCol='AppliedOnline', outputCol='AppliedOnlineEncoded')
stringIndexer2 = StringIndexer(inputCol='Residence',outputCol='ResidenceEncoded')
stringIndexer3 = StringIndexer(inputCol='MortgageDefault', outputCol='label')

# Instanciate the algorithm
rf=RandomForestClassifier(labelCol="label", featuresCol="features")


# Pipelines API requires that input variables are passed in  a vector
assembler = VectorAssembler(inputCols=["Income", "AppliedOnlineEncoded", "ResidenceEncoded", "YearCurrentAddress", "YearsCurrentEmployer", "NumberOfCards", \
                                       "CCDebt", "Loans", "LoanAmount", "SalePrice", "Location"], outputCol="features")

pipeline = Pipeline(stages=[stringIndexer1, stringIndexer2, stringIndexer3, assembler, rf])

In [70]:
# Split data into train and test datasets
train, test = merged.randomSplit([80.0,20.0], seed=6)

In [71]:
# Build model based upon the pipeline defined in the above cell
model = pipeline.fit(train)

### Step 6: Score the test data set

In [72]:
results = model.transform(test)
results.toPandas().head(3)

Unnamed: 0,ID,Income,AppliedOnline,Residence,YearCurrentAddress,YearsCurrentEmployer,NumberOfCards,CCDebt,Loans,LoanAmount,SalePrice,Location,MortgageDefault,AppliedOnlineEncoded,ResidenceEncoded,label,features,rawPrediction,probability,prediction
0,100282,45715,Y,O,8,14,2,772,1,12985,137000,100,N,0.0,0.0,0.0,"[45715.0, 0.0, 0.0, 8.0, 14.0, 2.0, 772.0, 1.0...","[16.36755712368833, 3.6324428763116674]","[0.8183778561844166, 0.1816221438155834]",0.0
1,100284,45049,Y,P,6,16,2,1345,1,9085,280000,110,N,0.0,1.0,0.0,"[45049.0, 0.0, 1.0, 6.0, 16.0, 2.0, 1345.0, 1....","[9.240368299158618, 10.75963170084138]","[0.4620184149579309, 0.537981585042069]",1.0
2,100285,44974,Y,P,14,18,2,2772,0,9515,264000,130,N,0.0,1.0,0.0,"[44974.0, 0.0, 1.0, 14.0, 18.0, 2.0, 2772.0, 0...","[10.084639136815953, 9.915360863184047]","[0.5042319568407977, 0.4957680431592023]",0.0


### Step 7: Model Evaluation 

In [73]:
print 'Precision model1 = {:.2f}.'.format(results.filter(results.label == results.prediction).count() / float(results.count()))

Precision model1 = 0.68.


In [74]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="label", metricName="areaUnderROC")
print 'Area under ROC curve = {:.2f}.'.format(evaluator.evaluate(results))

Area under ROC curve = 0.68.


In [75]:
evaluatorRF = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction", metricName="areaUnderROC")
accuracy = evaluatorRF.evaluate(results)

print("Accuracy = %g" % accuracy)

Accuracy = 0.676587


### Step 8: Save Model in ML repository


In [76]:
from dsx_ml.ml import save

model_name = "Predict Mortgage Default LOS"
save(name = model_name,
     model = model,
     algorithm_type = 'Classification',
     test_data = test)

## Summary:
You are now at the end of this notebook and should have successfully:
- Performed basic data preparation on the loaded data
- Explored the data graphically
- Build a Spark Model in the form of a pipeline
- Evaluated the model for accuracy levels
- Stored the model into the IBM ML environment, making it ready for deployment