 # Loading and Cleaning

First we need to load the data as a CSV. Note I also showed a row for simplicity to understand what we're looking at.

In [1]:
from pyspark.sql.functions import *

data = sqlContext.read.format('com.databricks.spark.csv') \
.option("header",True) \
.option("inferSchema",True) \
.load('file:/home/cloudera/Big Data Project/final_with_churn.csv')

In [2]:
data.limit(1).toPandas()

Unnamed: 0,user_id,days_used,total_transactions,total_num_25,total_num_50,total_num_75,total_num_985,total_num_100,avg_unique_songs,avg_total_secs,avg_plan_length,avg_expected_plan_price,avg_actual_plan_price,max_expiration_date,min_transaction_date,city,age,gender,registered_via,is_churn
0,++0wqjjQge1mBBe5r4ciHGKwtF/m322zkra7CK8I+Mw=,0,16,\N,\N,\N,\N,\N,\N,\N,30.0,99.0,99.0,20170306,20151107,\N,\N,\N,\N,0


Then we want to check the schema.

In [3]:
data.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- days_used: integer (nullable = true)
 |-- total_transactions: integer (nullable = true)
 |-- total_num_25: string (nullable = true)
 |-- total_num_50: string (nullable = true)
 |-- total_num_75: string (nullable = true)
 |-- total_num_985: string (nullable = true)
 |-- total_num_100: string (nullable = true)
 |-- avg_unique_songs: string (nullable = true)
 |-- avg_total_secs: string (nullable = true)
 |-- avg_plan_length: double (nullable = true)
 |-- avg_expected_plan_price: double (nullable = true)
 |-- avg_actual_plan_price: double (nullable = true)
 |-- max_expiration_date: integer (nullable = true)
 |-- min_transaction_date: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- registered_via: string (nullable = true)
 |-- is_churn: integer (nullable = true)



Notice that sum of the numbers came in as strings due to NA values. We're going to have to clean next so there are 2 steps to this:

1. Filling \N's in the appropriate columsns with 0. We will also want to convert the strings in these fields to floats.
    - these columns are
2. Dropping rows that aren't relevant because they have missing data.
    - these columns are: city, age, gender, registered via


Let's also get a row count so we have some idea of proportionality moving forward.

In [4]:
data.select('user_id').count()

992931

Before starting 1, lets check how many are missing from these columns. Note that I assume if one column is missing they are all misssing as they all were pulled from the same table.

In [5]:
data.select('total_num_25').groupby('total_num_25').count().sort('count', ascending = False).limit(5).show()

+------------+------+
|total_num_25| count|
+------------+------+
|          \N|123005|
|           0|  2684|
|          36|   845|
|          72|   776|
|          60|   751|
+------------+------+



So we have 123k missing the values pulled from the usage. That being said, since these users were present in other tables I'm going to make the assumption they were registered but for whatever reason they did not use their account actively. In this case, we will leave them in, but fill these usage related values as 0's in our next step.

In [6]:
#1
data = data.withColumn('total_num_25', \
                      when(data['total_num_25']=='\N', '0.0').otherwise(data['total_num_25']))

data = data.withColumn('total_num_50', \
                      when(data['total_num_50']=='\N', '0.0').otherwise(data['total_num_50']))

data = data.withColumn('total_num_75', \
                      when(data['total_num_75']=='\N', '0.0').otherwise(data['total_num_75']))

data = data.withColumn('total_num_985', \
                      when(data['total_num_985']=='\N', '0.0').otherwise(data['total_num_985']))

data = data.withColumn('total_num_100', \
                      when(data['total_num_100']=='\N', '0.0').otherwise(data['total_num_100']))

data = data.withColumn('avg_unique_songs', \
                      when(data['avg_unique_songs']=='\N', '0.0').otherwise(data['avg_unique_songs']))

data = data.withColumn('avg_total_secs', \
                      when(data['avg_total_secs']=='\N', '0.0').otherwise(data['avg_total_secs']))

