# In this Notebook we shall create a Machine Learning Model using Spark and Save the Model

#### The key reason we are using PySpark here is to show how to create a Machine Learning Model using large volume of data that may not fit within the memory of single Python process. So we need distributed computing infrastructure for those cases. We are using Spark here as a distributed computing Infrastructure. PySpark is a library that helps using Spark's capability using Python as programming language.

#### First, we import PySpark.

In [6]:
import os
from pyspark.sql import SparkSession

In [7]:
spark = SparkSession\
    .builder\
    .appName("WOSM")\
    .getOrCreate()

#### Next, we read in a dataset that we will use to develop a Machine Learning model. Once we read in the data, it can potentially reside on multiple machines/VMs in a distributed fashion. 

#### We can read the data here in various ways. We are showing here two mechanisms - reading data from a CSV file and reading data from a Database.

#### If you are reading the data from CSV file, read the data you have stored in previous step using the same file name in the next cell. Otherwise skip the next cell.

In [8]:
cmergedDf = spark.read.csv(os.environ['DSX_PROJECT_DIR']+'/datasets/enhanced_customers.csv', header='true', inferSchema = 'true')
cmergedDf.show(5)

+---+------------+-------------+-----+-------+---------+-------------+--------------------+-----+--------+-----+------+------+--------+---------+--------+---------+
| ID|LONGDISTANCE|INTERNATIONAL|LOCAL|DROPPED|PAYMETHOD|LOCALBILLTYPE|LONGDISTANCEBILLTYPE|USAGE|RATEPLAN|CHURN|GENDER|STATUS|CHILDREN|ESTINCOME|CAROWNER|      AGE|
+---+------------+-------------+-----+-------+---------+-------------+--------------------+-----+--------+-----+------+------+--------+---------+--------+---------+
|  1|          23|            0|  206|      0|       CC|       Budget|      Intnl_discount|  229|       3|    T|     F|     S|       1|  38000.0|       N|24.393333|
|  6|          29|            0|   45|      0|       CH|    FreeLocal|            Standard|   75|       2|    F|     M|     M|       2|  29616.0|       N|49.426667|
|  8|          24|            0|   22|      0|       CC|    FreeLocal|            Standard|   47|       3|    F|     M|     M|       0|  19732.8|       N|50.673333|
| 11|     

#### If you are reading the data from the Database read the data from the same using the instruction provided in the Hands On Lab document. Otherwise skip the next 2 cells

In [3]:
import dsx_core_utils, requests, os, io
from pyspark.sql import SparkSession

# Add asset from remote connection
df83 = None
dataSet = dsx_core_utils.get_remote_data_set_info('USER1009.CUSTOMER_MERGED_HISTORY_VIEW')
dataSource = dsx_core_utils.get_data_source_info(dataSet['datasource'])
sparkSession = SparkSession(sc).builder.getOrCreate()
# Load JDBC data to Spark dataframe
dbTableOrQuery = '"' + (dataSet['schema'] + '"."' if(len(dataSet['schema'].strip()) != 0) else '') + dataSet['table'] + '"'
if (dataSet['query']):
    dbTableOrQuery = "(" + dataSet['query'] + ") TBL"
df83 = sparkSession.read.format("jdbc").option("url", dataSource['URL']).option("dbtable", dbTableOrQuery).option("user",dataSource['user']).option("password",dataSource['password']).load()
df83.show(5)

+---------+--------+--------+---------+------+---+------+-----+-------+-------------+-----+-------------+------------+--------------------+---------+--------+-----+
|      AGE|CAROWNER|CHILDREN|ESTINCOME|GENDER| ID|STATUS|CHURN|DROPPED|INTERNATIONAL|LOCAL|LOCALBILLTYPE|LONGDISTANCE|LONGDISTANCEBILLTYPE|PAYMETHOD|RATEPLAN|USAGE|
+---------+--------+--------+---------+------+---+------+-----+-------+-------------+-----+-------------+------------+--------------------+---------+--------+-----+
|24.393333|       N|       1|  38000.0|     F|  1|     S|    T|      0|            0|  206|       Budget|          23|      Intnl_discount|       CC|       3|  229|
|49.426667|       N|       2|  29616.0|     M|  6|     M|    F|      0|            0|   45|    FreeLocal|          29|            Standard|       CH|       2|   75|
|50.673333|       N|       0|  19732.8|     M|  8|     M|    F|      0|            0|   22|    FreeLocal|          24|            Standard|       CC|       3|   47|
|56.473333

