# The Decision Tree on the Churn Dataset in Spark with Cross Validation

In [38]:
from pyspark.sql import DataFrameReader
from pyspark.sql import SparkSession
from pyspark.ml.feature import IndexToString, Normalizer, StringIndexer, VectorAssembler, VectorIndexer
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline
from pyspark.mllib.evaluation import MulticlassMetrics

## Select the churn file 

In [39]:
inputFile = "../data/churn.csv"

## Create the Spark Session 

In [40]:
#create a SparkSession
spark = (SparkSession
       .builder
       .appName("ChurnDecisionTree")
       .getOrCreate())
# create a DataFrame using an ifered Schema 
df = spark.read.option("header", "true") \
       .option("inferSchema", "true") \
       .option("delimiter", ";") \
       .csv(inputFile)   

## Data Preparation
### Transform labels into index

In [41]:
labelIndexer = StringIndexer().setInputCol("LEAVE").setOutputCol("label").fit(df)
collegeIndexer = StringIndexer().setInputCol("COLLEGE").setOutputCol("COLLEGE_NUM").fit(df)
satIndexer = StringIndexer().setInputCol("REPORTED_SATISFACTION").setOutputCol("REPORTED_SATISFACTION_NUM").fit(df)
usageIndexer = StringIndexer().setInputCol("REPORTED_USAGE_LEVEL").setOutputCol("REPORTED_USAGE_LEVEL_NUM").fit(df)
changeIndexer = StringIndexer().setInputCol("CONSIDERING_CHANGE_OF_PLAN").setOutputCol("CONSIDERING_CHANGE_OF_PLAN_NUM").fit(df)

 ### Build the feature vector

In [42]:
featureCols = df.columns.copy()
featureCols.remove("LEAVE")
featureCols.remove("COLLEGE")
featureCols.remove("REPORTED_SATISFACTION")
featureCols.remove("REPORTED_USAGE_LEVEL")
featureCols.remove("CONSIDERING_CHANGE_OF_PLAN")
featureCols = featureCols +["COLLEGE_NUM","REPORTED_SATISFACTION_NUM","REPORTED_USAGE_LEVEL_NUM","CONSIDERING_CHANGE_OF_PLAN_NUM"]

### Build the feature Vector Assembler

In [43]:
assembler =  VectorAssembler(outputCol="features", inputCols=list(featureCols))

### Build a featureIndexer 

Automatically identify categorical features, and index them.
Features with > 5 distinct values are treated as continuous.

In [44]:
featureIndexer = VectorIndexer(inputCol="features",outputCol="indexedFeatures", maxCategories=6) 

### Convert indexed labels back to original labels

In [45]:
predConverter = IndexToString(inputCol="prediction",outputCol="predictedLabel",labels=labelIndexer.labels)

## Build the decision tree model

In [46]:
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")

## Data Preparation
### Build a network parameters grid

In [47]:
paramGrid = ParamGridBuilder().addGrid(dt.maxDepth, [ 10 ]) \
				              .addGrid(dt.minInfoGain, [ 0.02, 0.01]) \
				              .addGrid(dt.minInstancesPerNode, [5, 10, 15]) \
                              .addGrid(dt.maxBins, [5, 6, 9]) \
				              .build()

### Split the data

In [48]:
splits = df.randomSplit([0.6, 0.4 ], 1234)
train = splits[0]
test = splits[1]

### Build a pipeline

In [49]:
pipeline = Pipeline(stages= [labelIndexer, collegeIndexer, satIndexer,
				usageIndexer, changeIndexer, assembler, featureIndexer,  dt, predConverter])

## Build an evaluator

In [50]:
evaluator =  BinaryClassificationEvaluator(labelCol="label",rawPredictionCol="prediction", metricName="areaUnderROC")

## Build the Cross Validator

In [51]:
cv = CrossValidator(estimator=pipeline, evaluator=evaluator,estimatorParamMaps=paramGrid,numFolds=10, parallelism=2)

## Train the Model 

In [52]:
cvModel = cv.fit(train)

## Find out what is the best model

In [53]:
treeModel = cvModel.bestModel.stages[7]
print("Learned classification tree model:\n",treeModel)
print("Best Params: \n", treeModel.explainParams())

Learned classification tree model:
 DecisionTreeClassificationModel (uid=DecisionTreeClassifier_ef48a7872baa) of depth 10 with 29 nodes
