# Credit Risk Modeling with Spark

Credit Approval Application processing by Risk Profile

## Explore Data

In [52]:

# @hidden_cell
# This connection object is used to access your data and contains your credentials.
# You might want to remove those credentials before you share your notebook.

from project_lib import Project
project = Project.access()

from pyspark.sql import SparkSession
sparkSession = SparkSession(spark).builder.getOrCreate()

#APPLICATIONS_credentials = project.get_connected_data(name="APPLICATIONS")

#OMIT, JUST AN EXAMPLE OF HOW YOU WOULD CONNECT VIA JDBC
#data_df_1 = sparkSession.read.format('jdbc') \
#    .option('url', 'jdbc:db2://{}:{}/{}'.format(APPLICATIONS_credentials['host'],APPLICATIONS_credentials['port'],APPLICATIONS_credentials['database'])) \
#    .option('dbtable', 'PQS03924.APPLICATIONS') \
#    .option('user', APPLICATIONS_credentials['username']) \
#    .option('password', APPLICATIONS_credentials['password']).load()
#data_df_1.show(5)



In [53]:

from pyspark.sql import SQLContext


df_data = SQLContext(sc).read.csv('/project_data/data_asset/credit_risk_training_1_csv_88gm6dxdayi44o402rl5udxif.csv', header='true', inferSchema = 'true')
df_data.show(5)


+--------------+------------+--------------------+-----------+----------+---------------+------------------+------------------+------+------------+------------------------+-----------------+---+----------------+-------+--------------------+-------+----------+---------+-------------+-------+
|CheckingStatus|LoanDuration|       CreditHistory|LoanPurpose|LoanAmount|ExistingSavings|EmploymentDuration|InstallmentPercent|   Sex|OthersOnLoan|CurrentResidenceDuration|     OwnsProperty|Age|InstallmentPlans|Housing|ExistingCreditsCount|    Job|Dependents|Telephone|ForeignWorker|   Risk|
+--------------+------------+--------------------+-----------+----------+---------------+------------------+------------------+------+------------+------------------------+-----------------+---+----------------+-------+--------------------+-------+----------+---------+-------------+-------+
|      0_to_200|          31|credits_paid_to_date|      other|      1889|     100_to_500|            less_1|                

In [54]:
df_data.printSchema()

root
 |-- CheckingStatus: string (nullable = true)
 |-- LoanDuration: integer (nullable = true)
 |-- CreditHistory: string (nullable = true)
 |-- LoanPurpose: string (nullable = true)
 |-- LoanAmount: integer (nullable = true)
 |-- ExistingSavings: string (nullable = true)
 |-- EmploymentDuration: string (nullable = true)
 |-- InstallmentPercent: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- OthersOnLoan: string (nullable = true)
 |-- CurrentResidenceDuration: integer (nullable = true)
 |-- OwnsProperty: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- InstallmentPlans: string (nullable = true)
 |-- Housing: string (nullable = true)
 |-- ExistingCreditsCount: integer (nullable = true)
 |-- Job: string (nullable = true)
 |-- Dependents: integer (nullable = true)
 |-- Telephone: string (nullable = true)
 |-- ForeignWorker: string (nullable = true)
 |-- Risk: string (nullable = true)



### Age by Risk (is Age and Gender Bias?)

In [55]:
import pixiedust

display(df_data)

### Interactive Data Browser

In [None]:
display(df_data)

