In [38]:
%pip install pyspark



##Import packages

In [None]:
#---for google colab / jupyter notebook
import pyspark
from pyspark.sql import SparkSession
#----------------------------------------

import pyspark.sql.functions as F

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import VectorAssembler

import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import pandas as pd


##Create Session

In [None]:

spark = SparkSession.builder.appName("Churn Prediction with PySpark").getOrCreate()

##Download Dataset

In [None]:
!wget https://github.com/urfie/belajar-python/raw/master/Machine%20Learning/churn60.csv.gz

##Load to Spark DataFrame

In [None]:
df_churn = spark.read.csv('churn60.csv.gz', header='true', inferSchema='true')

##Quick Check

In [None]:
df_churn.show(10)

###Describe

In [None]:
df_churn.describe().show()

###Print Schema

In [None]:
df_churn.printSchema()

##Check Categorical Data

In [None]:
df_churn.select('account_status').distinct().show()

In [None]:
df_churn.select('subscriber_status').distinct().show()

In [None]:
df_churn.select('occupation').distinct().show()

In [None]:
df_churn.select('education').distinct().show()

In [None]:
df_churn.select('marital_status').distinct().show()

In [None]:
df_churn.select('sex').distinct().show()

In [None]:
df_churn = df_churn.withColumn('sex', F.when(df_churn.sex=='MALE', 'MAL').otherwise(df_churn.sex))
df_churn.select('sex').distinct().show()

###Select features

In [None]:
num_cols = ['lifetime','age','voice_out_onnet_freq','voice_out_onnet_dur','voice_out_onnet_chg',
            'voice_out_mobile_freq','voice_out_mobile_dur','voice_out_mobile_chg','voice_out_pstn_freq',
            'voice_out_pstn_dur','voice_out_pstn_chg','voice_out_spcnum_freq','voice_out_spcnum_dur',
            'voice_out_spcnum_chg','voice_out_sli_freq','voice_out_sli_dur','voice_out_sli_chg',
            'voice_in_onnet_freq','voice_in_onnet_dur','sms_out_onnet_freq','sms_out_onnet_chg','sms_out_offnet_freq',
            'sms_out_offnet_chg','sms_in_onnet_freq','mms_out_onnet_freq',
            'mms_out_onnet_chg','data_freq','data_vol','data_ch','total_revenue']

In [None]:
cat_cols = ['account_status','subscriber_status','sex','occupation','education','marital_status']

###Null handling

In [None]:
df_churn = df_churn.fillna(0, subset=num_cols)

###Plot churn distribution

In [None]:
dfplot = df_churn.groupBy('churn').count().toPandas()
plt.bar(dfplot['churn'], dfplot['count'])

##Show Correlation

In [None]:
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler

#convert each DataFrame column to vectors
vector_col = 'corr_features'
assembler = VectorAssembler(inputCols=num_cols, outputCol=vector_col)
df_vector = assembler.transform(df_churn).select(vector_col)

In [None]:
#calculate correlation matrix
matrix = Correlation.corr(df_vector, vector_col)
corrmatrix = matrix.collect()[0][0].toArray().tolist()

In [None]:
dfp = pd.DataFrame(corrmatrix, columns=num_cols, index=num_cols)
ax = sns.heatmap(dfp)

##Encode

In [None]:
def get_dummy(df,indexCol,categoricalCols,continuousCols,labelCol):

    from pyspark.ml import Pipeline
    from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
    from pyspark.sql.functions import col

    indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
                 for c in categoricalCols ]

    # default setting: dropLast=True
    encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(),
                 outputCol="{0}_encoded".format(indexer.getOutputCol()))
                 for indexer in indexers ]

    assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders]
                                + continuousCols, outputCol="features")

    pipeline = Pipeline(stages=indexers + encoders + [assembler])

    model=pipeline.fit(df)
    data = model.transform(df)

    data = data.withColumn('label', col(labelCol))

    return data.select(indexCol,'features','label')

In [None]:
df_encoded = get_dummy(df_churn, 'msisdn', cat_cols, num_cols, 'churn')

In [None]:
df_encoded.show(5, truncate=False)

In [None]:
df_encoded.count()

##Split Data

In [None]:
(trainingData, testData) = df_encoded.randomSplit([0.7, 0.3])

##Create and train Model

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier

dTree = DecisionTreeClassifier(labelCol='label', featuresCol='features')

In [None]:
dTree_model = dTree.fit(trainingData)

##Make Prediction

In [None]:
# Make predictions.
predictions = dTree_model.transform(testData)
# Select example rows to display.
predictions.show(5)

##Evaluate

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

In [None]:
churn_eval = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label")
churn_acc  = churn_eval.evaluate(predictions, {churn_eval.metricName:"accuracy"})
print("Decision Tree Performance Measure")
print("Accuracy = %0.2f" % churn_acc)

In [None]:
churn_eval = BinaryClassificationEvaluator(rawPredictionCol="probability", labelCol="label")
churn_auc  = churn_eval.evaluate(predictions)
print("AUC = %.2f" % churn_auc)

In [None]:
#confusion matrix
cm_result = predictions.crosstab("prediction", "label")
cm_result = cm_result.toPandas()
cm_result.sort_values(by = ['prediction_label'])

In [None]:
TP = cm_result["1"][0]
FP = cm_result["0"][0]
TN = cm_result["0"][1]
FN = cm_result["1"][1]
Accuracy = (TP+TN)/(TP+FP+TN+FN)
Sensitivity = TP/(TP+FN)
Specificity = TN/(TN+FP)
Precision = TP/(TP+FP)

print ("Accuracy = %0.2f" %Accuracy )
print ("Sensitivity = %0.2f" %Sensitivity )
print ("Specificity = %0.2f" %Specificity )
print ("Precision = %0.2f" %Precision )