# Sparkify Project - Part 2

If you didn't read the first part, you should! This one picks up where it leaves off and starts using the full 12GB dataset. We will first verify our assumptions from the first notebook, then run the feature engineering script and train a few basic models to see how they perform. First we'll create the spark session and import some libraries.

In [1]:
# Starter code
from pyspark.sql import SparkSession

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
14,application_1548012558125_0015,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [16]:
import pyspark.sql.functions as sf
import pyspark.sql.types as st

from pyspark.ml.feature import VectorAssembler

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression

from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier

VBox()

In [3]:
# Create spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

VBox()

Next read in the full dataset from the public s3 bucket!

In [4]:
# Read in full sparkify dataset
event_data = "s3n://dsnd-sparkify/sparkify_event_data.json"
df_raw = spark.read.json(event_data)

VBox()

Next check the head and the schema to see if everything looks similar:

In [4]:
df_raw.head()

VBox()

Row(artist=u'Popol Vuh', auth=u'Logged In', firstName=u'Shlok', gender=u'M', itemInSession=278, lastName=u'Johnson', length=524.32934, level=u'paid', location=u'Dallas-Fort Worth-Arlington, TX', method=u'PUT', page=u'NextSong', registration=1533734541000, sessionId=22683, song=u'Ich mache einen Spiegel - Dream Part 4', status=200, ts=1538352001000, userAgent=u'"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId=u'1749042')

In [12]:
df_raw.printSchema()

VBox()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

## Exploration

In [9]:
df_raw.count()

VBox()

26259199

Holy cow, that's a big increase from the subset. Although we expected it.

In [16]:
df_raw.select([sf.count(sf.when(sf.isnull(c), c)).alias(c) for c in df_raw.columns]).show()

VBox()

+-------+----+---------+------+-------------+--------+-------+-----+--------+------+----+------------+---------+-------+------+---+---------+------+
| artist|auth|firstName|gender|itemInSession|lastName| length|level|location|method|page|registration|sessionId|   song|status| ts|userAgent|userId|
+-------+----+---------+------+-------------+--------+-------+-----+--------+------+----+------------+---------+-------+------+---+---------+------+
|5408927|   0|   778479|778479|            0|  778479|5408927|    0|  778479|     0|   0|      778479|        0|5408927|     0|  0|   778479|     0|
+-------+----+---------+------+-------------+--------+-------+-----+--------+------+----+------------+---------+-------+------+---+---------+------+

After looking at some missing values, it looks like the pattern is the same as the subset. Let's just check for the `""` user like before.

In [10]:
df_raw.filter(df_raw.userId == "").count()

VBox()

0

Hmmmm, that's weird, there aren't any. Let's see what user shows up then?

In [17]:
df_raw.where(sf.isnull(df_raw.registration)).select('userId').distinct().show()

VBox()

+-------+
| userId|
+-------+
|1261737|
+-------+

In [18]:
df_raw.where(sf.isnull(df_raw.gender)).select('userId').distinct().show()

VBox()

+-------+
| userId|
+-------+
|1261737|
+-------+

Wow, looks like `1261737` is the null user instead of empty string!

In [21]:
df.filter(df.userId==1261737).count()

VBox()

778479

In [20]:
df_raw.where(sf.isnull(df_raw.registration)).select('sessionId').distinct().show()

VBox()

+---------+
|sessionId|
+---------+
|    23116|
|    33760|
|    30428|
|    35323|
|    35484|
|    13248|
|    38878|
|    38543|
|    27919|
|    48763|
|    52001|
|    17048|
|    52051|
|    48899|
|    47492|
|    49983|
|    52611|
|    50049|
|    50329|
|    50287|
+---------+
only showing top 20 rows

And it looks like sessions aren't specific problem.

## Cleaning

We will use the same functions from the first notebook.

In [5]:
def convert_ms(x):
    """Converts given ns to ms"""
    if x is None:
        return None
    
    return x//1000

convert_ms_udf = sf.udf(convert_ms, st.LongType())

VBox()

In [6]:
def clean_df(df_raw):
    """
    Takes in a raw events dataframe, makes a few extra columns and cleans it.
    """
    df = df_raw.filter(df_raw.userId != 1261737)
    
    df = df.withColumn('timestamp', convert_ms_udf(df.ts).cast('timestamp'))
    df = df.withColumn('registration_ts', convert_ms_udf(df.registration).cast('timestamp'))
    
    return df

VBox()

In [7]:
# after reading in the df just running this cell catches up with the exploration
df = clean_df(df_raw)

VBox()

In [27]:
df.select([sf.count(sf.when(sf.isnull(c), c)).alias(c) for c in df_raw.columns]).show()

VBox()

+-------+----+---------+------+-------------+--------+-------+-----+--------+------+----+------------+---------+-------+------+---+---------+------+
| artist|auth|firstName|gender|itemInSession|lastName| length|level|location|method|page|registration|sessionId|   song|status| ts|userAgent|userId|
+-------+----+---------+------+-------------+--------+-------+-----+--------+------+----+------------+---------+-------+------+---+---------+------+
|4630448|   0|        0|     0|            0|       0|4630448|    0|       0|     0|   0|           0|        0|4630448|     0|  0|        0|     0|
+-------+----+---------+------+-------------+--------+-------+-----+--------+------+----+------------+---------+-------+------+---+---------+------+

Now we can check out the number of users and sessions in the full data, turns out there are still a sizeable number.

In [25]:
df.agg(sf.countDistinct('userId'), sf.countDistinct('sessionId')).show()

VBox()

+----------------------+-------------------------+
|count(DISTINCT userId)|count(DISTINCT sessionId)|
+----------------------+-------------------------+
|                 22277|                   223096|
+----------------------+-------------------------+

It also turns out the timestamp range is similar to the other data, although the registration range is longer as we would expect from a larger user group.

In [30]:
df.agg(sf.min('timestamp'), sf.max('timestamp')).show()

VBox()

+-------------------+-------------------+
|     min(timestamp)|     max(timestamp)|
+-------------------+-------------------+
|2018-10-01 00:00:01|2018-12-01 00:00:02|
+-------------------+-------------------+

In [29]:
df.agg(sf.min('registration_ts'), sf.max('registration_ts')).show()

VBox()

+--------------------+--------------------+
|min(registration_ts)|max(registration_ts)|
+--------------------+--------------------+
| 2017-10-14 22:05:25| 2018-12-03 07:23:42|
+--------------------+--------------------+

Interesting the that last registration is after the end of the events data - seems like a possible error but probably won't be a big deal.

## Moving On

In [8]:
def get_feature_df(df):
    """Takes in a cleaned event dataframe and returns a feature dataframe"""
    
    # Column names for the vector
    vector_cols = []
    
    # Get session counts
    session_counts = df.groupby('userId').agg(sf.countDistinct('sessionId').alias('session_count'))
    vector_cols.append('session_count')
    
    # Get pages and ignore events
    pages = df.select('page').distinct().sort('page')
    pages_list = [r.page for r in pages.collect()]
    drop_events = ['Cancel']
    
    # Get event counts
    feat_df = df.groupby('userId').pivot('page', pages_list).count()
    feat_df = feat_df.withColumnRenamed('Cancellation Confirmation', 'label')
    feat_df = feat_df.drop(*drop_events).fillna(0)
    
    feat_df = feat_df.join(session_counts, on='userId')
    
    # Normalize by session counts
    ignore_cols = {'userId', 'session_count', 'label'}
    remaining_cols = sorted(list(set(feat_df.columns) - ignore_cols))
    for column in remaining_cols:
        feat_df = feat_df.withColumn(column, sf.col(column) / feat_df.session_count)
    vector_cols.extend(remaining_cols)
    
    # Get account ages
    max_timestamp = df.agg(sf.max('timestamp')).first()[0]
    account_ages = df.select('userId', 
                             sf.datediff(sf.lit(max_timestamp), df.registration_ts).alias('account_age')).distinct()
    
    vector_cols.append('account_age')
    feat_df = feat_df.join(account_ages, on='userId')
    
    # Get weekly song counts
    week_counts = df.where(df.page=='NextSong') \
                .groupby('userId', sf.date_trunc('week', 'timestamp').cast('date').alias('week')).count() \
                .groupby('userId').pivot('week').sum().fillna(0)
    
    vector_cols.extend(week_counts.columns[1:])
    feat_df = feat_df.join(week_counts, on='userId')
    
    # Get genders
    genders = df.select('userId', sf.when(sf.col('gender')=='F', 0).otherwise(1).alias('genders')).distinct()
    
    vector_cols.append('genders')
    feat_df = feat_df.join(genders, on='userId')
    
    # Assemble the vector
    assembler = VectorAssembler(inputCols=vector_cols, outputCol='features')
    
    return assembler.transform(feat_df)

VBox()

Now we set up the feature dataframe and make sure everything is ok.

In [None]:
feature_df = get_feature_df(df)

In [10]:
feature_df.printSchema()

VBox()

root
 |-- userId: string (nullable = true)
 |-- About: double (nullable = true)
 |-- Add Friend: double (nullable = true)
 |-- Add to Playlist: double (nullable = true)
 |-- label: long (nullable = true)
 |-- Downgrade: double (nullable = true)
 |-- Error: double (nullable = true)
 |-- Help: double (nullable = true)
 |-- Home: double (nullable = true)
 |-- Logout: double (nullable = true)
 |-- NextSong: double (nullable = true)
 |-- Roll Advert: double (nullable = true)
 |-- Save Settings: double (nullable = true)
 |-- Settings: double (nullable = true)
 |-- Submit Downgrade: double (nullable = true)
 |-- Submit Upgrade: double (nullable = true)
 |-- Thumbs Down: double (nullable = true)
 |-- Thumbs Up: double (nullable = true)
 |-- Upgrade: double (nullable = true)
 |-- session_count: long (nullable = false)
 |-- account_age: integer (nullable = true)
 |-- 2018-10-01: long (nullable = true)
 |-- 2018-10-08: long (nullable = true)
 |-- 2018-10-15: long (nullable = true)
 |-- 2018-10-22

In [11]:
feature_df.persist()

VBox()

DataFrame[userId: string, About: double, Add Friend: double, Add to Playlist: double, label: bigint, Downgrade: double, Error: double, Help: double, Home: double, Logout: double, NextSong: double, Roll Advert: double, Save Settings: double, Settings: double, Submit Downgrade: double, Submit Upgrade: double, Thumbs Down: double, Thumbs Up: double, Upgrade: double, session_count: bigint, account_age: int, 2018-10-01: bigint, 2018-10-08: bigint, 2018-10-15: bigint, 2018-10-22: bigint, 2018-10-29: bigint, 2018-11-05: bigint, 2018-11-12: bigint, 2018-11-19: bigint, 2018-11-26: bigint, genders: int, features: vector]

In [12]:
feature_df.head(1)

VBox()

[Row(userId=u'1000280', About=0.0, Add Friend=0.6363636363636364, Add to Playlist=1.1363636363636365, label=1, Downgrade=0.13636363636363635, Error=0.13636363636363635, Help=0.36363636363636365, Home=2.0, Logout=0.6818181818181818, NextSong=46.45454545454545, Roll Advert=3.3636363636363638, Save Settings=0.045454545454545456, Settings=0.4090909090909091, Submit Downgrade=0.045454545454545456, Submit Upgrade=0.045454545454545456, Thumbs Down=1.5, Thumbs Up=2.409090909090909, Upgrade=0.4090909090909091, session_count=22, account_age=95, 2018-10-01=308, 2018-10-08=34, 2018-10-15=291, 2018-10-22=57, 2018-10-29=187, 2018-11-05=38, 2018-11-12=107, 2018-11-19=0, 2018-11-26=0, genders=1, features=DenseVector([22.0, 0.0, 0.6364, 1.1364, 0.1364, 0.1364, 0.3636, 2.0, 0.6818, 46.4545, 3.3636, 0.0455, 0.4091, 0.0455, 0.0455, 1.5, 2.4091, 0.4091, 95.0, 308.0, 34.0, 291.0, 57.0, 187.0, 38.0, 107.0, 0.0, 0.0, 1.0]))]

Next let's check the number of users in each class:

In [15]:
feature_df.groupby('label').count().show()

VBox()

+-----+-----+
|label|count|
+-----+-----+
|    0|17259|
|    1| 5002|
+-----+-----+

Looks like a decent balance after all!

## Modeling

First we'll split the data into a test and train sets, then we'll run through a few different models to see which is the most effective.

In [13]:
train, test = feature_df.randomSplit([0.8, 0.2], seed=42)

VBox()

I want to start with a logistic regression just to get a feel for some code, but then I'm going to move on to decision trees. Luckily, decision trees don't require feature scaling so it should be ok in the current state of the feature vector. First we will create the model, then build a parameter grid.

In [18]:
lr =  LogisticRegression(maxIter=10, regParam=0.0, elasticNetParam=0)

VBox()

In [22]:
paramGrid = ParamGridBuilder().addGrid(lr.regParam,[0.0, 0.1]).build()

VBox()

It's really advantageous to use the `CrossValidator` here because it means we can try out a variety of parameters and automatically select the best ones using a validation set.

In [23]:
crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=3)

