In [4]:
import ibmos2spark, os
# @hidden_cell

if os.environ.get('RUNTIME_ENV_LOCATION_TYPE') == 'external':
    endpoint_da32d840b0c6496da7e4b4ae965e36bf = 'https://s3.eu-geo.objectstorage.softlayer.net'
else:
    endpoint_da32d840b0c6496da7e4b4ae965e36bf = 'https://s3.eu-geo.objectstorage.service.networklayer.com'

credentials = {
    'endpoint': endpoint_da32d840b0c6496da7e4b4ae965e36bf,
    'service_id': 'iam-ServiceId-534b61b5-307d-474a-8ba4-deebac0187b5',
    'iam_service_endpoint': 'https://iam.cloud.ibm.com/oidc/token',
    'api_key': 'XiBbMTH05VLVP_M-y0DgAdgBNzIrmYT8Mmqr8kX51gzV'
}

configuration_name = 'os_da32d840b0c6496da7e4b4ae965e36bf_configs'
cos = ibmos2spark.CloudObjectStorage(sc, credentials, configuration_name, 'bluemix_cos')
# Since JSON data can be semi-structured and contain additional metadata, it is possible that you might face issues with the DataFrame layout.
# Please read the documentation of 'SparkSession.read()' to learn more about the possibilities to adjust the data loading.
# PySpark documentation: http://spark.apache.org/docs/2.0.2/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.json

df = spark.read.json(cos.url('medium-sparkify-event-data.json', 'churnspark-donotdelete-pr-pty3wun15oklrj'))



In [5]:
# import libraries
import pyspark
from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.sql.types import *


from pyspark.ml.feature import VectorAssembler, StandardScaler
#from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

import datetime

In [6]:
### delete empty userIds
df = df.dropna(how = 'any', subset = ['userId', 'sessionId'])
df = df.filter(df.userId != '')

### delete useless/sensitve information
### i.e. songs, names
df = df.select('auth', 'gender', 'itemInSession',\
     'level', 'location', 'method', 'page', 'registration',\
       'sessionId', 'status', 'ts', 'userAgent', 'userId')

In [7]:
# Users cancelled their paid accounts
flag_cancel_event = F.udf(lambda x: 1 if x == 'Cancellation Confirmation' else 0, IntegerType())
df = df.withColumn('cancel', flag_cancel_event('page'))

windowval = Window.partitionBy('userId').orderBy(F.desc('ts')).rangeBetween(Window.unboundedPreceding, 0)

df = df.withColumn('churn', F.sum('cancel').over(windowval))

# Feature Engineering

In [8]:
### Use Frequency: based on sessions
df_freq = df.groupby('userId').agg({'ts': 'max',\
                                    'registration': 'max',\
                                    'sessionId':'count'})

df_freq = df_freq.withColumn('days', (F.col('max(ts)') - F.col('max(registration)'))/(1000*60*60*24))\
        .withColumn('freq', F.col('count(sessionId)')/F.col('days'))\
        .select('userId', 'freq')

In [9]:
# How many songs do users listen to on average between visiting the home page? 
ifhome = F.udf(lambda ishome : int(ishome == 'Home'), IntegerType())

user_window = Window \
    .partitionBy('userId') \
    .orderBy(F.desc('ts')) \
    .rangeBetween(Window.unboundedPreceding, 0)

cusum = df.filter((df.page == 'NextSong') | (df.page == 'Home')) \
    .select('userId', 'page', 'ts') \
    .withColumn('homevisit', ifhome(F.col('page'))) \
    .withColumn('period', F.sum('homevisit').over(user_window))

df_songs = cusum.filter((cusum.page == 'NextSong')) \
    .groupBy('userID', 'period') \
    .agg({'period':'count'})\
    .groupBy('userID') \
    .agg(F.avg('count(period)').alias("Songs"))

In [10]:
# time per session
df_time = df.groupby("userId", "sessionId")\
    .agg((F.max(df.ts)-F.min(df.ts)).alias("duration"))\
    .groupby("userId").agg(F.avg(F.col('duration')).alias("time_session"))


In [11]:
# percentage time in different page type 
time_page_each = df.groupby("userId")\
.pivot('page').agg(F.count('page'))\
.fillna(0)

cols = time_page_each.columns

time_page_total = df.groupby("userId")\
.agg(F.count('page'))

time_page_each = time_page_each.join(time_page_total, ['userId'], how='left')

for col in cols[1:]: # get rid of 'userId' in the cols
    time_page_each =  time_page_each\
        .withColumn(col, F.col(col)/F.col('count(page)'))

df_page = time_page_each.drop('count(page)')

In [12]:
# gender
df_gender = df.select('userId', 'gender').dropDuplicates()

int_gender = F.udf(lambda x: 1 if x == "F" else 0, IntegerType())

df_gender = df_gender.withColumn('gender', int_gender('gender'))

In [13]:
# if ever paid
df_level = df.select('userId', 'level').dropDuplicates()

int_level = F.udf(lambda x: 1 if x == "paid" else 0, IntegerType())

df_level = df_level.withColumn('level', int_level('level'))\
    .groupby('userId').agg(F.max('level').alias('if_paid'))

# Model

In [14]:
# churn
df_churn = df.select('userId', 'churn').dropDuplicates()

In [15]:
### Create full dataset
full_data = df_churn
feature_list = [df_freq, df_songs, df_time, df_page, df_gender, df_level]

for f in feature_list:
    full_data = full_data.join(f, ['userId'], how='left')

In [16]:
# vectorize the features
assembler = VectorAssembler(inputCols=full_data.columns[2:], outputCol="NumFeatures")
full_data = assembler.transform(full_data) 

# standarlize the features
scaler = StandardScaler(inputCol='NumFeatures', outputCol='features', withMean=True, withStd=True)
scalerModel = scaler.fit(full_data)
full_data = scalerModel.transform(full_data)

In [17]:
full_data = full_data.select(F.col('churn').alias("label"), 'features')

In [18]:
train, test = full_data.randomSplit([0.9, 0.1], seed=42)
train.persist()

DataFrame[label: bigint, features: vector]

In [19]:
lr = LogisticRegression(maxIter=10)

paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.elasticNetParam, [0.1, 0.3])\
    .build()

crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=3)
cvModel = crossval.fit(train)

In [20]:
cvModel.avgMetrics

[0.9135136930091392,
 0.9351917142850934,
 0.9602657767482108,
 0.9648186183109246]

In [21]:
results = cvModel.transform(test)

In [24]:
evalutaor = BinaryClassificationEvaluator()
evalutaor.evaluate(results)

0.8277777777777777

In [27]:
cvModel.bestModel.coefficients

DenseVector([0.4053, 0.0, 0.2039, -0.03, -0.0087, -0.1134, 2.3632, 2.3632, 0.4971, -0.0193, 0.0289, -0.1068, -0.1357, -0.3546, 0.5034, -0.1207, 0.1394, -0.1241, -0.0, 0.1258, -0.1653, 0.0, 0.0, 0.5197])

The features are in the order of df_freq, df_songs, df_time, df_page, df_gender, df_level

In [32]:
from sklearn.metrics import confusion_matrix

cm = confusion_matrix(results.select('label').collect(), results.select('prediction').collect())
print(cm)
tn, fp, fn, tp = cm.ravel()
precision = tp / (tp + fp) 
recall = tp / (tp + fn)
f1 = 2*precision*recall/(precision+recall)
print('precision:', precision)
print('recall:', recall)
print('f1', f1)

[[30  0]
 [ 3  3]]
precision: 1.0
recall: 0.5
f1 0.6666666666666666
