## Requirement : Customer Churn

## Creating Spark Session

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Customer_Churn").getOrCreate()

## Reading the input file

In [3]:
df = spark.read.csv("customer_churn.csv", inferSchema = True, header = True)

In [4]:
df.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [5]:
df.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'Churn']

In [6]:
df.describe().show()

+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+--------------------+--------------------+-------------------+
|summary|        Names|              Age|   Total_Purchase|   Account_Manager|            Years|         Num_Sites|            Location|             Company|              Churn|
+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+--------------------+--------------------+-------------------+
|  count|          900|              900|              900|               900|              900|               900|                 900|                 900|                900|
|   mean|         null|41.81666666666667|10062.82403333334|0.4811111111111111| 5.27315555555555| 8.587777777777777|                null|                null|0.16666666666666666|
| stddev|         null|6.127560416916251|2408.644531858096|0.4999208935073339|1.274449013194616|1.764835592035

In [7]:
df.show()

+-------------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+
|              Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|       Onboard_date|            Location|             Company|Churn|
+-------------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+
|   Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|2013-08-30 07:00:40|10265 Elizabeth M...|          Harvey LLC|    1|
|      Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|2013-08-13 00:38:46|6157 Frank Garden...|          Wilson PLC|    1|
|        Eric Lozano|38.0|      12884.75|              0| 6.67|     12.0|2016-06-29 06:20:07|1331 Keith Court ...|Miller, Johnson a...|    1|
|      Phillip White|42.0|       8010.76|              0| 6.71|     10.0|2014-04-22 12:43:12|13120 Daniel Moun...|           Smith Inc|    1|
|     

In [8]:
df.head()

Row(Names='Cameron Williams', Age=42.0, Total_Purchase=11066.8, Account_Manager=0, Years=7.22, Num_Sites=8.0, Onboard_date=datetime.datetime(2013, 8, 30, 7, 0, 40), Location='10265 Elizabeth Mission Barkerburgh, AK 89518', Company='Harvey LLC', Churn=1)

## Checking the distribution of the target

In [9]:
df.groupBy("Churn").count().show()

+-----+-----+
|Churn|count|
+-----+-----+
|    1|  150|
|    0|  750|
+-----+-----+



## Balancing the data

In [10]:
df_0 = df.filter(df["Churn"] == 0)
df_1 = df.filter(df["Churn"] == 1)

count_0 = df_0.count()
count_1 = df_1.count()

print("Count 0: ", count_0)
print("Count 1: ", count_1)

Count 0:  750
Count 1:  150


## Oversampling the data

In [11]:
ratio = count_0/count_1

print(ratio)

5.0


In [12]:
df_1_oversampled = df_1.sample(withReplacement = True, fraction = ratio)

df = df_0.unionAll(df_1_oversampled)

df.groupBy("Churn").count().show()

+-----+-----+
|Churn|count|
+-----+-----+
|    1|  734|
|    0|  750|
+-----+-----+



## Selecting the required features

In [13]:
my_data = df.select(["Age", "Total_Purchase", "Account_Manager", "Years", "Num_Sites", "Churn"])

my_data.show()

+----+--------------+---------------+-----+---------+-----+
| Age|Total_Purchase|Account_Manager|Years|Num_Sites|Churn|
+----+--------------+---------------+-----+---------+-----+
|35.0|      15571.26|              0| 6.45|      9.0|    0|
|39.0|      10268.87|              1| 3.68|      6.0|    0|
|44.0|      12328.03|              1|  4.6|      9.0|    0|
|52.0|       9782.83|              0| 3.96|      7.0|    0|
|29.0|       9378.24|              0| 4.93|      8.0|    0|
|37.0|      10314.67|              1| 5.86|      8.0|    0|
|30.0|       8403.78|              1| 4.13|      7.0|    0|
|46.0|       5570.45|              0| 2.23|      7.0|    0|
|43.0|       8042.76|              0| 4.95|      8.0|    0|
|44.0|      10309.15|              1| 6.35|      9.0|    0|
|35.0|      12357.31|              0| 5.03|     10.0|    0|
|47.0|       11306.1|              0| 6.01|      7.0|    0|
|32.0|      13630.93|              0| 4.38|     10.0|    0|
|36.0|      12284.58|              1|  5

## Checking for Null in the features

In [14]:
from pyspark.sql.functions import *

my_data.select([count(when(isnan(x)|col(x).isNull(), x)).alias(x) for x in my_data.columns]).show()

+---+--------------+---------------+-----+---------+-----+
|Age|Total_Purchase|Account_Manager|Years|Num_Sites|Churn|
+---+--------------+---------------+-----+---------+-----+
|  0|             0|              0|    0|        0|    0|
+---+--------------+---------------+-----+---------+-----+



## OneHot encoding for Account_Manager

In [15]:
from pyspark.ml.feature import OneHotEncoder

Account_Manager_OneHot_Encoder = OneHotEncoder(inputCol = "Account_Manager", outputCol = "Account_Manager_Encode")

## Converting all the data in the form of array for Pyspark ML algorithim

In [16]:
from pyspark.ml.feature import VectorAssembler, VectorIndexer, OneHotEncoder, StringIndexer