Checking the first row to make sure the above corrections worked.

In [7]:
data.limit(1).toPandas()

Unnamed: 0,user_id,days_used,total_transactions,total_num_25,total_num_50,total_num_75,total_num_985,total_num_100,avg_unique_songs,avg_total_secs,avg_plan_length,avg_expected_plan_price,avg_actual_plan_price,max_expiration_date,min_transaction_date,city,age,gender,registered_via,is_churn
0,++0wqjjQge1mBBe5r4ciHGKwtF/m322zkra7CK8I+Mw=,0,16,0.0,0.0,0.0,0.0,0.0,0.0,0.0,30.0,99.0,99.0,20170306,20151107,\N,\N,\N,\N,0


Next we will try to drop rows missing some values... (city, age, gender, registered via) as these are important attributes we want in our analysis. First lets see how many rows are missing these values; note taht i assumed if its missing for city its missing for gender/age/registered via as well since they all pulled from the same table.

In [8]:
data.select('age').groupby('age').count().sort('count', ascending = False).limit(5).show()

+---+------+
|age| count|
+---+------+
|  0|433567|
| \N|296297|
| 27| 15344|
| 26| 14611|
| 25| 13778|
+---+------+



296,297 is a little more than a quarter of rows misssing these values; its a substantial number but since these attributes were from their customer table, I find it a bit odd that the customer table is missing this data while the payment table still has it. From earlier analysis, I could also see a lot of these rows were also missing usage data. Since we are testing the capabilities of pyspark, I am going to make the executive decision to drop these rows; we will still have a viable model built off a large quantity of data without them and will be able to drive better insights with the data not being so present.

In [9]:
data = data.filter(data.age != '\N')

In [10]:
data.limit(1).toPandas()

Unnamed: 0,user_id,days_used,total_transactions,total_num_25,total_num_50,total_num_75,total_num_985,total_num_100,avg_unique_songs,avg_total_secs,avg_plan_length,avg_expected_plan_price,avg_actual_plan_price,max_expiration_date,min_transaction_date,city,age,gender,registered_via,is_churn
0,++Bks8kE9oclzxZM3hcWs+qzsxuoXFeIE1+7pxKBCQg=,624,624,2408,312,264,360,18864,20.76923,6836.1167,30.0,99.0,99.0,20170331,20160730,1,0,,7,0


Since we have some null values in gender, we convert this to a new string called 'Not_Provided' so our model can still use it later.

In [11]:
data = data.withColumn('gender',when(
data['gender']=='', 'Not_Provided').otherwise(data['gender']))

data.select('gender').groupby('gender').count().sort('count', ascending = False).limit(5).show()

+------------+------+
|      gender| count|
+------------+------+
|Not_Provided|430471|
|        male|141045|
|      female|125118|
+------------+------+



The final thing we have to do in our data munging process is find the date difference between the max_expiration_date and the min_transaction_date.

First we need to convert both to dates.

In [12]:
data = data.withColumn('max_expiration_date',data.max_expiration_date.cast("string"))
data = data.withColumn('min_transaction_date',data.min_transaction_date.cast("string"))

data.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- days_used: integer (nullable = true)
 |-- total_transactions: integer (nullable = true)
 |-- total_num_25: string (nullable = true)
 |-- total_num_50: string (nullable = true)
 |-- total_num_75: string (nullable = true)
 |-- total_num_985: string (nullable = true)
 |-- total_num_100: string (nullable = true)
 |-- avg_unique_songs: string (nullable = true)
 |-- avg_total_secs: string (nullable = true)
 |-- avg_plan_length: double (nullable = true)
 |-- avg_expected_plan_price: double (nullable = true)
 |-- avg_actual_plan_price: double (nullable = true)
 |-- max_expiration_date: string (nullable = true)
 |-- min_transaction_date: string (nullable = true)
 |-- city: string (nullable = true)
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- registered_via: string (nullable = true)
 |-- is_churn: integer (nullable = true)



