# AWS EMR Notebook for Sparkify churn prediction

In [14]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, DecimalType, FloatType, ByteType, ShortType, BinaryType, BooleanType
from pyspark.sql import Window
from pyspark.ml.feature import VectorAssembler, Normalizer
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, BinaryLogisticRegressionSummary, GBTClassifier, MultilayerPerceptronClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from time import time

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
5,application_1613171401400_0006,pyspark,idle,Link,Link,✔


In [3]:
# CONFIG
S3_DATA_URI = "s3n://udacity-dsnd/sparkify/sparkify_event_data.json"
S3_DATA_SAMPLE_URI = "s3n://udacity-dsnd/sparkify/mini_sparkify_event_data.json"
SPARK_APP_NAME = "Sparkify"
TRAIN_TEST_SPLIT_RATE = [0.7, 0.3]
LABEL_ALIAS = "label"
RAW_FEATURES_ALIAS = "raw_features"
FEATURES_ALIAS = "features"
SEED = 2137
DAYS_ACTIVITY_LIMIT = 30

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
df = spark.read.json(S3_DATA_URI)
df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

26259199

In [5]:
# Exclude logout users
df = df.filter(df["userId"] != "")

# Flag churn events
flag_cancelled_auth_event = F.udf(lambda x: 1 if x == "Cancellation Confirmation" else 0, ByteType())
df = df.withColumn("cancelled", flag_cancelled_auth_event("page"))

# Consider the full range of rows partitioned by 'userId'. We know that `Cancellation Confirmation` is the last event 
# for the all users, that have it. 
windowval = Window.partitionBy("userId")
df = df.withColumn("phaseCancelled", F.max("cancelled").over(windowval).cast(ByteType()))

# Exclude unnecessary fields
df = df.filter(df.page != "Cancellation Confirmation")
df = df.filter(df.page != "Cancel")
df = df.filter(df.page != "Home")

# Convert timestamps to datetime format
df = df.withColumn('datetime', F.from_unixtime(col("ts") / 1000))
df = df.withColumn('datetimeReg', F.from_unixtime(col("registration") / 1000))

# Calculate number of days from registration
df = df.withColumn('daysFromReg', F.datediff(col('datetime'), col('datetimeReg')).cast(ShortType()))
df = df.withColumn('lastActivity', F.max('datetime').over(windowval))

# Calculate number days of last activity and keep records of DAYS_ACTIVITY_LIMIT from last activity
df = df.withColumn('daysFromLastActivity', F.datediff(col('lastActivity'), col('datetime')).cast(ShortType()))
df = df.filter(df.daysFromLastActivity < DAYS_ACTIVITY_LIMIT)