Best Params: 
 cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. (default: False)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext (default: 10)
featuresCol: features column name (default: features, current: features)
impurity: Criterion used for information gain calculation (case-insensitive). Supported options: entropy, gini (default: gini)
labelCol: label column name (default: label, current: label)
maxBins: Max number of bins for discretizing continuous features.  Must be >=2 and >= number of categorie

## Test the model 

In [54]:
predictions = cvModel.transform(test)
predictions.select("prediction", "label", "predictedLabel", "LEAVE", "features").show()

+----------+-----+--------------+-----+--------------------+
|prediction|label|predictedLabel|LEAVE|            features|
+----------+-----+--------------+-----+--------------------+
|       0.0|  1.0|          STAY|LEAVE|[20007.0,36.0,23....|
|       1.0|  1.0|         LEAVE|LEAVE|[20009.0,183.0,18...|
|       1.0|  1.0|         LEAVE|LEAVE|[20012.0,246.0,9....|
|       1.0|  0.0|         LEAVE| STAY|[20063.0,58.0,0.0...|
|       1.0|  0.0|         LEAVE| STAY|(11,[0,3,4,6,8,10...|
|       0.0|  0.0|          STAY| STAY|[20078.0,199.0,65...|
|       0.0|  0.0|          STAY| STAY|(11,[0,3,4,6,8],[...|
|       0.0|  0.0|          STAY| STAY|[20278.0,0.0,69.0...|
|       0.0|  0.0|          STAY| STAY|[20284.0,0.0,5.0,...|
|       1.0|  1.0|         LEAVE|LEAVE|[20288.0,0.0,0.0,...|
|       0.0|  1.0|          STAY|LEAVE|[20317.0,85.0,0.0...|
|       1.0|  1.0|         LEAVE|LEAVE|[20320.0,0.0,0.0,...|
|       0.0|  1.0|          STAY|LEAVE|[20326.0,88.0,0.0...|
|       0.0|  0.0|      

# Evaluate the Model 
## Area under ROC

In [55]:
accuracy = evaluator.evaluate(predictions)
print("Test Error = " ,(1.0 - accuracy))

Test Error =  0.2962273746129229


## Confusion Matrix 

In [56]:
predictionAndLabels = predictions.select("prediction", "label").rdd.map(lambda p: [p[0], p[1]]) # Map to RDD prediction|label
metrics =  MulticlassMetrics(predictionAndLabels)


In [57]:
confusion = metrics.confusionMatrix()
print("Confusion matrix: \n" , confusion)

Confusion matrix: 
 DenseMatrix([[2522., 1534.],
             [ 848., 3110.]])


## Statistics per label

In [58]:
labels = predictionAndLabels.map(lambda x: x[1]).distinct().collect()
print(labels)
for label in  labels:
  print("Class %f precision = %f\n" % (label , metrics.precision(label)))
  print("Class %f recall = %f\n" % (label, metrics.recall(label)))
  print("Class %f F1 score = %f\n" % (label, metrics.fMeasure( label)))

[1.0, 0.0]
Class 1.000000 precision = 0.669681

Class 1.000000 recall = 0.785750

Class 1.000000 F1 score = 0.723088

Class 0.000000 precision = 0.748368

Class 0.000000 recall = 0.621795

Class 0.000000 F1 score = 0.679235



## Weighted stats

In [60]:
print("Weighted precision = %s\n" % metrics.weightedPrecision)
print("Weighted recall = %s\n" % metrics.weightedRecall)
print("Weighted F1 score = %s\n" % metrics.weightedFMeasure)
print("Weighted false positive rate = %s\n" % metrics.weightedFalsePositiveRate)

Weighted precision = 0.7095057446104266

Weighted recall = 0.7027701522335912

Weighted F1 score = <bound method MulticlassMetrics.weightedFMeasure of <pyspark.mllib.evaluation.MulticlassMetrics object at 0x11562e790>>

Weighted false positive rate = 0.2952249014594369



## Summary stats 

In [62]:
print("Recall = %s" % metrics.recall())
print("Precision = %s" % metrics.precision())
print("Accuracy = %s" % metrics.accuracy) 

Recall = 0.7027701522335912
Precision = 0.7027701522335912
Accuracy = 0.7027701522335912


In [63]:
spark.stop()