# Data Loading

## Load data files

In [1]:
import ibmos2spark
# @hidden_cell
credentials = {
    'endpoint': 'https://s3-api.us-geo.objectstorage.service.networklayer.com',
    'service_id': '****',
    'iam_service_endpoint': 'https://iam.cloud.ibm.com/oidc/token',
    'api_key': '****'
}

configuration_name = 'os_8cbf59ff782d4a5db7683b1dceef1190_configs'
cos = ibmos2spark.CloudObjectStorage(sc, credentials, configuration_name, 'bluemix_cos')


from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Load the deplyment sample dataset
df = spark.read.option("header",True) \
     .option('inferSchema','true')\
     .csv(cos.url('Deployment_Sample_Data.csv', 'advanceddatasciencecapstone-donotdelete-pr-****'))


# Load the LSM_Imputer table to impute the missing values
df_lsm=spark.read.parquet(cos.url('LSM_Imputer.parquet', 'advanceddatasciencecapstone-donotdelete-pr-****'))


Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20200812181037-0000
KERNEL_ID = ab6d100c-41a2-44b5-9950-b81698638ae3


## Load saved model files

In [2]:
import types
import pandas as pd
from botocore.client import Config
import ibm_boto3

def __iter__(self): return 0

# @hidden_cell
# The following code accesses a file in your IBM Cloud Object Storage. It includes your credentials.
# You might want to remove those credentials before you share the notebook.
client_8cbf59ff782d4a5db7683b1dceef1190 = ibm_boto3.client(service_name='s3',
    ibm_api_key_id='****',
    ibm_auth_endpoint="https://iam.cloud.ibm.com/oidc/token",
    config=Config(signature_version='oauth'),
    endpoint_url='https://s3-api.us-geo.objectstorage.service.networklayer.com')

# Your data file was loaded into a botocore.response.StreamingBody object.
# Please read the documentation of ibm_boto3 and pandas to learn more about the possibilities to load the data.
# ibm_boto3 documentation: https://ibm.github.io/ibm-cos-sdk-python/
# pandas documentation: http://pandas.pydata.org/
streaming_lr_model = client_8cbf59ff782d4a5db7683b1dceef1190.get_object(Bucket='advanceddatasciencecapstone-donotdelete-pr-****', Key='LR_Model_Final.mdl.tar')['Body']
# add missing __iter__ method, so pandas accepts body as file-like object
if not hasattr(streaming_lr_model, "__iter__"): streaming_lr_model.__iter__ = types.MethodType( __iter__, streaming_lr_model ) 

    
streaming_pipeline_model = client_8cbf59ff782d4a5db7683b1dceef1190.get_object(Bucket='advanceddatasciencecapstone-donotdelete-pr-****', Key='Transformation_Pipeline.ppl.tar')['Body']
# add missing __iter__ method, so pandas accepts body as file-like object
if not hasattr(streaming_pipeline_model, "__iter__"): streaming_pipeline_model.__iter__ = types.MethodType( __iter__, streaming_pipeline_model ) 
    

In [3]:
# save model files
def save_stream(input_stream, output_filename):
    
    buff=input_stream.read()
    f = open(output_filename,"wb")
    f.write(buff)
    f.close()
    
# save Logistic Regression Model file
save_stream(streaming_lr_model, "LR_Model_Final.mdl.tar")

# save Pipeline Model file
save_stream(streaming_pipeline_model, "Transformation_Pipeline.ppl.tar")

In [4]:
# extract compressed models

!tar xvf LR_Model_Final.mdl.tar

!tar xvf Transformation_Pipeline.ppl.tar

LR_Model_Final.mdl/
LR_Model_Final.mdl/data/
LR_Model_Final.mdl/data/.part-00000-8afd7252-7efb-4e4d-87ac-9983903394cc-c000.snappy.parquet.crc
LR_Model_Final.mdl/data/._SUCCESS.crc
LR_Model_Final.mdl/data/part-00000-8afd7252-7efb-4e4d-87ac-9983903394cc-c000.snappy.parquet
LR_Model_Final.mdl/data/_SUCCESS
LR_Model_Final.mdl/metadata/
LR_Model_Final.mdl/metadata/.part-00000.crc
LR_Model_Final.mdl/metadata/._SUCCESS.crc
LR_Model_Final.mdl/metadata/part-00000
LR_Model_Final.mdl/metadata/_SUCCESS
Transformation_Pipeline.ppl/
Transformation_Pipeline.ppl/metadata/
Transformation_Pipeline.ppl/metadata/.part-00000.crc
Transformation_Pipeline.ppl/metadata/._SUCCESS.crc
Transformation_Pipeline.ppl/metadata/part-00000
Transformation_Pipeline.ppl/metadata/_SUCCESS
Transformation_Pipeline.ppl/stages/
Transformation_Pipeline.ppl/stages/00_StringIndexer_4aacbf961efb/
Transformation_Pipeline.ppl/stages/00_StringIndexer_4aacbf961efb/data/
Transformation_Pipeline.ppl/stages/00_StringIndexer_4aacbf961efb/d

# Data Cleansing

## 1. Missing values treatment / Imputing

