### Binary customer churn classification using PySpark
Data being used for classification is customer_churn.csv which has the following schema:
* Name : Name of the latest contact at Company
* Age: Customer Age
* Total_Purchase: Total Ads Purchased
* Account_Manager: Binary 0=No manager, 1= Account manager assigned
* Years: Totaly Years as a customer
* Num_sites: Number of websites that use the service.
* Onboard_date: Date that the name of the latest contact was onboarded
* Location: Client HQ Address
* Company: Name of Client Company
    

In [2]:
#Start a new Spark Session
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
from pyspark.sql import SparkSession

In [3]:
#New spark session
spark = SparkSession.builder.appName('logreg').getOrCreate()

In [5]:
#read the data
data = spark.read.csv('customer_churn.csv',inferSchema=True, header=True)

In [6]:
data.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [9]:
data.describe().show()

+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+--------------------+--------------------+-------------------+
|summary|        Names|              Age|   Total_Purchase|   Account_Manager|            Years|         Num_Sites|            Location|             Company|              Churn|
+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+--------------------+--------------------+-------------------+
|  count|          900|              900|              900|               900|              900|               900|                 900|                 900|                900|
|   mean|         null|41.81666666666667|10062.82403333334|0.4811111111111111| 5.27315555555555| 8.587777777777777|                null|                null|0.16666666666666666|
| stddev|         null|6.127560416916251|2408.644531858096|0.4999208935073339|1.274449013194616|1.764835592035

In [12]:
data.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'Churn']

In [17]:
from pyspark.ml.feature import VectorAssembler

In [21]:
#Merge all the feature columns in one
assembler = VectorAssembler(inputCols=['Age','Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites'], outputCol='features')

In [22]:
output = assembler.transform(data)

In [23]:
final_data = output.select('features', 'churn')

In [26]:
final_data.printSchema()

root
 |-- features: vector (nullable = true)
 |-- churn: integer (nullable = true)



In [27]:
# split the data into train and test
train_churn,test_churn=final_data.randomSplit([0.7,0.3])

In [28]:
from pyspark.ml.classification import LogisticRegression

In [30]:
# build the model
lr_churn = LogisticRegression(labelCol = 'churn')

In [31]:
# fit the model to train data
fitted_churn_model = lr_churn.fit(train_churn)

In [33]:
training_sum = fitted_churn_model.summary

In [34]:
training_sum.predictions.describe().show() 

+-------+-------------------+-------------------+
|summary|              churn|         prediction|
+-------+-------------------+-------------------+
|  count|                665|                665|
|   mean|0.16541353383458646|0.12631578947368421|
| stddev| 0.3718330353019494|0.33245498310218435|
|    min|                0.0|                0.0|
|    max|                1.0|                1.0|
+-------+-------------------+-------------------+



In [35]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [36]:
# evaluate the results
pred_and_labels=fitted_churn_model.evaluate(test_churn)

In [37]:
pred_and_labels.predictions.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|churn|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[26.0,8939.61,0.0...|    0|[7.04718416182429...|[0.99913090110730...|       0.0|
|[29.0,5900.78,1.0...|    0|[4.45047672052666...|[0.98846168586645...|       0.0|
|[29.0,9378.24,0.0...|    0|[5.29195829129182...|[0.99499329652909...|       0.0|
|[29.0,10203.18,1....|    0|[4.11128759390150...|[0.98387753185788...|       0.0|
|[29.0,11274.46,1....|    0|[4.81174191762636...|[0.99193194382876...|       0.0|
|[29.0,12711.15,0....|    0|[5.99329300319553...|[0.99751077864889...|       0.0|
|[30.0,7960.64,1.0...|    1|[3.22531121844357...|[0.96177575125051...|       0.0|
|[30.0,12788.37,0....|    0|[2.80655711524919...|[0.94302913144465...|       0.0|
|[31.0,7073.61,0.0...|    0|[3.51969348046131...|[0.97124294419240...|       0.0|
|[31.0,9574.89,0

In [39]:
churn_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction',labelCol='churn')

In [41]:
auc = churn_eval.evaluate(pred_and_labels.predictions)

In [42]:
auc
#0.769

0.7695512820512821

### Predict on new data

In [45]:
final_lr_model = lr_churn.fit(final_data)

In [48]:
new_customers = spark.read.csv('new_customers.csv', inferSchema=True, header=True)

In [49]:
new_customers.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)



In [50]:
test_new_customers = assembler.transform(new_customers)

In [51]:
test_new_customers.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- features: vector (nullable = true)



In [52]:
final_results= final_lr_model.transform(test_new_customers)

In [56]:
final_results.select('Company','prediction').show()
# +----------------+----------+
#|         Company|prediction|
#+----------------+----------+
#|        King Ltd|       0.0|
#|   Cannon-Benson|       1.0|
#|Barron-Robertson|       1.0|
#|   Sexton-Golden|       1.0|
#|        Wood LLC|       0.0|
#|   Parks-Robbins|       1.0|
#+----------------+----------+

+----------------+----------+
|         Company|prediction|
+----------------+----------+
|        King Ltd|       0.0|
|   Cannon-Benson|       1.0|
|Barron-Robertson|       1.0|
|   Sexton-Golden|       1.0|
|        Wood LLC|       0.0|
|   Parks-Robbins|       1.0|
+----------------+----------+



In [57]:
test_new_customers.describe().show()

+-------+-------------+------------------+-----------------+------------------+-----------------+------------------+--------------------+----------------+
|summary|        Names|               Age|   Total_Purchase|   Account_Manager|            Years|         Num_Sites|            Location|         Company|
+-------+-------------+------------------+-----------------+------------------+-----------------+------------------+--------------------+----------------+
|  count|            6|                 6|                6|                 6|                6|                 6|                   6|               6|
|   mean|         null|35.166666666666664|7607.156666666667|0.8333333333333334|6.808333333333334|12.333333333333334|                null|            null|
| stddev|         null| 15.71517313511584|4346.008232825459| 0.408248290463863|3.708737880555414|3.3862466931200785|                null|            null|
|    min|Andrew Mccall|              22.0|            100.0|          