In [113]:
import findspark
findspark.init('/home/shashank/spark-2.3.2-bin-hadoop2.7')

In [114]:
import pyspark

In [115]:
from pyspark.sql import SparkSession

In [116]:
spark = SparkSession.builder.appName('LR').getOrCreate()

In [117]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

In [118]:
data = spark.read.csv('customer_churn.csv', inferSchema=True, header=True)

In [119]:
data.createOrReplaceTempView('data')

In [120]:
spark.sql("FROM data SELECT *")

DataFrame[Names: string, Age: double, Total_Purchase: double, Account_Manager: int, Years: double, Num_Sites: double, Onboard_date: timestamp, Location: string, Company: string, Churn: int]

In [122]:
spark.sql("FROM data SELECT Churn, COUNT(Names)  GROUP BY Churn").show()

+-----+------------+
|Churn|count(Names)|
+-----+------------+
|    1|         150|
|    0|         750|
+-----+------------+



In [123]:
spark.sql("FROM data SELECT Company, COUNT(Names)  GROUP BY Company").show()

+--------------------+------------+
|             Company|count(Names)|
+--------------------+------------+
|Miller, Johnson a...|           1|
|Hunter, Reyes and...|           1|
|          Obrien PLC|           1|
|            Soto PLC|           2|
|            Todd LLC|           1|
|Smith, Marshall a...|           1|
|           Smith PLC|           1|
|          Hall Group|           1|
|Freeman, Lam and ...|           1|
|       Smith-Carroll|           1|
|Hall, Hernandez a...|           1|
|          Cannon Inc|           1|
|        White-Dennis|           1|
|Wilson, Collins a...|           1|
|Jennings, Gates a...|           1|
|     Campbell-Willis|           1|
|    Martinez-Roberts|           1|
|        Robinson PLC|           1|
|          Barton Inc|           1|
|Hernandez, Middle...|           1|
+--------------------+------------+
only showing top 20 rows



In [124]:
spark.sql("FROM data SELECT *").show()

+-------------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+
|              Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|       Onboard_date|            Location|             Company|Churn|
+-------------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+
|   Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|2013-08-30 07:00:40|10265 Elizabeth M...|          Harvey LLC|    1|
|      Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|2013-08-13 00:38:46|6157 Frank Garden...|          Wilson PLC|    1|
|        Eric Lozano|38.0|      12884.75|              0| 6.67|     12.0|2016-06-29 06:20:07|1331 Keith Court ...|Miller, Johnson a...|    1|
|      Phillip White|42.0|       8010.76|              0| 6.71|     10.0|2014-04-22 12:43:12|13120 Daniel Moun...|           Smith Inc|    1|
|     

In [125]:
#convert string to date subset to relevant cols
data2 = spark.sql("FROM data SELECT MONTH(TO_DATE(CAST(UNIX_TIMESTAMP(Onboard_date, 'MM/dd/yyyy') AS TIMESTAMP))) AS Month, \
            YEAR(TO_DATE(CAST(UNIX_TIMESTAMP(Onboard_date, 'MM/dd/yyyy') AS TIMESTAMP))) AS Year, \
          Age, Total_Purchase, Account_Manager, Years as Num_Years, Num_Sites, \
          SUBSTRING(SPLIT(Location, ',')[1], 0, 3) AS State, Churn")

In [126]:
data2.createOrReplaceTempView('data2')

In [127]:
data2.printSchema()

root
 |-- Month: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Num_Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- State: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [128]:
data2.show()

+-----+----+----+--------------+---------------+---------+---------+-----+-----+
|Month|Year| Age|Total_Purchase|Account_Manager|Num_Years|Num_Sites|State|Churn|
+-----+----+----+--------------+---------------+---------+---------+-----+-----+
|    8|2013|42.0|       11066.8|              0|     7.22|      8.0|   AK|    1|
|    8|2013|41.0|      11916.22|              0|      6.5|     11.0|   RI|    1|
|    6|2016|38.0|      12884.75|              0|     6.67|     12.0|   DE|    1|
|    4|2014|42.0|       8010.76|              0|     6.71|     10.0|   WY|    1|
|    1|2016|37.0|       9191.58|              0|     5.56|      9.0|   MH|    1|
|    3|2009|48.0|      10356.02|              0|     5.12|      8.0|   PR|    1|
|   12|2016|44.0|      11331.58|              1|     5.23|     11.0|   IA|    1|
|    3|2006|32.0|       9885.12|              1|     6.92|      9.0|   FM|    1|
|    9|2011|43.0|       14062.6|              1|     5.46|     11.0|   MA|    1|
|    3|2006|40.0|       8066

