In [1]:
# Import required libraries

from pyspark.sql import SparkSession
import pandas as pd
spark = SparkSession.builder.appName('Churn').getOrCreate()


# save the file location 
file_path = "./churn.csv"


In [2]:
# Read the input file
df = spark.read.csv(file_path, inferSchema = True, 
                    header = True, sep = "," ,
                    nanValue = ' ', nullValue = ' ')

In [3]:
df.columns

['churn',
 'accountlength',
 'internationalplan',
 'voicemailplan',
 'numbervmailmessages',
 'totaldayminutes',
 'totaldaycalls',
 'totaldaycharge',
 'totaleveminutes',
 'totalevecalls',
 'totalevecharge',
 'totalnightminutes',
 'totalnightcalls',
 'totalnightcharge',
 'totalintlminutes',
 'totalintlcalls',
 'totalintlcharge',
 'numbercustomerservicecalls']

In [4]:
# Lets look at the schema from the file
df.printSchema()

root
 |-- churn: string (nullable = true)
 |-- accountlength: integer (nullable = true)
 |-- internationalplan: string (nullable = true)
 |-- voicemailplan: string (nullable = true)
 |-- numbervmailmessages: integer (nullable = true)
 |-- totaldayminutes: double (nullable = true)
 |-- totaldaycalls: integer (nullable = true)
 |-- totaldaycharge: double (nullable = true)
 |-- totaleveminutes: double (nullable = true)
 |-- totalevecalls: integer (nullable = true)
 |-- totalevecharge: double (nullable = true)
 |-- totalnightminutes: double (nullable = true)
 |-- totalnightcalls: integer (nullable = true)
 |-- totalnightcharge: double (nullable = true)
 |-- totalintlminutes: double (nullable = true)
 |-- totalintlcalls: integer (nullable = true)
 |-- totalintlcharge: double (nullable = true)
 |-- numbercustomerservicecalls: integer (nullable = true)



In [5]:
# In some situations, spark may not be able to infer the data schema as easily as above
# especially in cases where read from json formats.
# we can force it to enforce the schema which we need, while it reads the dataset.
# Lets explore that a bit before we move on with our analysis

#from pyspark.sql.types import StructField, StringType, IntegerType, StructType
#data_schema = [StructField('age',IntegerType(), True),
#               StructField('name', StringType(), True)]
#final_struc = StructType(fields=data_schema)
#
# once we defined the final structure, we can just use it while reading the dataset
#df = spark.read.json('data.json', schema=final_struc)

In [6]:
# Check for NUll Values
from pyspark.sql.functions import isnan, when, count, col

df.select([count(when(isnan(c) | col(c).isNull(),c)).alias(c) for c in df.columns]).show()

+-----+-------------+-----------------+-------------+-------------------+---------------+-------------+--------------+---------------+-------------+--------------+-----------------+---------------+----------------+----------------+--------------+---------------+--------------------------+
|churn|accountlength|internationalplan|voicemailplan|numbervmailmessages|totaldayminutes|totaldaycalls|totaldaycharge|totaleveminutes|totalevecalls|totalevecharge|totalnightminutes|totalnightcalls|totalnightcharge|totalintlminutes|totalintlcalls|totalintlcharge|numbercustomerservicecalls|
+-----+-------------+-----------------+-------------+-------------------+---------------+-------------+--------------+---------------+-------------+--------------+-----------------+---------------+----------------+----------------+--------------+---------------+--------------------------+
|    0|            0|                0|            0|                  0|              0|            0|             0|            

In [7]:
# We do see that some of the fields have null values so we will need to impute them 
# before feeding them into the model

In [8]:
# Check the count of output variable in the dataset
df.groupBy('churn').count().show()

+-----+-----+
|churn|count|
+-----+-----+
|   No| 4293|
|  Yes|  707|
+-----+-----+



In [9]:
# Check the distribution of data for the AccountLength, totaldaycharge, totalevecharge, totalNightcharge
df.select('accountlength', 'totaldaycharge', 'totalnightcharge', 'totalevecharge').describe().show()

+-------+-----------------+------------------+-----------------+------------------+
|summary|    accountlength|    totaldaycharge| totalnightcharge|    totalevecharge|
+-------+-----------------+------------------+-----------------+------------------+
|  count|             5000|              5000|             4994|              4993|
|   mean|         100.2586|30.649668000000023|9.017503003604375|17.053647105948343|
| stddev|39.69455954726711| 9.162068691639355|2.273414183416672| 4.292381665441585|
|    min|                1|               0.0|              0.0|               0.0|
|    max|              243|             59.76|            17.77|             30.91|
+-------+-----------------+------------------+-----------------+------------------+



