This notebook is run on AWS EMR cluster. The notebook loads the complete log dataset(12 GB) from Sparkify from the S3 bucket.

The methods used here are taken from our work with the smaller version of this dataset which was run in a local machine. Following is the walk-through of steps done in this notebook on EMR cluster.

- Load the full dataset
- Feature engineer variables for modelling based on the analysis done with smaller dataset.
- Vectorise and Scale the required features
- Split training and test datasets
- Train a Gradient Boosted Tree classifier and validate against the testing dataset. We only use GBT classifier here to validate/re-affirm our work with the smaller version of the dataset.
- Evaluate the metrics such as Accuracy and F1 score
- Look at important features of the model

In [1]:
from pyspark.sql import SparkSession    
from pyspark.sql.functions import avg, col, concat, desc, explode, lit, min, max, split, udf
from pyspark.sql.types import IntegerType, DateType
from pyspark.sql import Window
import datetime
from time import time
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, GBTClassifier, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
5,application_1570777837805_0006,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
# create a Spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
#load the complete log dataset
event_data = "s3n://udacity-dsnd/sparkify/sparkify_event_data.json"
df = spark.read.json(event_data)
df.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(artist='Popol Vuh', auth='Logged In', firstName='Shlok', gender='M', itemInSession=278, lastName='Johnson', length=524.32934, level='paid', location='Dallas-Fort Worth-Arlington, TX', method='PUT', page='NextSong', registration=1533734541000, sessionId=22683, song='Ich mache einen Spiegel - Dream Part 4', status=200, ts=1538352001000, userAgent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId='1749042')

In [4]:
def feature_engineer(df):
    #clean the dataset
    df = df.filter(df['userId'] != '')
    df = df.dropna(how = "any", subset = ["userId", "sessionId"])
    
    #define churn events
    flag_downgrade_event = udf(lambda x: 1 if x == "Submit Downgrade" else 0, IntegerType())
    flag_churn_event = udf(lambda x: 1 if x == 'Cancellation Confirmation' else 0, IntegerType())
    
    #mark downgrade and churn event on each activity
    df = df.withColumn("downgrade_event", flag_downgrade_event("page"))\
            .withColumn("cancellation_event", flag_churn_event("page"))
    
    #label churn and downgrade event upon users
    user_frame = Window.partitionBy('userId')
    df = df.withColumn('downgrade', max('downgrade_event').over(user_frame)) \
            .withColumn('churn', max('cancellation_event').over(user_frame))
    
    #engineer features for modelling
    
    #udfs for engineering columns 
    free = udf(lambda x: int(x=='free'), IntegerType())
    paid = udf(lambda x: int(x=='paid'), IntegerType())
    get_day = udf(lambda x: datetime.datetime.fromtimestamp(x/1000), DateType())
    
    #generating featuring grouped by users
    total_songs_listened = df \
        .select('userID','song') \
        .groupBy('userID') \
        .count() \
        .withColumnRenamed('count', 'total_songs')
    
    thumbs_up_count = df \
        .select('userID','page') \
        .where(df.page == 'Thumbs Up') \
        .groupBy('userID') \
        .count() \
        .withColumnRenamed('count', 'thumbs_up_count') 

    thumbs_down_count = df \
        .select('userID','page') \
        .where(df.page == 'Thumbs Down') \
        .groupBy('userID') \
        .count() \
        .withColumnRenamed('count', 'thumbs_down_count')
    
    friends_added = df.filter(df.page=='Add Friend')\
        .select('userId', 'page')\
        .groupBy('userId').count()\
        .withColumnRenamed('count', 'friends_added_count')
    
    errors_encountered = df.filter(df.page=='Error')\
        .select('userId', 'page', 'ts', 'length')\
        .withColumn('date', get_day(col('ts')))\
        .groupBy('userId', 'date').agg({'page':'count'})\
        .groupBy('userId').mean()\
        .withColumnRenamed('avg(count(page))', 'errors_faced')
    
    customer_lifetime = df \
        .select('userId','registration','ts') \
        .withColumn('lifetime',(df.ts-df.registration)) \
        .groupBy('userId') \
        .agg({'lifetime':'max'}) \
        .withColumnRenamed('max(lifetime)','lifetime') \
        .select('userId', (col('lifetime')/1000/3600/24).alias('lifetime'))
    
    playlist_creation = df \
        .select('userID','page') \
        .where(df.page == 'Add to Playlist') \
        .groupBy('userID') \
        .count() \
        .withColumnRenamed('count', 'add_to_playlist')
    
    help_visits = df.filter(df.page=='Help')\
        .select('userId', 'page', 'ts', 'length')\
        .withColumn('date', get_day(col('ts')))\
        .groupBy('userId', 'date').agg({'page':'count'})\
        .groupBy('userId').mean()\
        .withColumnRenamed('avg(count(page))', 'help_visits')
    
    avg_songs_per_session = df.where('page == "NextSong"') \
        .groupby(['userId', 'sessionId']) \
        .count() \
        .groupby(['userId']) \
        .agg({'count':'avg'}) \
        .withColumnRenamed('avg(count)', 'avg_songs_per_session')
    
    user_level = df.select('userId', 'level')\
        .where((df.level=='free')|(df.level=='paid'))\
        .dropDuplicates()\
        .withColumn('free_tier', free('level'))\
        .withColumn('paid_tier', paid('level')).drop('level')
    
    label = df \
        .select('userId', col('churn').alias('label'))\
        .dropDuplicates()
    
    #create new dataframe to be used for modelling
    data = total_songs_listened.join(label, on='userId')\
        .join(user_level, on='userId')\
        .join(avg_songs_per_session, on='userId')\
        .join(help_visits, on='userId')\
        .join(playlist_creation, on='userId')\
        .join(customer_lifetime, on='userId')\
        .join(errors_encountered, on='userId')\
        .join(friends_added, on='userId')\
        .join(thumbs_down_count, on='userId')\
        .join(thumbs_up_count, on='userId')\
        .drop('userId')
    
    return data

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
df = feature_engineer(df)
df.take(2)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(total_songs=1317, label=1, free_tier=1, paid_tier=0, avg_songs_per_session=48.666666666666664, help_visits=1.6, add_to_playlist=25, lifetime=77.30377314814815, errors_faced=1.0, friends_added_count=14, thumbs_down_count=33, thumbs_up_count=53), Row(total_songs=1317, label=1, free_tier=0, paid_tier=1, avg_songs_per_session=48.666666666666664, help_visits=1.6, add_to_playlist=25, lifetime=77.30377314814815, errors_faced=1.0, friends_added_count=14, thumbs_down_count=33, thumbs_up_count=53)]