In [129]:
#Convert location to state
spark.sql("FROM data2 SELECT State , Count(Churn) GROUP BY State ").show() #74 Null

+-----+------------+
|State|count(Churn)|
+-----+------------+
|   ME|          17|
|   WA|          15|
|   AL|          17|
|   NM|          13|
|   MI|          15|
|   MH|          16|
|   HI|          13|
|   VT|           9|
|   MO|           8|
|   NE|          12|
|   PW|          11|
| null|          74|
|   RI|          14|
|   Bo|          33|
|   NH|          15|
|   AK|          16|
|   AR|          17|
|   PR|          15|
|   AZ|          14|
|   WV|          18|
+-----+------------+
only showing top 20 rows



In [130]:
data3 = spark.sql ("FROM data2 SELECT Month, Year, Age, Total_Purchase, Account_Manager, Num_Years, Num_Sites, \
                        COALESCE(State, 'Missing') AS State, Churn")

In [131]:
#Index String
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, VectorIndexer

In [132]:
State_index = StringIndexer(inputCol='State', outputCol='State_ind')

In [133]:
#One Hot encoding
State_encoder = OneHotEncoder(inputCol='State_ind', outputCol='StateVec')
Month_encoder = OneHotEncoder(inputCol='Month', outputCol='MonthVec')
Year_encoder = OneHotEncoder(inputCol='Year', outputCol='YearVec')

In [134]:
assembler = VectorAssembler(inputCols=['MonthVec', 'YearVec', 'StateVec', 'Age', 'Total_Purchase', 'Account_Manager', \
                                      'Num_Years', 'Num_Sites'], outputCol = 'features')

In [135]:
#LR Object
lr_model = LogisticRegression(featuresCol='features', labelCol='Churn')

In [136]:
from pyspark.ml import Pipeline

In [137]:
pipeline = Pipeline(stages = [State_index, State_encoder, Month_encoder, Year_encoder, assembler, lr_model])

In [138]:
seed = 42
train, test = data3.randomSplit([0.7, 0.3])

In [139]:
fit_model = pipeline.fit(train)
results = fit_model.transform(test)
eval_results = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='Churn')
eval_results.evaluate(results)

0.6462462462462462

# Try 2

In [179]:
data2.show()

+-----+----+----+--------------+---------------+---------+---------+-----+-----+
|Month|Year| Age|Total_Purchase|Account_Manager|Num_Years|Num_Sites|State|Churn|
+-----+----+----+--------------+---------------+---------+---------+-----+-----+
|    8|2013|42.0|       11066.8|              0|     7.22|      8.0|   AK|    1|
|    8|2013|41.0|      11916.22|              0|      6.5|     11.0|   RI|    1|
|    6|2016|38.0|      12884.75|              0|     6.67|     12.0|   DE|    1|
|    4|2014|42.0|       8010.76|              0|     6.71|     10.0|   WY|    1|
|    1|2016|37.0|       9191.58|              0|     5.56|      9.0|   MH|    1|
|    3|2009|48.0|      10356.02|              0|     5.12|      8.0|   PR|    1|
|   12|2016|44.0|      11331.58|              1|     5.23|     11.0|   IA|    1|
|    3|2006|32.0|       9885.12|              1|     6.92|      9.0|   FM|    1|
|    9|2011|43.0|       14062.6|              1|     5.46|     11.0|   MA|    1|
|    3|2006|40.0|       8066

In [180]:
cols = data2.select(['Month',
 'Year',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Num_Years',
 'Num_Sites',
 'State',
 'Churn'])

In [181]:
data4 = cols.na.drop()

In [182]:
seed = 42
train2, test2 = data4.randomSplit([0.7, 0.3])

In [183]:
fit_model2 = pipeline.fit(train2)
results2 = fit_model2.transform(test2)
eval_results2 = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='Churn')
eval_results2.evaluate(results2)

