In [3]:
print(sc)

<SparkContext master=spark://spark-master221-svc:7077 appName=py3-spark221-notebook>


In [4]:
import os
from pyspark.sql import SQLContext
import json
# Add asset from file system
pd_data = SQLContext(sc).read.csv(os.environ['DSX_PROJECT_DIR']+'/datasets/credit_risk_training.csv', header='true', inferSchema = 'true')
pd_data.show(5)

# Add asset from file system
df_data = SQLContext(sc).read.csv(os.environ['DSX_PROJECT_DIR']+'/datasets/german_credit_data_biased_training.csv', header='true', inferSchema = 'true')
df_data.show(5)


+--------------+------------+--------------------+-----------+----------+---------------+------------------+------------------+------+------------+------------------------+-----------------+---+----------------+-------+--------------------+-------+----------+---------+-------------+-------+
|CheckingStatus|LoanDuration|       CreditHistory|LoanPurpose|LoanAmount|ExistingSavings|EmploymentDuration|InstallmentPercent|   Sex|OthersOnLoan|CurrentResidenceDuration|     OwnsProperty|Age|InstallmentPlans|Housing|ExistingCreditsCount|    Job|Dependents|Telephone|ForeignWorker|   Risk|
+--------------+------------+--------------------+-----------+----------+---------------+------------------+------------------+------+------------+------------------------+-----------------+---+----------------+-------+--------------------+-------+----------+---------+-------------+-------+
|      0_to_200|          31|credits_paid_to_date|      other|      1889|     100_to_500|            less_1|                

In [5]:
df_data.printSchema()

root
 |-- CheckingStatus: string (nullable = true)
 |-- LoanDuration: integer (nullable = true)
 |-- CreditHistory: string (nullable = true)
 |-- LoanPurpose: string (nullable = true)
 |-- LoanAmount: integer (nullable = true)
 |-- ExistingSavings: string (nullable = true)
 |-- EmploymentDuration: string (nullable = true)
 |-- InstallmentPercent: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- OthersOnLoan: string (nullable = true)
 |-- CurrentResidenceDuration: integer (nullable = true)
 |-- OwnsProperty: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- InstallmentPlans: string (nullable = true)
 |-- Housing: string (nullable = true)
 |-- ExistingCreditsCount: integer (nullable = true)
 |-- Job: string (nullable = true)
 |-- Dependents: integer (nullable = true)
 |-- Telephone: string (nullable = true)
 |-- ForeignWorker: string (nullable = true)
 |-- Risk: string (nullable = true)



In [6]:
print("Number of records: " + str(df_data.count()))

Number of records: 5000


In [7]:
spark_df = df_data
(train_data, test_data) = spark_df.randomSplit([0.8, 0.2], 24)

MODEL_NAME = "Spark German Risk Model - Final"
DEPLOYMENT_NAME = "Spark German Risk Deployment - Final"

print("Number of records for training: " + str(train_data.count()))
print("Number of records for evaluation: " + str(test_data.count()))

spark_df.printSchema()

Number of records for training: 4016
Number of records for evaluation: 984
root
 |-- CheckingStatus: string (nullable = true)
 |-- LoanDuration: integer (nullable = true)
 |-- CreditHistory: string (nullable = true)
 |-- LoanPurpose: string (nullable = true)
 |-- LoanAmount: integer (nullable = true)
 |-- ExistingSavings: string (nullable = true)
 |-- EmploymentDuration: string (nullable = true)
 |-- InstallmentPercent: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- OthersOnLoan: string (nullable = true)
 |-- CurrentResidenceDuration: integer (nullable = true)
 |-- OwnsProperty: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- InstallmentPlans: string (nullable = true)
 |-- Housing: string (nullable = true)
 |-- ExistingCreditsCount: integer (nullable = true)
 |-- Job: string (nullable = true)
 |-- Dependents: integer (nullable = true)
 |-- Telephone: string (nullable = true)
 |-- ForeignWorker: string (nullable = true)
 |-- Risk: string (nullable = 

# Train the model

In [8]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, IndexToString, VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline, Model

si_CheckingStatus = StringIndexer(inputCol = 'CheckingStatus', outputCol = 'CheckingStatus_IX')
si_CreditHistory = StringIndexer(inputCol = 'CreditHistory', outputCol = 'CreditHistory_IX')
si_LoanPurpose = StringIndexer(inputCol = 'LoanPurpose', outputCol = 'LoanPurpose_IX')
si_ExistingSavings = StringIndexer(inputCol = 'ExistingSavings', outputCol = 'ExistingSavings_IX')
si_EmploymentDuration = StringIndexer(inputCol = 'EmploymentDuration', outputCol = 'EmploymentDuration_IX')
si_Sex = StringIndexer(inputCol = 'Sex', outputCol = 'Sex_IX')
si_OthersOnLoan = StringIndexer(inputCol = 'OthersOnLoan', outputCol = 'OthersOnLoan_IX')
si_OwnsProperty = StringIndexer(inputCol = 'OwnsProperty', outputCol = 'OwnsProperty_IX')
si_InstallmentPlans = StringIndexer(inputCol = 'InstallmentPlans', outputCol = 'InstallmentPlans_IX')
si_Housing = StringIndexer(inputCol = 'Housing', outputCol = 'Housing_IX')
si_Job = StringIndexer(inputCol = 'Job', outputCol = 'Job_IX')
si_Telephone = StringIndexer(inputCol = 'Telephone', outputCol = 'Telephone_IX')
si_ForeignWorker = StringIndexer(inputCol = 'ForeignWorker', outputCol = 'ForeignWorker_IX')

In [9]:
si_Label = StringIndexer(inputCol="Risk", outputCol="label").fit(spark_df)
label_converter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=si_Label.labels)

