# Spark churn - Karl Merisalu

### Libraries

In [2]:
import pandas as pd
import numpy as np

# python library (API) for Spark
import pyspark

### Initiate Spark session

In [3]:
# make a connection with a cluster
spark = pyspark.sql.SparkSession.builder.master('local[*]').appName('churn').getOrCreate() # how many cores are we using - * means all cores

In [4]:
spark

UP: Running in memory, which is faster than hadoop which runs on disk.

In [None]:
# RDD - core building blocks below

# resilient - immutable once chunks are created
# distributed - partitioned
# dataset - holds data

In [5]:
# checking out building blocks

sc = spark.sparkContext

sc

UP: Same as  before  ut no session

In [6]:
rdd = sc.textFile('agency_churn.csv')

rdd

agency_churn.csv MapPartitionsRDD[1] at textFile at <unknown>:0

In [8]:
type(rdd)

pyspark.rdd.RDD

In [9]:
rdd.take(2)

['Names,Age,Total_Purchase,Account_Manager,Years,Onboard_date,Location,Company,Churn',
 'Cameron Williams,42,11066.8,0,7.22,2013-08-30 7:00:40,"10265 Elizabeth Mission Barkerburgh, AK 89518",Harvey LLC,1']

In [21]:
# dataframe - same as RDD in terms of properties but has column structure or a schema
df = spark.read.csv('agency_churn.csv', header = True, inferSchema = True)

In [22]:
df

DataFrame[Names: string, Age: int, Total_Purchase: double, Account_Manager: int, Years: double, Onboard_date: timestamp, Location: string, Company: string, Churn: int]

In [23]:
df.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Onboard_date',
 'Location',
 'Company',
 'Churn']

In [24]:
df.show(3)

+----------------+---+--------------+---------------+-----+-------------------+--------------------+--------------------+-----+
|           Names|Age|Total_Purchase|Account_Manager|Years|       Onboard_date|            Location|             Company|Churn|
+----------------+---+--------------+---------------+-----+-------------------+--------------------+--------------------+-----+
|Cameron Williams| 42|       11066.8|              0| 7.22|2013-08-30 07:00:40|10265 Elizabeth M...|          Harvey LLC|    1|
|   Kevin Mueller| 41|      11916.22|              0|  6.5|2013-08-13 00:38:46|6157 Frank Garden...|          Wilson PLC|    1|
|     Eric Lozano| 38|      12884.75|              0| 6.67|2016-06-29 06:20:07|1331 Keith Court ...|Miller, Johnson a...|    1|
+----------------+---+--------------+---------------+-----+-------------------+--------------------+--------------------+-----+
only showing top 3 rows



In [25]:
df.printSchema() # 

root
 |-- Names: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [27]:
df.dtypes

[('Names', 'string'),
 ('Age', 'int'),
 ('Total_Purchase', 'double'),
 ('Account_Manager', 'int'),
 ('Years', 'double'),
 ('Onboard_date', 'timestamp'),
 ('Location', 'string'),
 ('Company', 'string'),
 ('Churn', 'int')]

### Find unique values

In [29]:
# select a column
df.select('Age', 'Total_Purchase').show()

+----+--------------+
| Age|Total_Purchase|
+----+--------------+
|  42|       11066.8|
|  41|      11916.22|
|  38|      12884.75|
|  42|       8010.76|
|  37|       9191.58|
|  48|      10356.02|
|null|      11331.58|
|  32|       9885.12|
|  43|       14062.6|
|  40|       8066.94|
|  30|      11575.37|
|  45|       8771.02|
|  45|       8988.67|
|  40|       8283.32|
|  41|       6569.87|
|  38|      10494.82|
|null|       8213.41|
|  43|      11226.88|
|  53|       5515.09|
|  46|        8046.4|
+----+--------------+
only showing top 20 rows



In [31]:
# show me the unique values that Age can take
df.select('Age').distinct().show()

+----+
| Age|
+----+
|  31|
|  65|
|  53|
|  34|
|  28|
|  26|
|  27|
|  44|
|  22|
|  47|
|null|
|  52|
|  40|
|  54|
|  48|
|  41|
|  43|
|  37|
|  35|
|  55|
+----+
only showing top 20 rows



In [32]:
#number of unique - nunique()
df.select('Age').distinct().count()

37

### describe - summary stats

In [33]:
df.describe().show()

+-------+-------------+-----------------+------------------+-------------------+-----------------+--------------------+--------------------+-------------------+
|summary|        Names|              Age|    Total_Purchase|    Account_Manager|            Years|            Location|             Company|              Churn|
+-------+-------------+-----------------+------------------+-------------------+-----------------+--------------------+--------------------+-------------------+
|  count|          900|              898|               899|                898|              900|                 900|                 899|                900|
|   mean|         null|41.81069042316258|10064.888242491663| 0.4799554565701559| 5.27315555555555|                null|                null|0.16666666666666666|
| stddev|         null| 6.13303075073418| 2409.188637458988|0.49987645989755647|1.274449013194616|                null|                null| 0.3728852122772358|
|    min|   Aaron King|           