In [5]:
# Treating the missing values

# province
df = df.na.fill("Unknown", 'province')

# years_occupation
df = df.na.fill(0, 'years_occupation')

# marital status
df = df.na.fill("Unknown", 'marital_status')


# LSM

# Create a new temporary view of DataFrames
df.createOrReplaceTempView("dftable")

df_lsm.createOrReplaceTempView("df_lsm")

df = spark.sql("""
SELECT d.*, CASE WHEN d.lsm IS NULL THEN l.med_lsm ELSE d.lsm END AS lsm_fixed 
FROM dftable d
LEFT JOIN df_lsm l ON l.province=d.province AND l.income_code=d.income_code
""")


# drop old lsm column
df=df.drop('lsm')

# rename lsm_fixed back to lsm
df=df.withColumnRenamed('lsm_fixed','lsm')

### Remove unnecessary values

In [6]:
# Create a new temporary view of DataFrame
df.createOrReplaceTempView("dftable")

# keep DIRECTORSHIP ACTIVE and DIRECTORSHIP RESIGNED only and change others to NO DIRECTORSHIP
df=spark.sql("""
SELECT *,
CASE WHEN (directorship<>'DIRECTORSHIP ACTIVE' AND directorship<>'DIRECTORSHIP RESIGNED') 
     THEN 'NO DIRECTORSHIP'
     ELSE directorship 
END AS directorship_fixed
FROM dftable
""")

# drop the old directorship column
df=df.drop('directorship')

# rename directorship_fixed column to directorship
df=df.withColumnRenamed('directorship_fixed','directorship')

## 2. Remove correlated variables

In [7]:
df=df.drop('months_risk_score')

## 3. Remove outliers

In [8]:
# reset and update outliers

df.createOrReplaceTempView('dftable')

df=spark.sql("""
SELECT *,
CASE WHEN months_mobile_phone>300 THEN 300 ELSE months_mobile_phone END AS months_mobile_phone_fixed,
CASE WHEN months_home_phone>300 THEN 300 ELSE months_home_phone END AS months_home_phone_fixed,
CASE WHEN months_work_phone>300 THEN 300 ELSE months_work_phone END AS months_work_phone_fixed,
CASE WHEN years_occupation>35 THEN 35 ELSE years_occupation END AS years_occupation_fixed
FROM dftable
""")

#drop old columns
dropped_cols = ['months_mobile_phone','months_home_phone','months_work_phone','years_occupation']
df=df.drop(*dropped_cols)

# rename the new fixed columns back to the original names
for col in dropped_cols:
    df=df.withColumnRenamed(col+'_fixed',col)

# Feature creation

In [9]:
# create age-group

df=df.drop('age_group')

df.createOrReplaceTempView('dftable')
df=spark.sql("""
SELECT *,
CASE WHEN age_buy_property >= 20 AND age_buy_property <  25 THEN '20-24'
    WHEN age_buy_property >= 25 AND age_buy_property <  30 THEN '25-29'
    WHEN age_buy_property >= 30 AND age_buy_property <  35 THEN '30-34'
    WHEN age_buy_property >= 35 AND age_buy_property <  40 THEN '35-39'
    WHEN age_buy_property >= 40 AND age_buy_property <  45 THEN '40-44'
    WHEN age_buy_property >= 45 AND age_buy_property <  50 THEN '45-49'
    WHEN age_buy_property >= 50 AND age_buy_property <  55 THEN '50-54'
    WHEN age_buy_property >= 55 AND age_buy_property <  60 THEN '55-59'
    WHEN age_buy_property >= 60 AND age_buy_property <  65 THEN '60-64'
    WHEN age_buy_property >= 65 AND age_buy_property <= 80 THEN '65-80'
    ELSE 'Unknown'
END AS age_group
FROM dftable    
""")


## Drop less important features

In [10]:
# dropped less important features
df=df.drop('months_home_phone','months_mobile_phone','and months_work_phone','contactability_score')

# Load Transformation pipeline

In [11]:
from pyspark.ml import PipelineModel

pipelineModel=PipelineModel.load("Transformation_Pipeline.ppl")

# Load Model

In [12]:
from pyspark.ml.classification import LogisticRegressionModel

lr = LogisticRegressionModel.load("LR_Model_Final.mdl")


# Transform and Predict data

In [13]:
# transform the data using the loaded pipeline
df_transformed=pipelineModel.transform(df)

# predict the data using the loaded model
predictions=lr.transform(df_transformed)

# Save the predictions

In [None]:
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql.window import Window

#add 'sequential' index and join both dataframe to get the final result
a = df.withColumn("row_idx", row_number().over(Window.orderBy(monotonically_increasing_id())))
b = predictions.withColumn("row_idx", row_number().over(Window.orderBy(monotonically_increasing_id())))

# keep row_idx and prediction columns
b=b['row_idx','prediction']

# add prediction column to the dataframe
final_df = a.join(b, a.row_idx == b.row_idx).\
             drop("row_idx")

# save the prediction result as a CSV file
final_df.write.csv(header=True, mode='overwrite', path=cos.url('Deployment_output.csv', 'advanceddatasciencecapstone-donotdelete-pr-****'))