In [1]:
#Import Libraries

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer, IndexToString
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.linalg import Vectors
from pyspark import SparkContext

In [2]:
#Initialise the SparkSeesion

spark = SparkSession \
.builder \
.appName("customer churn example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()

In [3]:
#Read the training data 

df = spark.read.load('churn-train.csv',format="csv",inferschema=True, header=True)

In [4]:
#Schema of the training data

df.printSchema()

root
 |-- st: string (nullable = true)
 |-- acclen: integer (nullable = true)
 |-- arcode: integer (nullable = true)
 |-- phnum: string (nullable = true)
 |-- intplan: string (nullable = true)
 |-- voice: string (nullable = true)
 |-- nummailmes: integer (nullable = true)
 |-- tdmin: double (nullable = true)
 |-- tdcal: integer (nullable = true)
 |-- tdchar: double (nullable = true)
 |-- temin: double (nullable = true)
 |-- tecal: integer (nullable = true)
 |-- tecahr: double (nullable = true)
 |-- tnmin: double (nullable = true)
 |-- tncal: integer (nullable = true)
 |-- tnchar: double (nullable = true)
 |-- timin: double (nullable = true)
 |-- tical: integer (nullable = true)
 |-- tichar: double (nullable = true)
 |-- ncsc: integer (nullable = true)
 |-- label: string (nullable = true)



In [5]:
#Convert the strings to double using StringIndexer

indexer = StringIndexer(inputCol="st", outputCol="indSt")
indexed_df = indexer.fit(df).transform(df)

indexer = StringIndexer(inputCol="phnum", outputCol="indPhnum")
indexed_df = indexer.fit(indexed_df).transform(indexed_df)

indexer = StringIndexer(inputCol="intplan", outputCol="indIntplan")
indexed_df = indexer.fit(indexed_df).transform(indexed_df)

indexer = StringIndexer(inputCol="voice", outputCol="indVoice")
indexed_df = indexer.fit(indexed_df).transform(indexed_df)
    
indexed_df.printSchema()

root
 |-- st: string (nullable = true)
 |-- acclen: integer (nullable = true)
 |-- arcode: integer (nullable = true)
 |-- phnum: string (nullable = true)
 |-- intplan: string (nullable = true)
 |-- voice: string (nullable = true)
 |-- nummailmes: integer (nullable = true)
 |-- tdmin: double (nullable = true)
 |-- tdcal: integer (nullable = true)
 |-- tdchar: double (nullable = true)
 |-- temin: double (nullable = true)
 |-- tecal: integer (nullable = true)
 |-- tecahr: double (nullable = true)
 |-- tnmin: double (nullable = true)
 |-- tncal: integer (nullable = true)
 |-- tnchar: double (nullable = true)
 |-- timin: double (nullable = true)
 |-- tical: integer (nullable = true)
 |-- tichar: double (nullable = true)
 |-- ncsc: integer (nullable = true)
 |-- label: string (nullable = true)
 |-- indSt: double (nullable = false)
 |-- indPhnum: double (nullable = false)
 |-- indIntplan: double (nullable = false)
 |-- indVoice: double (nullable = false)



In [12]:
#dropping the columns that are indexed

indexed_df = indexed_df.drop('st')
indexed_df = indexed_df.drop('phnum')
indexed_df = indexed_df.drop('intplan')
indexed_df = indexed_df.drop('voice')

In [15]:
#assemble the features in one column = features

vectorAss = VectorAssembler(inputCols=["acclen",
                                       "arcode",
                                       "nummailmes",
                                       "tdmin",
                                      "tdcal","tdchar","temin",
                                      "tecal","tecahr","tnmin",
                                      "tncal","tnchar","timin","tical","tichar",
                                      "ncsc"]
                            ,outputCol="features")

fi_train_df = vectorAss.transform(indexed_df)

In [16]:
#index the features

featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=20).fit(fi_train_df)
    
fi_df = featureIndexer.transform(fi_train_df)

In [17]:
#Run the Ranfom-Forest Classifier

rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=100, maxBins=32)

In [18]:
#Index the label and convert the predictions to string

labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(fi_df)
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

In [19]:
#Pipeline the features

pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

In [20]:
#train the model

model = pipeline.fit(fi_df)

In [21]:
#Add the testing data

TEST = spark.read.load('churn-test.csv',format="csv",inferschema=True, header=True)

TEST.count()

1667

In [22]:
#prepare the testing data

indexer = StringIndexer(inputCol="st", outputCol="indSt")
TEST = indexer.fit(TEST).transform(TEST)

indexer = StringIndexer(inputCol="phnum", outputCol="indPhnum")
TEST = indexer.fit(TEST).transform(TEST)

indexer = StringIndexer(inputCol="intplan", outputCol="indIntplan")
TEST = indexer.fit(TEST).transform(TEST)

indexer = StringIndexer(inputCol="voice", outputCol="indVoice")
TEST = indexer.fit(TEST).transform(TEST)