assembler = VectorAssembler(inputCols = ["Age", "Total_Purchase", "Account_Manager_Encode", "Years", "Num_Sites"],
                            outputCol = "Features")

## Splitting the dataset into Train and Test

In [17]:
train_data, test_data = my_data.randomSplit([0.7, 0.3])

In [18]:
train_data.columns

['Age', 'Total_Purchase', 'Account_Manager', 'Years', 'Num_Sites', 'Churn']

## Creating Model

In [19]:
from pyspark.ml.classification import LogisticRegression

model = LogisticRegression(featuresCol = "Features", labelCol = "Churn")

## Creating Pipeline

In [20]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages = [Account_Manager_OneHot_Encoder, assembler, model])

In [21]:
fit_model = pipeline.fit(train_data)

In [22]:
prediction = fit_model.transform(test_data)

In [23]:
prediction.head()

Row(Age=22.0, Total_Purchase=11254.38, Account_Manager=1, Years=4.96, Num_Sites=8.0, Churn=0, Account_Manager_Encode=SparseVector(1, {}), Features=DenseVector([22.0, 11254.38, 0.0, 4.96, 8.0]), rawPrediction=DenseVector([2.7246, -2.7246]), probability=DenseVector([0.9385, 0.0615]), prediction=0.0)

In [24]:
prediction.select(["Churn", "prediction"]).show()

+-----+----------+
|Churn|prediction|
+-----+----------+
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       1.0|
|    0|       0.0|
|    0|       1.0|
+-----+----------+
only showing top 20 rows



## Model Evaluation

In [25]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
# Note: We can use MulticlassClassificationEvaluator even if the target is Binary.

my_eval = BinaryClassificationEvaluator(rawPredictionCol = "prediction", labelCol = "Churn")

## AUC Score

In [26]:
AUC_Score = my_eval.evaluate(prediction)

print("AUC Score: ", AUC_Score)

AUC Score:  0.8548048048048048


## Classification Report using SKLearn

In [27]:
from sklearn.metrics import classification_report

print(classification_report(prediction.select('Churn').toPandas(), prediction.select('prediction').toPandas()))

             precision    recall  f1-score   support

          0       0.83      0.89      0.86       222
          1       0.88      0.82      0.85       225

avg / total       0.86      0.85      0.85       447



## F1 Score using SKlearn

In [28]:
from sklearn.metrics import f1_score

print("F1 Score evaluator : ", f1_score(prediction.select('Churn').toPandas(), 
                                        prediction.select('prediction').toPandas()))

F1 Score evaluator :  0.8505747126436781


## F1 Score using PySpark

In [46]:
F1_eval = MulticlassClassificationEvaluator(predictionCol = "prediction", labelCol = "Churn", 
                                                  metricName= "f1")


print("F1 Score: ", F1_eval.evaluate(prediction))

F1 Score DTC:  0.8544550376914889


## Confusion Matrix

In [29]:
from sklearn.metrics import confusion_matrix

print(confusion_matrix(prediction.select('Churn').toPandas(), prediction.select('prediction').toPandas()))

[[197  25]
 [ 40 185]]


## Accuracy Score

In [30]:
from sklearn.metrics import accuracy_score

print(accuracy_score(prediction.select('Churn').toPandas(), prediction.select('prediction').toPandas()))

0.854586129753915


## Prediction on new data

## Reading the new file

In [31]:
new_customers = spark.read.csv("new_customers.csv", inferSchema = True, header = True)

new_customers.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)



## Creating new Logistic model

In [None]:
from pyspark.ml.classification import LogisticRegression

Logistic_model = LogisticRegression(featuresCol = "Features", labelCol = "Churn")

## Creating Pipeline

In [35]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages = [Account_Manager_OneHot_Encoder, assembler, Logistic_model])

## Training the model

In [36]:
fit_model = pipeline.fit(my_data)

## Prediction on new data

In [42]:
final_results = fit_model.transform(new_customers)

In [44]:
final_results.select("Company", "prediction").show()

+----------------+----------+
|         Company|prediction|
+----------------+----------+
|        King Ltd|       0.0|
|   Cannon-Benson|       1.0|
|Barron-Robertson|       1.0|
|   Sexton-Golden|       1.0|
|        Wood LLC|       1.0|
|   Parks-Robbins|       1.0|
+----------------+----------+



In [45]:
new_customers.describe().show()

+-------+-------------+------------------+-----------------+------------------+-----------------+------------------+--------------------+----------------+
|summary|        Names|               Age|   Total_Purchase|   Account_Manager|            Years|         Num_Sites|            Location|         Company|
+-------+-------------+------------------+-----------------+------------------+-----------------+------------------+--------------------+----------------+
|  count|            6|                 6|                6|                 6|                6|                 6|                   6|               6|
|   mean|         null|35.166666666666664|7607.156666666667|0.8333333333333334|6.808333333333334|12.333333333333334|                null|            null|
| stddev|         null| 15.71517313511584|4346.008232825459| 0.408248290463863|3.708737880555414|3.3862466931200785|                null|            null|
|    min|Andrew Mccall|              22.0|            100.0|          