# Sparkify - Churn Prediction with PySpark


## 1. Intro

In [2]:
import datetime
import time
import numpy as np

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import Window
from pyspark.sql.functions import udf, col, concat, count, lit, avg, lag, first, last, when
from pyspark.sql.functions import min as Fmin, max as Fmax, sum as Fsum, round as Fround

from pyspark.sql.types import IntegerType, DateType, TimestampType

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, Normalizer, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.clustering import KMeans
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator, Evaluator

In [3]:
# Create a Spark Session
spark = SparkSession \
.builder \
.appName('Sparkify') \
.getOrCreate()

## 2. Data Understanding

In [4]:
# Load data and store it into a Spark dataframe

# Read in full sparkify dataset
event_data = "s3n://udacity-dsnd/sparkify/sparkify_event_data.json"
df = spark.read.json(event_data)

In [4]:
# Structure of the dataframe
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

In [5]:
# Shape of the smaller Sparkify dataframe
nrows = df.count()
ncols = len(df.dtypes)

In [6]:
print(nrows)
print(ncols)

26259199
18

In [7]:
# Obtain the number of distinct users in the full Sparkify dataset
df.select(['userId']).dropDuplicates().count()

22278

In [8]:
# Obtain the distribution of the interaction types in the analyzed dataset
df.groupby('page').count().sort('count', ascending = False).show(22)

+--------------------+--------+
|                page|   count|
+--------------------+--------+
|            NextSong|20850272|
|                Home| 1343102|
|           Thumbs Up| 1151465|
|     Add to Playlist|  597921|
|         Roll Advert|  385212|
|          Add Friend|  381664|
|               Login|  296350|
|              Logout|  296005|
|         Thumbs Down|  239212|
|           Downgrade|  184240|
|                Help|  155100|
|            Settings|  147074|
|               About|   92759|
|             Upgrade|   50507|
|       Save Settings|   29516|
|               Error|   25962|
|      Submit Upgrade|   15135|
|    Submit Downgrade|    6494|
|              Cancel|    5003|
|Cancellation Conf...|    5003|
|            Register|     802|
| Submit Registration|     401|
+--------------------+--------+

## 3. Feature Engineering and Exploratory Data Analysis

In [5]:
# Default observation start and end timestamps for users who haven't registered after October 1, or churned
obs_start_default = 1538352000000
obs_end_default = 1543622400000

In [6]:
# Lag the page column
windowsession = Window.partitionBy('sessionId').orderBy('ts')
df = df.withColumn("lagged_page", lag(df.page).over(windowsession))

In [7]:
# All the values calculated here are the same for all logs belonging to a given user. 
windowuser = Window.partitionBy('userId').orderBy('ts').rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)

# Identify users that registered after the start of observation, and infer the start date accordingly
df = df.withColumn("beforefirstlog", first(col('lagged_page')).over(windowuser))
df = df.withColumn("firstlogtime", first(col('ts')).over(windowuser))
df = df.withColumn("obsstart", 
                   when(df.beforefirstlog == "Submit Registration", df.firstlogtime).otherwise(obs_start_default))

In [8]:
# Identify users that cancelled their service, i.e. users whose last log event is "Cancellation Confirmation", and
# obtain the corresponding end of the observation period. This is done on the original dataset
# so a single value, end of observation period, is copied to all rows belonging to a given userId.

df = df.withColumn("endstate", last(col('page')).over(windowuser))
df = df.withColumn("lastlogtime", last(col('ts')).over(windowuser))
df = df.withColumn("obsend", when(df.endstate == "Cancellation Confirmation", df.lastlogtime).otherwise(obs_end_default))

In [9]:
# For each log compute the time from the beginning of observation...
df = df.withColumn("timefromstart", col('ts')-col("obsstart"))
# ...and time before the end of observation
df = df.withColumn("timebeforeend", col('obsend')-col('ts'))

In [10]:
# Obtain user's last subscription level and add it to the original dataset
df = df.withColumn("lastlevel", last(col('level')).over(windowuser))

In [11]:
# Remove rows with missing userId 
df = df.where(df.userId != "")

In [12]:
# Remove rows with corrupted timestamp
df = df.where(df.ts < obs_end_default)

In [13]:
# Length of the estimation window for activity trend calculation
trend_est_days = 14
trend_est_hours = trend_est_days * 24
# In timestamp format
trend_est = trend_est_days * 24 * 60 * 60 * 1000

