In [1]:
# Starter code
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, count, when, col, desc, udf, col, sort_array, asc, avg, lit, countDistinct
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType

from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml import Pipeline

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
2,application_1585230776778_0003,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
# Create spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
# Read in full sparkify dataset
event_data = "s3n://udacity-dsnd/sparkify/sparkify_event_data.json"
df = spark.read.json(event_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

## Remove missing values in the userId column

In [5]:
#remove rows with a missing userId
df_clean = df.filter(df.userId != "")
df_clean.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

26259199

## Feature Engineering

We will build the churn variable thanks to value "Cancellation Confirmation" of the page variable.

#### 0. initialize the dataset

In [6]:
df_model = df_clean.drop("artist", "auth", "firstName", "itemInSession", "lastName", "location", "method",
                        "registration", "song", "userAgent", "top_churn")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### 1. pivot the page column and keep the selected features

In [7]:
df_model = df_model.groupBy("userId").pivot("page").count().fillna(0)
df_model = df_model.drop("About", "Cancel", "Login", "Submit Registration", "Register", "Save Settings", 
                         "Downgrade", "Roll Advert", "Settings", "Upgrade", "Thumbs Down",
                        "Submit Downgrade", "Submit Upgrade", "Error")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### 2. add lenght per user

In [8]:
length_per_user = df_clean.filter(df_clean.page=="NextSong")\
    .groupBy(df_clean.userId)\
    .sum("length")

df_model = df_model.join(length_per_user, "userId", "left")\
    .withColumnRenamed("sum(length)", "length")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### 3. rename Cancellation Confirmation into label

In [9]:
df_model = df_model.withColumnRenamed("Cancellation Confirmation", "label")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### 4. gender

In [10]:
#gender : 1=F, 0=M
user_gender = df_clean.select("userId", "gender") \
    .dropDuplicates() \
    .sort("userId") \
    .replace(["F", "M"], ["0", "1"], "gender")

user_gender = user_gender.select("userId", user_gender.gender.cast("int"))

df_model = df_model.join(user_gender, on="userId", how="left")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### 5. level

In [11]:
#level : 0=free, 1=paid
#because of users who change level, we select the last observation of each user for the level variable
#it might be interesting to know whether changing of level can explain the cancellation but it is not this project's goal
user_level = df_clean.groupby("userId", "level") \
    .agg({"ts": "max"}) \
    .sort("userId", "level") \
    .withColumnRenamed("max(ts)", "levelTime")

user_last_obs_time = user_level.groupby("userId") \
    .agg({"levelTime": "max"}) \
    .sort("userId") \
    .withColumnRenamed("max(levelTime)", "LastTime")

user_last_obs = user_last_obs_time.join(user_level, ["userId"]) 

user_last_obs = user_last_obs.filter(user_last_obs.LastTime==user_last_obs.levelTime) \
    .select("userId", "level") \
    .sort("userId") \
    .replace(["free", "paid"], ["0", "1"], "level")

user_last_obs = user_last_obs.select("userId", user_last_obs.level.cast("int"))

df_model = df_model.join(user_last_obs, on="userId", how="left")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### 4. date variables

In [12]:
#number of days since registration
user_last_ts = df_clean.groupBy("userId").max("ts").sort("userId")
user_reg_ts = df_clean.select("userId", "registration").dropDuplicates().sort("userId")

ndays_since_reg = user_reg_ts.join(user_last_ts, user_reg_ts.userId==user_last_ts.userId) \
    .select(user_reg_ts.userId, ((user_last_ts["max(ts)"]-user_reg_ts["registration"])/(1000*60*60*24)).alias("nDays")) \
    .sort("userId")

df_model = df_model.join(ndays_since_reg, "userId", "left")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
#number of days as paid user
first_day_paid_user = df_clean.filter(df_clean.level=="paid") \
    .groupBy("userId") \
    .min("ts")
last_day_paid_user = df_clean.filter(df_clean.level=="paid") \
    .groupBy("userId") \
    .max("ts")

ndays_paid_user = first_day_paid_user.join(last_day_paid_user, "userId") \
    .select(first_day_paid_user.userId, ((last_day_paid_user["max(ts)"]-first_day_paid_user["min(ts)"])/(1000*60*60*24)).alias("ndays_paid"))

df_model = df_model.join(ndays_paid_user, "userId", "left")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
#number of days as free user
first_day_free_user = df_clean.filter(df_clean.level=="free") \
    .groupBy("userId") \
    .min("ts")
last_day_free_user = df_clean.filter(df_clean.level=="free") \
    .groupBy("userId") \
    .max("ts")

ndays_free_user = first_day_free_user.join(last_day_free_user, "userId") \
    .select(first_day_free_user.userId, ((last_day_free_user["max(ts)"]-first_day_free_user["min(ts)"])/(1000*60*60*24)).alias("ndays_free"))

df_model = df_model.join(ndays_free_user, "userId", "left")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### 5. fill empty values with 0

In [15]:
df_model = df_model.fillna(0)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
df_model.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- userId: string (nullable = true)
 |-- Add Friend: long (nullable = true)
 |-- Add to Playlist: long (nullable = true)
 |-- label: long (nullable = true)
 |-- Help: long (nullable = true)
 |-- Home: long (nullable = true)
 |-- Logout: long (nullable = true)
 |-- NextSong: long (nullable = true)
 |-- Thumbs Up: long (nullable = true)
 |-- length: double (nullable = false)
 |-- gender: integer (nullable = true)
 |-- level: integer (nullable = true)
 |-- nDays: double (nullable = false)
 |-- ndays_paid: double (nullable = false)
 |-- ndays_free: double (nullable = false)

## Modeling

In [17]:
#define the features and the target of the models
feature_columns = df_model.drop("userId", "label").columns

assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

#scale the data
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True)

#model to be trained : random forest classifier
rf = RandomForestClassifier(featuresCol="scaledFeatures", maxDepth=5, impurity="gini")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
#buil pipeline
rf_pipeline = Pipeline(stages=[assembler, scaler, rf])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
#split into train and validation sets
train_set, valid_set = df_model.randomSplit([0.8, 0.2], seed=42)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
#how to evaluate the performance of a model : F1-score and confusion matrix
def modelEvaluation(model, valid_data):
    """
    Evaluate the f1-score of a model
    
    INPUT
    model : pipeline object
    valid_data : the validation dataset
    
    OUTPUT
    f1-score
    accuracy score
    confusion matrix
    """
    #predictions on the validation dataset
    preds = model.transform(valid_data)
    
    #f1-score and accuracy
    f1_score = MulticlassClassificationEvaluator(metricName="f1")
    
    #confusion matrix
    confusion_matrix = preds.groupby("label").pivot("prediction").count()
    
    return f1_score.evaluate(preds), confusion_matrix

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
rf_model = rf_pipeline.fit(train_set)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
rf_f1_score, rf_confusion_matrix = modelEvaluation(rf_model, valid_set)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
print("f1-score for the random forest classifier : {}".format(rf_f1_score))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

f1-score for the random forest classifier : 0.8182675336620095

In [28]:
print("confusion matrix for the random forest classifier :")
rf_confusion_matrix.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

confusion matrix for the random forest classifier :
+-----+----+---+
|label| 0.0|1.0|
+-----+----+---+
|    0|3383|110|
|    1| 615|404|
+-----+----+---+