# Customer Churn Prediction
This is a sample Jupyter notebook to illustrate how to leverage SparkML in training an ML model for predicting customer churn.

**Please confirm the environment for this notebook is Python 3.5 with Spark**.


### Install PySpark
If you are using an environment which includes Spark, you do NOT need to explicitly install the pyspark libraries like we're doing here. However, if you are using a vanilla Python 3.5 environment, then you need to execute the following step to install PySpark.

In [1]:
!pip install pyspark==2.1.3

Collecting pyspark==2.1.3
  Downloading https://files.pythonhosted.org/packages/bd/e4/00786837b5f61c0d7ff7f75b116f1c6595f833f3984c25c1da7dbce36cc0/pyspark-2.1.3.tar.gz (181.3MB)
[K    100% |████████████████████████████████| 181.3MB 5.5kB/s eta 0:00:01��█████████████████        | 136.4MB 60.1MB/s eta 0:00:01
[?25hCollecting py4j==0.10.7 (from pyspark==2.1.3)
  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K    100% |████████████████████████████████| 204kB 5.0MB/s eta 0:00:01
[?25hBuilding wheels for collected packages: pyspark
  Running setup.py bdist_wheel for pyspark ... [?25ldone
[?25h  Stored in directory: /home/dsxuser/.cache/pip/wheels/0a/ac/b0/5b8145c9aeb42e735e5796b6539ffee1a17eb5aa9202a007b2
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.1.3


### Load Modules
In the next cell, we import a number of modules that will be used in subsequent cells in this notebook.

In [36]:
import os
import urllib
import pandas as pd

### The Dataset

A sample dataset we will be using for predicting customer churn. It includes information about:  
- Customers who left within the last month – the column is called Churn

- Services that each customer has signed up for – phone, multiple lines, internet, online security, online backup, device protection, tech support, and streaming TV and movies

- Customer account information – how long they’ve been a customer, contract, payment method, paperless billing, monthly charges, and total charges

- Demographic info about customers – gender, age range, and if they have partners and dependents

Link for getting the dataset: [https://community.watsonanalytics.com/wp-content/uploads/2015/03/WA_Fn-UseC_-Telco-Customer-Churn.csv](https://community.watsonanalytics.com/wp-content/uploads/2015/03/WA_Fn-UseC_-Telco-Customer-Churn.csv)

### 2. Loading Our Dataset

There are different methods to load the dataset. In the next cell, we use a simple method by downloading the dataset from the web using urllib and then reading it into a Pandas data frame.

In [37]:
# Download objects from a URL 
def download_file_from_url(file_url,save_directory=None):
    # If save directory provided then don't delete local downloads
    working_directory = "temp_cos_files"
    if save_directory is not None:
        working_directory = save_directory
    os.makedirs(working_directory, exist_ok=True)

    file_name = os.path.basename(file_url)
    # Sometime url include parms and need to split those off to get valid file_name
    file_name = file_name.split('?')[0]
    # Delete file if present as perhaps download failed and file corrupted
    file_path = os.path.join(working_directory, file_name)
    if os.path.exists(file_path):
        os.remove(file_path)

    file_path, _ = urllib.request.urlretrieve(file_url, file_path)
    stat_info = os.stat(file_path)
    print('Downloaded', file_path, stat_info.st_size, 'bytes.')
    return file_path

In [38]:
file_url = "https://community.watsonanalytics.com/wp-content/uploads/2015/03/WA_Fn-UseC_-Telco-Customer-Churn.csv"
file_path = download_file_from_url(file_url)

Downloaded temp_cos_files/WA_Fn-UseC_-Telco-Customer-Churn.csv 977501 bytes.


In [39]:
# Read into a pandas dataframe
df_data_1 = pd.read_csv(file_path)
df_data_1.head()

Unnamed: 0,customerID,gender,SeniorCitizen,Partner,Dependents,tenure,PhoneService,MultipleLines,InternetService,OnlineSecurity,...,DeviceProtection,TechSupport,StreamingTV,StreamingMovies,Contract,PaperlessBilling,PaymentMethod,MonthlyCharges,TotalCharges,Churn
0,7590-VHVEG,Female,0,Yes,No,1,No,No phone service,DSL,No,...,No,No,No,No,Month-to-month,Yes,Electronic check,29.85,29.85,No
1,5575-GNVDE,Male,0,No,No,34,Yes,No,DSL,Yes,...,Yes,No,No,No,One year,No,Mailed check,56.95,1889.5,No
2,3668-QPYBK,Male,0,No,No,2,Yes,No,DSL,Yes,...,No,No,No,No,Month-to-month,Yes,Mailed check,53.85,108.15,Yes
3,7795-CFOCW,Male,0,No,No,45,No,No phone service,DSL,Yes,...,Yes,Yes,No,No,One year,No,Bank transfer (automatic),42.3,1840.75,No
4,9237-HQITU,Female,0,No,No,2,Yes,No,Fiber optic,No,...,No,No,No,No,Month-to-month,Yes,Electronic check,70.7,151.65,Yes


### Spark
In the next cell, we start a Spark session and read the Pandas dataframe into a Spark dataframe.

In [40]:
# Start a Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [41]:
# Verify the Spark version
spark.version

'2.1.3'

In [42]:
# Parse the Pandas dataframe into a Spark dataframe
customer_data = spark.createDataFrame(df_data_1)

In [43]:
# Verify the schema for the data
customer_data.printSchema()

root
 |-- customerID: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: long (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: long (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: string (nullable = true)
 |-- Churn: string (nullable = true)



In [44]:
# Drop customerID field as it is not needed
customer_data = customer_data.drop('customerID')

In [45]:
# Cast the TotalCharges column to double
from pyspark.sql.functions import col

customer_data = customer_data.withColumn("TotalCharges", col("TotalCharges").cast("double"))

In [46]:
# gender        SeniorCitizen  Partner     Dependents  tenure     PhoneService  MultipleLines  InternetService  OnlineSecurity   OnlineBackup   DeviceProtection   TechSupport   StreamingTV
#[  2.587e-01   1.344e+02      8.241e+01   1.330e+02   1.628e+04  9.726e-02     9.747e+00      9.821e+00        5.516e+02        2.301e+02      1.913e+02          5.233e+02     7.490e+00   

# StreamingMovies   Contract    PaperlessBilling   PaymentMethod    MonthlyCharges    TotalCharges
# 8.235e+00         1.116e+03   1.057e+02          5.849e+01        7.945e+04         4.572e+05]

# Best features seem to be TotalCharges (or MonthlyCharges), tenure, Contract, OnlineSecurity, TechSupport
# Can possibly also include OnlineBackup, DeviceProtection, Dependents, and SeniorCitizen


### Spark Pipeline
In the next cell we define the Spark Pipeline which outlines the operations to execute on the data set to train ML model.
Specifically, the pipeline consists of the following steps:

- Map the output Churn column to an index label (0/1)
- Map any input columns that would be used for training into index labels
- Assemble the features to be used for training into a FEATURES column
- Apply ML training like RandomForestClassifier
- Define the pipeline to include all the previous steps

In [47]:
from pyspark.ml import Pipeline
#from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Index the output label
labelIndexer = StringIndexer(inputCol="Churn", outputCol="label").fit(customer_data)

# String to Index the columns of type string
labelIndexerContract = StringIndexer(inputCol="Contract", outputCol="Contract_index").fit(customer_data)
labelIndexerOnlineSecurity = StringIndexer(inputCol="OnlineSecurity", outputCol="OnlineSecurity_index").fit(customer_data)
labelIndexerTechSupport = StringIndexer(inputCol="TechSupport", outputCol="TechSupport_index").fit(customer_data)

# Identify relevant features
# For this example, we will use two features, tenure and TotalCharges as the features to leverage when predicting churn
# In practice, data scientists would visualize the data and run other statistical techniques as well as business intuition to identify
# most relevant features
featureColumns = ["TotalCharges","tenure"]


## Assemble features of interest into one column
assemblerFeatures = VectorAssembler(inputCols = featureColumns, outputCol = "FEATURES")


# Train a RandomForest model.
rfmodel = RandomForestClassifier(labelCol="label", featuresCol="FEATURES", numTrees=10)

# Convert indexed output label back to original label.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedChurn",labels=labelIndexer.labels)

pipeline = Pipeline(stages=[labelIndexer, labelIndexerContract, labelIndexerOnlineSecurity, labelIndexerTechSupport, assemblerFeatures, rfmodel, labelConverter])



In [48]:
# Remove null values from dataframe
customer_data = customer_data.na.drop()

### Split data for Training and Testing
To be able to evaluate the performance of your Machine Learning model, you need to test it on a data set which is different from the data it was trained on.

To do so, we split the data into two sets, a training data set and a test data set. We then train on the training data set and test on the test data set.

In [49]:

customer_data_train, customer_data_test = customer_data.randomSplit([0.8, 0.2])

In [50]:
# Train model.  This also runs the indexers.
model = pipeline.fit(customer_data_train)

In [51]:

# Make predictions.
predictions = model.transform(customer_data_test)

In [52]:
predictions.head(4)

[Row(gender='Female', SeniorCitizen=0, Partner='No', Dependents='No', tenure=1, PhoneService='No', MultipleLines='No phone service', InternetService='DSL', OnlineSecurity='No', OnlineBackup='No', DeviceProtection='No', TechSupport='No', StreamingTV='No', StreamingMovies='No', Contract='Month-to-month', PaperlessBilling='No', PaymentMethod='Electronic check', MonthlyCharges=25.25, TotalCharges=25.25, Churn='No', label=0.0, Contract_index=0.0, OnlineSecurity_index=0.0, TechSupport_index=0.0, FEATURES=DenseVector([25.25, 1.0]), rawPrediction=DenseVector([6.0101, 3.9899]), probability=DenseVector([0.601, 0.399]), prediction=0.0, predictedChurn='No'),
 Row(gender='Female', SeniorCitizen=0, Partner='No', Dependents='No', tenure=1, PhoneService='Yes', MultipleLines='No', InternetService='DSL', OnlineSecurity='No', OnlineBackup='No', DeviceProtection='No', TechSupport='No', StreamingTV='No', StreamingMovies='No', Contract='Month-to-month', PaperlessBilling='No', PaymentMethod='Electronic check

In [53]:
## Evaluate model
# Select example rows to display.
predictions.select("prediction", "label", "FEATURES").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
print("Accuracy of trained ML model: ", 100*accuracy)


+----------+-----+-----------+
|prediction|label|   FEATURES|
+----------+-----+-----------+
|       0.0|  0.0|[25.25,1.0]|
|       1.0|  1.0|[45.15,1.0]|
|       1.0|  1.0|[45.95,1.0]|
|       1.0|  0.0|[55.45,1.0]|
|       1.0|  0.0| [49.9,1.0]|
+----------+-----+-----------+
only showing top 5 rows

Test Error = 0.224897
Accuracy of trained ML model:  77.51031636863824


## Deploy to Watson Machine Learning

In [54]:
!pip install watson-machine-learning-client

Requirement not upgraded as not directly required: watson-machine-learning-client in /opt/conda/envs/DSX-Python35/lib/python3.5/site-packages
Requirement not upgraded as not directly required: pandas in /opt/conda/envs/DSX-Python35/lib/python3.5/site-packages (from watson-machine-learning-client)
Requirement not upgraded as not directly required: certifi in /opt/conda/envs/DSX-Python35/lib/python3.5/site-packages (from watson-machine-learning-client)
Requirement not upgraded as not directly required: requests in /opt/conda/envs/DSX-Python35/lib/python3.5/site-packages (from watson-machine-learning-client)
Requirement not upgraded as not directly required: urllib3 in /opt/conda/envs/DSX-Python35/lib/python3.5/site-packages (from watson-machine-learning-client)
Requirement not upgraded as not directly required: ibm-cos-sdk in /opt/conda/envs/DSX-Python35/lib/python3.5/site-packages (from watson-machine-learning-client)
Requirement not upgraded as not directly required: tqdm in /opt/conda

In [55]:
# The code was removed by Watson Studio for sharing.

In [56]:
# Create API client

from watson_machine_learning_client import WatsonMachineLearningAPIClient

client = WatsonMachineLearningAPIClient(wml_creds)

In [57]:
# Publish model in Watson Machine Learning repository on Cloud

model_props = {client.repository.ModelMetaNames.AUTHOR_NAME: "Leigh W", 
               client.repository.ModelMetaNames.NAME: "Customer Churn Spark ML January 2019"
               }
model_details = client.repository.store_model(model=model, pipeline = pipeline, meta_props=model_props, training_data=customer_data_train)


In [58]:
model_uid = client.repository.get_model_uid(model_details)

In [59]:
client.repository.list_models()

------------------------------------  ------------------------------------  ------------------------  ---------
GUID                                  NAME                                  CREATED                   FRAMEWORK
6ab8c74d-af1d-4355-832e-7d54cf5a9a2d  Customer Churn Spark ML January 2019  2019-02-11T21:17:35.876Z  mllib-2.1
276b744f-7523-4375-a7a3-2eab1b88e24a  Customer Churn Spark ML January 2019  2019-02-11T20:16:45.014Z  mllib-2.1
------------------------------------  ------------------------------------  ------------------------  ---------


In [60]:
# Create the deployment.
deployment_details = client.deployments.create(model_uid, 'RF Predict Churn SparkML Jan2019')



#######################################################################################

Synchronous deployment creation for uid: '6ab8c74d-af1d-4355-832e-7d54cf5a9a2d' started

#######################################################################################


INITIALIZING
DEPLOY_IN_PROGRESS
DEPLOY_SUCCESS


------------------------------------------------------------------------------------------------
Successfully finished deployment creation, deployment_uid='25e89c61-d209-460a-8b62-425d99307d0f'
------------------------------------------------------------------------------------------------




In [61]:
# List the deployments.
client.deployments.list()

------------------------------------  --------------------------------  ------  --------------  ------------------------  ---------  -------------
GUID                                  NAME                              TYPE    STATE           CREATED                   FRAMEWORK  ARTIFACT TYPE
25e89c61-d209-460a-8b62-425d99307d0f  RF Predict Churn SparkML Jan2019  online  DEPLOY_SUCCESS  2019-02-11T21:18:12.544Z  mllib-2.1  model
095f70bf-a1fc-45b0-abdf-e0b57ccff0ea  RF Predict Churn SparkML Jan2019  online  DEPLOY_SUCCESS  2019-02-11T20:17:05.421Z  mllib-2.1  model
------------------------------------  --------------------------------  ------  --------------  ------------------------  ---------  -------------


In [62]:
details = deployment_details
print(deployment_details)

{'entity': {'deployable_asset': {'guid': '6ab8c74d-af1d-4355-832e-7d54cf5a9a2d', 'type': 'model', 'url': 'https://us-south.ml.cloud.ibm.com/v3/wml_instances/bf9fe25f-653f-4398-b8c6-de0d7fe00656/published_models/6ab8c74d-af1d-4355-832e-7d54cf5a9a2d', 'created_at': '2019-02-11T21:18:12.518Z', 'name': 'Customer Churn Spark ML January 2019'}, 'runtime_environment': 'spark-2.1', 'scoring_url': 'https://us-south.ml.cloud.ibm.com/v3/wml_instances/bf9fe25f-653f-4398-b8c6-de0d7fe00656/deployments/25e89c61-d209-460a-8b62-425d99307d0f/online', 'type': 'online', 'description': 'Description of deployment', 'status_details': {'status': 'DEPLOY_SUCCESS'}, 'status': 'DEPLOY_SUCCESS', 'model_type': 'mllib-2.1', 'deployed_version': {'guid': '16a87fd1-b439-4353-b520-1c0419b6c4bd', 'url': 'https://us-south.ml.cloud.ibm.com/v3/ml_assets/models/6ab8c74d-af1d-4355-832e-7d54cf5a9a2d/versions/16a87fd1-b439-4353-b520-1c0419b6c4bd'}, 'name': 'RF Predict Churn SparkML Jan2019'}, 'metadata': {'guid': '25e89c61-d20

In [63]:
# Extract endpoint url and display it.
#scoring_url = client.deployments.get_scoring_url(deployment_details_RF)
scoring_url = client.deployments.get_scoring_url(details)
print(scoring_url)

https://us-south.ml.cloud.ibm.com/v3/wml_instances/bf9fe25f-653f-4398-b8c6-de0d7fe00656/deployments/25e89c61-d209-460a-8b62-425d99307d0f/online


In [64]:
customer_data_test.head(2)

[Row(gender='Female', SeniorCitizen=0, Partner='No', Dependents='No', tenure=1, PhoneService='No', MultipleLines='No phone service', InternetService='DSL', OnlineSecurity='No', OnlineBackup='No', DeviceProtection='No', TechSupport='No', StreamingTV='No', StreamingMovies='No', Contract='Month-to-month', PaperlessBilling='No', PaymentMethod='Electronic check', MonthlyCharges=25.25, TotalCharges=25.25, Churn='No'),
 Row(gender='Female', SeniorCitizen=0, Partner='No', Dependents='No', tenure=1, PhoneService='Yes', MultipleLines='No', InternetService='DSL', OnlineSecurity='No', OnlineBackup='No', DeviceProtection='No', TechSupport='No', StreamingTV='No', StreamingMovies='No', Contract='Month-to-month', PaperlessBilling='No', PaymentMethod='Electronic check', MonthlyCharges=45.15, TotalCharges=45.15, Churn='Yes')]

In [65]:

test1 = [1,25.1]
test2 = [1,35.75]

In [66]:
payload_scoring = {"fields": ["tenure", "TotalCharges"], "values": [test1, test2]}


In [67]:
# Perform prediction and display the result.
response_scoring = client.deployments.score(scoring_url, payload_scoring)
print(response_scoring)

{'values': [[1, 25.1, [25.1, 1.0], [6.0101206538668155, 3.9898793461331854], [0.6010120653866815, 0.39898793461331855], 0.0, 'No'], [1, 35.75, [35.75, 1.0], [4.4829856796476655, 5.5170143203523345], [0.4482985679647665, 0.5517014320352335], 1.0, 'Yes']], 'fields': ['tenure', 'TotalCharges', 'FEATURES', 'rawPrediction', 'probability', 'prediction', 'predictedChurn']}