VBox()

Then, train on the training dataset.

In [25]:
train_lr = crossval.fit(train)

VBox()

The average metrics here correspond to area under ROC curve, which is a great metric for evaluating binary classification. Mostly I just print it here to make sure it's working.

In [27]:
train_lr.avgMetrics

VBox()

[0.8724927340700368, 0.8581148562376506]

Finally, generate the results vector from the test set.

In [29]:
results_lr = train_lr.transform(test)

VBox()

We can define this useful function to print out some evaluation metrics from the results vector.

In [26]:
def get_metrics(res):
    """Given a results vector returns accuracy and f1-score"""
    total = res.count()
    
    tp = res.where((res.label==1) & (res.prediction==1)).count()
    tn = res.where((res.label==0) & (res.prediction==0)).count()
    
    fp = res.where((res.label==0) & (res.prediction==1)).count()
    fn = res.where((res.label==1) & (res.prediction==0)).count()
        
    accuracy = (1.0*tp + tn) / total
    precision = 1.0*tp / (tp + fp)
    recall = 1.0*tp / (tp + fn)
    f1 = 2.0 * (precision * recall) / (precision + recall)
    
    print('Accuracy: ', round(accuracy, 2))
    print('Precision: ', round(precision, 2))
    print('Recall: ', round(recall, 2))
    print('F1-Score: ', round(f1, 2))