In [14]:
# Aggregation by userId
df_user = df.groupby('userId')\
.agg(
     # User-level features
     first(when(col('lastlevel') == 'paid', 1).otherwise(0)).alias('lastlevel'),
     first(when(col('gender') == "F", 1).otherwise(0)).alias('gender'),
     first(col('obsstart')).alias('obsstart'),
     first(col('obsend')).alias('obsend'),
     first(col('endstate')).alias('endstate'),
     #first(col('location')).alias('location'),
     #first(col('userAgent')).alias('userAgent')
     #first(col('registration')).alias('registration'),
     
     # Aggregated activity statistics
     count(col('page')).alias('nact'),
     Fsum(when(col('page') == "NextSong", 1).otherwise(0)).alias("nsongs"),
     Fsum(when(col('page') == "Thumbs Up", 1).otherwise(0)).alias("ntbup"),
     Fsum(when(col('page') == "Thumbs Down", 1).otherwise(0)).alias("ntbdown"),
     Fsum(when(col('page') == "Add Friend", 1).otherwise(0)).alias("nfriend"),
     Fsum(when(col('page') == "Add to Playlist", 1).otherwise(0)).alias("nplaylist"),     
     Fsum(when(col('page') == "Submit Downgrade", 1).otherwise(0)).alias("ndgrade"),
     Fsum(when(col('page') == "Submit Upgrade", 1).otherwise(0)).alias("nugrade"),
     Fsum(when(col('page') == "Home", 1).otherwise(0)).alias("nhome"),
     Fsum(when(col('page') == "Roll Advert", 1).otherwise(0)).alias("nadvert"),
     Fsum(when(col('page') == "Help", 1).otherwise(0)).alias("nhelp"),
     Fsum(when(col('page') == "Settings", 1).otherwise(0)).alias("nsettings"),
     Fsum(when(col('page') == "Error", 1).otherwise(0)).alias("nerror"),
     
     # Aggregated activity statistics in different periods 
     Fsum(when(col('timebeforeend') < trend_est, 1).otherwise(0)).alias("nact_recent"),
     Fsum(when(col('timefromstart') < trend_est, 1).otherwise(0)).alias("nact_oldest"),
     Fsum(when((col('page') == "NextSong") & (col('timebeforeend') < trend_est), 1).otherwise(0)).alias("nsongs_recent"),
     Fsum(when((col('page') == "NextSong") & (col('timefromstart') < trend_est), 1).otherwise(0)).alias("nsongs_oldest") )

In [15]:
# Calculation of the defined features that will be used for identifying churned users
# The first added column, 'obshours', represents the length of the user-specific observation period in hours. The column 
# is not one of the features but is used to calculate all aggregated statistics per hour
df_user = df_user.withColumn('obshours', (col('obsend') - col('obsstart'))/1000/3600)\
.withColumn('nact_perh', col('nact') / col('obshours'))\
.withColumn('nsongs_perh', col('nsongs') / col('obshours'))\
.withColumn('ntbup_perh', col('ntbup') / col('obshours'))\
.withColumn('ntbdown_perh', col('ntbdown') / col('obshours'))\
.withColumn('nfriend_perh', col('nfriend') / col('obshours'))\
.withColumn('nplaylist_perh', col('nplaylist') / col('obshours'))\
.withColumn('nhome_perh', col('nhome') / col('obshours'))\
.withColumn('nadvert_perh', col('nadvert') / col('obshours'))\
.withColumn('nerror_perh', col('nerror') / col('obshours'))\
.withColumn('upgradedowngrade', col('nugrade') + col('ndgrade'))\
.withColumn('songratio', col('nsongs') / col('nact'))\
.withColumn('positiveratio', (col('ntbup') + col('nfriend') + col('nplaylist')) / col('nact'))\
.withColumn('negativeratio', (col('ntbdown') + col('nhelp') + col('nerror') + col('nsettings')) / col('nact'))\
.withColumn('updownratio', col('ntbup') / (col('ntbdown') + 0.1))\
.withColumn('nact_recent_perh', col('nact_recent') / trend_est_hours)\
.withColumn('nact_oldest_perh', col('nact_oldest') / trend_est_hours)\
.withColumn('nsongs_recent_perh', col('nsongs_recent') / trend_est_hours)\
.withColumn('nsongs_oldest_perh', col('nsongs_oldest') / trend_est_hours)\
.withColumn('trend_act', (col('nact_recent_perh') - col('nact_oldest_perh')) / col('obshours'))\
.withColumn('trend_songs', (col('nsongs_recent_perh') - col('nsongs_oldest_perh')) / col('obshours'))
#.withColumn('timesincereg', (col('obsstart') - col('registration'))/1000/3600)\

In [16]:
# Calculation of user's average number of items per session
session_avgnitems = df.groupby(['userId', 'sessionId'])\
.agg(Fmax(col('itemInSession')).alias('nitems'))\
.groupby('userId').agg(avg(col('nitems')).alias('avgsessionitems'))

In [17]:
# Calculation of user's average session length
session_avglength = df.groupby(['userId', 'sessionId'])\
.agg(Fmin(col('ts')).alias('startsession'), Fmax(col('ts')).alias('endsession'))\
.groupby('userId').agg(avg(col('endsession')-col('startsession')).alias('avgsessionlength'))

