<table style="border: none" align="left">
   <tr style="border: none">
      <th style="border: none"><font face="verdana" size="5" color="black"><b>Mortgage Complaints using Watson Machine Learning</b></font></th>
      <th style="border: none"><img src="https://github.com/pmservice/customer-satisfaction-prediction/blob/master/app/static/images/ml_icon_gray.png?raw=true" alt="Watson Machine Learning icon" height="40" width="40"></th>
   </tr> 
   <tr style="border: none">
       <td style="border: none"><img src="https://github.com/pmservice/wml-sample-models/raw/master/spark/drug-selection/images/learning_banner-05.png" width="600" alt="Icon"></td>
   </tr>
</table>

This notebook contains steps and code to train a Spark model with a Naive Bayes Multi-Class Text Classification. This notebook introduces commands for getting data, model persistance to Watson Machine Learning repository, model deployment, and scoring.

This notebook uses Python 3.5 and Apache Spark 2.1.

You will use the data. MORTGAGE_ISSUES_TRAINING.csv and MORTGAGE_ISSUES_TEST.csv files, which contains anonymous information about Mortgage Complaints got from the <a href="https://www.consumerfinance.gov/data-research/consumer-complaints/search/?from=0&searchField=all&searchText=&size=25&sort=created_date_desc">Consumer Complaint Database</a>

## Learning goals

This notebook teaches you how to:
-  Publish a sample model in the Watson Machine Learning (WML) repository

You will also learn how to use the WML API to:
-  Deploy a model for online scoring 


## Contents

This notebook contains the following parts:

