# Ex2: Logistic Regression Consulting Project

## Binary Customer Churn

A marketing agency has many customers that use their service to produce ads for the client/customer websites. They've noticed that they have quite a bit of churn in clients. They basically randomly assign account managers right now, but want you to create a machine learning model that will help predict which customers will churn (stop buying their service) so that they can correctly assign the customers most at risk to churn an account manager. Luckily they have some historical data, can you help them out? Create a classification algorithm that will help classify whether or not a customer churned. Then the company can test this against incoming data for future customers to predict which customers will churn and assign them an account manager.

The data is saved as customer_churn.csv. Here are the fields and their definitions:

    Name : Name of the latest contact at Company
    Age: Customer Age
    Total_Purchase: Total Ads Purchased
    Account_Manager: Binary 0=No manager, 1= Account manager assigned
    Years: Totaly Years as a customer
    Num_sites: Number of websites that use the service.
    Onboard_date: Date that the name of the latest contact was onboarded
    Location: Client HQ Address
    Company: Name of Client Company
    
Once you've created the model and evaluated it, test out the model on some new data (you can think of this almost like a hold-out set) that your client has provided, saved under new_customers.csv. The client wants to know which customers are most likely to churn given this data (they don't have the label yet).

# Initial SparkSession

In [1]:
import findspark
findspark.init('/Users/khoatrinh/Spark/spark-3.5.1-bin-hadoop3')

In [2]:
from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

In [3]:
sc =SparkContext()
sc

24/03/17 15:34:09 WARN Utils: Your hostname, macbook-cua-Minh.local resolves to a loopback address: 127.0.0.1; using 172.18.180.89 instead (on interface en0)
24/03/17 15:34:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/17 15:34:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark = SparkSession(sc)
spark 

# Read data 

In [5]:
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from datetime import datetime
from pyspark.sql import types
from pyspark.sql.functions import *
from pyspark.sql.types import DateType
from pyspark.ml.feature import Binarizer, Bucketizer, OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel 
from pyspark.ml.regression import LinearRegression, LinearRegressionModel
from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator, MulticlassClassificationEvaluator

In [6]:
customer_churn = spark.read.csv('customer_churn.csv', header = True, inferSchema = True)
customer_churn.show(3)

+----------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+
|           Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|       Onboard_date|            Location|             Company|Churn|
+----------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+
|Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|2013-08-30 07:00:40|10265 Elizabeth M...|          Harvey LLC|    1|
|   Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|2013-08-13 00:38:46|6157 Frank Garden...|          Wilson PLC|    1|
|     Eric Lozano|38.0|      12884.75|              0| 6.67|     12.0|2016-06-29 06:20:07|1331 Keith Court ...|Miller, Johnson a...|    1|
+----------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+
only showing top 3 rows



In [7]:
customer_churn.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [8]:
customer_churn.count()

900

In [9]:
customer_churn = customer_churn.dropna()
customer_churn.count()

900

In [10]:
customer_churn.describe().show(truncate = False)

24/03/17 15:34:14 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+---------------------------------------------------+-------------------------+-------------------+
|summary|Names        |Age              |Total_Purchase   |Account_Manager   |Years            |Num_Sites         |Location                                           |Company                  |Churn              |
+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+---------------------------------------------------+-------------------------+-------------------+
|count  |900          |900              |900              |900               |900              |900               |900                                                |900                      |900                |
|mean   |NULL         |41.81666666666667|10062.82403333334|0.4811111111111111|5.27315555555555 |8.587777777777777 |NULL                         

In [11]:
customer_churn.createOrReplaceTempView('customer_churn')

In [12]:
query = '''SELECT COUNT(DISTINCT Location) FROM customer_churn'''
spark.sql(query).show()

+------------------------+
|count(DISTINCT Location)|
+------------------------+
|                     900|
+------------------------+



In [13]:
query = '''SELECT COUNT(DISTINCT Company) FROM customer_churn'''
spark.sql(query).show()

+-----------------------+
|count(DISTINCT Company)|
+-----------------------+
|                    873|
+-----------------------+



In [14]:
df = customer_churn.drop(*['Names', 'Location'])
df.show(3)

+----+--------------+---------------+-----+---------+-------------------+--------------------+-----+
| Age|Total_Purchase|Account_Manager|Years|Num_Sites|       Onboard_date|             Company|Churn|
+----+--------------+---------------+-----+---------+-------------------+--------------------+-----+
|42.0|       11066.8|              0| 7.22|      8.0|2013-08-30 07:00:40|          Harvey LLC|    1|
|41.0|      11916.22|              0|  6.5|     11.0|2013-08-13 00:38:46|          Wilson PLC|    1|
|38.0|      12884.75|              0| 6.67|     12.0|2016-06-29 06:20:07|Miller, Johnson a...|    1|
+----+--------------+---------------+-----+---------+-------------------+--------------------+-----+
only showing top 3 rows