In [10]:
va_features = VectorAssembler(inputCols=["CheckingStatus_IX", "CreditHistory_IX", "LoanPurpose_IX", "ExistingSavings_IX", "EmploymentDuration_IX", "Sex_IX", \
                                         "OthersOnLoan_IX", "OwnsProperty_IX", "InstallmentPlans_IX", "Housing_IX", "Job_IX", "Telephone_IX", "ForeignWorker_IX", \
                                         "LoanDuration", "LoanAmount", "InstallmentPercent", "CurrentResidenceDuration", "LoanDuration", "Age", "ExistingCreditsCount", \
                                         "Dependents"], outputCol="features")

In [11]:
from pyspark.ml.classification import RandomForestClassifier
classifier = RandomForestClassifier(featuresCol="features")

pipeline = Pipeline(stages=[si_CheckingStatus, si_CreditHistory, si_EmploymentDuration, si_ExistingSavings, si_ForeignWorker, si_Housing, si_InstallmentPlans, si_Job, si_LoanPurpose, si_OthersOnLoan,\
                               si_OwnsProperty, si_Sex, si_Telephone, si_Label, va_features, classifier, label_converter])
model = pipeline.fit(train_data)

In [12]:
predictions = model.transform(test_data)
evaluatorDT = BinaryClassificationEvaluator(rawPredictionCol="prediction")
area_under_curve = evaluatorDT.evaluate(predictions)

#default evaluation is areaUnderROC
print("areaUnderROC = %g" % area_under_curve)

areaUnderROC = 0.703615


# Save the model in Analytics Project

In [13]:
from dsx_ml.ml import save

model_name = "CreditScore_ML_model_1022"
save(name = model_name,
     model = model,
     algorithm_type = 'Classification',
     test_data = test_data)

Using TensorFlow backend.


{'path': '/user-home/999/DSX_Projects/german-credit-risk-mode/models/CreditScore_ML_model_1022/1',
 'scoring_endpoint': 'https://dsxl-api/v3/project/score/Python35/spark-2.2/german-credit-risk-mode/CreditScore_ML_model_1022/1'}

# Save the Model to WML

In [14]:
from watson_machine_learning_client import WatsonMachineLearningAPIClient

In [15]:
wml_credentials = {
    "url": "https://10.1.1.1",
    "username": "admin",
    "password": "password",
    "instance_id": "icp"               
}

wml_client = WatsonMachineLearningAPIClient(wml_credentials)

In [16]:
model_props = {
    wml_client.repository.ModelMetaNames.NAME: "CreditScore_ML_model_1022",
    wml_client.repository.ModelMetaNames.EVALUATION_METHOD: "binary",
}

In [17]:
published_model_details = wml_client.repository.store_model(model=model, meta_props=model_props, training_data=train_data, pipeline=pipeline)

In [18]:
wml_client.repository.list_models()