VBox()

In [71]:
get_metrics(results_lr)

VBox()

('Accuracy: ', 0.85)
('Precision: ', 0.78)
('Recall: ', 0.46)
('F1-Score: ', 0.58)

Looks like the logisitic regression model actually performed fairly well! Not great but not terribly. Next we can move on to a basic decision tree.

In [76]:
dt = DecisionTreeClassifier()

VBox()

In [78]:
paramGrid = ParamGridBuilder() \
                .addGrid(dt.maxDepth, [3, 5, 7]) \
                .addGrid(dt.minInstancesPerNode, [1, 3, 5]) \
                .build()

VBox()

In [79]:
dt_crossval = CrossValidator(estimator = dt,
                             estimatorParamMaps=paramGrid,
                             evaluator = BinaryClassificationEvaluator(),
                             numFolds=3)

VBox()

In [None]:
model_dt = dt_crossval.fit(train)

In [81]:
model_dt.avgMetrics

VBox()

[0.8273880001860734, 0.8273880001860734, 0.8273880001860734, 0.8291456054470742, 0.8266169100477181, 0.7852490410095025, 0.803507766675378, 0.7921751221437958, 0.7865720684865891]

In [82]:
results_dt = model_dt.transform(test)

VBox()

In [83]:
get_metrics(results_dt)

