# Sparkify Project Workspace
This workspace contains a tiny subset (128MB) of the full dataset available (12GB). Feel free to use this workspace to build your project, or to explore a smaller subset with Spark before deploying your cluster on the cloud. Instructions for setting up your Spark cluster is included in the last lesson of the Extracurricular Spark Course content.

You can follow the steps below to guide your data analysis and model building portion of this project.

# Notation

In this notebook we refer to the mini_sparkify_event_data.json as the total dataset

In [1]:
# import libraries
from pyspark.sql import SparkSession, Window
from pyspark import SparkConf, SparkContext

from pyspark.sql.functions import udf, col, lit, min as Fmin, max as Fmax, sum as Fsum, expr
from pyspark.sql.types import IntegerType, DateType, FloatType, DoubleType
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import GBTClassifier, RandomForestClassifier, LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

from sklearn.metrics import f1_score, precision_score, recall_score, accuracy_score

import numpy as np
import pandas as pd

import datetime

In [2]:
# create a Spark session

configure = SparkConf() \
    .setAppName("ml_pipelines") \
    .setMaster("local") \
    .set('spark.executor.memory', '6g') \
    .set('spark.driver.memory', '4g') \
    .set('spark.executor.instances', '1') \
    .set('spark.driver.cores', '2')

spark = SparkSession.builder \
    .master("local") \
    .appName("Sparkify") \
    .config(conf=configure) \
    .getOrCreate()

# Load and Clean Dataset
In this workspace, the mini-dataset file is `mini_sparkify_event_data.json`. Load and clean the dataset, checking for invalid or missing data - for example, records without userids or sessionids. 

In [3]:
df = spark.read.json('mini_sparkify_event_data.json')
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [4]:
df.persist()

DataFrame[artist: string, auth: string, firstName: string, gender: string, itemInSession: bigint, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: bigint, sessionId: bigint, song: string, status: bigint, ts: bigint, userAgent: string, userId: string]

In [5]:
# Before checking for null values, lets get a sense of the size of the dataset

dataset_size = df.count()
print(dataset_size)

286500


In [6]:
# Check for null values in userId and sessionId
print( df.where(col("userId").isNull()).count() )
print( df.where(col("sessionId").isNull()).count() )

0
0


In [7]:
#Check for empty strings in userId and sessionId
print( df.where(col("userId") == "").count() )
print( df.where(col("sessionId") == "").count() )

8346
0


There are 8346 records where the userId is null. This is 2.9% of the total dataset ( mini_sparkify.json ). Lets remove these rows

In [8]:
df = df.filter(df.userId != "")

Nulls in other columns

In [9]:

for c in df.columns:
    null_count_c = df.where(col(c).isNull()).count()
    proportion_null = null_count_c/dataset_size
    print( c + ' : ' + str(null_count_c) + ' : ' + str(proportion_null))

artist : 50046 : 0.1746806282722513
auth : 0 : 0.0
firstName : 0 : 0.0
gender : 0 : 0.0
itemInSession : 0 : 0.0
lastName : 0 : 0.0
length : 50046 : 0.1746806282722513
level : 0 : 0.0
location : 0 : 0.0
method : 0 : 0.0
page : 0 : 0.0
registration : 0 : 0.0
sessionId : 0 : 0.0
song : 50046 : 0.1746806282722513
status : 0 : 0.0
ts : 0 : 0.0
userAgent : 0 : 0.0
userId : 0 : 0.0


Only artist, length and song seem to have nulls and also to the exact same extent. The values are likely null for page visits which dont involve a song like the home page etc.

# Exploratory Data Analysis
When you're working with the full dataset, perform EDA by loading a small subset of the data and doing basic manipulations within Spark. In this workspace, you are already provided a small subset of data you can explore.

### Define Churn

Once you've done some preliminary analysis, create a column `Churn` to use as the label for your model. I suggest using the `Cancellation Confirmation` events to define your churn, which happen for both paid and free users. As a bonus task, you can also look into the `Downgrade` events.

### Explore Data
Once you've defined churn, perform some exploratory data analysis to observe the behavior for users who stayed vs users who churned. You can start by exploring aggregates on these two groups of users, observing how much of a specific action they experienced per a certain time unit or number of songs played.

# Basic understanding of data

In [10]:
# Lets look at 2 rows

df.take(2)