CheckingStatus,LoanDuration,CreditHistory,LoanPurpose,LoanAmount,ExistingSavings,EmploymentDuration,InstallmentPercent,Sex,OthersOnLoan,CurrentResidenceDuration,OwnsProperty,Age,InstallmentPlans,Housing,ExistingCreditsCount,Job,Dependents,Telephone,ForeignWorker,Risk
less_0,13,all_credits_paid_back,radio_tv,1045,100_to_500,less_1,2,female,none,1,real_estate,24,none,rent,1,skilled,1,none,yes,No Risk
greater_200,23,credits_paid_to_date,car_used,2963,greater_1000,greater_7,4,male,none,4,car_other,46,none,own,2,skilled,1,none,yes,Risk
less_0,11,all_credits_paid_back,furniture,2016,less_100,less_1,1,female,none,1,car_other,34,bank,own,1,unemployed,1,none,yes,No Risk
0_to_200,25,outstanding_credit,furniture,1244,100_to_500,1_to_4,2,female,none,3,savings_insurance,40,stores,free,2,skilled,1,yes,yes,No Risk
less_0,4,all_credits_paid_back,car_new,250,less_100,unemployed,1,female,none,2,real_estate,19,none,rent,1,skilled,1,none,yes,No Risk
0_to_200,17,prior_payments_delayed,car_new,4188,greater_1000,greater_7,4,male,none,3,unknown,43,none,own,2,skilled,1,none,yes,No Risk
0_to_200,44,prior_payments_delayed,appliances,5045,unknown,greater_7,4,male,guarantor,4,unknown,36,stores,free,2,unskilled,1,yes,yes,No Risk
no_checking,39,prior_payments_delayed,repairs,6418,500_to_1000,4_to_7,5,male,none,4,car_other,43,none,free,2,management_self-employed,1,yes,no,No Risk
less_0,9,prior_payments_delayed,car_new,4609,less_100,1_to_4,3,male,none,2,savings_insurance,36,none,rent,1,skilled,2,none,yes,No Risk
less_0,12,all_credits_paid_back,furniture,250,less_100,less_1,1,male,none,2,real_estate,19,none,rent,1,skilled,1,none,yes,No Risk


## Create a Model

Split the data for training and for testing the model

In [57]:
spark_df = df_data
(train_data, test_data) = spark_df.randomSplit([0.8, 0.2], 24)

MODEL_NAME = "AIOS Spark German Risk Model - Final"
DEPLOYMENT_NAME = "AIOS Spark German Risk Deployment - Final"

print("Number of records for training: " + str(train_data.count()))
print("Number of records for evaluation: " + str(test_data.count()))

spark_df.printSchema()

Number of records for training: 4016
Number of records for evaluation: 984
root
 |-- CheckingStatus: string (nullable = true)
 |-- LoanDuration: integer (nullable = true)
 |-- CreditHistory: string (nullable = true)
 |-- LoanPurpose: string (nullable = true)
 |-- LoanAmount: integer (nullable = true)
 |-- ExistingSavings: string (nullable = true)
 |-- EmploymentDuration: string (nullable = true)
 |-- InstallmentPercent: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- OthersOnLoan: string (nullable = true)
 |-- CurrentResidenceDuration: integer (nullable = true)
 |-- OwnsProperty: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- InstallmentPlans: string (nullable = true)
 |-- Housing: string (nullable = true)
 |-- ExistingCreditsCount: integer (nullable = true)
 |-- Job: string (nullable = true)
 |-- Dependents: integer (nullable = true)
 |-- Telephone: string (nullable = true)
 |-- ForeignWorker: string (nullable = true)
 |-- Risk: string (nullable = 

### Create the steps for the pipeline

Encode non-numeric values with numeric values for the model. These will be added to the pipeline. Notice the outputCol names are part of the VectorAssembler later.

In [58]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, IndexToString, VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline, Model

si_CheckingStatus = StringIndexer(inputCol = 'CheckingStatus', outputCol = 'CheckingStatus_IX')
si_CreditHistory = StringIndexer(inputCol = 'CreditHistory', outputCol = 'CreditHistory_IX')
si_LoanPurpose = StringIndexer(inputCol = 'LoanPurpose', outputCol = 'LoanPurpose_IX')
si_ExistingSavings = StringIndexer(inputCol = 'ExistingSavings', outputCol = 'ExistingSavings_IX')
si_EmploymentDuration = StringIndexer(inputCol = 'EmploymentDuration', outputCol = 'EmploymentDuration_IX')
si_Sex = StringIndexer(inputCol = 'Sex', outputCol = 'Sex_IX')
si_OthersOnLoan = StringIndexer(inputCol = 'OthersOnLoan', outputCol = 'OthersOnLoan_IX')
si_OwnsProperty = StringIndexer(inputCol = 'OwnsProperty', outputCol = 'OwnsProperty_IX')
si_InstallmentPlans = StringIndexer(inputCol = 'InstallmentPlans', outputCol = 'InstallmentPlans_IX')
si_Housing = StringIndexer(inputCol = 'Housing', outputCol = 'Housing_IX')
si_Job = StringIndexer(inputCol = 'Job', outputCol = 'Job_IX')
si_Telephone = StringIndexer(inputCol = 'Telephone', outputCol = 'Telephone_IX')
si_ForeignWorker = StringIndexer(inputCol = 'ForeignWorker', outputCol = 'ForeignWorker_IX')

This is the value we want to predict, Risk.

In [59]:
#INDEX THE RISK COLUMN TO THE OUTPUT COLUMN RISK
si_Label = StringIndexer(inputCol="Risk", outputCol="label").fit(spark_df)


label_converter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=si_Label.labels)

