# Spark Preparation
We check if we are in Google Colab.  If this is the case, install all necessary packages.

To run spark in Colab, we need to first install all the dependencies in Colab environment i.e. Apache Spark 3.2.1 with hadoop 3.2, Java 8 and Findspark to locate the spark in the system. The tools installation can be carried out inside the Jupyter Notebook of the Colab.
Learn more from [A Must-Read Guide on How to Work with PySpark on Google Colab for Data Scientists!](https://www.analyticsvidhya.com/blog/2020/11/a-must-read-guide-on-how-to-work-with-pyspark-on-google-colab-for-data-scientists/)

credit: Natawut Nupairoj

In [1]:
try:
  import google.colab
  IN_COLAB = True
except:
  IN_COLAB = False

In [2]:
if IN_COLAB:
    !apt-get install openjdk-8-jdk-headless -qq > /dev/null
    !wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
    !tar xf spark-3.2.1-bin-hadoop3.2.tgz
    !mv spark-3.2.1-bin-hadoop3.2 spark
    !pip install -q findspark

In [3]:
if IN_COLAB:
  import os
  os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
  os.environ["SPARK_HOME"] = "/content/spark"

In [4]:
import findspark
findspark.init()

# Pyspark_Classification_Pipeline_Churn

In [5]:
#1 - import module
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.classification import DecisionTreeClassifier, DecisionTreeClassificationModel
from pyspark.ml.feature import StringIndexer, IndexToString, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [6]:
#2 - Create spark context
sc = SparkContext.getOrCreate()

In [7]:
sc

In [8]:
sc._conf.getAll()

[('spark.app.id', 'local-1647833646313'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.submit.pyFiles', ''),
 ('spark.app.startTime', '1647833644326'),
 ('spark.driver.port', '36569'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.host', 'ac394efb26d5'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.app.name', 'pyspark-shell')]

In [9]:
print (sc.getConf().toDebugString())

spark.app.id=local-1647833646313
spark.app.name=pyspark-shell
spark.app.startTime=1647833644326
spark.driver.host=ac394efb26d5
spark.driver.port=36569
spark.executor.id=driver
spark.master=local[*]
spark.rdd.compress=True
spark.serializer.objectStreamReset=100
spark.submit.deployMode=client
spark.submit.pyFiles=
spark.ui.showConsoleProgress=true


In [10]:
#3 - Setup SparkSession(SparkSQL)
spark = (SparkSession
         .builder
         .appName("Pyspark_Classification_Pipeline_Churn")
         .getOrCreate())
print (spark)

<pyspark.sql.session.SparkSession object at 0x7f2310c836d0>


In [11]:
!wget https://github.com/kaopanboonyuen/GISTDA2022/raw/main/dataset/churn.csv

--2022-03-21 03:34:09--  https://github.com/kaopanboonyuen/GISTDA2022/raw/main/dataset/churn.csv
Resolving github.com (github.com)... 140.82.113.4
Connecting to github.com (github.com)|140.82.113.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/kaopanboonyuen/GISTDA2022/main/dataset/churn.csv [following]
--2022-03-21 03:34:09--  https://raw.githubusercontent.com/kaopanboonyuen/GISTDA2022/main/dataset/churn.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.111.133, 185.199.108.133, 185.199.109.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.111.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 255025 (249K) [text/plain]
Saving to: ‘churn.csv’


2022-03-21 03:34:09 (41.5 MB/s) - ‘churn.csv’ saved [255025/255025]



In [12]:
#4 - Read file to spark DataFrame
data = (spark
        .read
        .option("header","true")
        .option("inferSchema", "true")
        .csv("churn.csv"))
data.cache()
print ("finish caching data")

finish caching data


In [13]:
#5 - Understand data and problems
category = ['International plan','Voice mail plan']
continuous = ['Number vmail messages','Total day minutes','Total day calls','Total day charge','Total eve minutes','Total eve calls','Total eve charge','Total night minutes','Total night calls','Total night charge','Total intl minutes','Total intl calls','Total intl charge','Customer service calls']
label = 'churn'

unique_features = ['State','Account length','Area code']
unused_features = ['Total day charge','Total eve charge','Total night charge','Total intl charge']
#bcz charges computed from minutes / 22.2252

print (len(category) + len(continuous))


16


In [14]:
data.describe().toPandas()

Unnamed: 0,summary,State,Account length,Area code,International plan,Voice mail plan,Number vmail messages,Total day minutes,Total day calls,Total day charge,Total eve minutes,Total eve calls,Total eve charge,Total night minutes,Total night calls,Total night charge,Total intl minutes,Total intl calls,Total intl charge,Customer service calls
0,count,3000,3000.0,3000.0,3000,3000,3000.0,3000.0,3000.0,3000.0,3000.0,3000.0,3000.0,3000.0,3000.0,3000.0,3000.0,3000.0,3000.0,3000.0
1,mean,,101.09333333333332,437.04566666666665,,,8.011,179.5585000000005,100.56133333333334,30.525506666666693,201.1894666666669,100.01833333333332,17.101323333333337,201.26770000000008,99.97733333333332,9.057113333333342,10.263133333333348,4.489,2.771566666666659,1.5616666666666668
2,stddev,,39.56984390169308,42.26102514103134,,,13.629996500422669,54.64951726833965,20.05754238578949,9.290379267048117,51.20153176094548,19.975977500853205,4.352130347501413,50.48089883403788,19.5420090129182,2.271682573422988,2.8103131739364486,2.4672335547743733,0.7587626676487821,1.3206482976923195
3,min,AK,1.0,408.0,No,No,0.0,0.0,0.0,0.0,0.0,0.0,0.0,23.2,33.0,1.04,0.0,0.0,0.0,0.0
4,max,WY,243.0,510.0,Yes,Yes,51.0,350.8,165.0,59.64,363.7,170.0,30.91,395.0,175.0,17.77,20.0,20.0,5.4,9.0


In [15]:
data.printSchema()

root
 |-- State: string (nullable = true)
 |-- Account length: integer (nullable = true)
 |-- Area code: integer (nullable = true)
 |-- International plan: string (nullable = true)
 |-- Voice mail plan: string (nullable = true)
 |-- Number vmail messages: integer (nullable = true)
 |-- Total day minutes: double (nullable = true)
 |-- Total day calls: integer (nullable = true)
 |-- Total day charge: double (nullable = true)
 |-- Total eve minutes: double (nullable = true)
 |-- Total eve calls: integer (nullable = true)
 |-- Total eve charge: double (nullable = true)
 |-- Total night minutes: double (nullable = true)
 |-- Total night calls: integer (nullable = true)
 |-- Total night charge: double (nullable = true)
 |-- Total intl minutes: double (nullable = true)
 |-- Total intl calls: integer (nullable = true)
 |-- Total intl charge: double (nullable = true)
 |-- Customer service calls: integer (nullable = true)
 |-- Churn: boolean (nullable = true)



In [16]:
data.sample(False, 0.001, 1234).toPandas()

Unnamed: 0,State,Account length,Area code,International plan,Voice mail plan,Number vmail messages,Total day minutes,Total day calls,Total day charge,Total eve minutes,Total eve calls,Total eve charge,Total night minutes,Total night calls,Total night charge,Total intl minutes,Total intl calls,Total intl charge,Customer service calls,Churn
0,OR,21,510,No,Yes,31,135.9,90,23.1,271.0,84,23.04,179.1,89,8.06,9.5,7,2.57,6,False
1,OR,159,510,No,No,0,114.8,98,19.52,192.6,101,16.37,259.0,108,11.66,12.2,5,3.29,0,False
2,MN,74,510,No,Yes,33,193.7,91,32.93,246.1,96,20.92,138.0,92,6.21,14.6,3,3.94,2,False
3,SC,64,510,No,Yes,40,210.0,116,35.7,232.7,89,19.78,168.8,94,7.6,5.9,4,1.59,8,False
4,NM,41,415,No,No,0,232.1,74,39.46,327.1,88,27.8,226.5,119,10.19,10.9,2,2.94,3,True


In [17]:
data.groupBy(label).count().toPandas()

Unnamed: 0,churn,count
0,True,438
1,False,2562


In [18]:
#6 - Change column type from boolean to string
data.select(label).printSchema()
data = data.withColumn(label, data[label].cast("string"))
data.select(label).printSchema()

root
 |-- churn: boolean (nullable = true)

root
 |-- churn: string (nullable = true)



In [19]:
#8 - Remove unused variables
print ("number of features : " + str(len(data.drop(label).head())))
for unused_feature in unique_features + unused_features:
    print (unused_feature)
    data = data.drop(unused_feature)
print ("\nnumber of features remain : " + str(len(data.drop(label).head())))


category = [feature for feature in category if feature not in (unique_features + unused_features)]
continuous = [feature for feature in continuous if feature not in (unique_features + unused_features)]

print ("\nnumber of features remain : " + str(len(category) + len(continuous)))

number of features : 19
State
Account length
Area code
Total day charge
Total eve charge
Total night charge
Total intl charge

number of features remain : 12

number of features remain : 12


In [20]:
#9 - split Train and Test data
data = data.sort(label)
(trainingData, testData) = data.randomSplit([0.7, 0.3],seed = 50)

print(type(data))
print(type(trainingData))
print(type(testData))

print ("data count : " + str(data.count()))
print ("trainingData count : " + str(trainingData.count()))
print ("testData count : " + str(testData.count()))

data.groupBy(label).count().show()
trainingData.groupBy(label).count().show()
testData.groupBy(label).count().show()

<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pyspark.sql.dataframe.DataFrame'>
data count : 3000
trainingData count : 2106
testData count : 894
+-----+-----+
|churn|count|
+-----+-----+
|false| 2562|
| true|  438|
+-----+-----+

+-----+-----+
|churn|count|
+-----+-----+
|false| 1799|
| true|  307|
+-----+-----+

+-----+-----+
|churn|count|
+-----+-----+
|false|  763|
| true|  131|
+-----+-----+



In [21]:
#10 - String indexer
featureidx_list = [StringIndexer(inputCol = label, outputCol='label')]
featureidx_list += [StringIndexer(inputCol = c, outputCol=c + 'idx') for c in category]

print (featureidx_list)

[StringIndexer_19f977e1d8cb, StringIndexer_692f6a480c27, StringIndexer_bd5e5fa7840c]


In [22]:
#11 - Create Vector
features = continuous + [c + 'idx' for c in category]
assem =  VectorAssembler(inputCols = features ,outputCol="features")

print (type(assem))

<class 'pyspark.ml.feature.VectorAssembler'>


In [23]:
#12 - Create model
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")

print (dt)


DecisionTreeClassifier_3d8f108fbd37


In [24]:
#13 - Set ML pipeline
print (featureidx_list)
print (assem)
print (dt)
print ("\n")

all_process_list = featureidx_list + [assem,dt]
print (all_process_list)

pipeline = Pipeline(stages=all_process_list)
print ("\n")
print (pipeline)


[StringIndexer_19f977e1d8cb, StringIndexer_692f6a480c27, StringIndexer_bd5e5fa7840c]
VectorAssembler_27aa1c505744
DecisionTreeClassifier_3d8f108fbd37


[StringIndexer_19f977e1d8cb, StringIndexer_692f6a480c27, StringIndexer_bd5e5fa7840c, VectorAssembler_27aa1c505744, DecisionTreeClassifier_3d8f108fbd37]


Pipeline_391c3f146e56


In [25]:
#14 - Train model
model = pipeline.fit(trainingData)
#predictions.cache()

In [26]:
#15 - (Optional) Assign multiple parameter lists used to train multiple models
paramGrid = (ParamGridBuilder()
    .addGrid(dt.maxDepth, [4,5,6])
     .addGrid(dt.minInstancesPerNode, [1,10])
     .addGrid(dt.impurity, ["gini","entropy"])        
    .build())

for param in paramGrid:
    print (param)
    print ("\n\n")

{Param(parent='DecisionTreeClassifier_3d8f108fbd37', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 4, Param(parent='DecisionTreeClassifier_3d8f108fbd37', name='minInstancesPerNode', doc='Minimum number of instances each child must have after split. If a split causes the left or right child to have fewer than minInstancesPerNode, the split will be discarded as invalid. Should be >= 1.'): 1, Param(parent='DecisionTreeClassifier_3d8f108fbd37', name='impurity', doc='Criterion used for information gain calculation (case-insensitive). Supported options: entropy, gini'): 'gini'}



{Param(parent='DecisionTreeClassifier_3d8f108fbd37', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 4, Param(parent='DecisionTreeClassifier_3d8f108fbd37', name='minInstancesPerNode', 

In [27]:
# #16 - (Optional) Train multiple models with multiple parameters
# crossval = CrossValidator(estimator=pipeline,
#                       estimatorParamMaps=paramGrid,
#                       evaluator=MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1"),
#                       numFolds=3)
# cvModel = crossval.fit(trainingData)
# model = cvModel.bestModel

# print (model)

In [28]:
#17 - Make predictions
predictions = model.transform(testData)

In [29]:
# Print sample result
predictions.toPandas()

Unnamed: 0,International plan,Voice mail plan,Number vmail messages,Total day minutes,Total day calls,Total eve minutes,Total eve calls,Total night minutes,Total night calls,Total intl minutes,Total intl calls,Customer service calls,churn,label,International planidx,Voice mail planidx,features,rawPrediction,probability,prediction
0,No,No,0,39.5,78,264.3,106,185.8,90,10.0,6,0,false,0.0,0.0,0.0,"[0.0, 39.5, 78.0, 264.3, 106.0, 185.8, 90.0, 1...","[1341.0, 34.0]","[0.9752727272727273, 0.024727272727272726]",0.0
1,No,No,0,48.4,101,281.1,138,218.5,87,18.2,1,1,false,0.0,0.0,0.0,"[0.0, 48.4, 101.0, 281.1, 138.0, 218.5, 87.0, ...","[1341.0, 34.0]","[0.9752727272727273, 0.024727272727272726]",0.0
2,No,No,0,51.1,106,208.6,137,198.0,92,12.3,3,1,false,0.0,0.0,0.0,"[0.0, 51.1, 106.0, 208.6, 137.0, 198.0, 92.0, ...","[1341.0, 34.0]","[0.9752727272727273, 0.024727272727272726]",0.0
3,No,No,0,54.7,131,256.1,105,176.6,135,11.1,4,1,false,0.0,0.0,0.0,"[0.0, 54.7, 131.0, 256.1, 105.0, 176.6, 135.0,...","[1341.0, 34.0]","[0.9752727272727273, 0.024727272727272726]",0.0
4,No,No,0,58.2,94,138.7,118,136.8,91,11.9,1,5,true,1.0,0.0,0.0,"[0.0, 58.2, 94.0, 138.7, 118.0, 136.8, 91.0, 1...","[5.0, 57.0]","[0.08064516129032258, 0.9193548387096774]",1.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
889,Yes,Yes,38,220.1,78,234.3,71,237.3,85,10.1,4,4,false,0.0,1.0,1.0,"[38.0, 220.1, 78.0, 234.3, 71.0, 237.3, 85.0, ...","[69.0, 14.0]","[0.8313253012048193, 0.1686746987951807]",0.0
890,Yes,Yes,39,126.8,94,293.6,115,174.1,91,8.4,4,0,false,0.0,1.0,1.0,"[39.0, 126.8, 94.0, 293.6, 115.0, 174.1, 91.0,...","[109.0, 3.0]","[0.9732142857142857, 0.026785714285714284]",0.0
891,Yes,Yes,39,149.7,122,211.1,75,114.3,90,9.2,4,1,false,0.0,1.0,1.0,"[39.0, 149.7, 122.0, 211.1, 75.0, 114.3, 90.0,...","[109.0, 3.0]","[0.9732142857142857, 0.026785714285714284]",0.0
892,Yes,Yes,41,146.8,128,285.6,96,213.6,80,4.3,2,1,true,1.0,1.0,1.0,"[41.0, 146.8, 128.0, 285.6, 96.0, 213.6, 80.0,...","[0.0, 27.0]","[0.0, 1.0]",1.0


In [30]:
# Print sample result
predictions.select("prediction", "rawPrediction", "probability", "label", "features").toPandas()

Unnamed: 0,prediction,rawPrediction,probability,label,features
0,0.0,"[1341.0, 34.0]","[0.9752727272727273, 0.024727272727272726]",0.0,"[0.0, 39.5, 78.0, 264.3, 106.0, 185.8, 90.0, 1..."
1,0.0,"[1341.0, 34.0]","[0.9752727272727273, 0.024727272727272726]",0.0,"[0.0, 48.4, 101.0, 281.1, 138.0, 218.5, 87.0, ..."
2,0.0,"[1341.0, 34.0]","[0.9752727272727273, 0.024727272727272726]",0.0,"[0.0, 51.1, 106.0, 208.6, 137.0, 198.0, 92.0, ..."
3,0.0,"[1341.0, 34.0]","[0.9752727272727273, 0.024727272727272726]",0.0,"[0.0, 54.7, 131.0, 256.1, 105.0, 176.6, 135.0,..."
4,1.0,"[5.0, 57.0]","[0.08064516129032258, 0.9193548387096774]",1.0,"[0.0, 58.2, 94.0, 138.7, 118.0, 136.8, 91.0, 1..."
...,...,...,...,...,...
889,0.0,"[69.0, 14.0]","[0.8313253012048193, 0.1686746987951807]",0.0,"[38.0, 220.1, 78.0, 234.3, 71.0, 237.3, 85.0, ..."
890,0.0,"[109.0, 3.0]","[0.9732142857142857, 0.026785714285714284]",0.0,"[39.0, 126.8, 94.0, 293.6, 115.0, 174.1, 91.0,..."
891,0.0,"[109.0, 3.0]","[0.9732142857142857, 0.026785714285714284]",0.0,"[39.0, 149.7, 122.0, 211.1, 75.0, 114.3, 90.0,..."
892,1.0,"[0.0, 27.0]","[0.0, 1.0]",1.0,"[41.0, 146.8, 128.0, 285.6, 96.0, 213.6, 80.0,..."


In [31]:
#18 - Evaluate model
for metricName in ['accuracy','weightedPrecision','weightedRecall','f1']:
    evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName=metricName)
    result = evaluator.evaluate(predictions)
    print ('%s = %g' % (metricName,result))
    if(metricName == 'accuracy') :
        print("error = %g " % (1.0 - result))


accuracy = 0.938479
error = 0.0615213 
weightedPrecision = 0.938123
weightedRecall = 0.938479
f1 = 0.933282


In [32]:
#19 - Show tree diagram
treeModel = model.stages[-1]
treeModel_debug_str = treeModel.toDebugString
print (treeModel_debug_str)


DecisionTreeClassificationModel: uid=DecisionTreeClassifier_3d8f108fbd37, depth=5, numNodes=47, numClasses=2, numFeatures=12
  If (feature 1 <= 265.25)
   If (feature 9 <= 3.5)
    If (feature 10 in {1.0})
     If (feature 7 <= 13.05)
      If (feature 8 <= 2.5)
       Predict: 1.0
      Else (feature 8 > 2.5)
       Predict: 0.0
     Else (feature 7 > 13.05)
      Predict: 1.0
    Else (feature 10 not in {1.0})
     If (feature 1 <= 222.05)
      Predict: 0.0
     Else (feature 1 > 222.05)
      If (feature 3 <= 268.95000000000005)
       Predict: 0.0
      Else (feature 3 > 268.95000000000005)
       Predict: 1.0
   Else (feature 9 > 3.5)
    If (feature 1 <= 160.7)
     If (feature 3 <= 261.65)
      Predict: 1.0
     Else (feature 3 > 261.65)
      If (feature 6 <= 99.5)
       Predict: 0.0
      Else (feature 6 > 99.5)
       Predict: 1.0
    Else (feature 1 > 160.7)
     If (feature 3 <= 134.95)
      If (feature 1 <= 211.75)
       Predict: 1.0
      Else (feature 1 > 211.75)
  

In [33]:
#20 - Save model
model_dir = "/user/admin/"
modelFile = "dt_churn"

#Save model as Pipeline model format
model.write().overwrite().save(model_dir + modelFile +".plmodel")

#Save model as DecisionTree model format
treeModel.write().overwrite().save(model_dir + modelFile +".model")

print ("finish save model")


finish save model


In [34]:
#21 - Load Pipeline model
read_plmodel = PipelineModel.read().load(model_dir + modelFile + ".plmodel")
print (read_plmodel.stages)


[StringIndexerModel: uid=StringIndexer_19f977e1d8cb, handleInvalid=error, StringIndexerModel: uid=StringIndexer_692f6a480c27, handleInvalid=error, StringIndexerModel: uid=StringIndexer_bd5e5fa7840c, handleInvalid=error, VectorAssembler_27aa1c505744, DecisionTreeClassificationModel: uid=DecisionTreeClassifier_3d8f108fbd37, depth=5, numNodes=47, numClasses=2, numFeatures=12]


In [35]:
#22 - Load DecisionTree model
read_model = DecisionTreeClassificationModel.read().load(model_dir + modelFile + ".model")
print ("depth : " + str(read_model.depth))
print ("numNodes : " + str(read_model.numNodes))
print ("featureImportances : " + str(read_model.featureImportances))


#these lines avaiable for Spark2.1 or above
#print readed_model.numClasses  
#print readed_model.numFeatures


depth : 5
numNodes : 47
featureImportances : (12,[1,2,3,5,6,7,8,9,10,11],[0.3056839226789766,0.015313021400614635,0.14650209207474457,0.023816694422276154,0.004208276239732733,0.09278431411154443,0.11381185168461758,0.15130828941580798,0.10188517032502856,0.04468636764665689])