VBox()

('Accuracy: ', 0.9)
('Precision: ', 0.8)
('Recall: ', 0.71)
('F1-Score: ', 0.76)

Well the decision tree actually performed quite well! 0.76 f1-score is respectable, and 0.9 accuracy is great. Next let's see how Random Forests does.

In [17]:
rf = RandomForestClassifier()

VBox()

In [20]:
paramGrid = ParamGridBuilder() \
                .addGrid(rf.maxDepth, [3, 5, 7]) \
                .addGrid(rf.minInstancesPerNode, [1, 3, 5]) \
                .build()

VBox()

In [21]:
rf_crossval = CrossValidator(estimator = rf,
                             estimatorParamMaps=paramGrid,
                             evaluator = BinaryClassificationEvaluator(),
                             numFolds=3)

VBox()

In [None]:
model_rf = rf_crossval.fit(train)

In [23]:
model_rf.avgMetrics

VBox()

[0.904789115085872, 0.90478895477181, 0.9047911787971201, 0.9169346891040968, 0.9173023225091668, 0.916875486452861, 0.9254453792776469, 0.9257542682113924, 0.9249778436312617]

In [24]:
results_rf = model_rf.transform(test)

VBox()

In [27]:
get_metrics(results_rf)

VBox()

('Accuracy: ', 0.91)
('Precision: ', 0.91)
('Recall: ', 0.69)
('F1-Score: ', 0.79)