0.7134146341463415

We see that dropping NAs, instead of including them as "Missing" has a better prediction rate.

# CROSS VALIDATION

In [171]:
#Explore params
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [173]:
print(lr_model.explainParams())

aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)
family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial (default: auto)
featuresCol: features column name. (default: features, current: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label, current: Churn)
lowerBoundsOnCoefficients: The lower bounds on coefficients if fitting under bound constrained optimization. The bound matrix must be compatible with the shape (1, number of features) for binomial regression, or (number of classes, number of features) for multinomial regression. (undefined)
lowerBoundsOnIntercepts: The lower bounds on intercepts if fitting under bound constrained optimization. The

In [177]:
#Create param grid for cross validation
paramGrid = (ParamGridBuilder().addGrid(lr_model.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr_model.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr_model.maxIter, [1, 5, 10])
             .build())

In [226]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=eval_results2, numFolds=5)


In [227]:
# Run cross validations
cvModel = cv.fit(train2)

In [228]:
predictions = cvModel.transform(test2)

In [236]:
eval_results2.evaluate(predictions)

0.6383825417201541

In [237]:
cvModel = cv.fit(train)
predictions = cvModel.transform(test)
eval_results.evaluate(predictions)

0.5750750750750752

Cross validated that dropping missing state observations is better

# Final Predictions

In [203]:
pred_data = spark.read.csv('new_customers.csv', inferSchema=True, header=True)

In [204]:
data4.head()

Row(Month=8, Year=2013, Age=42.0, Total_Purchase=11066.8, Account_Manager=0, Num_Years=7.22, Num_Sites=8.0, State=' AK', Churn=1)

In [205]:
pred_data.head()

Row(Names='Andrew Mccall', Age=37.0, Total_Purchase=9935.53, Account_Manager=1, Years=7.71, Num_Sites=8.0, Onboard_date=datetime.datetime(2011, 8, 29, 18, 37, 54), Location='38612 Johnny Stravenue Nataliebury, WI 15717-8316', Company='King Ltd')

In [206]:
pred_data.createOrReplaceTempView('pred_data')

In [208]:
pred_data2 = spark.sql("FROM pred_data SELECT Names, Age, Total_Purchase, Account_Manager, Years AS Num_Years, Num_Sites, \
                        MONTH(TO_DATE(CAST(UNIX_TIMESTAMP(Onboard_date, 'MM/dd/yyyy') AS TIMESTAMP))) AS Month, \
                       YEAR(TO_DATE(CAST(UNIX_TIMESTAMP(Onboard_date, 'MM/dd/yyyy') AS TIMESTAMP))) AS Year, \
                       SUBSTRING(SPLIT(Location, ',')[1], 0, 3) AS State")

In [209]:
pred_data2.show()

+--------------+----+--------------+---------------+---------+---------+-----+----+-----+
|         Names| Age|Total_Purchase|Account_Manager|Num_Years|Num_Sites|Month|Year|State|
+--------------+----+--------------+---------------+---------+---------+-----+----+-----+
| Andrew Mccall|37.0|       9935.53|              1|     7.71|      8.0|    8|2011|   WI|
|Michele Wright|23.0|       7526.94|              1|     9.28|     15.0|    7|2013|   Yo|
|  Jeremy Chang|65.0|         100.0|              1|      1.0|     15.0|   12|2006|   WY|
|Megan Ferguson|32.0|        6487.5|              0|      9.4|     14.0|   10|2016|   NC|
|  Taylor Young|32.0|      13147.71|              1|     10.0|      8.0|    3|2012| null|
| Jessica Drake|22.0|       8445.26|              1|     3.46|     14.0|    2|2011| null|
+--------------+----+--------------+---------------+---------+---------+-----+----+-----+



In [210]:
cols_pred_data = pred_data2.select(['Names','Month',
 'Year',
 'Age',
 'Total_Purchase',
 'Account_Manager',
 'Num_Years',
 'Num_Sites',
 'State'])

In [211]:
pred_data3 = cols_pred_data.na.drop()

In [212]:
final_fit = pipeline.fit(data4)
final_results = final_fit.transform(pred_data3)

In [213]:
final_results.createOrReplaceTempView('final_results')