In [60]:
#DEBUG TO SEE THE LABELS
si_Label.labels

['No Risk', 'Risk']

In [61]:
va_features = VectorAssembler(inputCols=["CheckingStatus_IX", "CreditHistory_IX", "LoanPurpose_IX", "ExistingSavings_IX", "EmploymentDuration_IX", "Sex_IX", \
                                         "OthersOnLoan_IX", "OwnsProperty_IX", "InstallmentPlans_IX", "Housing_IX", "Job_IX", "Telephone_IX", "ForeignWorker_IX", \
                                         "LoanDuration", "LoanAmount", "InstallmentPercent", "CurrentResidenceDuration", "LoanDuration", "Age", "ExistingCreditsCount", \
                                         "Dependents"], outputCol="features")

In [62]:
from pyspark.ml.classification import RandomForestClassifier
classifier = RandomForestClassifier(featuresCol="features")

pipeline = Pipeline(stages=[si_CheckingStatus, si_CreditHistory, si_EmploymentDuration, si_ExistingSavings, si_ForeignWorker, si_Housing, si_InstallmentPlans, si_Job, si_LoanPurpose, si_OthersOnLoan,\
                               si_OwnsProperty, si_Sex, si_Telephone, si_Label, va_features, classifier, label_converter])
model = pipeline.fit(train_data)

In [63]:
predictions = model.transform(test_data)
evaluatorDT = BinaryClassificationEvaluator(rawPredictionCol="prediction")
area_under_curve = evaluatorDT.evaluate(predictions)

#default evaluation is areaUnderROC
print("areaUnderROC = %g" % area_under_curve)

areaUnderROC = 0.697499


## Saving the Model

In [64]:
from project_lib import Project

project = Project.access()

In [65]:
model.save( "credit-risk-model" )

Py4JJavaError: An error occurred while calling o4410.save.
: java.io.IOException: Path credit-risk-model already exists. To overwrite it, please use write.overwrite().save(path) for Scala and use write().overwrite().save(path) for Java and Python.
	at org.apache.spark.ml.util.FileSystemOverwrite.handleOverwrite(ReadWrite.scala:702)
	at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:179)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:819)


In [66]:
train_data.write.save( "training-data.parquet" )

AnalysisException: 'path file:/home/spark/shared/training-data.parquet already exists.;'

In [67]:
project.save_data("credit-risk-model", "credit-risk-model", set_project_asset = True, overwrite = True)
project.save_data("training-data.parquet", "training-data.parquet", set_project_asset = True, overwrite = True)

{'file_name': 'data_asset/training-data.parquet',
 'message': 'File saved to project storage.',
 'asset_id': 'ca6bc6af-6568-4317-9452-3828f4d26500'}

