In [1]:
#Build a model that predicts whether or not a customer is going to churn
#1. IMport pyspark
#2. Import Data
#3. Transform Data
#4. Create Model
#5. Model Evaluation

In [2]:
import findspark
findspark.init('/home/ubuntu/spark-2.2.0-bin-hadoop2.7')

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('LogProj').getOrCreate()


In [87]:
import pandas as pd
import numpy as np

import os
os.getcwd()
data_folder = '/home/ubuntu/data/raw'
file_name = '/customer_churn.csv'
data = spark.read.csv(data_folder+file_name, header=True, inferSchema=True)

In [220]:
data.printSchema()
data.describe().show()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)

+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+--------------------+--------------------+-------------------+
|summary|        Names|              Age|   Total_Purchase|   Account_Manager|            Years|         Num_Sites|            Location|             Company|              Churn|
+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+--------------------+--------------------+-------------------+
|  count|          900|              900|           

In [62]:
for row in data.head(5):
    print(row[0:10])

('Cameron Williams', 42.0, 11066.8, 0, 7.22, 8.0, datetime.datetime(2013, 8, 30, 7, 0, 40), '10265 Elizabeth Mission Barkerburgh, AK 89518', 'Harvey LLC', 1)
('Kevin Mueller', 41.0, 11916.22, 0, 6.5, 11.0, datetime.datetime(2013, 8, 13, 0, 38, 46), '6157 Frank Gardens Suite 019 Carloshaven, RI 17756', 'Wilson PLC', 1)
('Eric Lozano', 38.0, 12884.75, 0, 6.67, 12.0, datetime.datetime(2016, 6, 29, 6, 20, 7), '1331 Keith Court Alyssahaven, DE 90114', 'Miller, Johnson and Wallace', 1)
('Phillip White', 42.0, 8010.76, 0, 6.71, 10.0, datetime.datetime(2014, 4, 22, 12, 43, 12), '13120 Daniel Mount Angelabury, WY 30645-4695', 'Smith Inc', 1)
('Cynthia Norton', 37.0, 9191.58, 0, 5.56, 9.0, datetime.datetime(2016, 1, 19, 15, 31, 15), '765 Tricia Row Karenshire, MH 71730', 'Love-Jones', 1)


# Feature Extraction

In [63]:
#Transform into Numerical Columns
#Location - from string using one hot
#Onboard_date - month
#Onboard_date - year

In [64]:
from pyspark.sql.functions import (year, month, hour)

new_data = data.withColumn(col=year('Onboard_date'),colName='Onboarding_year')
new_data = new_data.withColumn(col=month('Onboard_date'),colName='Onboarding_month')
new_data = new_data.withColumn(col=month('Onboard_date'),colName='Onboarding_month')
new_data = new_data.withColumn(col=hour('Onboard_date'),colName='Onboarding_hour')

for row in new_data.head(2):
    print(row[1:15])
    
new_data.show(2)
new_data.printSchema()


#date_year = year(col= 'Onboard_date')
#model = date_year.fit(data)
#df_transformed1 = model.transform(data)

(42.0, 11066.8, 0, 7.22, 8.0, datetime.datetime(2013, 8, 30, 7, 0, 40), '10265 Elizabeth Mission Barkerburgh, AK 89518', 'Harvey LLC', 1, 2013, 8, 7)
(41.0, 11916.22, 0, 6.5, 11.0, datetime.datetime(2013, 8, 13, 0, 38, 46), '6157 Frank Gardens Suite 019 Carloshaven, RI 17756', 'Wilson PLC', 1, 2013, 8, 0)
+----------------+----+--------------+---------------+-----+---------+-------------------+--------------------+----------+-----+---------------+----------------+---------------+
|           Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|       Onboard_date|            Location|   Company|Churn|Onboarding_year|Onboarding_month|Onboarding_hour|
+----------------+----+--------------+---------------+-----+---------+-------------------+--------------------+----------+-----+---------------+----------------+---------------+
|Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|2013-08-30 07:00:40|10265 Elizabeth M...|Harvey LLC|    1|           2013|               

# Data Exploration

In [135]:
#Feature Correlations

