In [1]:
# Always neds to be done in Rasberry Pi
import findspark
findspark.init('/home/baxman/spark-2.4.7-bin-hadoop2.7')
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('churn').getOrCreate()

In [2]:
# Import LinReg from MLlib
from pyspark.ml.classification import LogisticRegression

In [3]:
# This notebook will predict customer churn and manager assignment for a local business

In [4]:
# Set data
data = spark.read.csv('/home/baxman/Codes/PySpark/Python-and-Spark-for-Big-Data-master/Spark_for_Machine_Learning/Logistic_Regression/customer_churn.csv', header = True, inferSchema = True)

In [5]:
# Checking data schema
data.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [8]:
data.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'Churn']

In [9]:
data.show()

+-------------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+
|              Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|       Onboard_date|            Location|             Company|Churn|
+-------------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+
|   Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|2013-08-30 07:00:40|10265 Elizabeth M...|          Harvey LLC|    1|
|      Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|2013-08-13 00:38:46|6157 Frank Garden...|          Wilson PLC|    1|
|        Eric Lozano|38.0|      12884.75|              0| 6.67|     12.0|2016-06-29 06:20:07|1331 Keith Court ...|Miller, Johnson a...|    1|
|      Phillip White|42.0|       8010.76|              0| 6.71|     10.0|2014-04-22 12:43:12|13120 Daniel Moun...|           Smith Inc|    1|
|     

In [25]:
# Only choosing relevant columns...
cols = data.select(['Age',
                     'Total_Purchase',
                    'Account_Manager',
                     'Years',
                     'Num_Sites',
                     'Churn'])

In [26]:
# Will drop missing data
data_fin = cols.na.drop()

In [27]:
data_fin.show()

+----+--------------+---------------+-----+---------+-----+
| Age|Total_Purchase|Account_Manager|Years|Num_Sites|Churn|
+----+--------------+---------------+-----+---------+-----+
|42.0|       11066.8|              0| 7.22|      8.0|    1|
|41.0|      11916.22|              0|  6.5|     11.0|    1|
|38.0|      12884.75|              0| 6.67|     12.0|    1|
|42.0|       8010.76|              0| 6.71|     10.0|    1|
|37.0|       9191.58|              0| 5.56|      9.0|    1|
|48.0|      10356.02|              0| 5.12|      8.0|    1|
|44.0|      11331.58|              1| 5.23|     11.0|    1|
|32.0|       9885.12|              1| 6.92|      9.0|    1|
|43.0|       14062.6|              1| 5.46|     11.0|    1|
|40.0|       8066.94|              1| 7.11|     11.0|    1|
|30.0|      11575.37|              1| 5.22|      8.0|    1|
|45.0|       8771.02|              1| 6.64|     11.0|    1|
|45.0|       8988.67|              1| 4.84|     11.0|    1|
|40.0|       8283.32|              1|  5

In [28]:
# Categorical data already encoded - onhotencoding not required!


from pyspark.ml.feature import (VectorAssembler,VectorIndexer)

In [29]:
# Assemble...

assembler = VectorAssembler(inputCols=['Age',
                     'Total_Purchase',
                    'Account_Manager',
                     'Years',
                     'Num_Sites',],outputCol='features')

In [30]:
# Import Logistic Regression

from pyspark.ml.classification import LogisticRegression

In [31]:
# Seting up a pipeline, train/test split

from pyspark.ml import Pipeline
log_reg_churn = LogisticRegression(featuresCol='features',labelCol='Churn')

# Define pipeline -> stages: define each step in pipeline

pipeline = Pipeline(stages=[assembler,log_reg_churn])
train,test = data_fin.randomSplit([0.7,0.3])

In [37]:
# Fit model

fit_model = pipeline.fit(train)

In [45]:
# Showing summary

logi_reg_summary = fit_model.stages[-1].summary

In [47]:

logi_reg_summary.predictions.show()

+----+--------------+---------------+-----+---------+-----+--------------------+--------------------+--------------------+----------+
| Age|Total_Purchase|Account_Manager|Years|Num_Sites|Churn|            features|       rawPrediction|         probability|prediction|
+----+--------------+---------------+-----+---------+-----+--------------------+--------------------+--------------------+----------+
|22.0|      11254.38|              1| 4.96|      8.0|  0.0|[22.0,11254.38,1....|[4.13087308265087...|[0.98418528097326...|       0.0|
|25.0|       9672.03|              0| 5.49|      8.0|  0.0|[25.0,9672.03,0.0...|[4.30213894990538...|[0.98664130341408...|       0.0|
|26.0|       8787.39|              1| 5.42|     11.0|  1.0|[26.0,8787.39,1.0...|[0.16174760918549...|[0.54034897217393...|       0.0|
|26.0|       8939.61|              0| 4.54|      7.0|  0.0|[26.0,8939.61,0.0...|[5.98261992459265...|[0.99748413590724...|       0.0|
|27.0|        8628.8|              1|  5.3|      7.0|  0.0|[27

In [33]:
results = fit_model.transform(test)

In [34]:
results.show()

+----+--------------+---------------+-----+---------+-----+--------------------+--------------------+--------------------+----------+
| Age|Total_Purchase|Account_Manager|Years|Num_Sites|Churn|            features|       rawPrediction|         probability|prediction|
+----+--------------+---------------+-----+---------+-----+--------------------+--------------------+--------------------+----------+
|28.0|       9090.43|              1| 5.74|     10.0|    0|[28.0,9090.43,1.0...|[1.08448876153702...|[0.74734249951792...|       0.0|
|28.0|      11245.38|              0| 6.72|      8.0|    0|[28.0,11245.38,0....|[3.46928978743254...|[0.96980122559332...|       0.0|
|29.0|       5900.78|              1| 5.56|      8.0|    0|[29.0,5900.78,1.0...|[3.57134702024637...|[0.97265104394519...|       0.0|
|29.0|       9378.24|              0| 4.93|      8.0|    0|[29.0,9378.24,0.0...|[4.43635179276735...|[0.98829947200724...|       0.0|
|29.0|       9617.59|              0| 5.49|      8.0|    0|[29

In [35]:
# Evaluation

from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction',
                                       labelCol='Churn')

In [36]:
results.select('Churn','prediction').show()

+-----+----------+
|Churn|prediction|
+-----+----------+
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    1|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    1|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
+-----+----------+
only showing top 20 rows



In [48]:
# Area under the curve

AUC = evaluator.evaluate(results)

In [49]:
AUC

0.806641604010025

In [52]:
# Predict on new data!

# First, fit pipeline to all data

final_lr_model = pipeline.fit(data_fin)

In [54]:
new_customers  = spark.read.csv('/home/baxman/Codes/PySpark/Python-and-Spark-for-Big-Data-master/Spark_for_Machine_Learning/Logistic_Regression/new_customers.csv', header = True, inferSchema = True)

In [55]:
new_customers.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)



In [58]:
test_new_customers = assembler.transform(new_customers)

In [59]:
test_new_customers.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- features: vector (nullable = true)



In [63]:
# Assess on new data

final_results = final_lr_model.stages[-1].transform(test_new_customers)

In [67]:
# Showing predictions based on companies

final_results.select('Company','prediction').show()

+----------------+----------+
|         Company|prediction|
+----------------+----------+
|        King Ltd|       0.0|
|   Cannon-Benson|       1.0|
|Barron-Robertson|       1.0|
|   Sexton-Golden|       1.0|
|        Wood LLC|       0.0|
|   Parks-Robbins|       1.0|
+----------------+----------+