1.	[Set up the environment](#setup)
2.	[Create spark ml model](#model)
3.	[Store the model](#store)
4.	[Deploy & score](#score)

<a id="setup"></a>
## 1. Set up the environment

Before you use the sample code in this notebook, you must perform the following setup tasks:

- Create a [Watson Machine Learning (WML) Service](https://console.ng.bluemix.net/catalog/services/ibm-watson-machine-learning/) instance (a free plan is offered and information about how to create the instance is [here](https://dataplatform.ibm.com/docs/content/analyze-data/wml-setup.html))
- Create a [Spark Service](https://console.ng.bluemix.net/catalog/services/spark/) instance (an entry plan is offered). [Associated](https://dataplatform.cloud.ibm.com/docs/content/getting-started/assoc-services.html?audience=wdp&linkInPage=true) the Spark service with your Watson Studio project.
- Create a [Db2 Warehouse on Cloud Service](https://console.bluemix.net/catalog/services/db2-warehouse-on-cloud/) instance (an entry plan is offered).
  + Download [MORTGAGE_ISSUES_TRAINING.csv](https://github.com/mohanie/MortgageBento/raw/master/data/MORTGAGE_ISSUES_TRAINING.csv) and [MORTGAGE_ISSUES_TEST.csv](https://github.com/mohanie/MortgageBento/raw/master/data/MORTGAGE_ISSUES_TEST.csv) file from git repository.
  + Click **Open the console** to get started with **Db2 Warehouse on Cloud** icon.
  + Select the **Load Data** and **Desktop** load type.
  + **Drag and drop** previously downloaded file and press **Next**.
  + Select **Schema** to import data and click **New Table**. 
  + Write the name **MORTGAGE_ISSUES_TRAINING** for **new table** than click **Next** to finish data import.
  + Use `,` as **field separator**.
  + Click **Next** to create a table with the uploaded data.
  + Repeat steps to load **MORTGAGE_ISSUES_TEST.csv** file

<a id="model"></a>
## 2. Create the spark machine learning model

In this section you will learn how to prepare data, create an Apache Spark machine learning pipeline, and train a model.

- [2.1 Load the training data from Db2 Warehouse on Cloud](#load)
- [2.2 Explore the data](#explore)
- [2.3 Create the pipeline](#pipe)
- [2.4 Train the model](#train)
- [2.5 Test the accuracy](#accuracy)

### 2.1 Load the training data from Db2 Warehouse on Cloud<a id="load"></a>

Run the following cell to the load the **MORTGAGE_ISSUES_TRAINING** and **MORTGAGE_ISSUES_TEST** table content into the Spark DataFrame.

Enter your authentication data as required. 

**Tip:** The authentication information can be found under the **Service Credentials**  tab of Db2 Warehouse on Cloud service instance created in IBM Cloud. Click **New credential** to create credentials if you do not have any.

In [1]:
from ingest.Connectors import Connectors
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

dashDBloadTraining = {Connectors.DASHDB.HOST              : 'dashdb-entry-yp-dal10-01.services.dal.bluemix.net',
                      Connectors.DASHDB.DATABASE          : 'BLUDB',
                      Connectors.DASHDB.USERNAME          : '***',
                      Connectors.DASHDB.PASSWORD          : '***',
                      Connectors.DASHDB.SOURCE_TABLE_NAME : 'MORTGAGE_ISSUES_TRAINING'}

dashDBloadTest = { Connectors.DASHDB.HOST              : 'dashdb-entry-yp-dal09-10.services.dal.bluemix.net',
                      Connectors.DASHDB.DATABASE          : 'BLUDB',
                      Connectors.DASHDB.USERNAME          : '***',
                      Connectors.DASHDB.PASSWORD          : '***',
                      Connectors.DASHDB.SOURCE_TABLE_NAME : 'MORTGAGE_ISSUES_TEST'}

mortgage_training_data = sqlContext.read.format("com.ibm.spark.discover").options(**dashDBloadTraining).load()
mortgage_test_data = sqlContext.read.format("com.ibm.spark.discover").options(**dashDBloadTest).load()

<a id="explore"></a>
### 2.2 Explore the data

In this subsection you will explore the data.
- Show MORTGAGE_ISSUES_TRAIN Spark DataFrame schema and total rows.
- Show MORTGAGE_ISSUES_TEST Spark DataFrame schema and total rows.
- List most complaints.
- List the complaints categories.

In [2]:
print("MORTGAGE_ISSUES_TRAIN")
mortgage_training_data.printSchema()
mortgage_training_data.show()
print("Total Rows : "+str(mortgage_training_data.count())+"\n")

print("MORTGAGE_ISSUES_TEST")
mortgage_test_data.printSchema()
mortgage_test_data.show()
print("Total Rows : "+str(mortgage_test_data.count())+"\n")

print("The most complaints")
from pyspark.sql.functions import col
mortgage_training_data.groupBy("CONSUMER_COMPLAINT_NARRATIVE") \
        .count() \
        .orderBy(col("count").desc()) \
        .show()

print("Complaints categories.")
mortgage_training_data.select("ISSUE").distinct().show()

MORTGAGE_ISSUES_TRAIN
root
 |-- CONSUMER_COMPLAINT_NARRATIVE: string (nullable = true)
 |-- ISSUE: string (nullable = true)

+----------------------------+--------------------+
|CONSUMER_COMPLAINT_NARRATIVE|               ISSUE|
+----------------------------+--------------------+
|        I AM ATTEMPTING T...|Loan modification...|
|        On XX/XX/2016 I c...|Loan modification...|
|        SHERIFF SALE TODA...|Loan modification...|
|        I am a Mortgage b...|Loan servicing pa...|
|        I recently sold m...|Settlement proces...|
|        SLS had requested...|Settlement proces...|
|        I refinanced my m...|Settlement proces...|
|        On XX/XX/2017 - X...|Settlement proces...|
|        post closing the ...|Settlement proces...|
|        I contacted Resid...|Settlement proces...|
|        On  XXXX    XXXX ...|Settlement proces...|
|        Ditech collects e...|Loan servicing pa...|
|        I recently purcha...|Loan servicing pa...|
|        I have a mortgage...|Loan servicin

<a id="pipe"></a>
### 2.3 Create the pipeline

- regexTokenizer: Tokenization the CONSUMER_COMPLAINT_NARRATIVE text with Regular Expression
- stopwordsRemover: A list of Stop Words to remove
- countVectors: Convert the output of the Tokenization to vectors of token counts.

In [3]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer,HashingTF, IDF
from pyspark.ml.classification import LogisticRegression

regexTokenizer = RegexTokenizer(inputCol="CONSUMER_COMPLAINT_NARRATIVE", outputCol="words")

add_stopwords = [ "a", "about", "above", "after", "again", "against", "all", "am", "an", "and", "any", "are", "as", "at", "be", "because", "been", "before", "being", "below", "between", "both", "but", "by", "could", "did", "do", "does", "doing", "down", "during", "each", "few", "for", "from", "further", "had", "has", "have", "having", "he", "he'd", "he'll", "he's", "her", "here", "here's", "hers", "herself", "him", "himself", "his", "how", "how's", "i", "i'd", "i'll", "i'm", "i've", "if", "in", "into", "is", "it", "it's", "its", "itself", "let's", "me", "more", "most", "my", "myself", "nor", "of", "on", "once", "only", "or", "other", "ought", "our", "ours", "ourselves", "out", "over", "own", "same", "she", "she'd", "she'll", "she's", "should", "so", "some", "such", "than", "that", "that's", "the", "their", "theirs", "them", "themselves", "then", "there", "there's", "these", "they", "they'd", "they'll", "they're", "they've", "this", "those", "through", "to", "too", "under", "until", "up", "very", "was", "we", "we'd", "we'll", "we're", "we've", "were", "what", "what's", "when", "when's", "where", "where's", "which", "while", "who", "who's", "whom", "why", "why's", "with", "would", "you", "you'd", "you'll", "you're", "you've", "your", "yours", "yourself", "yourselves","XXXX", "XX", "My","I","To","Hello" ]
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)

countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)

- StringIndexer encodes a string column of labels to a column of label indices. The indices are in [0, numLabels), ordered by label frequencies, so the most frequent label gets index 0.
- IndexToString convert Encoded Issue Types back to String.

In [4]:
from pyspark.ml.feature import StringIndexer, IndexToString

label_stringIdx = StringIndexer(inputCol = "ISSUE", outputCol = "label").fit(mortgage_training_data)

label_converter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=label_stringIdx.labels)

- Define a Naive Bayes multiclass classification.
- Create the Spark pipeline.

In [5]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import NaiveBayes

NB = NaiveBayes(smoothing=1)

SparkPipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx,NB,label_converter])

<a id="train"></a>
### 2.4 Train the model

Now, you can train your Naive Bayes model by using the previously defined pipeline and train data.

In [6]:
NBmodel = SparkPipeline.fit(mortgage_training_data)
PredictedLabel = NBmodel.transform(mortgage_training_data)
PredictedLabel.select('CONSUMER_COMPLAINT_NARRATIVE', 'ISSUE', 'prediction','predictedLabel').show(5)

+----------------------------+--------------------+----------+--------------------+
|CONSUMER_COMPLAINT_NARRATIVE|               ISSUE|prediction|      predictedLabel|
+----------------------------+--------------------+----------+--------------------+
|        filed s previous ...|Credit decision U...|       0.0|Loan servicing pa...|
|        This is a complai...|Credit decision U...|       7.0|Credit decision U...|
|        I obtained a home...|Credit decision U...|       0.0|Loan servicing pa...|
|        I purchased a hom...|Credit decision U...|       1.0|Loan modification...|
|        It has been 10 we...|Credit decision U...|       7.0|Credit decision U...|
+----------------------------+--------------------+----------+--------------------+
only showing top 5 rows



<a id="accuracy"></a>
### 2.5 Test the accuracy

In [7]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

predictions = NBmodel.transform(mortgage_test_data)
predictions.filter(predictions['prediction'] == 0) \
    .select("CONSUMER_COMPLAINT_NARRATIVE","ISSUE","probability","label","prediction","predictedLabel") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % accuracy)

+------------------------------+------------------------------+------------------------------+-----+----------+------------------------------+
|  CONSUMER_COMPLAINT_NARRATIVE|                         ISSUE|                   probability|label|prediction|                predictedLabel|
+------------------------------+------------------------------+------------------------------+-----+----------+------------------------------+
|In XXXX 2015 we obtained a ...|Loan servicing payments esc...|[1.0,2.4174454747610357E-28...|  0.0|       0.0|Loan servicing payments esc...|
|SENECA MORTGAGE SERVICING X...|Loan servicing payments esc...|[1.0,1.614037692951306E-41,...|  0.0|       0.0|Loan servicing payments esc...|
|We paid our mortgage to Car...|Loan servicing payments esc...|[1.0,4.492948716573328E-65,...|  0.0|       0.0|Loan servicing payments esc...|
|My previous lender sold my ...|    Struggling to pay mortgage|[0.9999999999999987,3.51717...|  3.0|       0.0|Loan servicing payments esc...|

<a id="store"></a>
## 3. Store the model

In this section you will learn how to store sample model in Watson Machine Learning repository by using repository client.

First, install and import the client library.

In [8]:
!rm -rf $PIP_BUILD
!pip install --upgrade watson-machine-learning-client==1.0.260

Requirement already up-to-date: watson-machine-learning-client==1.0.260 in /gpfs/global_fs01/sym_shared/YPProdSpark/user/s0d3-9feb51e0b4d68d-c1ce9e972176/.local/lib/python3.5/site-packages (1.0.260)
Requirement not upgraded as not directly required: tqdm in /usr/local/src/conda3_runtime.v47/home/envs/DSX-Python35-Spark/lib/python3.5/site-packages (from watson-machine-learning-client==1.0.260) (4.19.4)
Requirement not upgraded as not directly required: tabulate in /usr/local/src/conda3_runtime.v47/home/envs/DSX-Python35-Spark/lib/python3.5/site-packages (from watson-machine-learning-client==1.0.260) (0.8.2)
Requirement not upgraded as not directly required: urllib3 in /usr/local/src/conda3_runtime.v47/home/envs/DSX-Python35-Spark/lib/python3.5/site-packages (from watson-machine-learning-client==1.0.260) (1.22)
Requirement not upgraded as not directly required: certifi in /usr/local/src/conda3_runtime.v47/home/envs/DSX-Python35-Spark/lib/python3.5/site-packages (from watson-machine-learn

**Note**: Apache Spark 2.1 is required.

In [9]:
from watson_machine_learning_client import WatsonMachineLearningAPIClient



Authenticate to the Watson Machine Learning service on IBM Cloud.

**Tip**: Authentication information (your credentials) can be found in the <a href="https://console.bluemix.net/docs/services/service_credentials.html#service_credentials" target="_blank" rel="noopener no referrer">Service Credentials</a> tab of the service instance that you created on IBM Cloud. 

If you cannot see the **instance_id** field in **Service Credentials**, click **New credential (+)** to generate new authentication information. 

**Action**: Enter your Watson Machine Learning service instance credentials here.

In [10]:
wml_credentials = {
  "apikey": "***",
  "iam_apikey_description": "***",
  "iam_apikey_name": "***",
  "iam_role_crn": "***",
  "iam_serviceid_crn": "***",
  "instance_id": "***",
  "password": "***",
  "url": "***",
  "username": "***"
}

Create the WatsonMachineLearningAPIClient.

In [11]:
client = WatsonMachineLearningAPIClient(wml_credentials)
client.version

'1.0.260'

#### Prepare the metadata

**Tip**: If the accuracy value falls below the threshold value, retraining action is required.

Prepare the additional information to be saved as model's metadata:
* TRAINING_DATA_REFERENCE
* OUTPUT_DATA_SCHEMA
* EVALUATION_METHOD: **multiclass**
* EVALUATION_METRICS name: **accuracy** (metric name used to evaluate the model)
* EVALUATION_METRICS value: **0.61** (accuracy value calculated few steps above)
* EVALUATION_METRICS threshold: **0.6** (if the accuracy after evaluation using feedback data is below this threshold auto-retraining is triggered)

**Tip**: All required fields can be found on Service Credentials tab of Db2 Warehouse on Cloud service instance created in IBM Cloud.

In [12]:
db2_service_credentials = {
  "port": 50000,
  "db": "BLUDB",
  "username": "***",
  "ssljdbcurl": "***",
  "host": "***",
  "https_url": "***",
  "dsn": "***",
  "hostname": "***",
  "jdbcurl": "***",
  "ssldsn": "***",
  "uri": "***",
  "password": "***"
}

training_data_reference = {
 "name": "MortgageComplaints_training_reference",
 "connection": db2_service_credentials,
 "source": {
  "tablename": "MORTGAGE_ISSUES_TRAINING",
  "type": "dashdb"
 }
}

Define OUTPUT_DATA_SCHEMA for the model

In [13]:
train_data_schema = mortgage_training_data.schema
label_field = next(f for f in train_data_schema.fields if f.name == "ISSUE")
label_field.metadata['values'] = label_stringIdx.labels

Set up modelling roles in OUTPUT_DATA_SCHEMA

In [14]:
from pyspark.sql.types import *

input_fileds = filter(lambda f: f.name != "ISSUE", train_data_schema.fields)

output_data_schema = StructType(list(input_fileds)). \
    add("prediction", DoubleType(), True, {'modeling_role': 'prediction'}). \
    add("predictedLabel", StringType(), True, {'modeling_role': 'decoded-target', 'values': label_stringIdx.labels}). \
    add("probability", ArrayType(DoubleType()), True, {'modeling_role': 'probability'})

In [15]:
import json
print(json.dumps(output_data_schema.jsonValue(),indent=3))

{
   "fields": [
      {
         "metadata": {
            "columnInfo": {
               "columnType": 12,
               "columnLength": 32592,
               "columnSigned": false,
               "columnScale": 0,
               "columnTypeName": "varchar",
               "columnNullable": true,
               "columnPrimaryKey": false
            }
         },
         "type": "string",
         "name": "CONSUMER_COMPLAINT_NARRATIVE",
         "nullable": true
      },
      {
         "metadata": {
            "modeling_role": "prediction"
         },
         "type": "double",
         "name": "prediction",
         "nullable": true
      },
      {
         "metadata": {
            "modeling_role": "decoded-target",
            "values": [
               "Loan servicing payments escrow account",
               "Loan modification collection foreclosure",
               "Trouble during payment process",
               "Struggling to pay mortgage",
               "Application ori

Add all the information to model meta props.

In [17]:
model_properties = {
    client.repository.ModelMetaNames.NAME: "Spark_Mortgage_Complaints",
    client.repository.ModelMetaNames.TRAINING_DATA_REFERENCE: training_data_reference,
    client.repository.ModelMetaNames.EVALUATION_METHOD: "multiclass",
    client.repository.ModelMetaNames.OUTPUT_DATA_SCHEMA: output_data_schema.jsonValue(),
    client.repository.ModelMetaNames.EVALUATION_METRICS: [{"name": "accuracy", "value": accuracy, "threshold": 0.6}]
}

Save the model.

In [18]:
published_model_details = client.repository.store_model(model=NBmodel, meta_props=model_properties, training_data=mortgage_training_data, pipeline=SparkPipeline)

List all Models in the repository

In [19]:
model_uid = client.repository.get_model_uid(published_model_details)
client.repository.list_models()
print(model_uid)

------------------------------------  ------------------------------------  ------------------------  --------------
GUID                                  NAME                                  CREATED                   FRAMEWORK
9f72ffe8-eafb-4f78-9453-b44f09a6860c  Spark_Mortgage_Complaints1            2018-11-26T16:28:24.194Z  mllib-2.1
b93e16f7-882b-47d3-a7b3-7bd007654bd6  Spark_Mortgage_Complaints             2018-11-23T10:19:23.558Z  mllib-2.1
44f31ee1-3888-47d6-afaf-1fae29293131  Keras Consumer Complaints K80         2018-11-15T15:44:47.410Z  tensorflow-1.5
eaadfeb9-5a95-4e92-8b47-0140b04102d8  MortgageComplaintsWithKerasAITest     2018-11-15T11:43:08.960Z  tensorflow-1.5
03b44eb1-b159-4e74-a1b7-c073bf64f2ab  MNIST model                           2018-11-14T14:27:23.079Z  tensorflow-1.5
6f104e6d-31d9-4edb-b07a-f0a15ebc86bf  CC_FUN                                2018-11-08T13:16:00.605Z  tensorflow-1.5
d5960e54-ef94-4dd0-951e-c147db5f4907  CC_F                                  201

Get the model details of our current deployed model.

In [20]:
model_details = client.repository.get_model_details(model_uid)

<a id="score"></a>
## 4. Deploy and score

Deploy previously stored model as web service.

In [21]:
deployment_details = client.deployments.create(model_uid=model_uid, name='Spark_Mortgage_Complaints')



#######################################################################################

Synchronous deployment creation for uid: '9f72ffe8-eafb-4f78-9453-b44f09a6860c' started

#######################################################################################


INITIALIZING
DEPLOY_SUCCESS


------------------------------------------------------------------------------------------------
Successfully finished deployment creation, deployment_uid='910d5f62-77ec-40b6-8ea3-50a2919c5d51'
------------------------------------------------------------------------------------------------




Get the scoring endpoint.

In [22]:
scoring_url = client.deployments.get_scoring_url(deployment_details)
print(scoring_url)

https://us-south.ml.cloud.ibm.com/v3/wml_instances/34399854-47de-41c8-949e-267347c29223/deployments/910d5f62-77ec-40b6-8ea3-50a2919c5d51/online


Test Recommended action from the model

In [23]:
fields = ['CONSUMER_COMPLAINT_NARRATIVE']
values = ['I have a loan in foreclosure with my Mortgage company, Loan # XXXX, and I re-applied for a loan modification on XXXX XXXX, 2016.']

In [24]:
payload_scoring = {"fields": fields,"values": [values]}
scoring_response = client.deployments.score(scoring_url, payload_scoring)

print("Recommended action: " + json.dumps(scoring_response['values'][0][7], indent=3))
#print(json.dumps(scoring_response,indent=3))

Recommended action: "Loan modification collection foreclosure"