The random forest performed a bit better than the decision tree! f1 score of 0.79 is actually quite good. It seems that our features worked quite well at predicting user churn in this format.

## Conclusion

To sum it up, we found and engineered a useful set of features, tried a few different models, and ended up with a fairly well performing model with an f1-score of 0.79. A few alternatives were tried, event though we weren't going for the highest performance, more for a proof of concept. The random forest ended up being the highest performing model, which isn't too surprising compared to decision trees and logistic regression.

This method has proven to be pretty effective, and can clearly distinguish between churned and non churned users when provided the entire timeframe of data. This could be useful in a production service to provide some sort of promotion or treatment to users that are identified as likely to churn.

I think both the hardest part and the most interesting part of this project was identifying and extracting features from the events data. It's much less intuitive than many projects we had in the class, so it provided a nice challenge. I would say my main goal for this project was to learn the basics of PySpark, and I think I accomplished that nicely. I have a few other interesting ideas that I didn't have time to investigate in this project.

## Improvements

In this project I found a lot of interesting similarites to the recommendation engines section of the nanodegree even though it wasn't implemented in the same way. A really interesting improvement to this project could be to approach the problem as a timerange problem rather than a random split problem - training on a few weeks of data and testing to see if you can predict the users who churn in the following few weeks. This would be a much more realistic way to apply the model to how we'd like to use it in the real world. It does present a bunch of additional challenges and might require some custom algorithms/evaluators which makes it a more longer term project.

Another obvious extension would be to look for instances of `Submit Downgrade` instead of the cancellation events. This would be tracking users who are going to downgrade from paid to free. It would require a different set of features, likely more difficult ones since it would be required to separate different user periods by level. It also would benefit to a similar approach mentioned above, combining the two would be very interesting.

That about wraps my project, thanks for reading!