In [18]:
# Calculations to obtain user's average number of songs played between home visits
windowhome = (Window.partitionBy('userId').orderBy('ts').rangeBetween(Window.unboundedPreceding, 0))
df = df.withColumn("phasehome", Fsum(when(df.page == "Home", 1).otherwise(0)).over(windowhome))

songs_home = df.groupby(['userId', 'phasehome'])\
.agg(Fsum(when(col('page') == "NextSong", 1).otherwise(0)).alias('total'))\
.groupby('userId').agg(avg(col('total')).alias('avgsongs'))

In [19]:
# Adding the extra features to the df_user dataset with all other engineered features
df_user = df_user\
.join(session_avgnitems, on = 'userId')\
.join(session_avglength, on = 'userId')\
.join(songs_home, on = 'userId')

In [20]:
# Calculating the binary response variable
df_user = df_user.withColumn('label', when(df_user.endstate == "Cancellation Confirmation", 1).otherwise(0))

In [20]:
# Distribution of the binary response variable
df_user.groupby('label').count().show()

+-----+-----+
|label|count|
+-----+-----+
|    1| 5003|
|    0|17275|
+-----+-----+

In [21]:
# Schema of the transformed dataset
df_user.printSchema()

root
 |-- userId: string (nullable = true)
 |-- lastlevel: integer (nullable = true)
 |-- gender: integer (nullable = true)
 |-- obsstart: long (nullable = true)
 |-- obsend: long (nullable = true)
 |-- endstate: string (nullable = true)
 |-- nact: long (nullable = false)
 |-- nsongs: long (nullable = true)
 |-- ntbup: long (nullable = true)
 |-- ntbdown: long (nullable = true)
 |-- nfriend: long (nullable = true)
 |-- nplaylist: long (nullable = true)
 |-- ndgrade: long (nullable = true)
 |-- nugrade: long (nullable = true)
 |-- nhome: long (nullable = true)
 |-- nadvert: long (nullable = true)
 |-- nhelp: long (nullable = true)
 |-- nsettings: long (nullable = true)
 |-- nerror: long (nullable = true)
 |-- nact_recent: long (nullable = true)
 |-- nact_oldest: long (nullable = true)
 |-- nsongs_recent: long (nullable = true)
 |-- nsongs_oldest: long (nullable = true)
 |-- obshours: double (nullable = true)
 |-- nact_perh: double (nullable = true)
 |-- nsongs_perh: double (nullable = t

## 4. Modelling and Evaluation

In [21]:
# Packing together multiple features  using VectorAssembler
numeric_columns = ['nsongs_perh', 'ntbup_perh','ntbdown_perh', 'nfriend_perh', 
'nadvert_perh', 'nerror_perh', 'upgradedowngrade', 'songratio', 'positiveratio','negativeratio', 
'updownratio', 'trend_songs', 'avgsessionitems','avgsongs']

numeric_assembler = VectorAssembler(inputCols = numeric_columns, outputCol = "numericvectorized")

# Standardizing all numerical features at the same time
scaler = StandardScaler(inputCol = "numericvectorized", outputCol = "numericscaled", withStd = True, withMean = True)
#scaler = Normalizer(inputCol="numericvectorized", outputCol="numericscaled")

# Adding the two binary features
binary_columns = ['lastlevel', 'gender']
total_assembler = VectorAssembler(inputCols = binary_columns + ["numericscaled"], outputCol = "features")

In [22]:
#Custom F1 score evaluator that we can use instead of BinaryClassificationEvaluator() in grid search
class F1score(Evaluator):

    def __init__(self, predictionCol = "prediction", labelCol="label"):
        self.predictionCol = predictionCol
        self.labelCol = labelCol

    def _evaluate(self, dataset):
        
        # Calculate F1 score 
        tp = dataset.where((dataset.label == 1) & (dataset.prediction == 1)).count()
        fp = dataset.where((dataset.label == 0) & (dataset.prediction == 1)).count()
        tn = dataset.where((dataset.label == 0) & (dataset.prediction == 0)).count()
        fn = dataset.where((dataset.label == 1) & (dataset.prediction == 0)).count()
        
        # Add epsilon to prevent division by zero
        precision = tp / float(tp + fp + 0.00001)
        recall = tp / float(tp + fn + 0.00001)
        
        f1 = 2 * precision * recall / float(precision + recall + 0.00001)
        
        return f1

    def isLargerBetter(self):
        return True

In [23]:
# Split the data into training (plus validation) and test set 

train_plus_val, test = df_user.randomSplit([0.8, 0.2], seed = 9) 
#ntotal = df_user.count()
#ntrainval = train_plus_val.count()
#ntest = ntotal - ntrainval

In [25]:
print(ntotal)
print(ntrainval)
print(ntest)

22278
17799
4479

In [31]:
# Defining three different pipelines with three different classifiers, all with default parameters

# Pipeline with logistic regression 
#lr =  LogisticRegression()
#pipeline_lr = Pipeline(stages = [numeric_assembler, scaler, total_assembler, lr])

# Pipeline with random forest classifier
#rf = RandomForestClassifier()
#pipeline_rf = Pipeline(stages = [numeric_assembler, scaler, total_assembler, rf])

## Model Gradient Boost Tree Classifier 1

In [32]:
# Perform the grid search by fitting the grid search object
gb1 = GBTClassifier(maxDepth = 5, maxIter = 20)
pipeline_gb1 = Pipeline(stages = [numeric_assembler, scaler, total_assembler, gb1])

start = time.time()
model_gb1 = pipeline_gb1.fit(train_plus_val)
end = time.time()
print('Time spent for training:')
print(round(end-start))

Time spent for training:
2510.0

In [33]:
# Display feature importances
importances = model_gb1.stages[-1].featureImportances
importances_list = [importances[i] for i in range(len(importances))]
names = binary_columns + numeric_columns
print(names)
print(importances_list)

['lastlevel', 'gender', 'nsongs_perh', 'ntbup_perh', 'ntbdown_perh', 'nfriend_perh', 'nadvert_perh', 'nerror_perh', 'upgradedowngrade', 'songratio', 'positiveratio', 'negativeratio', 'updownratio', 'trend_songs', 'avgsessionitems', 'avgsongs']
[0.01774133563300751, 0.0007286232039209089, 0.06766983986664434, 0.0032197386588725114, 0.0914501514035554, 0.029716406544868613, 0.09803824638788945, 0.35941186881524695, 0.0221361649739096, 0.04981996403917844, 0.04519212423399973, 0.00869449171132609, 0.020951283491423053, 0.15103637455925087, 0.013870367363126513, 0.020323019113779733]

## Evaluation

In [34]:
# Obtain predictions on the test set
results_gb1 = model_gb1.transform(test)

In [35]:
# AUC score on the test set
auc_evaluator = BinaryClassificationEvaluator()
auc_gb1 = auc_evaluator.evaluate(results_gb1)
print('Model gb1 AUC score:')
print(auc_gb1)

Model gb1 AUC score:
0.94108963301

In [36]:
# F1 score on the test set
f1_evaluator = F1score()
f1score_gb1 = f1_evaluator._evaluate(results_gb1)
print('Model gb1 F1 score:')
print(f1score_gb1)

Model gb1 F1 score:
0.733178914983

## Model Gradient Boost Tree Classifier 2

In [24]:
# Perform the grid search by fitting the grid search object
gb4 = GBTClassifier(maxDepth = 5, maxIter = 200)
pipeline_gb4 = Pipeline(stages = [numeric_assembler, scaler, total_assembler, gb4])

start = time.time()
model_gb4 = pipeline_gb4.fit(train_plus_val)
end = time.time()
print('Time spent for training:')
print(round(end-start))

Time spent for training:
5861.0

In [28]:
# Display feature importances
importances = model_gb4.stages[-1].featureImportances
importances_list = [importances[i] for i in range(len(importances))]
names = binary_columns + numeric_columns
print(names)
print(importances_list)

['lastlevel', 'gender', 'nsongs_perh', 'ntbup_perh', 'ntbdown_perh', 'nfriend_perh', 'nadvert_perh', 'nerror_perh', 'upgradedowngrade', 'songratio', 'positiveratio', 'negativeratio', 'updownratio', 'trend_songs', 'avgsessionitems', 'avgsongs']
[0.013795342305289867, 0.003419130795754195, 0.05192724418392687, 0.023604481206826188, 0.18329355168409747, 0.060501586172062846, 0.0974098898402886, 0.2110401549499677, 0.02286163968506689, 0.04730318423769473, 0.04232860002653574, 0.043054343894025765, 0.036773157537074976, 0.07858267078570629, 0.046534988450475566, 0.03757003424520641]

## Evaluation

In [25]:
# Obtain predictions on the test set
results_gb4 = model_gb4.transform(test)

In [26]:
# AUC score on the test set
auc_evaluator = BinaryClassificationEvaluator()
auc_gb4 = auc_evaluator.evaluate(results_gb4)
print('Model gb4 AUC score:')
print(auc_gb4)

Model gb4 AUC score:
0.980815674238

In [27]:
# F1 score on the test set
f1_evaluator = F1score()
f1score_gb4 = f1_evaluator._evaluate(results_gb4)
print('Model gb4 F1 score:')
print(f1score_gb4)

Model gb4 F1 score:
0.855347479183