from pyspark.sql.functions import corr, covar_pop
important_columns = ['Total_Purchase','Num_Sites', 'Years',
                  'Onboarding_year', 'Onboarding_month','Onboarding_hour']


corr_df = pd.DataFrame(columns=important_columns, index=important_columns)
for feature in important_columns:
    for feature2 in important_columns:
        #new_data.select(corr(feature,feature2)).show()
        corr_df[feature2][feature] = new_data.select(corr(feature,feature2)).head()[0]
corr_df.head()


Unnamed: 0,Total_Purchase,Num_Sites,Years,Onboarding_year,Onboarding_month,Onboarding_hour
Total_Purchase,1.0,-0.00339,-0.00562317,-0.0246313,0.0448348,-0.00431726
Num_Sites,-0.00339,1.0,0.0516417,-0.0327771,0.00845673,-0.0450561
Years,-0.00562317,0.0516417,1.0,-0.0466239,0.0158553,-0.0167235
Onboarding_year,-0.0246313,-0.0327771,-0.0466239,1.0,-0.0390766,0.0511536
Onboarding_month,0.0448348,0.00845673,0.0158553,-0.0390766,1.0,-0.00356287


In [136]:
covar_df = pd.DataFrame(columns=important_columns, index=important_columns)
for feature in important_columns:
    for feature2 in important_columns:
        #new_data.select(corr(feature,feature2)).show()
        covar_df[feature2][feature] = new_data.select(covar_pop(feature,feature2)).head()[0]
covar_df.head()
    

Unnamed: 0,Total_Purchase,Num_Sites,Years,Onboarding_year,Onboarding_month,Onboarding_hour
Total_Purchase,5795120.0,-14.3944,-17.2422,-190.067,376.65,-72.295
Num_Sites,-14.3944,3.11118,0.116023,-0.18532,0.0520543,-0.552822
Years,-17.2422,0.116023,1.62242,-0.190361,0.0704771,-0.148176
Onboarding_year,-190.067,-0.18532,-0.190361,10.2749,-0.437116,1.1406
Onboarding_month,376.65,0.0520543,0.0704771,-0.437116,12.1782,-0.0864889


# DF Transformation to Spark format

In [235]:
from pyspark.ml.feature import VectorAssembler
important_columns = ['Total_Purchase','Num_Sites', 'Years',
                  'Onboarding_year', 'Onboarding_month','Onboarding_hour']
assembler = VectorAssembler(inputCols=important_columns, outputCol='features')

In [236]:
output = assembler.transform(new_data)
output.printSchema()
final_data = output.select('features', 'Churn')

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)
 |-- Onboarding_year: integer (nullable = true)
 |-- Onboarding_month: integer (nullable = true)
 |-- Onboarding_hour: integer (nullable = true)
 |-- features: vector (nullable = true)



# Test/Train Split

In [237]:
train_data, test_data = final_data.randomSplit([0.7,0.3])
train_data.describe().show()
test_data.describe().show()

+-------+-------------------+
|summary|              Churn|
+-------+-------------------+
|  count|                626|
|   mean|0.16613418530351437|
| stddev|0.37249868666277897|
|    min|                  0|
|    max|                  1|
+-------+-------------------+

+-------+------------------+
|summary|             Churn|
+-------+------------------+
|  count|               274|
|   mean|0.1678832116788321|
| stddev|0.3744464645429237|
|    min|                 0|
|    max|                 1|
+-------+------------------+



# Create LogReg Model

In [238]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol='Churn', featuresCol='features')
lr_trained = lr.fit(train_data)

In [261]:
final_lr_trained = lr.fit(final_data)

# Evaluation

In [239]:
training_sum = lr_trained.summary
training_sum.predictions.describe().show()
test_results = lr_trained.evaluate(test_data)
#test_results.areaUnderROC
predictions = test_results.predictions

+-------+-------------------+------------------+
|summary|              Churn|        prediction|
+-------+-------------------+------------------+
|  count|                626|               626|
|   mean|0.16613418530351437|0.1182108626198083|
| stddev|0.37249868666277897|0.3231158211320125|
|    min|                0.0|               0.0|
|    max|                1.0|               1.0|
+-------+-------------------+------------------+



In [241]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
pred_and_labels = lr_trained.evaluate(test_data)
pred_and_labels.predictions.show(15)