TEST.count()

1667

In [23]:
TEST.count()

1667

In [24]:
#assemble the features in the testing data

newvectorAss = VectorAssembler(inputCols=["acclen",
                                       "arcode",
                                       "nummailmes",
                                       "tdmin",
                                      "tdcal","tdchar","temin",
                                      "tecal","tecahr","tnmin",
                                      "tncal","tnchar","timin","tical","tichar",
                                      "ncsc"]
                            ,outputCol="features")

TEST = newvectorAss.transform(TEST)

In [25]:
#index the features in the testing data

featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(TEST)

    
TEST = featureIndexer.transform(TEST)

In [26]:
#make predictions in the testing data

predictions = model.transform(TEST)

In [27]:
#Evaluate the model

evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
print("Accuracy = %g" % (accuracy))

rfModel = model.stages[2]
print(rfModel)  # summary only

Test Error = 0.0995801
Accuracy = 0.90042
RandomForestClassificationModel (uid=RandomForestClassifier_4524ad6b7625469e0710) with 100 trees


In [28]:
tp = predictions[(predictions.indexedLabel == 1) & (predictions.prediction == 1)].count()
tn = predictions[(predictions.indexedLabel == 0) & (predictions.prediction == 0)].count()
fp = predictions[(predictions.indexedLabel == 0) & (predictions.prediction == 1)].count()
fn = predictions[(predictions.indexedLabel == 1) & (predictions.prediction == 0)].count()
print ("True Positives:", tp)
print ("True Negatives:", tn)
print ("False Positives:", fp)
print ("False Negatives:", fn)
print ("Total", predictions.count())

r = float(tp)/(tp + fn)
print ("recall", r)

p = float(tp) / (tp + fp)
print ("precision - true", p)

p1 = float(tn) / (tn + fn)
print ("precision - false", p1)

True Positives: 58
True Negatives: 1443
False Positives: 0
False Negatives: 166
Total 1667
recall 0.25892857142857145
precision - true 1.0
precision - false 0.8968303293971411


In [33]:
predictions.printSchema()

root
 |-- st: string (nullable = true)
 |-- acclen: integer (nullable = true)
 |-- arcode: integer (nullable = true)
 |-- phnum: string (nullable = true)
 |-- intplan: string (nullable = true)
 |-- voice: string (nullable = true)
 |-- nummailmes: integer (nullable = true)
 |-- tdmin: double (nullable = true)
 |-- tdcal: integer (nullable = true)
 |-- tdchar: double (nullable = true)
 |-- temin: double (nullable = true)
 |-- tecal: integer (nullable = true)
 |-- tecahr: double (nullable = true)
 |-- tnmin: double (nullable = true)
 |-- tncal: integer (nullable = true)
 |-- tnchar: double (nullable = true)
 |-- timin: double (nullable = true)
 |-- tical: integer (nullable = true)
 |-- tichar: double (nullable = true)
 |-- ncsc: integer (nullable = true)
 |-- label: string (nullable = true)
 |-- indSt: double (nullable = false)
 |-- indPhnum: double (nullable = false)
 |-- indIntplan: double (nullable = false)
 |-- indVoice: double (nullable = false)
 |-- features: vector (nullable = true

In [40]:
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.linalg import Vectors

selector = ChiSqSelector(numTopFeatures=4, featuresCol="features",
                         outputCol="selectedFeatures", labelCol="indexedLabel")

result = selector.fit(predictions).transform(predictions)

print("ChiSqSelector output with top %d features selected" % selector.getNumTopFeatures())
result.select("features","selectedFeatures").show(20, False)

ChiSqSelector output with top 4 features selected
+-----------------------------------------------------------------------------------------+---------------------+
|features                                                                                 |selectedFeatures     |
+-----------------------------------------------------------------------------------------+---------------------+
|[101.0,510.0,0.0,70.9,123.0,12.05,211.9,73.0,18.01,236.0,73.0,10.62,10.6,3.0,2.86,3.0]   |[70.9,12.05,3.0,3.0] |
|[137.0,510.0,0.0,223.6,86.0,38.01,244.8,139.0,20.81,94.2,81.0,4.24,9.5,7.0,2.57,0.0]     |[223.6,38.01,7.0,0.0]|
|[103.0,408.0,29.0,294.7,95.0,50.1,237.3,105.0,20.17,300.3,127.0,13.51,13.7,6.0,3.7,1.0]  |[294.7,50.1,6.0,1.0] |
|[99.0,415.0,0.0,216.8,123.0,36.86,126.4,88.0,10.74,220.6,82.0,9.93,15.7,2.0,4.24,1.0]    |[216.8,36.86,2.0,1.0]|
|[108.0,415.0,0.0,197.4,78.0,33.56,124.0,101.0,10.54,204.5,107.0,9.2,7.7,4.0,2.08,2.0]    |[197.4,33.56,4.0,2.0]|
|[117.0,415.0,0.0,226.5,85.0,38.51,141