In [10]:
# Lets create a sql view for the data frame so that we can run some queries interactively.
# Get some insights on the effect of certain predictor variables on the dependent variable
# First, lets look at churn count by TotalCustomerservicecalls
df.createOrReplaceTempView("Churn_data")
query = "select numbercustomerservicecalls, churn, count(*) as churn_count \
        from Churn_data \
        group by numbercustomerservicecalls, churn \
        order by numbercustomerservicecalls,churn desc, churn_count"

spark.sql(query).show()

+--------------------------+-----+-----------+
|numbercustomerservicecalls|churn|churn_count|
+--------------------------+-----+-----------+
|                         0|  Yes|        121|
|                         0|   No|        902|
|                         1|  Yes|        190|
|                         1|   No|       1596|
|                         2|  Yes|        122|
|                         2|   No|       1005|
|                         3|  Yes|         73|
|                         3|   No|        592|
|                         4|  Yes|        111|
|                         4|   No|        141|
|                         5|  Yes|         58|
|                         5|   No|         38|
|                         6|  Yes|         22|
|                         6|   No|         12|
|                         7|  Yes|          7|
|                         7|   No|          6|
|                         8|  Yes|          1|
|                         8|   No|          1|
|            

In [11]:
# Next lets look at Voicemail plan predictor varible with the churn variable

query = "select voicemailplan, churn, count(*) as plan_count \
         from Churn_data \
         group by voicemailplan, churn"
spark.sql(query).show()

+-------------+-----+----------+
|voicemailplan|churn|plan_count|
+-------------+-----+----------+
|          yes|   No|      1221|
|           no|   No|      3072|
|          yes|  Yes|       102|
|           no|  Yes|       605|
+-------------+-----+----------+



In [12]:
# Let's Look at Account Length variable and see if it can show any insights in current form

df.stat.crosstab('accountlength', 'churn').show()

+-------------------+---+---+
|accountlength_churn| No|Yes|
+-------------------+---+---+
|                 69| 31|  5|
|                138| 34|  5|
|                101| 48|  7|
|                 88| 40| 10|
|                170|  8|  2|
|                115| 39|  9|
|                217|  3|  0|
|                  5|  2|  0|
|                120| 47|  5|
|                202|  2|  0|
|                 10|  3|  0|
|                 56| 22|  2|
|                142| 26|  1|
|                153| 11|  1|
|                174| 10|  2|
|                185|  9|  1|
|                 42| 24|  2|
|                 24|  7|  3|
|                 37| 13|  2|
|                 25| 10|  2|
+-------------------+---+---+
only showing top 20 rows



In [13]:
# Lets try a different view - Understand description statistics for this variable

df.select('accountlength').describe().show()


+-------+-----------------+
|summary|    accountlength|
+-------+-----------------+
|  count|             5000|
|   mean|         100.2586|
| stddev|39.69455954726711|
|    min|                1|
|    max|              243|
+-------+-----------------+



In [14]:
# Let us look closely at International plan Variable and understand how it is  
# impacting churn

df.stat.crosstab('internationalplan', 'churn').show()

+-----------------------+----+---+
|internationalplan_churn|  No|Yes|
+-----------------------+----+---+
|                     no|4019|508|
|                    yes| 274|199|
+-----------------------+----+---+



In [15]:
# This tells us that customers who have international plan tend to churn less.

In [16]:
# Lets start building our Model - We will use the pipeline functionality in Spark ML to transform our dataset
# As we have less data overall, I am choosing a train/test split of 0.8,0.2

In [17]:
churn_df = df
(train_data, test_data) = churn_df.randomSplit([0.8, 0.2], 24)

print("Records for training: " + str(train_data.count()))
print("Records for validation: " + str(test_data.count()))

Records for training: 4016
Records for validation: 984


In [18]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler

catColumns = ["internationalplan", "voicemailplan"]

In [19]:
# Let us set the stages for our pipeline and then transform the variables accordingly
# The first stage for our Pipeline is the transformer which performs StringIndexing and categorical Encoding
stages = []

for catCol in catColumns:
    
    stringIndexer = StringIndexer(inputCol= catCol, outputCol=catCol + "Index")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols = [catCol + "catVec"])
    
    stages += [stringIndexer, encoder]