[Row(artist='Martha Tilston', auth='Logged In', firstName='Colin', gender='M', itemInSession=50, lastName='Freeman', length=277.89016, level='paid', location='Bakersfield, CA', method='PUT', page='NextSong', registration=1538173362000, sessionId=29, song='Rockpools', status=200, ts=1538352117000, userAgent='Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0', userId='30'),
 Row(artist='Five Iron Frenzy', auth='Logged In', firstName='Micah', gender='M', itemInSession=79, lastName='Long', length=236.09424, level='free', location='Boston-Cambridge-Newton, MA-NH', method='PUT', page='NextSong', registration=1538331630000, sessionId=8, song='Canada', status=200, ts=1538352180000, userAgent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.103 Safari/537.36"', userId='9')]

In [11]:
# Timespan of the data

print( 'Min timestamp = ' + str( datetime.datetime.utcfromtimestamp( df.agg(Fmin(col("ts"))).collect()[0][0]/1000  ).strftime('%c') ) )
print( 'Max timestamp = ' + str( datetime.datetime.utcfromtimestamp( df.agg(Fmax(col("ts"))).collect()[0][0]/1000  ).strftime('%c') ) )

Min timestamp = Mon Oct  1 00:01:57 2018
Max timestamp = Mon Dec  3 01:11:16 2018


In UTC the data given is from 1-Oct-2018 to the early hours of 3-Dec-2018.

Effectively the data given is for a period of 2 months ( 63 days to be precise )

In [12]:
# Number of users

df.select("userId").dropDuplicates().count()

225

In [13]:
# Number of sessions

df.select("sessionId").dropDuplicates().count()

2312

In [14]:
# Distinct values taken by different categorical variables at the app level and their respective number of occurences in the data

# The following fields are excluded here since they are user activity specific. serId, sessionId, artist

categorical_variables = [ 'auth' , 'gender', 'level', 'method', 'page', 'status' ]

for cvar in categorical_variables:
    print(cvar)
#     df.select(cvar).dropDuplicates().sort(cvar).show(100, False)
    df.groupby(cvar).count().sort('count', ascending = False).show(100, False)

auth
+---------+------+
|auth     |count |
+---------+------+
|Logged In|278102|
|Cancelled|52    |
+---------+------+

gender
+------+------+
|gender|count |
+------+------+
|F     |154578|
|M     |123576|
+------+------+

level
+-----+------+
|level|count |
+-----+------+
|paid |222433|
|free |55721 |
+-----+------+

method
+------+------+
|method|count |
+------+------+
|PUT   |257818|
|GET   |20336 |
+------+------+

page
+-------------------------+------+
|page                     |count |
+-------------------------+------+
|NextSong                 |228108|
|Thumbs Up                |12551 |
|Home                     |10082 |
|Add to Playlist          |6526  |
|Add Friend               |4277  |
|Roll Advert              |3933  |
|Logout                   |3226  |
|Thumbs Down              |2546  |
|Downgrade                |2055  |
|Settings                 |1514  |
|Help                     |1454  |
|Upgrade                  |499   |
|About                    |495   |
|Save Sett

In the status field the 404's are interesting and could potentially have a correlation with users dropping out

Most of our events seem to be for listening to the next song

The GET requests are likely for Home page visits etc.

Most events are from paid users

auth=Cancelled seems to refer to users who have cancelled their subscription.

In [15]:
# Number of songs and artists

print( 'Num songs: ' + str(df.select("song").drop_duplicates().count()))
print( 'Num artists: ' + str(df.select("artist").drop_duplicates().count()) )

Num songs: 58481
Num artists: 17656


# Defining churn

We define churn as a user cancelling their service. If we see a "Cancellation Confirmation" event for a user, then we would consider that user to have churned

Given our definition of churn, downgrades of service from paid to free tier could be potential indicators of churn too.

In [16]:
churn = udf(lambda x: int(x == 'Cancellation Confirmation'), IntegerType())
downgrade_churn = udf(lambda x: int(x == 'Submit Downgrade'), IntegerType())

df = df.withColumn("downgraded", downgrade_churn("page")).withColumn("cancelled", churn("page"))

In [17]:
# Lets compute the number of users who downgraded or cancelled

# We first compute the status of each user
signum = udf(lambda x: int(x>0), IntegerType())
# The signum function is used since there are users who have multiple downgrade events.

user_status = df.select(["userId", "downgraded", "cancelled"]) \
    .groupby("userId").sum() \
    .withColumnRenamed("sum(downgraded)" , "sum_downgraded") \
    .withColumnRenamed("sum(cancelled)" , "sum_cancelled") \

user_status = user_status \
    .withColumn('downgraded', signum(col("sum_downgraded"))) \
    .withColumn('cancelled', signum(col("sum_cancelled"))) \
    .drop('sum_downgraded').drop('sum_cancelled')

In [18]:
user_status.show(10)

+------+----------+---------+
|userId|downgraded|cancelled|
+------+----------+---------+
|100010|         0|        0|
|200002|         0|        0|
|   125|         0|        1|
|    51|         0|        1|
|   124|         0|        0|
|     7|         0|        0|
|    54|         1|        1|
|    15|         0|        0|
|   155|         0|        0|
|   132|         0|        0|
+------+----------+---------+
only showing top 10 rows



Its interesting that userId 54 both downgraded and cancelled. Some users seem to have directly cancelled without downgrading.

In [19]:
user_status.agg({ 'downgraded' : 'sum', 'cancelled': 'sum'}).show()

+---------------+--------------+
|sum(downgraded)|sum(cancelled)|
+---------------+--------------+
|             49|            52|
+---------------+--------------+



We have 49 users who downgraded and 52 who cancelled

One must keep in mind that the total number of users is 225. This means 21.7% of users downgraded and 23% cancelled. These numbers are quite high, hence its imperative that we identify these users upfront, address their needs and incentivise them to stay on the platform.

In [20]:
# Of the users who cancelled whats the split of tier of service ( paid/free )

df.filter(df.page == 'Cancellation Confirmation').select(["userId", "level"]).drop_duplicates().groupby("level").count().show()

+-----+-----+
|level|count|
+-----+-----+
| free|   21|
| paid|   31|
+-----+-----+



Out of the 52 who cancelled, 21 are free and 31 are paid users

In [21]:
# How many users cancelled without downgrading

user_status.where(col("cancelled") == 1).groupby("downgraded").count().show()

+----------+-----+
|downgraded|count|
+----------+-----+
|         1|    9|
|         0|   43|
+----------+-----+



Of the 52 users who cancelled, 43 of them directly cancelled and only 9 of them downgraded before cancelling

# Feature Engineering
Once you've familiarized yourself with the data, build out the features you find promising to train your model on. To work with the full dataset, you can follow the following steps.
- Write a script to extract the necessary features from the smaller subset of data
- Ensure that your script is scalable, using the best practices discussed in Lesson 3
- Try your script on the full data set, debugging your script if necessary

If you are working in the classroom workspace, you can just extract features based on the small subset of data contained here. Be sure to transfer over this work to the larger dataset when you work on your Spark cluster.

In [22]:
# Marking the time and phase of churn for each user
# This will help us find metrics before and after churn for users


window = Window().partitionBy("userId").orderBy("ts").rangeBetween(Window.unboundedPreceding, 0)
df = df.withColumn("churn_phase", Fsum("cancelled").over(window)).withColumn("downgrade_phase", Fsum("downgraded").over(window))

In [23]:
df.agg( Fmax(col("downgrade_phase")) ).show()

# This is happening since there are users who downgraded multiple times. These users also upgraded multiple times. Hence the data here is not erroneous

+--------------------+
|max(downgrade_phase)|
+--------------------+
|                   3|
+--------------------+



In [24]:
df.where(df.page == "NextSong").select(["userId", "level", "ts"]).groupby( [ "level", "userId" ] ).count().groupby("level").mean().show()

+-----+------------------+
|level|        avg(count)|
+-----+------------------+
| free|215.33846153846153|
| paid|1134.8597560975609|
+-----+------------------+



In [25]:
# Number of songs per session between paid and free

#Lets first compute average number of songs per session for each user

user_avg_songs_per_session = df.where(df.page == "NextSong") \
    .select(["userId", "level", "sessionId"]) \
    .groupby( [ "level", "userId", "sessionId" ] ).count() \
    .withColumnRenamed("count", "numSongsPerSession") \
    .groupby( [ "level", "userId" ] ).agg( { "numSongsPerSession" : "avg"}) \
    .withColumnRenamed("avg(numSongsPerSession)", "avgSongsPerSession")

# Now lets average across all users for free and paid respectively
user_avg_songs_per_session.groupby("level").mean() \
    .withColumnRenamed("avg(avgSongsPerSession)", "avgSongsPerSession") \
    .show()

+-----+------------------+
|level|avgSongsPerSession|
+-----+------------------+
| free| 31.28982093144803|
| paid|102.62525766710827|
+-----+------------------+



That is a stark difference. Since ads are played in the free tier, people might listen to lesser songs. Which also means if people dial down their usage due to some reason, then they might have a strong reason to hang on to the subscription. It is possible that people for whom the avg number of songs per session decreases beyond a point, there is a good chance of churn

In [26]:
# Number of songs per day across free and paid

# We first convert the timestamp to date ( in UTC ) and add it to the dataframe

ts_date = udf(lambda x: datetime.datetime.utcfromtimestamp(x/1000), DateType() )
df = df.withColumn("date", ts_date(col("ts")))

#Compute number of songs per day per user

user_avg_songs_per_day = df.where(df.page == "NextSong") \
    .select(["userId", "level", "date"]) \
    .groupby( [ "level", "userId", "date" ] ).count() \
    .withColumnRenamed("count", "numSongsPerDay") \
    .groupby( [ "level", "userId" ] ).agg( { "numSongsPerDay" : "avg"}) \
    .withColumnRenamed("avg(numSongsPerDay)", "avgSongsPerDay")

# Now lets average across all users for free and paid respectively
user_avg_songs_per_day.groupby("level").mean() \
    .withColumnRenamed("avg(avgSongsPerDay)", "avgSongsPerDay") \
    .show()

+-----+------------------+
|level|    avgSongsPerDay|
+-----+------------------+
| free|32.318292019819815|
| paid| 87.08996179115817|
+-----+------------------+



In [27]:
# Number of 404s per user

user_pagenotfound = df.filter(df.status == 404).select("userId", "status", "ts").groupby("userId").count().withColumnRenamed("count", "pagenotfound_instances")

user_pagenotfound.agg( Fmax(col("pagenotfound_instances")), Fmin(col("pagenotfound_instances"))  ).show()

+---------------------------+---------------------------+
|max(pagenotfound_instances)|min(pagenotfound_instances)|
+---------------------------+---------------------------+
|                          7|                          1|
+---------------------------+---------------------------+



7 404 errors in a span of 63 days seems a bit. Lets see what the distribution is like

In [28]:
# Distribution of 404 instances

user_pagenotfound.groupby("pagenotfound_instances").count().sort("pagenotfound_instances").show()

+----------------------+-----+
|pagenotfound_instances|count|
+----------------------+-----+
|                     1|   49|
|                     2|   32|
|                     3|   20|
|                     4|    8|
|                     5|    4|
|                     6|    1|
|                     7|    3|
+----------------------+-----+



Amongst the few users who do see 404 errors, most of them see it only once. The number quickly drops after 3 instances

In [29]:
# Number of errors per user

user_error_pages = df.filter(df.page == "Error").select("userId", "status", "ts").groupby("userId").count().withColumnRenamed("count", "error_instances")

user_error_pages.agg( Fmax(col("error_instances")), Fmin(col("error_instances"))  ).show()

+--------------------+--------------------+
|max(error_instances)|min(error_instances)|
+--------------------+--------------------+
|                   7|                   1|
+--------------------+--------------------+



In [30]:
user_error_pages.groupby("error_instances").count().sort("error_instances").show()

+---------------+-----+
|error_instances|count|
+---------------+-----+
|              1|   49|
|              2|   32|
|              3|   20|
|              4|    8|
|              5|    4|
|              6|    1|
|              7|    3|
+---------------+-----+



We see that error and 404 instances have the same distribution. Its very likely that 404s are equivalent to errors. One could prove this using queries, but I will choose to accept this for now.

In [31]:
# Thumbs up and thumbs down per user

user_thumbsup_perday = df.filter(df.page == "Thumbs Up") \
    .select("userId", "date") \
    .groupby( [ "userId", "date" ]) \
    .count() \
    .withColumnRenamed("count", "thumbsup") \
    .groupby("userId").mean() \
    .withColumnRenamed("avg(thumbsup)", "avg_thumbsup_per_day")
    
user_thumbsdown_perday = df.filter(df.page == "Thumbs Down") \
    .select("userId", "date") \
    .groupby( [ "userId", "date" ]) \
    .count() \
    .withColumnRenamed("count", "thumbsdown") \
    .groupby("userId").mean() \
    .withColumnRenamed("avg(thumbsdown)", "avg_thumbsdown_per_day")

We can combine user_avg_songs_per_day with user_thumbsup_perday and user_thumbsdown_perday to compute proportion of songs the user thumbs up's or down's. We will do this when we combine all the features

In [32]:
# Number of help page visits

user_help_vists = df.filter(df.page == "Help").select("userId", "status", "ts").groupby("userId").count().withColumnRenamed("count", "help_visits")

user_help_vists.agg( Fmax(col("help_visits")), Fmin(col("help_visits"))  ).show()

+----------------+----------------+
|max(help_visits)|min(help_visits)|
+----------------+----------------+
|              46|               1|
+----------------+----------------+



In [33]:
user_help_vists.groupby("help_visits").count().sort("help_visits").show(50)

+-----------+-----+
|help_visits|count|
+-----------+-----+
|          1|   34|
|          2|   19|
|          3|   15|
|          4|   11|
|          5|   13|
|          6|   11|
|          7|   18|
|          8|    7|
|          9|   14|
|         10|    7|
|         11|    2|
|         12|    7|
|         13|    5|
|         14|    2|
|         15|    5|
|         16|    4|
|         17|    2|
|         18|    2|
|         19|    3|
|         20|    1|
|         23|    2|
|         24|    2|
|         27|    1|
|         28|    1|
|         30|    1|
|         34|    1|
|         40|    1|
|         46|    1|
+-----------+-----+



The distribution of help visits is rather skewed towards lower numbers. People who have higher number of help page visits might have a higher propensity to churn

In [34]:
# Number of friend referrals

user_num_friends = df.select(["userId", "page"]) \
    .filter(df.page == "Add Friend") \
    .groupby("userId") \
    .count() \
    .withColumnRenamed("count", "numFriendAdditions")

In [35]:
# Abstract the data fetching from disk and basic cleanup into a function

def obtain_data(filepath):
    '''
    Given a file path:
    
    1. Read the events json file
    2. Remove records with null or empty string userId's and sessionId's
    '''
    df = spark.read.json(filepath=filepath)
    df = df.where( (df.userId.isNotNull()) & (df.sessionId.isNotNull()) & (df.userId != "") & (df.sessionId != "") )
    return df    

# Combine all the feature engineering and put in a single function

def feature_engineering(events_df):
    '''
    Given an events spark data frame, compute the necessary features for
    predicting churn using machine learning algorithms
    
    Inputs:
        events_df: Spark data frame with the structure        
         |-- artist: string (nullable = true)
         |-- auth: string (nullable = true)
         |-- firstName: string (nullable = true)
         |-- gender: string (nullable = true)
         |-- itemInSession: long (nullable = true)
         |-- lastName: string (nullable = true)
         |-- length: double (nullable = true)
         |-- level: string (nullable = true)
         |-- location: string (nullable = true)
         |-- method: string (nullable = true)
         |-- page: string (nullable = true)
         |-- registration: long (nullable = true)
         |-- sessionId: long (nullable = true)
         |-- song: string (nullable = true)
         |-- status: long (nullable = true)
         |-- ts: long (nullable = true)
         |-- userAgent: string (nullable = true)
         |-- userId: string (nullable = true)            
    
    Output:
        df_feature: Spark data frame with the following structure
        root
         |-- userId: string (nullable = true)
         |-- downgraded: integer (nullable = true)
         |-- cancelled: integer (nullable = true)
         |-- avgSongsPerSession: double (nullable = true)
         |-- avgSongsPerDay: double (nullable = true)
         |-- avg_thumbsup_per_day: double (nullable = true)
         |-- avg_thumbsup_per_day: double (nullable = true)
         |-- error_instances: long (nullable = false)
         |-- help_visits: long (nullable = false)
         |-- numFriendAdditions: long (nullable = false)
    '''
    
    churn = udf(lambda x: int(x == 'Cancellation Confirmation'), IntegerType())
    downgrade_churn = udf(lambda x: int(x == 'Submit Downgrade'), IntegerType())    
    signum = udf(lambda x: int(x>0), IntegerType())
    ts_date = udf(lambda x: datetime.datetime.utcfromtimestamp(x/1000), DateType() )
    proportion = udf(lambda x,y : x/y, FloatType())
    
    ## Base transformations to input df
    events_df = events_df.withColumn("date", ts_date(col("ts")))       
       
    ## User level features    
    
    # Status of each user ( target variable )
    user_status = events_df.select(["userId", "downgraded", "cancelled"]) \
        .groupby("userId").sum() \
        .withColumnRenamed("sum(downgraded)" , "sum_downgraded") \
        .withColumnRenamed("sum(cancelled)" , "sum_cancelled") \

    user_status = user_status \
        .withColumn('downgraded', signum(col("sum_downgraded"))) \
        .withColumn('cancelled', signum(col("sum_cancelled"))) \
        .drop('sum_downgraded').drop('sum_cancelled')
    # The signum function is used since there are users who have multiple downgrade events.
        
    #Number of songs per session per user
    user_avg_songs_per_session = events_df.where(events_df.page == "NextSong") \
        .select(["userId", "sessionId"]) \
        .groupby( [ "userId", "sessionId" ] ).count() \
        .withColumnRenamed("count", "numSongsPerSession") \
        .groupby( [ "userId" ] ).agg( { "numSongsPerSession" : "avg"}) \
        .withColumnRenamed("avg(numSongsPerSession)", "avgSongsPerSession")
    
    #Number of songs per day per user
    user_avg_songs_per_day = events_df.where(events_df.page == "NextSong") \
        .select(["userId", "date"]) \
        .groupby( [ "userId", "date" ] ).count() \
        .withColumnRenamed("count", "numSongsPerDay") \
        .groupby( [ "userId" ] ).agg( { "numSongsPerDay" : "avg"}) \
        .withColumnRenamed("avg(numSongsPerDay)", "avgSongsPerDay")
    
    #Number of thumbs ups per day per user
    user_thumbsup_perday = events_df.filter(events_df.page == "Thumbs Up") \
        .select("userId", "date") \
        .groupby( [ "userId", "date" ]) \
        .count() \
        .withColumnRenamed("count", "thumbsup") \
        .groupby("userId").mean() \
        .withColumnRenamed("avg(thumbsup)", "avg_thumbsup_per_day")
    
    #Number of thumbs down per day per user
    user_thumbsdown_perday = events_df.filter(events_df.page == "Thumbs Down") \
        .select("userId", "date") \
        .groupby( [ "userId", "date" ]) \
        .count() \
        .withColumnRenamed("count", "thumbsdown") \
        .groupby("userId").mean() \
        .withColumnRenamed("avg(thumbsdown)", "avg_thumbsdown_per_day")    

    #Number of error pages encountered by user
    user_error_pages = events_df.filter(events_df.page == "Error") \
        .select("userId", "status", "ts") \
        .groupby("userId") \
        .count() \
        .withColumnRenamed("count", "error_instances")
    
    # Number of help page visits
    user_help_vists = events_df.filter(events_df.page == "Help") \
        .select("userId", "status", "ts") \
        .groupby("userId") \
        .count() \
        .withColumnRenamed("count", "help_visits")
    
    # Number of friend additions per user
    user_num_friends = events_df.select(["userId", "page"]) \
        .filter(df.page == "Add Friend") \
        .groupby("userId") \
        .count() \
        .withColumnRenamed("count", "numFriendAdditions")
    
    df_features = user_status \
        .join(user_avg_songs_per_session, on = "userId" , how = 'left') \
        .join(user_avg_songs_per_day, on = "userId", how = 'left') \
        .join(user_thumbsup_perday, on = "userId", how = 'left') \
        .join(user_thumbsdown_perday, on = "userId", how = 'left') \
        .join(user_error_pages, on = "userId", how = 'left') \
        .join(user_help_vists, on = "userId", how = 'left') \
        .join(user_num_friends, on = "userId", how = 'left')
    
    # Renaming the target column to label and casting it to DoubleType
    df_features = df_features.withColumn( "label", df_features.cancelled.cast(DoubleType())).drop("cancelled")
    
    return df_features

In [36]:
df_features = feature_engineering(df)

In [37]:
df_features.show(10)

+------+----------+------------------+------------------+--------------------+----------------------+---------------+-----------+------------------+-----+
|userId|downgraded|avgSongsPerSession|    avgSongsPerDay|avg_thumbsup_per_day|avg_thumbsdown_per_day|error_instances|help_visits|numFriendAdditions|label|
+------+----------+------------------+------------------+--------------------+----------------------+---------------+-----------+------------------+-----+
|100010|         0|39.285714285714285|39.285714285714285|  2.8333333333333335|                  1.25|           null|          2|                 4|  0.0|
|200002|         0|              64.5|55.285714285714285|                 3.0|                   3.0|           null|          2|                 4|  0.0|
|   125|         0|               8.0|               8.0|                null|                  null|           null|       null|              null|  1.0|
|    51|         0|             211.1| 162.3846153846154|   8.33333333

In [38]:
df_features.printSchema()

root
 |-- userId: string (nullable = true)
 |-- downgraded: integer (nullable = true)
 |-- avgSongsPerSession: double (nullable = true)
 |-- avgSongsPerDay: double (nullable = true)
 |-- avg_thumbsup_per_day: double (nullable = true)
 |-- avg_thumbsdown_per_day: double (nullable = true)
 |-- error_instances: long (nullable = true)
 |-- help_visits: long (nullable = true)
 |-- numFriendAdditions: long (nullable = true)
 |-- label: double (nullable = true)



In [39]:
def feature_scaling(df_features):
    feature_cols = df_features.drop('userId', 'label').columns
    #Replace nulls with zero for columns in feature_cols
    df_features = df_features.fillna(0, subset = feature_cols )
    
    assembler = VectorAssembler(inputCols=feature_cols, outputCol="FeatureVector")
    df_features = assembler.transform(df_features)
    
    scaler = StandardScaler(withMean = True, inputCol = "FeatureVector",  outputCol = "ScaledFeatureVector")
    scalerModel = scaler.fit(df_features)
    df_features = scalerModel.transform(df_features)
    df_features_scaled = df_features.select(["userId", "label", "ScaledFeatureVector"])    
    
    return df_features_scaled

In [40]:
df_features_scaled = feature_scaling(df_features)

In [41]:
df_features_scaled.persist()

DataFrame[userId: string, label: double, ScaledFeatureVector: vector]

In [42]:
df_features_scaled.show(10, False)

+------+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId|label|ScaledFeatureVector                                                                                                                                              |
+------+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|100010|0.0  |[-0.5264710031632485,-0.7392637500378801,-0.783710508690406,-0.7321501459351433,-0.36390803366118396,-0.7605559300887401,-0.6161090451316099,-0.7292340618073143]|
|200002|0.0  |[-0.5264710031632485,-0.14759257929335814,-0.2848591481704853,-0.6465812558359041,1.8888671790512386,-0.7605559300887401,-0.6161090451316099,-0.7292340618073143]|
|125   |1.0  |[-0.5264710031632485,-1.473405316004171,-1.7591430797070366,-2.1868212776222102,-1.9730331855986287,-

In [43]:
df_features_scaled.count()

# We have a row for each user in our dataset

225

# Modeling
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.

## Train test split

In [44]:
train, validation = df_features_scaled.randomSplit([0.70, 0.30], seed = 42)
# test, validation = rest.randomSplit([0.50, 0.50], seed = 42)

Lets see the distribution of churned an non-churned users in both the training and validation sets

In [45]:
train.groupby("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|  0.0|  127|
|  1.0|   36|
+-----+-----+



In [46]:
validation.groupby("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|  0.0|   46|
|  1.0|   16|
+-----+-----+



In both datasets about 22-25% of the users have churned

## Functions for find the best threshold and evaluating a given model

In [47]:
def find_best_threshold_by_f1_score(preds, labelCol = 'label', probabilityCol = 'probability', steps = 1000):
    '''
        Given prediction probabilities by a classiification model, find the best threshold that maximizes fscore
        
        Input:
            preds - Spark dataframe with the following schema            
             |-- userId: string (nullable = true)
             |-- label: double (nullable = true)
             |-- ScaledFeatureVector: vector (nullable = true)
             |-- rawPrediction: vector (nullable = true)
             |-- probability: vector (nullable = true)
             |-- prediction: double (nullable = false)
        
        Output:
            best_threshold - Best threshold in [0,1] that maximizes fscore
            best_fscore - Best fscore achieved using the best_threshold
    '''
    labels = preds[labelCol]
    probability = preds[probabilityCol]
    fscores = []
    thresholds = np.linspace(0,1,steps)
    
    for thresh in thresholds:
        pred_labels = probability.apply(lambda x: np.float64(x > thresh))
        fscores.append( f1_score(labels, pred_labels) )
    
    return ( thresholds[ np.argmax(fscores) ], np.max(fscores) )

In [48]:
def evaluate_model(model, train, validation):
    '''
    Input:
        ML model after fitting
        
        Given a model we find the best threshold that gives the best f-score on the valdation data set
        Then we take this best threshold and apply that on the model predictions of the test data set.
        On this set set of predictions on the test set we compute precision, recall, f-score etc.
        
        We also compute area under the ROC curve for train, validation and test datasets
    
    Output:
        Metrics:
            AUC for train, validation and test            
            Best threshold based on validation
            FScore using best threshold on validation, test            
    '''
    
    model_preds_train = model.transform(train)
    model_preds_valid = model.transform(validation)
    
    evaluator_roc = BinaryClassificationEvaluator(metricName = 'areaUnderROC')
    auc_train = evaluator_roc.evaluate(model_preds_train)
    auc_valid = evaluator_roc.evaluate(model_preds_valid)
    
    model_preds_train_df = model_preds_train.select( [ 'userId', 'label', 'probability', 'prediction' ] ).toPandas()
    
    model_train_fscore = f1_score( model_preds_train_df.label, model_preds_train_df.prediction)
    model_train_precision = precision_score( model_preds_train_df.label, model_preds_train_df.prediction)
    model_train_recall = recall_score( model_preds_train_df.label, model_preds_train_df.prediction)
    model_train_accuracy = accuracy_score( model_preds_train_df.label, model_preds_train_df.prediction)
    
    model_preds_valid_df = model_preds_valid.select( [ 'userId', 'label', 'probability', 'prediction' ] ).toPandas()  
    
    model_valid_fscore = f1_score( model_preds_valid_df.label, model_preds_valid_df.prediction)
    model_valid_precision = precision_score( model_preds_valid_df.label, model_preds_valid_df.prediction)
    model_valid_recall = recall_score( model_preds_valid_df.label, model_preds_valid_df.prediction)
    model_valid_accuracy = accuracy_score( model_preds_valid_df.label, model_preds_valid_df.prediction)    
    
    metrics = {
        'train_auc' : auc_train,
        'valid_auc' : auc_valid,        
        'train_precision' : model_train_precision,
        'train_recall' : model_train_recall,
        'train_fscore' : model_train_fscore,
        'train_accuracy': model_train_accuracy,        
        'valid_precision' : model_valid_precision,
        'valid_recall' : model_valid_recall,
        'valid_fscore' : model_valid_fscore,
        'valid_accuracy': model_valid_accuracy
    }
    
    return metrics  

## Experimenting with different classifiers

### Random forest classifier

In [49]:
%%time
# Random forest classifier
rf = RandomForestClassifier(
        featuresCol = "ScaledFeatureVector", 
        labelCol = 'label', 
        maxMemoryInMB = 1000, 
        seed = 42
    )
rf_model = rf.fit(train)

CPU times: user 172 ms, sys: 56 ms, total: 228 ms
Wall time: 20.6 s


In [50]:
rf_model.transform(train).printSchema()

root
 |-- userId: string (nullable = true)
 |-- label: double (nullable = true)
 |-- ScaledFeatureVector: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [51]:
rf_model_metrics = evaluate_model(rf_model, train, validation)

In [52]:
rf_model_metrics

{'train_auc': 0.9857830271216097,
 'valid_auc': 0.6671195652173915,
 'train_precision': 1.0,
 'train_recall': 0.4166666666666667,
 'train_fscore': 0.5882352941176471,
 'train_accuracy': 0.8711656441717791,
 'valid_precision': 1.0,
 'valid_recall': 0.125,
 'valid_fscore': 0.2222222222222222,
 'valid_accuracy': 0.7741935483870968}

### Gradient boosted tree classifier

In [54]:
%%time
# Gradient boosted trees classifier
gbt = GBTClassifier(
        featuresCol = "ScaledFeatureVector", 
        labelCol = 'label', 
        maxMemoryInMB = 1000, 
        seed = 42,
    )
gbt_model = gbt.fit(train)

CPU times: user 2.41 s, sys: 724 ms, total: 3.14 s
Wall time: 2min 25s


In [55]:
gbt_model_metrics = evaluate_model(gbt_model, train, validation)

In [56]:
gbt_model_metrics

{'train_auc': 0.9997812773403325,
 'valid_auc': 0.7472826086956519,
 'train_precision': 1.0,
 'train_recall': 0.9722222222222222,
 'train_fscore': 0.9859154929577464,
 'train_accuracy': 0.9938650306748467,
 'valid_precision': 0.6,
 'valid_recall': 0.375,
 'valid_fscore': 0.4615384615384615,
 'valid_accuracy': 0.7741935483870968}

### Logistic regression

In [58]:
%%time

lr = LogisticRegression(featuresCol = "ScaledFeatureVector")
lr_model = lr.fit(train)

CPU times: user 500 ms, sys: 128 ms, total: 628 ms
Wall time: 19.6 s


In [59]:
lr_model_metrics = evaluate_model(lr_model, train, validation)

In [60]:
lr_model_metrics

{'train_auc': 0.8018372703412077,
 'valid_auc': 0.6480978260869564,
 'train_precision': 0.625,
 'train_recall': 0.2777777777777778,
 'train_fscore': 0.3846153846153846,
 'train_accuracy': 0.803680981595092,
 'valid_precision': 0.5,
 'valid_recall': 0.125,
 'valid_fscore': 0.2,
 'valid_accuracy': 0.7419354838709677}

In all these models we can observe heavy overfitting. The train AUC's are very close to 1 while the validation and test AUCs are closer to 0.5

Lets tune the parameters of the models and try to reduce the overfitting using cross validation

### Random forest classifier with hyper parameter tuning

In [61]:
%%time
# Random forest with cross validation
rf_paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [5,10,20]) \
    .addGrid( rf.minInstancesPerNode, [5, 10, 20] ) \
    .build()

crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=rf_paramGrid,
                          evaluator=BinaryClassificationEvaluator(metricName = 'areaUnderROC'),
                          numFolds=3)
rf_cv_model = crossval.fit(train)

CPU times: user 5.11 s, sys: 1.54 s, total: 6.65 s
Wall time: 5min 27s


In [62]:
rf_cv_model_metrics = evaluate_model(rf_cv_model, train, validation)

  'precision', 'predicted', average, warn_for)
  'precision', 'predicted', average, warn_for)


In [63]:
rf_cv_model_metrics

{'train_auc': 0.7725284339457572,
 'valid_auc': 0.688858695652174,
 'train_precision': 0.0,
 'train_recall': 0.0,
 'train_fscore': 0.0,
 'train_accuracy': 0.7791411042944786,
 'valid_precision': 0.0,
 'valid_recall': 0.0,
 'valid_fscore': 0.0,
 'valid_accuracy': 0.7419354838709677}

In [65]:
# lets see what parameters the best model took

rf_cv_model.bestModel.extractParamMap()

{Param(parent='RandomForestClassifier_dab8a114eff7', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees.'): False,
 Param(parent='RandomForestClassifier_dab8a114eff7', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext'): 10,
 Param(parent='RandomForestClassifier_dab8a114eff7', name='featureSubsetStrategy', doc='The number of features to consider for splits at each tree node. Supported options: auto, all, onethird, sqrt, log2, (0.0-1.0], [1-n].'): 'auto',
 Param(parent='RandomForestClassifier_dab8a114eff7', name='featuresCol', doc='features column name'): 'ScaledFeatureVector',
 Param(parent='RandomForestClassifier_dab8

### Gradient boosted tree classifier with hyper parameter tuning

In [66]:
%%time
# Gradient boosted trees with cross validation
gbt_paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [5,10]) \
    .addGrid(gbt.minInstancesPerNode, [5, 15] ) \
    .build()

gbt_crossval = CrossValidator(estimator=gbt,
                          estimatorParamMaps=gbt_paramGrid,
                          evaluator=BinaryClassificationEvaluator(metricName = 'areaUnderROC'),
                          numFolds=3)

gbt_cv_model = gbt_crossval.fit(train)

CPU times: user 36.4 s, sys: 12.2 s, total: 48.5 s
Wall time: 35min 15s


In [67]:
gbt_cv_model_metrics = evaluate_model(gbt_cv_model, train, validation)

In [68]:
gbt_cv_model_metrics

{'train_auc': 0.9993438320209973,
 'valid_auc': 0.6168478260869564,
 'train_precision': 1.0,
 'train_recall': 0.9444444444444444,
 'train_fscore': 0.9714285714285714,
 'train_accuracy': 0.9877300613496932,
 'valid_precision': 0.5714285714285714,
 'valid_recall': 0.25,
 'valid_fscore': 0.34782608695652173,
 'valid_accuracy': 0.7580645161290323}

In [69]:
# Lets see what the best models parameters
gbt_cv_model.bestModel.extractParamMap()

{Param(parent='GBTClassifier_68f407fe1214', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees.'): False,
 Param(parent='GBTClassifier_68f407fe1214', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext'): 10,
 Param(parent='GBTClassifier_68f407fe1214', name='featureSubsetStrategy', doc='The number of features to consider for splits at each tree node. Supported options: auto, all, onethird, sqrt, log2, (0.0-1.0], [1-n].'): 'all',
 Param(parent='GBTClassifier_68f407fe1214', name='featuresCol', doc='features column name'): 'ScaledFeatureVector',
 Param(parent='GBTClassifier_68f407fe1214', name='labelCol', doc='label column 

### Logistic regression  with hyper parameter tuning

In [70]:
%%time
# Logistic regression with cross validation
lr_paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.0, 0.1, 0.5, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.05, 0.1] ) \
    .build()

lr_crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=lr_paramGrid,
                          evaluator=BinaryClassificationEvaluator(metricName = 'areaUnderROC'),
                          numFolds=3)

lr_cv_model = lr_crossval.fit(train)

CPU times: user 11.1 s, sys: 3.87 s, total: 15 s
Wall time: 7min 52s


In [71]:
lr_cv_model_metrics = evaluate_model(lr_cv_model, train, validation)

In [72]:
lr_cv_model_metrics

{'train_auc': 0.8018372703412077,
 'valid_auc': 0.6480978260869564,
 'train_precision': 0.625,
 'train_recall': 0.2777777777777778,
 'train_fscore': 0.3846153846153846,
 'train_accuracy': 0.803680981595092,
 'valid_precision': 0.5,
 'valid_recall': 0.125,
 'valid_fscore': 0.2,
 'valid_accuracy': 0.7419354838709677}

## Comparing different models

In [73]:
all_metrics = pd.DataFrame( { 
    'Base RF': rf_model_metrics, 
    'CV RF' : rf_cv_model_metrics, 
    'Base GBT' : gbt_model_metrics, 
    'CV GBT' : gbt_cv_model_metrics,
    'Base LR' : lr_model_metrics,
    'CV LR' : lr_cv_model_metrics
}).sort_index()
all_metrics

Unnamed: 0,Base RF,CV RF,Base GBT,CV GBT,Base LR,CV LR
train_accuracy,0.871166,0.779141,0.993865,0.98773,0.803681,0.803681
train_auc,0.985783,0.772528,0.999781,0.999344,0.801837,0.801837
train_fscore,0.588235,0.0,0.985915,0.971429,0.384615,0.384615
train_precision,1.0,0.0,1.0,1.0,0.625,0.625
train_recall,0.416667,0.0,0.972222,0.944444,0.277778,0.277778
valid_accuracy,0.774194,0.741935,0.774194,0.758065,0.741935,0.741935
valid_auc,0.66712,0.688859,0.747283,0.616848,0.648098,0.648098
valid_fscore,0.222222,0.0,0.461538,0.347826,0.2,0.2
valid_precision,1.0,0.0,0.6,0.571429,0.5,0.5
valid_recall,0.125,0.0,0.375,0.25,0.125,0.125


## Best model

The base GBT is the best performing model when comparing using validation fscore.

One common thing noticeable across all models is that they have high precision but rather low recall. In this context, it means if the model predicts a user will churn, then its very likely the user will churn indeed. Poor recall on the other hand if a user were to churn then the model may not predict that the user would churn.

## Potential improvements

The current model has high precision but poor recall. Increasing recall is certainly an area for improvement. High precision is useful since likely an organization will spend some money to incentivize the people whom the model predicts will leave. Hence having high precision will ensure the organization's capital is used effectively. However poor recall will imply many users will churn without the organization predicting upfront. This would lead to a loss in the user base and decreased revenue.

On the feature development part many more things could be done. Some potential feature additions are:

1. Current level of a user
<br>Is the user a paid or a free user. Given paid users seem to churn more, this could be an interesting feature
2. Has the user downgraded/upgraded in the past, how many times
3. Time since upgrade
4. Number of additions to playlist
5. Number of advertisement roll outs
6. Average time spent in a session