In [6]:
def feature_scale(df):
    features = df.drop('label').columns
    assembler = VectorAssembler(inputCols=features, outputCol="NumFeatures")
    data=assembler.transform(df)
    
    scaler = StandardScaler(inputCol="NumFeatures", outputCol="features", withStd=True)
    scalerModel = scaler.fit(data)
    data = scalerModel.transform(data)
    
    return data

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
data = feature_scale(df)
data.take(2)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(total_songs=1317, label=1, free_tier=0, paid_tier=1, avg_songs_per_session=48.666666666666664, help_visits=1.6, add_to_playlist=25, lifetime=77.30377314814815, errors_faced=1.0, friends_added_count=14, thumbs_down_count=33, thumbs_up_count=53, NumFeatures=DenseVector([1317.0, 0.0, 1.0, 48.6667, 1.6, 25.0, 77.3038, 1.0, 14.0, 33.0, 53.0]), features=DenseVector([0.8445, 0.0, 2.0021, 1.3298, 3.9489, 0.6578, 1.9802, 4.3204, 0.573, 2.2174, 0.6815])), Row(total_songs=1317, label=1, free_tier=1, paid_tier=0, avg_songs_per_session=48.666666666666664, help_visits=1.6, add_to_playlist=25, lifetime=77.30377314814815, errors_faced=1.0, friends_added_count=14, thumbs_down_count=33, thumbs_up_count=53, NumFeatures=DenseVector([1317.0, 1.0, 0.0, 48.6667, 1.6, 25.0, 77.3038, 1.0, 14.0, 33.0, 53.0]), features=DenseVector([0.8445, 2.0021, 0.0, 1.3298, 3.9489, 0.6578, 1.9802, 4.3204, 0.573, 2.2174, 0.6815]))]

In [None]:
train, rest = data.randomSplit([0.6, 0.4], seed=42)
validation, test = rest.randomSplit([0.5,0.5], seed=42)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
gbt = GBTClassifier(maxIter=10,seed=42)
gbt_model = gbt.fit(train)
results_final = gbt_model.transform(test)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print('Metrics:')
print('Accuracy: {}'.format(evaluator.evaluate(results_final, {evaluator.metricName: "accuracy"})))
print('F-1 Score:{}'.format(evaluator.evaluate(results_final, {evaluator.metricName: "f1"})))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Metrics:
Accuracy: 0.8326756116811366
F-1 Score:0.8054970355804507

In [12]:
feature_ind = gbt_model.featureImportances.indices.tolist()
features = df.drop('label').columns
feature_name = [features[ind] for ind in feature_ind]
feature_coef = gbt_model.featureImportances.values.tolist()
print(feature_name)
print()
print(feature_coef)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['total_songs', 'avg_songs_per_session', 'help_visits', 'add_to_playlist', 'lifetime', 'errors_faced', 'friends_added_count', 'thumbs_down_count', 'thumbs_up_count']

[0.1102245511111617, 0.12269378173912349, 0.06958595730175013, 0.08590240367675563, 0.23165153605701336, 0.021539120434971665, 0.07819213119609128, 0.137730723894722, 0.14247979458841073]

Accuracy and F1 score of the model has dropped when trained on a huge dataset. But the accuracy is still around 80% which is quite good for finding if a user will cancel subscription or not. 

The only difference is the importance of features has changed for a larger dataset. Here, it seems like Customer lifetime is very important. Loyal customers do not churn easily. Other than that, features like total songs played, songs listened per session and thumbs up event does take part in predicting a customer churn.

Using this model, a company could predict the churn percentage and then touch base with them to identify their problems with the help of segmented marketing.