In [20]:
# The Second stage for our Transformer pipeline is the Imputer to impute missing values

from pyspark.ml.feature import Imputer
imputer = Imputer(inputCols = ["totalevecharge", "totalnightcharge"], \
                  outputCols=["out_totalevecharge", "out_totalnightcharge"])

stages += [imputer]

In [21]:
# Create bins for Account Length variable as using it as a numeric variable might not add much value

from pyspark.ml.feature import QuantileDiscretizer

acctlen_bin = QuantileDiscretizer(numBuckets=4, inputCol = "accountlength", outputCol="acctlen_bin")

stages += [acctlen_bin]


In [22]:
# Create the label_Idx for the Output Column

label_Idx = StringIndexer(inputCol= "churn", outputCol = "label")
stages += [label_Idx]

In [23]:
# Let us create a vector assembler which will convert the input values into array format which 
# can be fed to the model

numericCols = ["acctlen_bin", "numbervmailmessages", "totaldayminutes","totaldaycalls", \
               "totaldaycharge", "totaleveminutes", "totalevecalls", \
               "out_totalevecharge", "totalnightminutes", "totalnightcalls", \
               "out_totalnightcharge", "totalintlminutes", "totalintlcalls", \
               "totalintlcharge", "numbercustomerservicecalls"]
inputFeatures = [c + "catVec" for c in catColumns] + numericCols
assembler = VectorAssembler(inputCols=inputFeatures, outputCol="features")
stages += [assembler]

In [24]:
stages

[StringIndexer_382ffd84ebd4,
 OneHotEncoderEstimator_eb0073c4b65b,
 StringIndexer_adf0adb2e342,
 OneHotEncoderEstimator_45c993d4a763,
 Imputer_b1b0da79b1ab,
 QuantileDiscretizer_e221cc75a074,
 StringIndexer_d9ef30bc785c,
 VectorAssembler_1cb58fdaf39c]

In [25]:
# Create our stages into a pipeline
pipeline = Pipeline().setStages(stages)
pipelineModel = pipeline.fit(train_data)

In [26]:
# Run our data through the pipeline

trainfinalDF = pipelineModel.transform(train_data)
testfinalDF = pipelineModel.transform(test_data)

In [27]:
# Lets fit a Random forest Algorithm to get insights on feature importances

from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol="label", featuresCol="features")

rf_model = rf.fit(trainfinalDF)

In [28]:
# Look at feature importance value for the features

feature_dict = {}
for feature, imp in zip(inputFeatures, rf_model.featureImportances):
    feature_dict[feature] = feature_dict.get(feature,0) + imp
    
feature_list = sorted(feature_dict.items(), key = lambda x:x[1], reverse = True)

feature_list

[('totaldaycharge', 0.21485353351969486),
 ('totaldayminutes', 0.16679368310247106),
 ('numbercustomerservicecalls', 0.14772046100045796),
 ('internationalplancatVec', 0.09774993907102503),
 ('totaleveminutes', 0.06741407409952797),
 ('totalintlcalls', 0.052604088424604845),
 ('totalintlminutes', 0.05076341770100635),
 ('voicemailplancatVec', 0.049605674517355855),
 ('totalintlcharge', 0.04128566467198688),
 ('out_totalevecharge', 0.03524340854747537),
 ('numbervmailmessages', 0.022639067948644623),
 ('totalnightminutes', 0.019059749971999992),
 ('out_totalnightcharge', 0.015985548654851377),
 ('totalnightcalls', 0.0055304158138380945),
 ('totalevecalls', 0.0052188884977352355),
 ('totaldaycalls', 0.004925527496499302),
 ('acctlen_bin', 0.0026068569608253263)]

In [29]:
# Let us rebuild the pipeline only considering the high significant features above

stages = []

for catCol in catColumns:
    
    stringIndexer = StringIndexer(inputCol= catCol, outputCol=catCol + "Index")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols = [catCol + "catVec"])
    
    stages += [stringIndexer, encoder]

imputer = Imputer(inputCols = ["totalevecharge", "totalnightcharge"], \
                  outputCols=["out_totalevecharge", "out_totalnightcharge"])

stages += [imputer]

label_Idx = StringIndexer(inputCol= "churn", outputCol = "label")
stages += [label_Idx]