#### In the previous cell in last line there would be show() command for the dataset that created. In the next cell change the name of the dataset to cmergedDf.

In [7]:
# Uncomment next line. Use the dataframe name used in the last line of previous cell (for the show() command) instead of df81. Then execute this cell.
#cmergedDf = df82

#### With this data we can create a Binary Classification model that can predict whether a person is a top KoL or not

In [9]:
cmergedDf.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- LONGDISTANCE: integer (nullable = true)
 |-- INTERNATIONAL: integer (nullable = true)
 |-- LOCAL: integer (nullable = true)
 |-- DROPPED: integer (nullable = true)
 |-- PAYMETHOD: string (nullable = true)
 |-- LOCALBILLTYPE: string (nullable = true)
 |-- LONGDISTANCEBILLTYPE: string (nullable = true)
 |-- USAGE: integer (nullable = true)
 |-- RATEPLAN: integer (nullable = true)
 |-- CHURN: string (nullable = true)
 |-- GENDER: string (nullable = true)
 |-- STATUS: string (nullable = true)
 |-- CHILDREN: integer (nullable = true)
 |-- ESTINCOME: double (nullable = true)
 |-- CAROWNER: string (nullable = true)
 |-- AGE: double (nullable = true)



#### In next 2 lines we create a distributed temporary table/view out of the Dataset so that we can access the data easily using SQL syntax (through use of Spark SQL)

In [10]:
cmergedDf.createOrReplaceTempView("mergedt")

In [11]:
custDf = spark.sql("select *, case when AGE < 1 then 'INFANT' when AGE < 18 then 'Child' else 'Adult' End as PHASE from mergedt").cache()

In [12]:
custDf.count()

1415

#### For developing the Binary Classification Model we want to use Spark ML Lib. Spark MLLib has implementations of various machine learning algorithms that can help creating model on Distributed datasets. Using Spark MLLib, one can create a model using dataset as big as needed for developing a real life Machine Learning model as long as the data is available in the Distributed dataset we created in previous step.

#### Next, we import components from Spark MLLib for developing the model using the data

#### We are importing Pipeline, as that helps building the model development steps as pipeline and the same can be applied to training dataset easily. Additionally we are also importing the VectorAssembler that helps in asembling the attributes (the ones we want to use as features of the model) as Vector.

In [13]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer, IndexToString
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier

In [14]:
# Prepare string variables so that they can be used by the decision tree algorithm
# StringIndexer encodes a string column of labels to a column of label indices
SI1 = StringIndexer(inputCol='GENDER', outputCol='GenderEncoded')
SI2 = StringIndexer(inputCol='STATUS',outputCol='StatusEncoded')
SI3 = StringIndexer(inputCol='CAROWNER',outputCol='CarOwnerEncoded')
SI4 = StringIndexer(inputCol='PAYMETHOD',outputCol='PaymethodEncoded')
SI5 = StringIndexer(inputCol='LOCALBILLTYPE',outputCol='LocalBilltypeEncoded')
SI6 = StringIndexer(inputCol='LONGDISTANCEBILLTYPE',outputCol='LongDistanceBilltypeEncoded')
SI7 = StringIndexer(inputCol='PHASE',outputCol='PhaseEncoded')
S18 = StringIndexer(inputCol='CHURN', outputCol='label')

# Pipelines API requires that input variables are passed in  a vector
assembler = VectorAssembler(inputCols=["GenderEncoded", "StatusEncoded", "CarOwnerEncoded", "PaymethodEncoded", "LocalBilltypeEncoded", \
                                       "LongDistanceBilltypeEncoded", "PhaseEncoded", "CHILDREN", "ESTINCOME", "AGE", "LONGDISTANCE", "INTERNATIONAL", "LOCAL",\
                                      "DROPPED","USAGE","RATEPLAN"], outputCol="features")

In [15]:
labelIndexer = S18.fit(custDf)