# Encode gender
encode_gender = F.udf(lambda x: 1 if x == "M" else 0, ByteType())
df = df.withColumn("isMale", encode_gender("gender"))


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
df.take(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(artist='Alejandro Sanz', auth='Logged In', firstName='Frank', gender='M', itemInSession=3, lastName='Warren', length=277.44608, level='free', location='Findlay, OH', method='PUT', page='NextSong', registration=1535470939000, sessionId=21396, song='Te lo agradezco_ pero no', status=200, ts=1541107960000, userAgent='Mozilla/5.0 (Windows NT 5.1; rv:31.0) Gecko/20100101 Firefox/31.0', userId='1000280', cancelled=0, phaseCancelled=1, datetime='2018-11-01 21:32:40', datetimeReg='2018-08-28 15:42:19', daysFromReg=65, lastActivity='2018-11-13 22:58:59', daysFromLastActivity=12, isMale=1)]
----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 42418)
Traceback (most recent call last):
  File "/usr/lib64/python3.6/socketserver.py", line 320, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib64/python3.6/socketserver.py", line 351, in process_request
    self.finish_request(request, client_address

In [7]:
avgItemsInSession = df.groupby('userId', 'sessionId')\
                        .agg(F.max('itemInSession').alias('maxItemInSession'))\
                        .groupby('userId')\
                        .agg(F.mean('maxItemInSession').cast(ShortType()).alias('avgItemsInSession'))

# IntegerType, DecimalType, FloatType, ByteType, ShortType, BinaryType, BooleanType
def days(i):
    return i*86400

users_sessions = df.groupby('userId', 'isMale', 'phaseCancelled').agg(F.countDistinct('sessionId').alias('sessCount'),
                                                            F.max('daysFromReg').cast(ShortType()).alias('daysFromReg'))

users_events = df.groupby('userId', 'page').agg(F.count('sessionId').alias('pageCount'))
users_sessions = users_sessions.join(users_events, on='userId', how='left')\
                               .join(avgItemsInSession, on='userId', how='left')

users_sessions = users_sessions.withColumn('pageCountPerSess', (col('pageCount') / col('sessCount')).cast(DecimalType(6, 2)))
final_df = users_sessions.select('phaseCancelled', 'userId', 'isMale', 'avgItemsInSession', 'daysFromReg', 'page', 'sessCount', 'pageCountPerSess', 'pageCount')\
    .groupby('phaseCancelled', 'userId', 'isMale', 'avgItemsInSession', 'daysFromReg', 'sessCount').pivot('page').agg(F.avg('pageCountPerSess').alias('CPS'), 
                                                                                                                       F.avg('pageCount').alias('C'))
final_df = final_df.fillna(0)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
inputCols = [
 'avgItemsInSession',
 'daysFromReg',
 'isMale',
 'sessCount',
 'About_CPS',
 'About_C',
#  'Add Friend_CPS',
 'Add Friend_C',
 'Add to Playlist_CPS',
#  'Add to Playlist_C',
 'Downgrade_CPS',
 'Downgrade_C',
 'Error_CPS',
 'Error_C',
 'Help_CPS',
 'Help_C',
#  'Logout_CPS',
 'Logout_C',
 'NextSong_CPS',
 'NextSong_C',
 'Roll Advert_CPS',
#  'Roll Advert_C',
 'Save Settings_CPS',
 'Save Settings_C',
 'Settings_CPS',
 'Settings_C',
 'Submit Downgrade_CPS',
 'Submit Downgrade_C',
 'Submit Upgrade_CPS',
 'Submit Upgrade_C',
 'Thumbs Down_CPS',
 'Thumbs Down_C',
 'Thumbs Up_CPS',
#  'Thumbs Up_C',
 'Upgrade_CPS'
#  'Upgrade_C'
]

assembler = VectorAssembler(inputCols=inputCols, outputCol=RAW_FEATURES_ALIAS)
final_df = assembler.transform(final_df)

normalizer = Normalizer(inputCol=RAW_FEATURES_ALIAS, outputCol=FEATURES_ALIAS)
final_df = normalizer.transform(final_df)

dataset = final_df.select(col("phaseCancelled").alias(LABEL_ALIAS), col(FEATURES_ALIAS))
train, test = dataset.randomSplit(TRAIN_TEST_SPLIT_RATE, seed=SEED)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
def classification_report(test_results, train_results, label):
    """Calculate classification report based provided test and train set labels and predictions.
    Returns: dictionary containing classification report."""
        
    s = time()
    for dataset, dataset_label in [(train_results, 'train'), (test_results, 'test')]:
        counts = dataset.groupby('label', 'prediction').count()
        counts_list = list(map(lambda row: row.asDict(), counts.collect()))

        eval_map = { 
            'TP' : [1.0, 1.0], 
            'FP' : [0.0, 1.0],
            'TN' : [0.0, 0.0],
            'FN' : [1.0, 0.0]
        }

        dict_res = {}

        for d in counts_list:
            for name, (label_val, pred_val)  in eval_map.items():
                if d['label'] == label_val and d['prediction'] == pred_val:
                    dict_res[name] = d['count']

        for name in eval_map.keys():
            if name not in dict_res.keys():
                dict_res[name] = 0

        dict_res['TOTAL'] = sum(dict_res.values())
        dict_res['ACC'] = (dict_res['TP'] + dict_res['TN']) / dict_res['TOTAL']
        dict_res['PR'] = dict_res['TP'] / (dict_res['TP'] + dict_res['FP'])
        dict_res['RC'] = dict_res['TP'] / (dict_res['TP'] + dict_res['FN'])
        dict_res['FSCORE'] = 2 * (dict_res['PR'] * dict_res['RC']) / (dict_res['PR'] + dict_res['RC'])
        
        print(f'---- {dataset_label.upper()} ----')
        print('MEASURE\tVALUE')
        for key, val in dict_res.items():
            print(f'{key}:\t{val:.2f}')
        print('')
    e = time()
    delta = e-s
    print(f'Eval time: {delta:.2f}s')
    return dict_res


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
def train_and_predict(method):
    """Train model and evaluate"""
    
    m = method.fit(train)
    test_results = m.transform(test)
    train_results = m.transform(train)
    r = classification_report(test_results, train_results)
    return m, r

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
label_0_cnt = train.filter(train.label == 1).count()
label_1_cnt = train.count() - label_0_cnt
label_0_weight = 1.0
label_1_weight = label_1_cnt / label_0_cnt

assign_weight = F.udf(lambda x: label_1_weight if x == 1 else label_0_weight, FloatType())

train = train.withColumn('weightCol', assign_weight(col('label')))

lr = LogisticRegression(maxIter=40, regParam=0.0, weightCol='weightCol')
m = lr.fit(train)

test_results = m.transform(test)
train_results = m.transform(train)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
rep = classification_report(test_results, train_results,  'LogRegWeighted') # 0.52, 0.51

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

---- TRAIN ----
MEASURE	VALUE
FN:	853.00
TN:	8251.00
TP:	2626.00
FP:	3785.00
TOTAL:	15515.00
ACC:	0.70
PR:	0.41
RC:	0.75
FSCORE:	0.53

---- TEST ----
MEASURE	VALUE
FN:	375.00
TN:	3559.00
TP:	1149.00
FP:	1668.00
TOTAL:	6751.00
ACC:	0.70
PR:	0.41
RC:	0.75
FSCORE:	0.53

Eval time: 254.73s

In [12]:
s = time()
lr = LogisticRegression(maxIter=1000, regParam=0.0, weightCol='weightCol')
lr_model, lr_report = train_and_predict(lr)
e = time()
delta = e-s
print(f'Total time: {delta:.2f}s')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Total time: 166.66s

**The logistic regression model managed to identify 76% (recall=0.76) users who churned! Not bad! However, there is more false-positives than true-positives (Precision=0.41).**

In [76]:
gbt = GBTClassifier(featuresCol='features', 
                    labelCol='label', 
                    predictionCol='prediction', 
                    maxIter=50,
                    maxDepth=8,
                    maxBins=32, 
                    minInstancesPerNode=20, 
                    minInfoGain=0.0, 
                    maxMemoryInMB=256, 
                    cacheNodeIds=False, 
                    checkpointInterval=10, 
                    lossType='logistic', 
                    seed=SEED, 
                    subsamplingRate=0.7, 
                    featureSubsetStrategy='auto')

gbt, gbt = train_and_predict(gbt) # 0.52, 0.51

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-71:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 37445



---- TRAIN ----
MEASURE	VALUE
FN:	1268.00
TN:	11820.00
TP:	2211.00
FP:	216.00
TOTAL:	15515.00
ACC:	0.90
PR:	0.91
RC:	0.64
FSCORE:	0.75

---- TEST ----
MEASURE	VALUE
FN:	883.00
TN:	4882.00
TP:	641.00
FP:	345.00
TOTAL:	6751.00
ACC:	0.82
PR:	0.65
RC:	0.42
FSCORE:	0.51

Eval time: 447.33s

In [None]:
s = time()
cv_rf_model = cv.fit(train)
e = time()
delta = e-s
print(f'Total time: {delta:.2f}s')

In [23]:
s = time()
# specify layers for the neural network:
# input layer of size 4 (features), two intermediate of size 5 and 4
# and output of size 3 (classes)
n_input = len(inputCols)
layers = [n_input, 8, 4, 2]

# create the trainer and set its parameters
trainer = MultilayerPerceptronClassifier(maxIter=1500, layers=layers, blockSize=256, seed=SEED)

# train the model
model = trainer.fit(train)
print('Model trained!')
# compute accuracy on the test set
test_result_nn = model.transform(test)
train_result_nn = model.transform(train)
e = time()
delta = e-s
print(f'Total time: {delta:.2f}s')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-22:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 9992



Model trained!
Total time: 710.38s

In [24]:
res_rf = classification_report(test_result_nn, train_result_nn, 'NN')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

---- TRAIN ----
MEASURE	VALUE
FN:	1976.00
TN:	11482.00
TP:	1503.00
FP:	554.00
TOTAL:	15515.00
ACC:	0.84
PR:	0.73
RC:	0.43
FSCORE:	0.54

---- TEST ----
MEASURE	VALUE
FN:	886.00
TN:	4981.00
TP:	638.00
FP:	246.00
TOTAL:	6751.00
ACC:	0.83
PR:	0.72
RC:	0.42
FSCORE:	0.53

Eval time: 299.33s