numericCols = ["numbercustomerservicecalls", "totaldaycharge", "totaldayminutes",\
               "totaleveminutes", "out_totalevecharge", "totalintlcalls"]
inputFeatures = [c + "catVec" for c in catColumns] + numericCols
assembler = VectorAssembler(inputCols=inputFeatures, outputCol="features")
stages += [assembler]

# Create our stages into a pipeline
pipeline = Pipeline().setStages(stages)
pipelineModel = pipeline.fit(train_data)

# Run our data through the pipeline

trainfinalDF = pipelineModel.transform(train_data)
testfinalDF = pipelineModel.transform(test_data)

In [30]:
# Lets fit the Logistic Regression Model 

from pyspark.ml.classification import LogisticRegression

logistic = LogisticRegression(labelCol="label", featuresCol="features", maxIter = 150)

#Train the above model with our training data
logistic_model = logistic.fit(trainfinalDF)

In [31]:
# Lets predict with the test dataset and look at model metrics

from pyspark.mllib.evaluation import BinaryClassificationMetrics

predictions = logistic_model.transform(testfinalDF) # Outputs a df object

results = predictions.select(['prediction', 'label'])
res = results.collect()
results_list = [(float(i[0]), float(i[1])) for i in res]
preds_labels = spark.sparkContext.parallelize(results_list) # Convert to RDD to work with BCM

metrics = BinaryClassificationMetrics(preds_labels)

print("Area under PR = ", metrics.areaUnderPR)
print("Area under ROC = ", metrics.areaUnderROC) 

Area under PR =  0.33972125435540074
Area under ROC =  0.5589878131347326


In [32]:
# Let us look at Precision and Recall Metrics

count=predictions.count()
correct = results.filter(results['prediction'] == results['label']).count()
wrong = results.filter(results['prediction'] != results['label']).count()
tp = results.filter(results['prediction'] == 1.0).filter(results['prediction'] == results['label']).count()
fp = results.filter(results['prediction'] == 1.0).filter(results['prediction'] != results['label']).count()
fn = results.filter(results['prediction'] == 0.0).filter(results['prediction'] != results['label']).count()
tn = results.filter(results['prediction'] == 0.0).filter(results['prediction'] == results['label']).count()

accuracy = (tp+tn)/count

precision = tp/(tp+fp)

recall = tp/(tp+fn)

print("Correct: %s\nWrong: %s\ntp: %s\nfp: %s\nfn: %s\ntn: %s\nAccuracy: %s\nPrecision: %s\nRecall: %s"
      % (correct, wrong, tp, fp, fn, tn, accuracy, precision, recall))

Correct: 843
Wrong: 141
tp: 20
fp: 21
fn: 120
tn: 823
Accuracy: 0.8567073170731707
Precision: 0.4878048780487805
Recall: 0.14285714285714285


In [33]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(logistic.regParam, [0.01, 0.1])
             .build())

evaluatorLR = BinaryClassificationEvaluator(rawPredictionCol="prediction")

cv = CrossValidator(estimator=logistic, estimatorParamMaps=paramGrid, evaluator=evaluatorLR, numFolds=3)

cvModel = cv.fit(trainfinalDF)

In [34]:
predictions = cvModel.bestModel.transform(testfinalDF)
evaluatorLR.evaluate(predictions)

0.5553994583615437

In [35]:
# Calculate the Accuracy, Precision and recall metrics again

results = predictions.select(['prediction', 'label'])

count=predictions.count()
correct = results.filter(results.prediction == results.label).count()
wrong = results.filter(results.prediction != results.label).count()
tp = results.filter(results.prediction == 1.0).filter(results.prediction == results.label).count()
fp = results.filter(results.prediction == 1.0).filter(results.prediction != results.label).count()
fn = results.filter(results.prediction == 0.0).filter(results.prediction != results.label).count()
tn = results.filter(results.prediction == 0.0).filter(results.prediction == results.label).count()

accuracy = (tp+tn)/count

precision = tp/(tp+fp)

recall = tp/(tp+fn)

print("Correct: %s\nWrong: %s\ntp: %s\nfp: %s\nfn: %s\ntn: %s\nAccuracy: %s\nPrecision: %s\nRecall: %s"
      % (correct, wrong, tp, fp, fn, tn, accuracy, precision, recall))

Correct: 847
Wrong: 137
tp: 18
fp: 15
fn: 122
tn: 829
Accuracy: 0.8607723577235772
Precision: 0.5454545454545454
Recall: 0.12857142857142856