In [15]:
df = df.withColumn('log_Total_Purchase', log(col('Total_Purchase')))


In [16]:
# Calculate mean and std for filtering outliers
mean_val = df.agg({'log_Total_Purchase': "mean"}).collect()[0][0]
stddev_val = df.agg({'log_Total_Purchase': "stddev"}).collect()[0][0]

print('Mean: ', mean_val)
print('Stddev: ', stddev_val)

Mean:  9.181747690263084
Stddev:  0.2986488999263224


In [17]:
lo = mean_val - 3 * stddev_val
hi = mean_val + 3 * stddev_val
# Filtering df_sub without outliers
df_sub = df.where((df['log_Total_Purchase'] > lo) & (df['log_Total_Purchase'] < hi))

In [18]:
df_sub.describe().show()

+-------+------------------+------------------+-------------------+------------------+------------------+--------------------+-------------------+-------------------+
|summary|               Age|    Total_Purchase|    Account_Manager|             Years|         Num_Sites|             Company|              Churn| log_Total_Purchase|
+-------+------------------+------------------+-------------------+------------------+------------------+--------------------+-------------------+-------------------+
|  count|               895|               895|                895|               895|               895|                 895|                895|                895|
|   mean|41.815642458100555|10102.777988826821|0.48044692737430167| 5.279318435754184| 8.588826815642458|                NULL|0.16759776536312848|  9.191290188135513|
| stddev| 6.126060242946418|2352.6813239307294|0.49989688112158687|1.2734358235480647|1.7636296554821884|                NULL|0.37371754637596827|0.24851609510340106

In [19]:
# df_sub = df_sub.withColumn('Account_Manager', df_sub.Account_Manager.cast("string"))
# df_sub = df_sub.withColumn('Churn', df_sub.Churn.cast("string"))

In [20]:
df_sub.printSchema()

root
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)
 |-- log_Total_Purchase: double (nullable = true)



In [52]:
assembler = VectorAssembler(inputCols= ['Age', 'log_Total_Purchase','Account_Manager', 'Years', 'Num_Sites'],
                            outputCol = 'features')
data_pre = assembler.transform(df_sub)

In [53]:
data_pre.show(2)