In [13]:
data.registerTempTable('data')

data.select('max_expiration_date','min_transaction_date').limit(5).show()

+-------------------+--------------------+
|max_expiration_date|min_transaction_date|
+-------------------+--------------------+
|           20170331|            20160730|
|           20170311|            20150914|
|           20170316|            20150131|
|           20170331|            20150121|
|           20170305|            20150410|
+-------------------+--------------------+



Then we need to use SqlContext to convert each date to a date type colum using SQL Context. We also test to make sure the conversion worked correctly.

In [14]:
#building the correct date vectors
max_exp_date = sqlContext.sql("""
    SELECT TO_DATE(CAST(UNIX_TIMESTAMP(max_expiration_date,'yyyyMMdd') AS TIMESTAMP)) AS max_exp_date
    FROM DATA
""")

min_trans_date = sqlContext.sql("""
    SELECT TO_DATE(CAST(UNIX_TIMESTAMP(min_transaction_date,'yyyyMMdd') AS TIMESTAMP)) AS min_trans_date
    FROM DATA
""")

In [15]:
max_exp_date.limit(5).show()

+------------+
|max_exp_date|
+------------+
|  2017-03-31|
|  2017-03-11|
|  2017-03-16|
|  2017-03-31|
|  2017-03-05|
+------------+



In [16]:
min_trans_date.limit(5).show()

+--------------+
|min_trans_date|
+--------------+
|    2016-07-30|
|    2015-09-14|
|    2015-01-31|
|    2015-01-21|
|    2015-04-10|
+--------------+



Then we merge these new date type dates back into our original data frame, then calculate the date difference as a new column titled 'date_diff'.

In [17]:
#merging these into the data dataFrame

data = data.withColumn('row_index', monotonically_increasing_id())
max_exp_date = max_exp_date.withColumn('row_index', monotonically_increasing_id())
min_trans_date = min_trans_date.withColumn('row_index', monotonically_increasing_id())

data = data.join(max_exp_date, on=['row_index'])
data = data.join(min_trans_date, on=['row_index']).drop('row_index')

In [18]:
#make date diff column
data = data.withColumn('date_diff', datediff(data.max_exp_date,data.min_trans_date))

In [19]:
data.limit(5).toPandas()

Unnamed: 0,user_id,days_used,total_transactions,total_num_25,total_num_50,total_num_75,total_num_985,total_num_100,avg_unique_songs,avg_total_secs,...,max_expiration_date,min_transaction_date,city,age,gender,registered_via,is_churn,max_exp_date,min_trans_date,date_diff
0,+1ZgRw2ZlmD4Z1NCVo8lh4ECpNtG73bp/cECvhq4l8Q=,1071,1071,2961,1050,735,651,16212,16.745098,4050.471,...,20170311,20150802,1,0,Not_Provided,7,0,2017-03-11,2015-08-02,587
1,+NKVPkGwpoOWKQDdH3mtpaZGR5lx9fu5bOHixIRjsnI=,2610,2610,17385,3495,3270,2580,75045,18.718391,8045.0156,...,20170315,20151216,1,0,Not_Provided,7,0,2017-03-15,2015-12-16,455
2,+i2T+lq7TNR/gThVOEh6M3CdHEIgZhgeH1ENTjAgMyE=,6403,6403,39349,10260,6175,5966,79591,19.973293,3643.7224,...,20170319,20150920,1,0,Not_Provided,7,0,2017-03-19,2015-09-20,546
3,/67f8zgh70yyzqwntxaAuqqSrbibNC7KxG5rGBg4/hc=,130,130,390,55,80,65,1970,12.230769,4521.8457,...,20170331,20161029,1,0,Not_Provided,7,0,2017-03-31,2016-10-29,153
4,/UYp4Ued/yMVEf5OD13C9Hz8B/N78PBx13tglE3+gXA=,117,117,213,21,9,15,2409,19.564102,5146.8057,...,20170331,20161231,1,0,Not_Provided,7,0,2017-03-31,2016-12-31,90