In [34]:
df.select('Names', 'Age', 'Total_Purchase').describe().show()

+-------+-------------+-----------------+------------------+
|summary|        Names|              Age|    Total_Purchase|
+-------+-------------+-----------------+------------------+
|  count|          900|              898|               899|
|   mean|         null|41.81069042316258|10064.888242491663|
| stddev|         null| 6.13303075073418| 2409.188637458988|
|    min|   Aaron King|               22|             100.0|
|    max|Zachary Walsh|               65|          18026.01|
+-------+-------------+-----------------+------------------+



### Check the shape or dimensions of data

In [37]:
# number of rows
df.count(), len(df.columns)

(900, 9)

In [36]:
#number of cols
len(df.columns)

9

### Subset by row (filtering)

In [38]:
df.filter(df['Age'] > 30)

DataFrame[Names: string, Age: int, Total_Purchase: double, Account_Manager: int, Years: double, Onboard_date: timestamp, Location: string, Company: string, Churn: int]

In [39]:
df.filter(df['Age'] > 30).count()

868

In [40]:
df.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Onboard_date',
 'Location',
 'Company',
 'Churn']

In [43]:
# & = and, | = or

df.filter((df['Age'] > 30) & (df['Churn'] == 1)).count()

143

In [44]:
df.filter(df['Names']. startswith('Eric')).show()

+------------+----+--------------+---------------+-----+-------------------+--------------------+--------------------+-----+
|       Names| Age|Total_Purchase|Account_Manager|Years|       Onboard_date|            Location|             Company|Churn|
+------------+----+--------------+---------------+-----+-------------------+--------------------+--------------------+-----+
| Eric Lozano|  38|      12884.75|              0| 6.67|2016-06-29 06:20:07|1331 Keith Court ...|Miller, Johnson a...|    1|
| Eric Butler|null|      11331.58|           null| 5.23|2016-12-05 03:35:43|4846 Savannah Roa...|   Reynolds-Sheppard|    1|
|Erica Flores|  38|       9624.18|              0| 5.53|2008-12-22 06:09:17|421 Kevin Shoal G...|           Carey Ltd|    1|
| Eric Kelley|  43|      10271.19|              1| 5.31|2011-07-19 10:28:53|98472 Meghan Mall...|Santos, Hoffman a...|    1|
|  Eric Smith|  37|      11416.56|              0| 6.06|2006-04-03 04:29:08|166 Julie Vista A...|       Thomas-Curtis|    0|


In [46]:
df.select('Names', 'Age').filter(df['Names'].cointains('Eric')).show()

TypeError: 'Column' object is not callable

In [48]:
df.select('Names', 'Age').filter(df['Age'].isNull()).show()

+-------------+----+
|        Names| Age|
+-------------+----+
|  Eric Butler|null|
|Doris Wilkins|null|
+-------------+----+



In [76]:
df = df.dropna()

In [77]:
df.count()

896

In [None]:
#drop
# df.dropna(how = 'any', subset = ['Age', 'Names'])

In [None]:
# fill na's in  Age with 30
# df.fillna(30, subset = ['Age'])

In [78]:
df.filter(df['Onboard_date'].between('2013-08-30','2013-08-13')).show()

+-----+---+--------------+---------------+-----+------------+--------+-------+-----+
|Names|Age|Total_Purchase|Account_Manager|Years|Onboard_date|Location|Company|Churn|
+-----+---+--------------+---------------+-----+------------+--------+-------+-----+
+-----+---+--------------+---------------+-----+------------+--------+-------+-----+



In [79]:
df.select('Onboard_date').show(2)

+-------------------+
|       Onboard_date|
+-------------------+
|2013-08-30 07:00:40|
|2013-08-13 00:38:46|
+-------------------+
only showing top 2 rows



In [80]:
# filter using sql
df.createOrReplaceTempView('churn_table')

In [81]:
spark.sql('SELECT Names, Age FROM churn_table LIMIT 2').show()

+----------------+---+
|           Names|Age|
+----------------+---+
|Cameron Williams| 42|
|   Kevin Mueller| 41|
+----------------+---+



### Group by

In [82]:
df.groupBy('Churn').mean().show(4)

+-----+------------------+-------------------+--------------------+------------------+----------+
|Churn|          avg(Age)|avg(Total_Purchase)|avg(Account_Manager)|        avg(Years)|avg(Churn)|
+-----+------------------+-------------------+--------------------+------------------+----------+
|    1|42.986301369863014| 10202.051575342472|  0.5547945205479452| 5.893698630136986|       1.0|
|    0| 41.58133333333333| 10036.952853333332|  0.4653333333333333|5.1510666666666625|       0.0|
+-----+------------------+-------------------+--------------------+------------------+----------+



In [83]:
df.groupBy('Churn').mean().toPandas()

Unnamed: 0,Churn,avg(Age),avg(Total_Purchase),avg(Account_Manager),avg(Years),avg(Churn)
0,1,42.986301,10202.051575,0.554795,5.893699,1.0
1,0,41.581333,10036.952853,0.465333,5.151067,0.0