In [68]:
#DEBUG
!ls

conda			logs	      training-data.parquet
credit-risk-model	pixiedust.db  user-libs
library_content.tar.gz	spark-events  wml


In [1]:
!pwd

Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20201116191740-0000
KERNEL_ID = 0366e55d-0e76-45dd-a9bb-790164aa3dd0
/home/spark/shared


### Watson Machine Learning

More info: https://www.ibm.com/support/producthub/icpdata/docs/content/SSQNUZ_current/wsj/wmls/wmls-deploy-python.html

In [69]:
from watson_machine_learning_client import WatsonMachineLearningAPIClient


In [70]:
import sys,os,os.path
token = os.environ['USER_ACCESS_TOKEN']

In [71]:
wml_credentials = {
"token": token,
"instance_id" : "wml_local",
"url": "https://cp4d-cpd-cp4d.sjsu-cp4dv301-505749-73aebe06726e634c608c4167edcc2aeb-0000.sjc04.containers.appdomain.cloud",
"version": "3.0.0"
}

In [72]:
#Initialize the client with the credentials:
client = WatsonMachineLearningAPIClient(wml_credentials)

In [73]:
#Create a deployment space if it doesn't exit

space_details = client.spaces.store(meta_props={client.spaces.ConfigurationMetaNames.NAME: "Credit Deployment Space"})
space_id = client.spaces.get_uid(space_details)
space_id 

'5ca51e99-f4a0-43b8-a693-58595a1164c7'

In [74]:
#Set the space.
client.set.default_space(space_id)

'SUCCESS'

In [75]:
# Store the trained model to the repository and get the model ID. To do so, enter the absolute path of the trained model file, 
# as well as the model name, model type and model runtime. Note that the model name cannot contain characters such as 
# [ ] { } | \ ” % ~ # < > that conflict with forming a valid HTTP request.

#OTHER TYPES HERE
#https://www.ibm.com/support/producthub/icpdata/docs/content/SSQNUZ_current/wsj/wmls/wmls-deploy-python-types.html

# Model metadata
software_spec_uid = client.software_specifications.get_uid_by_name('spark-mllib_2.4')
meta_props = {
    client.repository.ModelMetaNames.NAME: 'Spark Credit Risk Model',
    client.repository.ModelMetaNames.SOFTWARE_SPEC_UID: software_spec_uid,
    client.repository.ModelMetaNames.TYPE: 'mllib_2.4'
}                              


In [76]:
model

PipelineModel_159ad8baa63c

In [77]:
model_artifact = client.repository.store_model(model,
                                              meta_props=meta_props,
                                              training_data=train_data,
                                              training_target=test_data,
                                              pipeline=pipeline)
model_uid = client.repository.get_model_uid(model_artifact)
print("Model UID = " + model_uid)

Model UID = 8da2b736-3a1e-4831-89be-a9c59c247483


In [78]:
client.repository.list_models()

------------------------------------  -----------------------  ------------------------  ---------
GUID                                  NAME                     CREATED                   TYPE
8da2b736-3a1e-4831-89be-a9c59c247483  Spark Credit Risk Model  2020-11-13T21:40:14.002Z  mllib_2.4
------------------------------------  -----------------------  ------------------------  ---------


In [79]:
#Create an online deployment

dep_details = client.deployments.create(artifact_uid=model_uid,meta_props={
     client.deployments.ConfigurationMetaNames.NAME:"Spark Credit Risk Deployment",client.deployments.ConfigurationMetaNames.ONLINE:{}})




#######################################################################################

Synchronous deployment creation for uid: '8da2b736-3a1e-4831-89be-a9c59c247483' started

#######################################################################################


initializing.............
ready


------------------------------------------------------------------------------------------------
Successfully finished deployment creation, deployment_uid='0ac19d8d-bbf4-4cac-8a53-d323937e431e'
------------------------------------------------------------------------------------------------


