In [1]:
# import libraries
import numpy as np
import re
import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.classification import GBTClassifier, LogisticRegression, RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, VectorIndexer, VectorAssembler, CountVectorizer, StringIndexer, StandardScaler
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import time

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1562298471208_0001,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
# Create spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

# Read in full sparkify dataset
event_data = "s3n://udacity-dsnd/sparkify/sparkify_event_data.json"
data = spark.read.json(event_data)
data.head()

VBox()

Row(artist=u'Popol Vuh', auth=u'Logged In', firstName=u'Shlok', gender=u'M', itemInSession=278, lastName=u'Johnson', length=524.32934, level=u'paid', location=u'Dallas-Fort Worth-Arlington, TX', method=u'PUT', page=u'NextSong', registration=1533734541000, sessionId=22683, song=u'Ich mache einen Spiegel - Dream Part 4', status=200, ts=1538352001000, userAgent=u'"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId=u'1749042')

In [3]:
data.count()

VBox()

26259199

In [3]:
#remove missing data based on user first and last names
data = data.dropna(subset=['firstName', 'lastName'])

VBox()

In [5]:
data.agg(*[count(when(isnull(c),1)).alias(c) for c in data.columns]).show()

VBox()

+-------+----+---------+------+-------------+--------+-------+-----+--------+------+----+------------+---------+-------+------+---+---------+------+
| artist|auth|firstName|gender|itemInSession|lastName| length|level|location|method|page|registration|sessionId|   song|status| ts|userAgent|userId|
+-------+----+---------+------+-------------+--------+-------+-----+--------+------+----+------------+---------+-------+------+---+---------+------+
|4630448|   0|        0|     0|            0|       0|4630448|    0|       0|     0|   0|           0|        0|4630448|     0|  0|        0|     0|
+-------+----+---------+------+-------------+--------+-------+-----+--------+------+----+------------+---------+-------+------+---+---------+------+

In [6]:
# number of records
data.count()

VBox()

25480720

In [7]:
# number of users
data.groupby('firstName', 'lastName', 'userId').count().count()

VBox()

22277

### Churn and Feature Enigneering

In [19]:
churn = data.groupby('userId').agg(array_contains(collect_set('page'), 'Submit Downgrade').alias('churn_Downgrade'), \
    array_contains(collect_set('page'), 'Cancellation Confirmation').alias('churn_Cancel')) \
.select('userId', col('churn_Downgrade').cast(IntegerType()), col('churn_Cancel').cast(IntegerType()))
churn.show(5)

VBox()

+-------+---------------+------------+
| userId|churn_Downgrade|churn_Cancel|
+-------+---------------+------------+
|1002185|              0|           0|
|1057724|              0|           0|
|1083324|              1|           0|
|1114507|              0|           0|
|1178731|              0|           0|
+-------+---------------+------------+
only showing top 5 rows

In [9]:
churn.crosstab('churn_Cancel', 'churn_Downgrade').show()

VBox()

+----------------------------+-----+----+
|churn_Cancel_churn_Downgrade|    0|   1|
+----------------------------+-----+----+
|                           1| 3860|1143|
|                           0|13314|3960|
+----------------------------+-----+----+

#### Session length, Number of items per session and Number of songs per session

In [4]:
# for each user:
# average and stdev of sessnion_length (in seconds), 
# avg and stdev of number of songs per session
session_length_items_songs = data.groupby('userId', 'sessionId') \
.agg(((max('ts')-min('ts'))/1000).alias('session_length'), \
     count('sessionId').alias('n_items'), \
     sum((col('page') == 'NextSong').cast(IntegerType())).alias('n_songs')
    ) \
.groupby('userId') \
.agg(avg('session_length').alias('session_len_avg'), \
     stddev_samp('session_length').alias('session_len_std'), \
     avg('n_items').alias('n_items_avg'), \
     stddev_samp('n_items').alias('n_items_std'), \
     avg('n_songs').alias('n_songs_session_avg'), stddev_samp('n_songs').alias('n_songs_session_std') \
    ).fillna(0)
session_length_items_songs.show(5)

VBox()

+-------+------------------+------------------+----------------+------------------+-------------------+-------------------+
| userId|   session_len_avg|   session_len_std|     n_items_avg|       n_items_std|n_songs_session_avg|n_songs_session_std|
+-------+------------------+------------------+----------------+------------------+-------------------+-------------------+
|1633577|18230.282608695652|23023.776841115112|89.8695652173913|110.27493090668173|  74.69565217391305|  93.21846969237073|
|1875484|          14910.24|15733.354360720412|           73.36| 72.69908298367089|              59.96|  61.87115644627955|
|1797408|         20318.875| 19303.54166770095|          98.875| 92.71093067522656|               83.0|   78.6774003976576|
|1829495|           16541.2|11109.655989273475|            78.4| 54.59212397406791|               66.4|  46.04128582044598|
|1770964|         13706.625|11684.373963403716|         71.1875| 58.07950728670713|            55.0625|  46.34431824218945|
+-------

#### Thumb Up, Thumb Down, Add Friend, Roll Advert, Add to Playlist,  Submit Upgrad, Error

In [5]:
##for each user,compute the portion of a given action of all actions

portions = data.groupby('userId') \
.agg((sum((col('page')=='Thumbs Up' ).cast(IntegerType()))/count('page')).alias('thumb_up_ratio'), \
     (sum((col('page')=='Thumbs Down' ).cast(IntegerType()))/count('page')).alias('thumb_down_ratio') , \
     (sum((col('page')=='Add Friend' ).cast(IntegerType()))/count('page')).alias('add_friend_ratio') , \
     (sum((col('page')=='Roll Advert' ).cast(IntegerType()))/count('page')).alias('roll_advert_ratio') , \
     (sum((col('page')=='Add to Playlist' ).cast(IntegerType()))/count('page')).alias('add_palylist_ratio') , \
     (sum((col('page')=='Submit Upgrade' ).cast(IntegerType()))/count('page')).alias('submit_upgrade_ratio') , \
     (sum((col('page')=='Error' ).cast(IntegerType()))/count('page')).alias('error_ratio') , \
)
portions.show(5,truncate=False)

VBox()

+-------+--------------------+--------------------+--------------------+---------------------+--------------------+--------------------+---------------------+
|userId |thumb_up_ratio      |thumb_down_ratio    |add_friend_ratio    |roll_advert_ratio    |add_palylist_ratio  |submit_upgrade_ratio|error_ratio          |
+-------+--------------------+--------------------+--------------------+---------------------+--------------------+--------------------+---------------------+
|1331962|0.04126547455295736 |0.011004126547455296|0.005502063273727648|0.001375515818431912 |0.023383768913342505|0.0                 |0.001375515818431912 |
|1178731|0.046445497630331754|0.008530805687203791|0.011058451816745656|0.0022116903633491313|0.026540284360189573|0.0                 |0.0022116903633491313|
|1528396|0.01948051948051948 |0.01948051948051948 |0.0                 |0.0                  |0.012987012987012988|0.0                 |0.0                  |
|1002185|0.04423076923076923 |0.00673076923076

#### Gender, Device and transaction status

In [6]:
## Device of user
def DeviceType(s):
    try:
        s = s.lower()
    except:
        s=''
    result = 'None'
    if ('windows nt' in s) or ('compatible' in s):
        result = 'Windows'
    if 'x11' in s:
        result = 'X11'
    if 'ipad' in s:
        result = 'iPad'
    if 'iphone' in s:
        result = 'iPhone'
    if 'macintosh' in s:
        result = 'Mac'
    return result
device = udf(DeviceType, StringType())  

#For each user:
# gender
# device used to access sparkify
# time lapse between latest transaction and registration (in days)
#whether contains 404 web status
gender_device_time_status = data.groupby('userId') \
.agg(first('gender').alias('gender'), \
     device(first('userAgent')).alias('device'), \
     ((max('ts')-avg('registration'))/1000/3600/24).alias('time'), \
     array_contains(collect_set('status'), 404).cast(IntegerType()).alias('status_404') \
)

gender_device_time_status.show(5)

VBox()

+-------+------+-------+------------------+----------+
| userId|gender| device|              time|status_404|
+-------+------+-------+------------------+----------+
|1002185|     F|Windows| 65.75105324074075|         1|
|1057724|     M|    Mac| 96.04454861111111|         1|
|1069552|     F|Windows|126.26884259259259|         1|
|1142513|     M|Windows| 117.1135300925926|         1|
|1178731|     F|Windows| 93.45488425925926|         1|
+-------+------+-------+------------------+----------+
only showing top 5 rows

#### Combine All Features into One Data Frame

In [20]:
#transoform: onehotencoding for gender
#churn
#portions
#session_length_items_songs
#gender_device_time_status
df = churn.join(portions, on='userId') \
          .join(session_length_items_songs, on='userId') \
          .join(gender_device_time_status, on='userId')

VBox()

In [14]:
#### Categorical Features
df.crosstab('churn_Cancel', 'gender').show()

VBox()

+-------------------+----+----+
|churn_Cancel_gender|   F|   M|
+-------------------+----+----+
|                  1|2347|2656|
|                  0|8279|8995|
+-------------------+----+----+

In [15]:
df.crosstab('churn_Downgrade', 'gender').show()

VBox()

+----------------------+----+----+
|churn_Downgrade_gender|   F|   M|
+----------------------+----+----+
|                     1|2488|2615|
|                     0|8138|9036|
+----------------------+----+----+

In [16]:
df.crosstab('churn_Cancel', 'status_404').show()

VBox()

+-----------------------+----+----+
|churn_Cancel_status_404|   0|   1|
+-----------------------+----+----+
|                      1|2544|2459|
|                      0|8461|8813|
+-----------------------+----+----+

In [17]:
df.crosstab('churn_Downgrade', 'status_404').show()

VBox()

+--------------------------+----+----+
|churn_Downgrade_status_404|   0|   1|
+--------------------------+----+----+
|                         1|1368|3735|
|                         0|9637|7537|
+--------------------------+----+----+

In [18]:
device
#df.crosstab('churn_Cancel', 'device').show()

VBox()

+-------------------+----+-------+----+----+------+
|churn_Cancel_device| Mac|Windows| X11|iPad|iPhone|
+-------------------+----+-------+----+----+------+
|                  1|2042|   2404| 311|  62|   184|
|                  0|6883|   8425|1121| 249|   596|
+-------------------+----+-------+----+----+------+

In [19]:
df.crosstab('churn_Downgrade', 'device').show()

VBox()

+----------------------+----+-------+----+----+------+
|churn_Downgrade_device| Mac|Windows| X11|iPad|iPhone|
+----------------------+----+-------+----+----+------+
|                     1|1987|   2503| 358|  78|   177|
|                     0|6938|   8326|1074| 233|   603|
+----------------------+----+-------+----+----+------+

In [20]:
## Continuous features
params = ['thumb_up_ratio',
 'thumb_down_ratio',
 'add_friend_ratio',
 'roll_advert_ratio',
 'add_palylist_ratio',
 'submit_upgrade_ratio',
 'error_ratio',
 'session_len_avg',
 'session_len_std',
 'time']

VBox()

In [21]:
df.select(*(params+['churn_Cancel'])).groupby('churn_Cancel').agg({p:'mean' for p in params}).collect()

VBox()

[Row(churn_Cancel=1, avg(thumb_up_ratio)=0.03884181695625863, avg(add_friend_ratio)=0.014475170310004592, avg(roll_advert_ratio)=0.03729404184393977, avg(add_palylist_ratio)=0.021833674471137175, avg(thumb_down_ratio)=0.011992995161779773, avg(submit_upgrade_ratio)=0.0011073805918634905, avg(session_len_avg)=17222.53430207806, avg(time)=63.718817195793605, avg(session_len_std)=15859.129901157647, avg(error_ratio)=0.0010104320777228081), Row(churn_Cancel=0, avg(thumb_up_ratio)=0.04426469342324222, avg(add_friend_ratio)=0.015147195222619793, avg(roll_advert_ratio)=0.029078504439365126, avg(add_palylist_ratio)=0.022549195283939533, avg(thumb_down_ratio)=0.009611730964541678, avg(submit_upgrade_ratio)=0.0008096641362242619, avg(session_len_avg)=16391.743286512923, avg(time)=89.08563652583202, avg(session_len_std)=15906.165752162457, avg(error_ratio)=0.001021583859221606)]

In [22]:
df.select(*(params+['churn_Downgrade'])).groupby('churn_Downgrade').agg({p:'mean' for p in params}).collect()

VBox()

[Row(churn_Downgrade=1, avg(thumb_up_ratio)=0.04200537339626784, avg(add_friend_ratio)=0.014954732202469102, avg(roll_advert_ratio)=0.022614039230351966, avg(add_palylist_ratio)=0.022902345636838274, avg(thumb_down_ratio)=0.010473076061888603, avg(submit_upgrade_ratio)=0.0010578505759255895, avg(session_len_avg)=18895.018488125126, avg(time)=88.99539589321101, avg(session_len_std)=20174.979011201605, avg(error_ratio)=0.0009847229626781869), Row(churn_Downgrade=0, avg(thumb_up_ratio)=0.04335626551671669, avg(add_friend_ratio)=0.015008613538330455, avg(roll_advert_ratio)=0.033392612952040185, avg(add_palylist_ratio)=0.02223582176133022, avg(thumb_down_ratio)=0.010049486918135534, avg(submit_upgrade_ratio)=0.0008226477175545974, avg(session_len_avg)=15889.951863259479, avg(time)=81.72277993098417, avg(session_len_std)=14624.048923033899, avg(error_ratio)=0.0010292878764699219)]

### Modeling

In [8]:
inputcols = ['thumb_up_ratio', 'thumb_down_ratio', 'add_friend_ratio','roll_advert_ratio', 
             'add_palylist_ratio', 'submit_upgrade_ratio','error_ratio', 'session_len_avg', 
             'session_len_std','time', 'gendervec', 'status_404', 'devicevec']

gender_indexer = StringIndexer(inputCol='gender', outputCol='genderIndex')
device_indexer = StringIndexer(inputCol='device', outputCol= 'deviceIndex')
encoder = OneHotEncoderEstimator(inputCols=['genderIndex', 'deviceIndex'], outputCols=['gendervec','devicevec'])
assembler = VectorAssembler(inputCols=inputcols, outputCol='features')
pipeline = Pipeline(stages=[gender_indexer, device_indexer, encoder, assembler])

VBox()

In [21]:
df_train, df_test = df.randomSplit([0.75, 0.25], seed=12)

VBox()

In [22]:
#Transform train and test datasets
datatransform = pipeline.fit(df_train)
train_DF = datatransform.transform(df_train).select('churn_Cancel', 'churn_Downgrade',  'features')
test_DF = datatransform.transform(df_test).select('churn_Cancel', 'churn_Downgrade', 'features')

VBox()

In [26]:
train_DF.count(), test_DF.count()

VBox()

(16733, 5544)

In [32]:
def performance(dataset, label, prediction_label):
    """Calculate classification performance metrics
    Inputs:
        dataset: dataframe of predcition by classification model
        label: column name of true value
        prediction_label: column name of prediction value
    Output:
        (metrics, score): metrics: list of classification performance metrics
                    score: corresponding scores 
    """

    #TP
    sql = '{:s}=1 and {:s}=1.0'.format(label, prediction_label)
    tp = dataset.filter(sql).count()
    #FN
    sql = '{:s}=1 and {:s}=0.0'.format(label, prediction_label)
    fn = dataset.filter(sql).count()
    #TN
    sql = '{:s}=0 and {:s}=0.0'.format(label, prediction_label)
    tn = dataset.filter(sql).count()
    #FP
    sql = '{:s}=0 and {:s}=1.0'.format(label, prediction_label)
    fp = dataset.filter(sql).count()
    recall = tp/(tp+fn)
    precision = tp/(tp+fp)
    if (recall+precision) >0:
        f1 = 2*recall*precision/(recall+precision)
    else:
        f1 = 0
    accuracy = (tp+tn)/(tp+fn+fp+tn)
    
    return (['f1', 'Precision', 'Recall', 'Accuracy'], [f1, precision, recall, accuracy])

VBox()

In [49]:
def performance(dataset, label, prediction_label):
    """Calculate classification performance metrics
    Inputs:
        dataset: dataframe of predcition by classification model
        label: column name of true value
        prediction_label: column name of prediction value
    Output:
        (metrics, score): metrics: list of classification performance metrics
                    score: corresponding scores 
    """
    d = dataset.collect()
    cols = [ [x[label], x[prediction_label]] for x in d]
    counts = np.unique(cols, return_counts=True, axis=0)
    tn, fp, fn, tp = counts[1]*1.0

    recall = tp/(tp+fn)
    precision = tp/(tp+fp)
    if (recall+precision) >0:
        f1 = 2*recall*precision/(recall+precision)
    else:
        f1 = 0
    accuracy = (tp+tn)/(tp+fn+fp+tn)
    
    return (['f1', 'Precision', 'Recall', 'Accuracy'], [f1, precision, recall, accuracy] )

VBox()

### GBT Classifier

In [28]:
# Define evalautor with F1 metric

class Fevaluator:
    def __init__(self, labelCol='label', predictionCol='prediction'):
        self.label = labelCol
        self.prediction = predictionCol
        
    def evaluate(self, dataset):
        _, score = performance(dataset, self.label, self.prediction)
        return score[0]  #f1 score
    def isLargerBetter(self):
        return True

VBox()

In [23]:
start=time.time()
label = 'churn_Cancel'
gbt = GBTClassifier(labelCol=label)
gbt_model = gbt.fit(train_DF)
end = time.time()
print('time = {:.3f}s'.format(end-start))

VBox()

time = 438.862s

In [48]:
gbt_pred = gbt_model.transform(test_DF)
performance(gbt_pred, 'churn_Cancel', 'prediction')

VBox()

(['f1', 'Precision', 'Recall', 'Accuracy'], [0.5274177467597208, 0.720708446866485, 0.4158805031446541, 0.829004329004329])

In [50]:
#train model for churn due to downgrade
start  = time.time()
label = 'churn_Downgrade'
gbt = GBTClassifier(labelCol=label, subsamplingRate=0.7)
gbt_model_downgrade = gbt.fit(train_DF)
end = time.time()
print('time = {:.3f}s'.format(end-start))

VBox()

time = 472.988s

In [55]:
gbt_pred_downgrade = gbt_model_downgrade.transform(test_DF)
performance(gbt_pred_downgrade, 'churn_Downgrade', 'prediction')

VBox()

(['f1', 'Precision', 'Recall', 'Accuracy'], [0.5303964757709251, 0.59251968503937, 0.4800637958532695, 0.8077200577200577])

#### Feature Importance

In [56]:
features = ['thumb_up_ratio', 'thumb_down_ratio', 'add_friend_ratio', 'roll_advert_ratio', 
            'add_palylist_ratio', 'submit_upgrade_ratio', 'error_ratio', 'session_len_avg', 
            'session_len_std', 'time', 'gender', 'status_404', 'device0', 'device1', 'device2', 'device3']
## churn due to cancellation
importance_cancel = {'Feature': features, 
                                  'Importance': gbt_model.featureImportances.toArray()} 
## churn due to downgrade
importance_downgrade = {'Feature': features, 
                                  'Importance': gbt_model_downgrade.featureImportances.toArray()}

VBox()

In [57]:
importance_cancel

VBox()

{'Importance': array([0.11156252, 0.10679758, 0.06009227, 0.15211177, 0.03097527,
       0.08795566, 0.04286932, 0.10515224, 0.09834546, 0.18061292,
       0.00132843, 0.00328822, 0.00262389, 0.00681254, 0.00340545,
       0.00606647]), 'Feature': ['thumb_up_ratio', 'thumb_down_ratio', 'add_friend_ratio', 'roll_advert_ratio', 'add_palylist_ratio', 'submit_upgrade_ratio', 'error_ratio', 'session_len_avg', 'session_len_std', 'time', 'gender', 'status_404', 'device0', 'device1', 'device2', 'device3']}

In [58]:
importance_downgrade

VBox()

{'Importance': array([0.07873384, 0.09587231, 0.07568663, 0.21681246, 0.07383164,
       0.21422365, 0.05922658, 0.05971482, 0.06581646, 0.0407637 ,
       0.00100988, 0.00167394, 0.00261001, 0.00144628, 0.00886811,
       0.00370969]), 'Feature': ['thumb_up_ratio', 'thumb_down_ratio', 'add_friend_ratio', 'roll_advert_ratio', 'add_palylist_ratio', 'submit_upgrade_ratio', 'error_ratio', 'session_len_avg', 'session_len_std', 'time', 'gender', 'status_404', 'device0', 'device1', 'device2', 'device3']}