In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('churn-classifier').master('local[4]').getOrCreate()

In [3]:
df = spark.read.csv('../data/customer_churn.csv', header=True, inferSchema=True)

### Data Exploration

In [4]:
df.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [5]:
# data set is imbalanced!
pos_cnt = df.filter(df['Churn'] == 1).count()
neg_cnt = df.filter(df['Churn'] == 0).count()
print('pos: ', pos_cnt, 'neg: ', neg_cnt)

pos:  150 neg:  750


In [6]:
for col, item in zip(df.columns, df.take(1)[0]):
    print(col + ': ' + str(item))

Names: Cameron Williams
Age: 42.0
Total_Purchase: 11066.8
Account_Manager: 0
Years: 7.22
Num_Sites: 8.0
Onboard_date: 2013-08-30 07:00:40
Location: 10265 Elizabeth Mission Barkerburgh, AK 89518
Company: Harvey LLC
Churn: 1


### Data Cleansing

In [7]:
my_cols = df.select('Age', 'Total_Purchase', 'Years', 'Num_Sites', 'Churn')

In [8]:
without_na = my_cols.na.drop()

In [9]:
without_na.count()

900

In [10]:
# no missing data was removed

### Create Feature Vector

In [11]:
from pyspark.ml.feature import (VectorAssembler, VectorIndexer)

In [12]:
assembler = VectorAssembler(inputCols=['Age', 'Total_Purchase', 'Years', 'Num_Sites'], outputCol='features')

In [13]:
final_data = assembler.transform(my_cols)

### Train / Test Sets

In [14]:
train, test = final_data.randomSplit([0.7, 0.3])

### Train Logistic Regression

In [15]:
from pyspark.ml.classification import LogisticRegression

In [16]:
log_reg = LogisticRegression(featuresCol='features', labelCol='Churn')

In [17]:
fit_log_reg = log_reg.fit(train)

### Evaluate Logistic Regression

In [18]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [19]:
evaluator = BinaryClassificationEvaluator(labelCol='Churn')

In [20]:
pred_labels = fit_log_reg.evaluate(test)

In [21]:
pred_labels.predictions.show()

+----+--------------+-----+---------+-----+--------------------+--------------------+--------------------+----------+
| Age|Total_Purchase|Years|Num_Sites|Churn|            features|       rawPrediction|         probability|prediction|
+----+--------------+-----+---------+-----+--------------------+--------------------+--------------------+----------+
|22.0|      11254.38| 4.96|      8.0|    0|[22.0,11254.38,4....|[4.75267893569920...|[0.99144526605447...|       0.0|
|25.0|       9672.03| 5.49|      8.0|    0|[25.0,9672.03,5.4...|[4.30748677406267...|[0.98671160590182...|       0.0|
|28.0|      11128.95| 5.12|      8.0|    0|[28.0,11128.95,5....|[4.37006516427899...|[0.98750761764998...|       0.0|
|28.0|      11245.38| 6.72|      8.0|    0|[28.0,11245.38,6....|[3.44424048595928...|[0.96905891465598...|       0.0|
|29.0|       5900.78| 5.56|      8.0|    0|[29.0,5900.78,5.5...|[4.08906402590924...|[0.98352119252748...|       0.0|
|29.0|      13255.05| 4.89|      8.0|    0|[29.0,13255.0

In [22]:
results = evaluator.evaluate(pred_labels.predictions)

In [23]:
results

0.8984906462585042

### Evaluate on New Data

In [25]:
final_lr_model = log_reg.fit(final_data)

In [26]:
new_customers = spark.read.csv('../data/new_customers.csv',inferSchema=True, header=True)

In [27]:
new_customers.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)



In [28]:
new_customers_df = assembler.transform(new_customers)

In [30]:
new_customers_df.take(1)

[Row(Names='Andrew Mccall', Age=37.0, Total_Purchase=9935.53, Account_Manager=1, Years=7.71, Num_Sites=8.0, Onboard_date=datetime.datetime(2011, 8, 29, 18, 37, 54), Location='38612 Johnny Stravenue Nataliebury, WI 15717-8316', Company='King Ltd', features=DenseVector([37.0, 9935.53, 7.71, 8.0]))]

In [31]:
final_results = final_lr_model.transform(new_customers_df)

In [None]:
from pyspark.sql.functions import round

In [37]:
final_results.select('Names', 'Age', 'prediction', 'probability').show()

+--------------+----+----------+--------------------+
|         Names| Age|prediction|         probability|
+--------------+----+----------+--------------------+
| Andrew Mccall|37.0|       0.0|[0.91821394450559...|
|Michele Wright|23.0|       1.0|[0.00249574827398...|
|  Jeremy Chang|65.0|       1.0|[0.03129415226295...|
|Megan Ferguson|32.0|       1.0|[0.00475889563977...|
|  Taylor Young|32.0|       0.0|[0.78319303188952...|
| Jessica Drake|22.0|       1.0|[0.19177446955641...|
+--------------+----+----------+--------------------+