In [16]:
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labelIndexer.labels)

#### We will use the Random Forest algorithm for Binary Classification. One could also use another supervised learning algorithm such as Binary Treee or XG Boost.

In [17]:
# instantiate the algorithm, take the default settings
rf=RandomForestClassifier(labelCol="label", featuresCol="features")

#### Next, we define a pipeline with 2 stages. In first stage the features are assembled and in the next stage they are used to create a model using the Random Forest Classifier

In [18]:
pipeline = Pipeline(stages=[SI1,SI2,SI3,SI4,SI5,SI6,SI7,labelIndexer, assembler, rf, labelConverter])

In [19]:
# Split data into train and test datasets
train, test = custDf.randomSplit([0.7,0.3], seed=6)
train.cache()
test.cache()

DataFrame[ID: int, LONGDISTANCE: int, INTERNATIONAL: int, LOCAL: int, DROPPED: int, PAYMETHOD: string, LOCALBILLTYPE: string, LONGDISTANCEBILLTYPE: string, USAGE: int, RATEPLAN: int, CHURN: string, GENDER: string, STATUS: string, CHILDREN: int, ESTINCOME: double, CAROWNER: string, AGE: double, PHASE: string]

#### In next step the Model gets created when we apply the pipeline on the Training dataset. It returns the model.

In [20]:
# Build models
model = pipeline.fit(train)

#### Next, we test the model by using the Test dataset

In [21]:
model.transform(test)

DataFrame[ID: int, LONGDISTANCE: int, INTERNATIONAL: int, LOCAL: int, DROPPED: int, PAYMETHOD: string, LOCALBILLTYPE: string, LONGDISTANCEBILLTYPE: string, USAGE: int, RATEPLAN: int, CHURN: string, GENDER: string, STATUS: string, CHILDREN: int, ESTINCOME: double, CAROWNER: string, AGE: double, PHASE: string, GenderEncoded: double, StatusEncoded: double, CarOwnerEncoded: double, PaymethodEncoded: double, LocalBilltypeEncoded: double, LongDistanceBilltypeEncoded: double, PhaseEncoded: double, label: double, features: vector, rawPrediction: vector, probability: vector, prediction: double, predictedLabel: string]

In [22]:
results = model.transform(test)
results=results.select(results["ID"],results["CHURN"],results["label"],results["predictedLabel"],results["prediction"],results["probability"])
results.toPandas().head(6)

Unnamed: 0,ID,CHURN,label,predictedLabel,prediction,probability
0,6,F,0.0,F,0.0,"[0.8201269331036564, 0.17987306689634358]"
1,17,F,0.0,F,0.0,"[0.9021391200887166, 0.09786087991128342]"
2,22,F,0.0,F,0.0,"[0.5969959289050152, 0.4030040710949847]"
3,24,F,0.0,F,0.0,"[0.9252897734458483, 0.07471022655415167]"
4,29,T,1.0,T,1.0,"[0.16958321269672788, 0.8304167873032722]"
5,38,T,1.0,F,0.0,"[0.7682895609145737, 0.2317104390854264]"


In [23]:
print('Precision model = {:.2f}.'.format(results.filter(results.label == results.prediction).count() / float(results.count())))

Precision model = 0.93.


#### Next, we import  the BinaryClassificationEvaluator object from Spark ML Lib for evaluating the model

#### We evaluate the model using the area under the Receiver Operating Characteristic (ROC) curve.

In [24]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="label", metricName="areaUnderROC")
print('Area under ROC curve = {:.2f}.'.format(evaluator.evaluate(results)))

Area under ROC curve = 0.93.


#### Now we save the Model. Please note the URL that is crated after teh model is saved.

In [25]:
from dsx_ml.ml import save

In [26]:
model_name = "Telco Churn Model 2"

In [27]:
save(name = model_name, model = model, algorithm_type = "Classification", test_data = test)

Using TensorFlow backend.


{'path': '/user-home/1001/DSX_Projects/CustomerChurnDemo/models/Telco Churn Model 2/1',
 'scoring_endpoint': 'https://dsxl-api/v3/project/score/Python36/spark-2.3/CustomerChurnDemo/Telco%20Churn%20Model%202/1'}