# Model Prep and Pipeline Building

Next up we need to get our data in a format that can be fed to models. First, we will select the data and also cast it to its appropriate data types now that its been cleaned up.

In [20]:
data.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- days_used: integer (nullable = true)
 |-- total_transactions: integer (nullable = true)
 |-- total_num_25: string (nullable = true)
 |-- total_num_50: string (nullable = true)
 |-- total_num_75: string (nullable = true)
 |-- total_num_985: string (nullable = true)
 |-- total_num_100: string (nullable = true)
 |-- avg_unique_songs: string (nullable = true)
 |-- avg_total_secs: string (nullable = true)
 |-- avg_plan_length: double (nullable = true)
 |-- avg_expected_plan_price: double (nullable = true)
 |-- avg_actual_plan_price: double (nullable = true)
 |-- max_expiration_date: string (nullable = true)
 |-- min_transaction_date: string (nullable = true)
 |-- city: string (nullable = true)
 |-- age: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- registered_via: string (nullable = true)
 |-- is_churn: integer (nullable = true)
 |-- max_exp_date: date (nullable = true)
 |-- min_trans_date: date (nullable = true)
 |-- da

In [28]:
cols = data.select(data.days_used.cast('double'),
                   data.total_transactions.cast('double'),
                   data.total_num_25.cast('double'),
                   data.total_num_50.cast('double'),
                   data.total_num_75.cast('double'),
                   data.total_num_985.cast('double'),
                   data.total_num_100.cast('double'),
                   data.avg_unique_songs.cast('double'),
                   data.avg_total_secs.cast('double'),
                   data.avg_plan_length.cast('double'),
                   data.avg_expected_plan_price.cast('double'),
                   data.avg_actual_plan_price.cast('double'),
                   data.date_diff.cast('double'),
                   data.age.cast('double'),
                   data.city,
                   data.gender,
                   data.registered_via,
                   data.is_churn.cast('double')               
                  )

cols.limit(1).toPandas()

Unnamed: 0,days_used,total_transactions,total_num_25,total_num_50,total_num_75,total_num_985,total_num_100,avg_unique_songs,avg_total_secs,avg_plan_length,avg_expected_plan_price,avg_actual_plan_price,date_diff,age,city,gender,registered_via,is_churn
0,1071.0,1071.0,2961.0,1050.0,735.0,651.0,16212.0,16.745098,4050.471,30.0,99.04762,99.04762,587.0,0.0,1,Not_Provided,7,0.0


Indexing categorical variables, then encoding them

In [22]:
from pyspark.ml.feature import (VectorAssembler, VectorIndexer, OneHotEncoder, StringIndexer)

#City, gender, registered via
#Creating Indexers
city_indexer = StringIndexer(inputCol='city', outputCol='cityIndex')
gender_indexer = StringIndexer(inputCol='gender', outputCol='genderIndex')
registered_via_indexer = StringIndexer(inputCol='registered_via', outputCol='registered_viaIndex')
churn_indexer = StringIndexer(inputCol='is_churn', outputCol='is_churnIndex')

#Creating Encoders
city_encoder = OneHotEncoder(inputCol='cityIndex', outputCol='cityVec')
gender_encoder = OneHotEncoder(inputCol='genderIndex', outputCol='genderVec')
registered_via_encoder = OneHotEncoder(inputCol='registered_viaIndex', outputCol='registered_viaVec')
churn_encoder = OneHotEncoder(inputCol='is_churnIndex', outputCol='is_churnVec')


Building Assembler

In [23]:
assembler = VectorAssembler(inputCols=['days_used','total_transactions','total_num_25','total_num_50','total_num_75',
                                      'total_num_985','total_num_100','avg_unique_songs','avg_total_secs','avg_plan_length',
                                      'avg_expected_plan_price','avg_actual_plan_price','date_diff','age','cityVec','genderVec',
                                      'registered_viaVec'], outputCol='features')