+--------------------+-----+--------------------+--------------------+----------+
|            features|Churn|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[3263.0,9.0,2.77,...|    0|[3.95641127730121...|[0.98122749947310...|       0.0|
|[3825.7,8.0,4.28,...|    0|[4.29474072311952...|[0.98654344111046...|       0.0|
|[4111.4,8.0,3.93,...|    0|[4.38964725858222...|[0.98774689677991...|       0.0|
|[4316.73,9.0,4.79...|    0|[2.81333693405567...|[0.94339228630554...|       0.0|
|[4492.44,9.0,6.43...|    0|[1.92253604480088...|[0.87242096881303...|       0.0|
|[5024.52,9.0,8.11...|    1|[1.1508963597262,...|[0.75967460262709...|       0.0|
|[5191.08,7.0,6.29...|    0|[4.93559820106940...|[0.99286511156548...|       0.0|
|[5192.38,11.0,4.8...|    1|[0.48574492493590...|[0.61910353436706...|       0.0|
|[5347.74,10.0,5.6...|    0|[1.08134308819845...|[0.74674806615457...|       0.0|
|[5387.75,9.0,6.

In [242]:
#create the evaluator
churn_eval = BinaryClassificationEvaluator(labelCol='Churn', rawPredictionCol='prediction')

#Now pass in the dataframe we just created
auc = churn_eval.evaluate(pred_and_labels.predictions)
auc


0.8258962623951182

# Test Against New Data

In [262]:
file_name = "/new_customers.csv"
new_customers = spark.read.csv(data_folder+file_name, header=True, inferSchema=True)

In [263]:
new_test = new_customers.withColumn(col=year('Onboard_date'),colName='Onboarding_year')
new_test = new_test.withColumn(col=month('Onboard_date'),colName='Onboarding_month')
new_test = new_test.withColumn(col=month('Onboard_date'),colName='Onboarding_month')
new_test = new_test.withColumn(col=hour('Onboard_date'),colName='Onboarding_hour')
new_test.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'Onboarding_year',
 'Onboarding_month',
 'Onboarding_hour']

In [266]:
#Transform Data to Spark Format
important_columns = ['Total_Purchase','Num_Sites', 'Years',
                  'Onboarding_year', 'Onboarding_month','Onboarding_hour']
assembler = VectorAssembler(inputCols=important_columns, outputCol='features')
output = assembler.transform(new_test)
output.printSchema()
final_results = lr_trained.transform(output)
#output = output.withColumn(colName='Churn',col=(new_test['Years']*0))
#final_test_data = output.select('features', 'Churn')

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Onboarding_year: integer (nullable = true)
 |-- Onboarding_month: integer (nullable = true)
 |-- Onboarding_hour: integer (nullable = true)
 |-- features: vector (nullable = true)



In [277]:
final_results.select('Company','rawPrediction','probability','prediction').show()
final_results.columns

+----------------+--------------------+--------------------+----------+
|         Company|       rawPrediction|         probability|prediction|
+----------------+--------------------+--------------------+----------+
|        King Ltd|[2.36842530123786...|[0.91438766905519...|       0.0|
|   Cannon-Benson|[-7.1171122599380...|[8.10448275527764...|       1.0|
|Barron-Robertson|[-2.4965984285989...|[0.07609698712692...|       1.0|
|   Sexton-Golden|[-5.6051194277756...|[0.00366549585534...|       1.0|
|        Wood LLC|[1.10345072040642...|[0.75090610814763...|       0.0|
|   Parks-Robbins|[-3.3125390808024...|[0.03514352120712...|       1.0|
+----------------+--------------------+--------------------+----------+



['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Num_Sites',
 'Onboard_date',
 'Location',
 'Company',
 'Onboarding_year',
 'Onboarding_month',
 'Onboarding_hour',
 'features',
 'rawPrediction',
 'probability',
 'prediction']

In [268]:
new_results = lr_trained.evaluate(final_test_data)
new_predictions = new_results.predictions

In [269]:
print(new_predictions.head(201)[0][4])


for pred in new_predictions.head(20):
    print(pred[4])

0.0
0.0
1.0
1.0
1.0
0.0
1.0