### Machine learning
predict churn

In [84]:
df.columns

['Names',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Years',
 'Onboard_date',
 'Location',
 'Company',
 'Churn']

In [85]:
from pyspark.ml.feature import VectorAssembler

In [86]:
df.dtypes

[('Names', 'string'),
 ('Age', 'int'),
 ('Total_Purchase', 'double'),
 ('Account_Manager', 'int'),
 ('Years', 'double'),
 ('Onboard_date', 'timestamp'),
 ('Location', 'string'),
 ('Company', 'string'),
 ('Churn', 'int')]

In [87]:
# avoiding dummy variables for now as onehotencoding in spark is more complicated. now using only numerical values

input_columns = ['Age', 'Total_Purchase', 'Account_Manager', 'Years']

In [88]:
assembler = VectorAssembler(inputCols = input_columns, outputCol = 'features') # outputcol maaking all inputs into one

assembler

VectorAssembler_10eb497b39b9

In [89]:
output = assembler.transform(df)

output # adding a new column features

DataFrame[Names: string, Age: int, Total_Purchase: double, Account_Manager: int, Years: double, Onboard_date: timestamp, Location: string, Company: string, Churn: int, features: vector]

In [90]:
output.select('features').show(2)

+--------------------+
|            features|
+--------------------+
|[42.0,11066.8,0.0...|
|[41.0,11916.22,0....|
+--------------------+
only showing top 2 rows



UP: 1st person age = 42, total purchase 11k, etc..

In [91]:
final_data = output.select('features', 'Churn')

In [92]:
final_data.show(4)

+--------------------+-----+
|            features|Churn|
+--------------------+-----+
|[42.0,11066.8,0.0...|    1|
|[41.0,11916.22,0....|    1|
|[38.0,12884.75,0....|    1|
|[42.0,8010.76,0.0...|    1|
+--------------------+-----+
only showing top 4 rows



### Train/test split

In [93]:
train, test = final_data.randomSplit([0.7, 0.3], seed = 123)

In [94]:
train.count()

629

### Modelling - Decision Trees

In [115]:
# import ML algorithms

from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier


In [96]:
tree_model = DecisionTreeClassifier(featuresCol = 'features',
                                   labelCol = 'Churn')

In [97]:
tree_model_fitted = tree_model.fit(train)

In [99]:
tree_model_fitted.featureImportances

SparseVector(4, {0: 0.1471, 1: 0.295, 2: 0.0262, 3: 0.5317})

In [116]:
# For random forest
rf_model = RandomForestClassifier(featuresCol = 'features',
                                 labelCol = 'Churn')

In [117]:
# for random forest
rf_model_fitted = rf_model.fit(train)

In [120]:
# for random forest, which features are most important according to RF

rf_model_fitted.featureImportances

SparseVector(4, {0: 0.3368, 1: 0.22, 2: 0.0439, 3: 0.3993})

In [100]:
tree_model_fitted

DecisionTreeClassificationModel (uid=DecisionTreeClassifier_2d9710830552) of depth 5 with 27 nodes

### Evaluation

In [101]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [104]:
# make predictions on test set
predictions_tree = tree_model_fitted.transform(test)

predictions_tree.show()

+--------------------+-----+-------------+--------------------+----------+
|            features|Churn|rawPrediction|         probability|prediction|
+--------------------+-----+-------------+--------------------+----------+
|[25.0,9672.03,0.0...|    0| [134.0,27.0]|[0.83229813664596...|       0.0|
|[26.0,8787.39,1.0...|    1| [134.0,27.0]|[0.83229813664596...|       0.0|
|[26.0,8939.61,0.0...|    0|    [3.0,0.0]|           [1.0,0.0]|       0.0|
|[28.0,11204.23,0....|    0|    [3.0,0.0]|           [1.0,0.0]|       0.0|
|[29.0,5900.78,1.0...|    0| [134.0,27.0]|[0.83229813664596...|       0.0|
|[29.0,11274.46,1....|    0|    [3.0,0.0]|           [1.0,0.0]|       0.0|
|[29.0,12711.15,0....|    0|  [92.0,28.0]|[0.76666666666666...|       0.0|
|[29.0,13240.01,1....|    0|   [67.0,6.0]|[0.91780821917808...|       0.0|
|[30.0,10183.98,1....|    0|  [92.0,28.0]|[0.76666666666666...|       0.0|
|[30.0,12788.37,0....|    0|    [3.0,0.0]|           [1.0,0.0]|       0.0|
|[31.0,5304.6,0.0,...|   

In [124]:
churn_eval = MulticlassClassificationEvaluator(predictionCol='prediction', 
                                               labelCol = 'Churn',
                                               metricName = 'accuracy')

In [125]:
churn_eval.evaluate(predictions_tree)

0.8164794007490637

In [126]:
# for random forest

rf_model = RandomForestClassifier(featuresCol = 'features',
                                 labelCol = 'Churn')

In [None]:
churn_eval.evaluate(predictions_tree)