# Logistic Regression

Importing and setting up logistic regression

In [29]:
from pyspark.ml.classification import LogisticRegression
log_reg = LogisticRegression(labelCol='is_churn')

Pipeline Time!

In [30]:
from pyspark.ml import Pipeline

pipe = Pipeline(stages=[city_indexer,
                       gender_indexer,
                       registered_via_indexer,
                       city_encoder,
                       gender_encoder,
                       registered_via_encoder,
                       assembler,
                       log_reg])

Splitting the data to a 70 / 30 - test / train

In [31]:
train_data, test_data = cols.randomSplit([.7,.3])

Fitting the Model to our Logistic Regression


In [34]:
fit = pipe.fit(train_data)

#3 minutes 40 seconds

In [35]:
results = fit.transform(test_data)



# Evaluating our Logistic Regresssion

In [36]:
log_reg_model = fit.stages[-1]

Printing out the coeffecients for our variables.

In [37]:
print log_reg_model.coefficients

[-2.76981774646e-06,-2.77028020459e-06,2.40386775847e-08,1.5243337119e-07,-7.29547587911e-08,-1.16132044599e-07,-3.24917953939e-08,0.00129359986098,3.28902230099e-17,0.00153692421394,0.000920840544739,0.000815303931357,-0.000113107106972,0.000706489031535,-0.077814969204,0.0380074112638,0.0418986711783,0.0474049215087,0.0576303332431,0.056205335389,0.0878063887289,0.0215013542602,0.0734439640239,0.0381708330171,0.0630016420282,0.048244453277,0.10650677688,0.0363828611859,0.00991158409444,0.074281895514,0.0432528166236,-0.00524575779075,0.0136111436158,0.0567259207853,-0.082177963689,0.0673396541995,-0.191555320707,0.0707969001526,0.220590090054,0.326696198719]


Evaluating our metrics (AUC and area under precision recall)

In [38]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
data_eval = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', labelCol='is_churn',)
auc = data_eval.evaluate(results)#.fMeasureByThreshold
print auc

0.834534232775


In [39]:
data_eval.setMetricName('areaUnderPR')

areaUnderPrecisionRecall = data_eval.evaluate(results)

print areaUnderPrecisionRecall

0.0399069265069


# Random Forest

Next, we will do the same thing we did for logistic regression with Random Forests.

In [56]:
from pyspark.ml.classification import RandomForestClassifier

We aligned parameters with out other platforms to compare similar trees.

In [57]:
rf = RandomForestClassifier(numTrees=3, maxDepth=2, labelCol='is_churnIndex')

Notice that for the RF pipeline we have to use churn_indexer. RF in ML requise the target variable for categorical to be run through a string indexer in order to work.

In [58]:
rf_pipe = Pipeline(stages=[city_indexer,
                       gender_indexer,
                       registered_via_indexer,
                         churn_indexer,
                       city_encoder,
                       gender_encoder,
                       registered_via_encoder,
#                           churn_encoder,
                       assembler,
                       rf])

Fitting the RF to our training data.

In [65]:
rf_fit = rf_pipe.fit(train_data)

#takes 5 minutes to run

Running the model on our testing data for our RF

# Evaluating our Random Forest

In [62]:
rf_results = rf_fit.transform(test_data)

In [63]:
rf_data_eval = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', labelCol='is_churnIndex')

rf_auc = rf_data_eval.evaluate(rf_results)

print rf_auc

0.861153257612


In [64]:
rf_data_eval.setMetricName('areaUnderPR')

rf_areaUnderPrecisionRecall = rf_data_eval.evaluate(rf_results)

print rf_areaUnderPrecisionRecall

0.0596329188101


All in all, our random forest produced better result than our logistic regression. It did take longer to run, but not a significant amount of time to make a difference.