+----+--------------+---------------+-----+---------+-------------------+----------+-----+------------------+--------------------+
| Age|Total_Purchase|Account_Manager|Years|Num_Sites|       Onboard_date|   Company|Churn|log_Total_Purchase|            features|
+----+--------------+---------------+-----+---------+-------------------+----------+-----+------------------+--------------------+
|42.0|       11066.8|              0| 7.22|      8.0|2013-08-30 07:00:40|Harvey LLC|    1| 9.311704914356662|[42.0,9.311704914...|
|41.0|      11916.22|              0|  6.5|     11.0|2013-08-13 00:38:46|Wilson PLC|    1| 9.385655776234074|[41.0,9.385655776...|
+----+--------------+---------------+-----+---------+-------------------+----------+-----+------------------+--------------------+
only showing top 2 rows



In [54]:
data_pre.select('features', 'Churn').show(3, False)

+--------------------------------------+-----+
|features                              |Churn|
+--------------------------------------+-----+
|[42.0,9.311704914356662,0.0,7.22,8.0] |1    |
|[41.0,9.385655776234074,0.0,6.5,11.0] |1    |
|[38.0,9.463799720492338,0.0,6.67,12.0]|1    |
+--------------------------------------+-----+
only showing top 3 rows



In [55]:
final_data = data_pre.select('features', 'Churn')
final_data.count()

895

In [56]:
final_data = final_data.na.drop()
final_data.count()

895

In [57]:
train_data, test_data = final_data.randomSplit([0.7,0.3])

In [58]:
logistic = LogisticRegression(
    featuresCol = 'features', labelCol = 'Churn', predictionCol = 'prediction'
)

logisticModel = logistic.fit(train_data)

In [59]:
test_model = logisticModel.transform(test_data)
test_model.groupBy('Churn', 'prediction').count().show()

+-----+----------+-----+
|Churn|prediction|count|
+-----+----------+-----+
|    1|       0.0|   22|
|    0|       0.0|  218|
|    1|       1.0|   27|
|    0|       1.0|    7|
+-----+----------+-----+



In [62]:
# Calculate the elements of the confusion matrix
TN = test_model.filter('prediction = 0 AND Churn = prediction').count()
TP = test_model.filter('prediction = 1 AND Churn = prediction').count()
FN = test_model.filter('prediction = 0 AND Churn != prediction').count()
FP = test_model.filter('prediction = 1 AND Churn != prediction').count()
# Calculate precision and recall
precision = TP / (TP + FP)
recall = TP / (TP + FN)
print('precision = {:.2f}\nrecall    = {:.2f}'.format(precision, recall))
acc = (TP+TN)/(TP+TN+FP+FN)
print('accuracy = {:.2f}'.format(acc))


precision = 0.79
recall    = 0.55
accuracy = 0.89


In [67]:
test_model = test_model.withColumn('label', test_model.Churn)

In [68]:
multi_evaluator = MulticlassClassificationEvaluator()

acc_ = multi_evaluator.evaluate(test_model, {multi_evaluator.metricName : 'accuracy'})
acc_

0.8941605839416058

In [69]:
binary_evaluator = BinaryClassificationEvaluator()
auc = binary_evaluator.evaluate(test_model, {multi_evaluator.metricName : 'areaUnderROC'})
auc 

0.9241723356009068

In [70]:
logisticModel.save('logisticModel_customer_churn')

                                                                                

In [71]:
logisticModel2 = LogisticRegressionModel.load('logisticModel_customer_churn')

# Test on new data 

In [73]:
new_data = spark.read.csv('new_customers.csv', header = True, inferSchema = True)
new_data.show(3)

+--------------+----+--------------+---------------+-----+---------+-------------------+--------------------+----------------+
|         Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|       Onboard_date|            Location|         Company|
+--------------+----+--------------+---------------+-----+---------+-------------------+--------------------+----------------+
| Andrew Mccall|37.0|       9935.53|              1| 7.71|      8.0|2011-08-29 18:37:54|38612 Johnny Stra...|        King Ltd|
|Michele Wright|23.0|       7526.94|              1| 9.28|     15.0|2013-07-22 18:19:54|21083 Nicole Junc...|   Cannon-Benson|
|  Jeremy Chang|65.0|         100.0|              1|  1.0|     15.0|2006-12-11 07:48:13|085 Austin Views ...|Barron-Robertson|
+--------------+----+--------------+---------------+-----+---------+-------------------+--------------------+----------------+
only showing top 3 rows



In [75]:
new_data = new_data.withColumn('log_Total_Purchase', log(col('Total_Purchase')))
new_data_pre = assembler.transform(new_data)

In [77]:
new_data_pre.show(3)

+--------------+----+--------------+---------------+-----+---------+-------------------+--------------------+----------------+------------------+--------------------+
|         Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|       Onboard_date|            Location|         Company|log_Total_Purchase|            features|
+--------------+----+--------------+---------------+-----+---------+-------------------+--------------------+----------------+------------------+--------------------+
| Andrew Mccall|37.0|       9935.53|              1| 7.71|      8.0|2011-08-29 18:37:54|38612 Johnny Stra...|        King Ltd|  9.20387250031693|[37.0,9.203872500...|
|Michele Wright|23.0|       7526.94|              1| 9.28|     15.0|2013-07-22 18:19:54|21083 Nicole Junc...|   Cannon-Benson| 8.926243863699453|[23.0,8.926243863...|
|  Jeremy Chang|65.0|         100.0|              1|  1.0|     15.0|2006-12-11 07:48:13|085 Austin Views ...|Barron-Robertson| 4.605170185988092|[65.0,4.605170185...

In [80]:
unlabeled_data = new_data_pre.select('features')
predictions = logisticModel2.transform(unlabeled_data)
predictions[['features', 'probability', 'prediction']].show(5,False)

+--------------------------------------+-----------------------------------------+----------+
|features                              |probability                              |prediction|
+--------------------------------------+-----------------------------------------+----------+
|[37.0,9.20387250031693,1.0,7.71,8.0]  |[0.9062513301918502,0.09374866980814978] |0.0       |
|[23.0,8.926243863699453,1.0,9.28,15.0]|[0.003849241423576753,0.9961507585764232]|1.0       |
|[65.0,4.605170185988092,1.0,1.0,15.0] |[0.01726223230510628,0.9827377676948937] |1.0       |
|[32.0,8.777632527474145,0.0,9.4,14.0] |[0.009708056459548352,0.9902919435404517]|1.0       |
|[32.0,9.484002877954307,1.0,10.0,8.0] |[0.8048534344953365,0.19514656550466347] |0.0       |
+--------------------------------------+-----------------------------------------+----------+
only showing top 5 rows