------------------------------------  ----------------------------  ------------------------  ---------
GUID                                  NAME                          CREATED                   FRAMEWORK
43e9a417-f322-4672-8bee-db3f46a42a06  CreditScore_ML_model_1022     2019-10-22T12:21:40.458Z  mllib-2.2
8736acd9-5803-4d8f-b977-3fd134fae6bd  churn-model-in-cp4d-wml12     2019-10-20T20:00:22.622Z  mllib-2.2
326da861-1298-4f8c-9ac0-6bbaa26f2e8e  churn-model-in-cp4d-wml12     2019-10-19T21:25:15.266Z  mllib-2.2
636eb9a3-5db4-4440-ae16-9ab1cd9f9101  CreditScore_ML_model_spark22  2019-10-18T15:04:13.970Z  mllib-2.2
9616c14c-4ed9-405e-8ecf-92b5a43d8b14  churn-model-in-cp4d-wml12     2019-10-18T03:53:47.674Z  mllib-2.2
------------------------------------  ----------------------------  ------------------------  ---------


# Deploy the model

In [19]:
model_uid = wml_client.repository.get_model_uid(published_model_details)
deployment_details = wml_client.deployments.create(model_uid, 'CreditScore_ML_model_1022')



#######################################################################################

Synchronous deployment creation for uid: '43e9a417-f322-4672-8bee-db3f46a42a06' started

#######################################################################################


INITIALIZING
DEPLOY_IN_PROGRESS.
DEPLOY_SUCCESS


------------------------------------------------------------------------------------------------
Successfully finished deployment creation, deployment_uid='aeb581bb-f4b2-4405-a33e-855440908444'
------------------------------------------------------------------------------------------------




In [20]:
wml_client.deployments.list()

------------------------------------  -----------------------------  ------  --------------  ------------------------  ---------  -------------
GUID                                  NAME                           TYPE    STATE           CREATED                   FRAMEWORK  ARTIFACT TYPE
aeb581bb-f4b2-4405-a33e-855440908444  CreditScore_ML_model_1022      online  DEPLOY_SUCCESS  2019-10-22T12:22:35.478Z  mllib-2.2  model
69443b4d-ba91-4fbd-a43e-39e9e18ddb9e  churn-model-in-cp4d-wml12      online  DEPLOY_SUCCESS  2019-10-21T22:30:28.588Z  mllib-2.2  model
bec4f2ff-c979-42de-95b4-bdc5ddf61e64  churn-model-in-cp4d-wml12      online  DEPLOY_SUCCESS  2019-10-20T20:02:44.064Z  mllib-2.2  model
d4618e31-59ed-4bc3-8d2d-0875f8e70384  churn-model-in-cp4d-wml-qijun  online  DEPLOY_SUCCESS  2019-10-20T17:25:46.141Z  mllib-2.2  model
06797c28-064e-4dfd-a3d2-72dd148b7799  CreditScore_ML_model_spark22   online  DEPLOY_SUCCESS  2019-10-18T15:19:20.557Z  mllib-2.2  model
a1ef1537-033e-4877-a0e3-bf4b12be

In [21]:
deployment_details = wml_client.deployments.get_details('aeb581bb-f4b2-4405-a33e-855440908444')

In [22]:
icp4d_churn_scoring_url = deployment_details['entity']['scoring_url']
print(icp4d_churn_scoring_url)

https://10.1.1.1:31843/v3/scoring/online/aeb581bb-f4b2-4405-a33e-855440908444


# Save Training Data to db2 table 

In [23]:
jdbcuri = "jdbc:db2://10.1.1.4:30285/BLUDB"

properties = {
    "user": "user999",
    "password": "y39cFCX__x4i%G6*",
    "driver": "com.ibm.db2.jcc.DB2Driver",
    "sslConnection":"false"
}

TABLE_NAME = "modeltrn_german_risk"

In [24]:
df_data.write.jdbc(url=jdbcuri, table=TABLE_NAME, mode="append", properties=properties)

# Payload Scoring

In [26]:

fields = ["CheckingStatus","LoanDuration","CreditHistory","LoanPurpose","LoanAmount","ExistingSavings","EmploymentDuration","InstallmentPercent","Sex","OthersOnLoan","CurrentResidenceDuration","OwnsProperty","Age","InstallmentPlans","Housing","ExistingCreditsCount","Job","Dependents","Telephone","ForeignWorker"]
values = [
  ["no_checking",13,"credits_paid_to_date","car_new",1343,"100_to_500","1_to_4",2,"female","none",3,"savings_insurance",46,"none","own",2,"skilled",1,"none","yes"],
  ["no_checking",24,"prior_payments_delayed","furniture",4567,"500_to_1000","1_to_4",4,"male","none",4,"savings_insurance",36,"none","free",2,"management_self-employed",1,"none","yes"],
  ["0_to_200",26,"all_credits_paid_back","car_new",863,"less_100","less_1",2,"female","co-applicant",2,"real_estate",38,"none","own",1,"skilled",1,"none","yes"],
  ["0_to_200",14,"no_credits","car_new",2368,"less_100","1_to_4",3,"female","none",3,"real_estate",29,"none","own",1,"skilled",1,"none","yes"],
  ["0_to_200",4,"no_credits","car_new",250,"less_100","unemployed",2,"female","none",3,"real_estate",23,"none","rent",1,"management_self-employed",1,"none","yes"],
  ["no_checking",17,"credits_paid_to_date","car_new",832,"100_to_500","1_to_4",2,"male","none",2,"real_estate",42,"none","own",1,"skilled",1,"none","yes"],
  ["no_checking",33,"outstanding_credit","appliances",5696,"unknown","greater_7",4,"male","co-applicant",4,"unknown",54,"none","free",2,"skilled",1,"yes","yes"],
  ["0_to_200",13,"prior_payments_delayed","retraining",1375,"100_to_500","4_to_7",3,"male","none",3,"real_estate",37,"none","own",2,"management_self-employed",1,"none","yes"]
]

payload_scoring = {"fields": fields,"values": values}
scoring_response = wml_client.deployments.score(icp4d_churn_scoring_url, payload_scoring)

print('Single record scoring result:', '\n fields:', scoring_response['fields'], '\n values: ', scoring_response['values'][0])



Single record scoring result: 
 fields: ['CheckingStatus', 'LoanDuration', 'CreditHistory', 'LoanPurpose', 'LoanAmount', 'ExistingSavings', 'EmploymentDuration', 'InstallmentPercent', 'Sex', 'OthersOnLoan', 'CurrentResidenceDuration', 'OwnsProperty', 'Age', 'InstallmentPlans', 'Housing', 'ExistingCreditsCount', 'Job', 'Dependents', 'Telephone', 'ForeignWorker', 'CheckingStatus_IX', 'CreditHistory_IX', 'EmploymentDuration_IX', 'ExistingSavings_IX', 'ForeignWorker_IX', 'Housing_IX', 'InstallmentPlans_IX', 'Job_IX', 'LoanPurpose_IX', 'OthersOnLoan_IX', 'OwnsProperty_IX', 'Sex_IX', 'Telephone_IX', 'features', 'rawPrediction', 'probability', 'prediction', 'predictedLabel'] 
 values:  ['no_checking', 13, 'credits_paid_to_date', 'car_new', 1343, '100_to_500', '1_to_4', 2, 'female', 'none', 3, 'savings_insurance', 46, 'none', 'own', 2, 'skilled', 1, 'none', 'yes', 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, [21, [1, 3, 5, 13, 14, 15, 16, 17, 18, 19, 20], [1.0, 1.0, 1.0, 13

# More payload Scoring to get fairness records enough

In [27]:
score_data = SQLContext(sc).read.csv(os.environ['DSX_PROJECT_DIR']+'/datasets/german_credit_data_biased_scoring.csv', header='true', inferSchema = 'true')
df_data.show(5)

+--------------+------------+--------------------+-----------+----------+---------------+------------------+------------------+------+------------+------------------------+-----------------+---+----------------+-------+--------------------+-------+----------+---------+-------------+-------+
|CheckingStatus|LoanDuration|       CreditHistory|LoanPurpose|LoanAmount|ExistingSavings|EmploymentDuration|InstallmentPercent|   Sex|OthersOnLoan|CurrentResidenceDuration|     OwnsProperty|Age|InstallmentPlans|Housing|ExistingCreditsCount|    Job|Dependents|Telephone|ForeignWorker|   Risk|
+--------------+------------+--------------------+-----------+----------+---------------+------------------+------------------+------+------------+------------------------+-----------------+---+----------------+-------+--------------------+-------+----------+---------+-------------+-------+
|      0_to_200|          31|credits_paid_to_date|      other|      1889|     100_to_500|            less_1|                

In [None]:
score_data_pandas = score_data.toPandas()
r1=score_data_pandas.values[0].tolist()
payload_scoring = {"fields": fields,"values": [r1]}
scoring_response = wml_client.deployments.score(icp4d_churn_scoring_url, payload_scoring, "custchurn_rfc1")
scoring_response

In [35]:
values=[]
for i in range(500):
  values.append(score_data_pandas.values[i].tolist())


In [36]:
payload_scoring = {"fields": fields,"values": values}
scoring_response = wml_client.deployments.score(icp4d_churn_scoring_url, payload_scoring, "custchurn_rfc1")