## Testing Churn Classification on Full Dataset

In [1]:
# import libraries
from pyspark.sql import SparkSession
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.stat import ChiSquareTest
from pyspark.sql.functions import split, array, concat, desc, min, max, udf, sum, count, avg, col, when, isnull, isnan, expr, regexp_extract
from pyspark.ml.feature import Bucketizer
from pyspark.sql.types import IntegerType, FloatType
from pyspark.sql import Window
from pyspark.sql import SQLContext
from pyspark.ml.feature import OneHotEncoder, StringIndexer, CountVectorizer
from pyspark.sql import functions as F
import pyspark.sql.types as T
from datetime import datetime

from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier, DecisionTreeClassifier, RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer, VectorAssembler, StandardScaler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

import numpy as np


Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
6,application_1566641608962_0007,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
# create a Spark session
spark = SparkSession \
        .builder \
        .appName("Predict Customer Churn") \
        .getOrCreate()

spark.conf.set('spark.sql.pivotMaxValues', u'50000')

In [3]:
# event_data = "s3n://udacity-dsnd/sparkify/mini_sparkify_event_data.json"
# event_data = "s3n://udacity-dsnd/sparkify/sparkify_event_data.json"
event_data = "s3n://test-ai-hops/sparkify_event_data.json"
raw_dataset = spark.read.json(event_data)
raw_dataset.head()

Row(artist=u'Popol Vuh', auth=u'Logged In', firstName=u'Shlok', gender=u'M', itemInSession=278, lastName=u'Johnson', length=524.32934, level=u'paid', location=u'Dallas-Fort Worth-Arlington, TX', method=u'PUT', page=u'NextSong', registration=1533734541000, sessionId=22683, song=u'Ich mache einen Spiegel - Dream Part 4', status=200, ts=1538352001000, userAgent=u'"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId=u'1749042')

## Prepare Features

In [4]:
raw_dataset = raw_dataset.persist()
raw_dataset.createOrReplaceTempView("sparkify_event_data")

In [5]:
df = raw_dataset
print("all rows: ", df.count())
df = df.dropna(subset=["userId", "sessionId", "page", "ts", "registration"])
df = df.filter(df["userId"]!="")
df = df.filter(df["page"]!="")
df = df.persist()
print("after cleaning: ", df.count())

('all rows: ', 26259199)
('after cleaning: ', 25480720)

In [6]:
df.take(1)

[Row(artist=u'Popol Vuh', auth=u'Logged In', firstName=u'Shlok', gender=u'M', itemInSession=278, lastName=u'Johnson', length=524.32934, level=u'paid', location=u'Dallas-Fort Worth-Arlington, TX', method=u'PUT', page=u'NextSong', registration=1533734541000, sessionId=22683, song=u'Ich mache einen Spiegel - Dream Part 4', status=200, ts=1538352001000, userAgent=u'"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId=u'1749042')]

## 2.2 Convert datetime

In [7]:

hour_millis = 60*60*1000
day_millis = 24*60*60*1000
week_millis = 7*24*60*60*1000

to_spark_time = udf(lambda x : datetime.utcfromtimestamp(x/1000).strftime('%Y-%m-%d %H:%M:%S'))
to_day = udf(lambda x : int(x/day_millis))
to_week = udf(lambda x : int(x/week_millis))

df = df.withColumn('timestamp', to_spark_time('ts'))
df = df.withColumn('day', to_day('ts'))
df = df.withColumn('week', to_week('ts'))
df = df.withColumn('registration_day', to_day('registration'))
df = df.persist()


In [8]:
df.take(1)

[Row(artist=u'Popol Vuh', auth=u'Logged In', firstName=u'Shlok', gender=u'M', itemInSession=278, lastName=u'Johnson', length=524.32934, level=u'paid', location=u'Dallas-Fort Worth-Arlington, TX', method=u'PUT', page=u'NextSong', registration=1533734541000, sessionId=22683, song=u'Ich mache einen Spiegel - Dream Part 4', status=200, ts=1538352001000, userAgent=u'"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId=u'1749042', timestamp=u'2018-10-01 00:00:01', day=u'17805', week=u'2543', registration_day=u'17751')]

## 3 Define Churn

In [9]:
user_window = Window.partitionBy("userId").rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)
churn_func = udf(lambda v :1 if v =='Cancellation Confirmation' else 0, IntegerType())
df = df.withColumn("churn", churn_func("page")) \
       .withColumn("churn", sum("churn").over(user_window))
user_churn = df.select("userId", "churn").distinct().persist()

### 3.2 Define location and state

In [10]:
user_location = df.select("userId", "location").distinct()
split_col = split(user_location['location'], ',')
user_city = user_location.withColumn('location_city', split_col.getItem(0)).select("userId", "location_city").distinct()
user_city = user_city.withColumn("location_city", F.regexp_replace('location_city', '[\.\s]', '-')).persist()
user_state = user_location.withColumn('location_state', split_col.getItem(1)).select("userId", "location_state").distinct().persist()

In [11]:
churn_states = user_state.join(user_churn, "userId", "outer").persist()
churn_cities = user_city.join(user_churn, "userId", "outer").persist()

### 3.3 User OS

In [12]:
user_agent = df.select("userId", "userAgent").distinct()
# Mozilla/5.0 (Windows NT 6.3; WOW64; rv:31.0)
user_os = user_agent.withColumn("user_os", regexp_extract(col('userAgent'), r'Mozilla/5.0 \(([\w\s\.\/]*);', 1))
user_os = user_os.select("userId", "user_os")
user_os = user_os.withColumn("user_os", F.regexp_replace('user_os', '[\.\s]', '-'))
user_os = user_os.withColumn('user_os', when(col('user_os') != '', col('user_os')).otherwise("Unknown")).persist()

## 3.4 Page views (min, max, avg)

In [16]:
pages_to_drop = ['Cancel', 'Cancellation Confirmation', 'Downgrade', 'Submit Downgrade', 'Submit Upgrade', 'Upgrade']
pages = df.select("userId", "page", "day")
pages = pages.filter(~pages['page'].isin(pages_to_drop))

page_view_pivot = \
    pages.groupby("userId", "day") \
      .pivot("page")

page_view_counts = page_view_pivot.count().fillna(0)
page_view_count_avg_per_day = page_view_counts.groupby("userId").avg().persist()
page_view_count_min_per_day = page_view_counts.groupby("userId").min().persist()
page_view_count_max_per_day = page_view_counts.groupby("userId").max().persist()

## 3.5 Gender

In [17]:
user_gender = df.select("userId", "gender").distinct().persist()

## 3.6 User session stats with service per week

In [18]:
sessions_per_week = \
    df.select("userId", "sessionId", "week")\
      .distinct() \
      .groupby("userId", "week") \
      .agg({'sessionId':'count'}) \
      .groupby("userId") \
      .agg(F.min('count(sessionId)'), F.max('count(sessionId)'), F.avg('count(sessionId)')).fillna(0).persist()

## 3.7 Length

In [19]:
user_length = df.select("userId", "length").groupby("userId").agg({"length":"sum"}).fillna(0).persist()

## 3.8 Time Since Registration

In [20]:
time_since_registration = \
    df.select("userId", "ts", "registration")\
      .groupby("userId") \
      .agg(((F.max("ts") - F.avg("registration")) / 1000).alias("active_time")) \
      .persist()

## 3.9 Items in session

In [21]:
items_in_session = \
    df.select("userId", "itemInSession")\
      .groupby("userId") \
      .agg(F.min('itemInSession'), F.max('itemInSession'), F.avg('itemInSession')) \
      .fillna(0).persist()

## 4.1 Making dataset out of features

In [22]:
clf_df = user_churn

In [23]:
# user_os
clf_df = clf_df.join(user_os, "userId", "outer" ).persist()

In [24]:
# pageview counts per day
page_view_count_avg_per_day = page_view_count_avg_per_day.select("userId", "avg(About)", "avg(Error)", "avg(NextSong)", "avg(Roll Advert)", "avg(Thumbs Down)", "avg(Thumbs Up)").persist()
page_view_count_min_per_day = page_view_count_min_per_day.select("userId", "min(Add Friend)", "min(Add to Playlist)", "min(NextSong)", "min(Roll Advert)", "min(Thumbs Down)", "min(Thumbs Up)").persist()
page_view_count_max_per_day = page_view_count_max_per_day.select("userId", "max(About)", "max(Add Friend)", "max(Add to Playlist)", "max(NextSong)", "max(Roll Advert)", "max(Thumbs Down)").persist()


In [25]:
clf_df = clf_df.join(page_view_count_avg_per_day, "userId", "outer" ).persist()
clf_df = clf_df.join(page_view_count_min_per_day, "userId", "outer" ).persist()
clf_df = clf_df.join(page_view_count_max_per_day, "userId", "outer" ).persist()

In [26]:
items_in_session = items_in_session.select("userId", "min(itemInSession)", "max(itemInSession)")

In [27]:
clf_df = clf_df.join(items_in_session, "userId", "outer").persist()

In [28]:
sessions_per_week = sessions_per_week.select("userId", "min(count(sessionId))", "max(count(sessionId))", "avg(count(sessionId))")

In [29]:
clf_df = clf_df.join(sessions_per_week, "userId", "outer").persist()

In [30]:
# user_state
clf_df = clf_df.join(user_state, "userId", "outer" ).persist()

In [31]:
# user_city
clf_df = clf_df.join(user_city, "userId", "outer" ).persist()

In [32]:
# user_length
clf_df = clf_df.join(user_length, "userId", "outer" ).persist()

In [33]:
clf_df = clf_df.join(time_since_registration, "userId", "outer" ).persist()

In [34]:
clf_df.printSchema()

root
 |-- userId: string (nullable = true)
 |-- churn: long (nullable = true)
 |-- user_os: string (nullable = true)
 |-- avg(About): double (nullable = true)
 |-- avg(Error): double (nullable = true)
 |-- avg(NextSong): double (nullable = true)
 |-- avg(Roll Advert): double (nullable = true)
 |-- avg(Thumbs Down): double (nullable = true)
 |-- avg(Thumbs Up): double (nullable = true)
 |-- min(Add Friend): long (nullable = true)
 |-- min(Add to Playlist): long (nullable = true)
 |-- min(NextSong): long (nullable = true)
 |-- min(Roll Advert): long (nullable = true)
 |-- min(Thumbs Down): long (nullable = true)
 |-- min(Thumbs Up): long (nullable = true)
 |-- max(About): long (nullable = true)
 |-- max(Add Friend): long (nullable = true)
 |-- max(Add to Playlist): long (nullable = true)
 |-- max(NextSong): long (nullable = true)
 |-- max(Roll Advert): long (nullable = true)
 |-- max(Thumbs Down): long (nullable = true)
 |-- min(itemInSession): long (nullable = true)
 |-- max(itemInSessi

In [35]:
for coln in clf_df.columns:
    missing_count = clf_df.filter((isnan(clf_df[coln])) | (clf_df[coln].isNull()) | (clf_df[coln] == "")).count()
    print("Column {} has {} missing values.".format(coln, missing_count))

Column userId has 0 missing values.
Column churn has 0 missing values.
Column user_os has 0 missing values.
Column avg(About) has 0 missing values.
Column avg(Error) has 0 missing values.
Column avg(NextSong) has 0 missing values.
Column avg(Roll Advert) has 0 missing values.
Column avg(Thumbs Down) has 0 missing values.
Column avg(Thumbs Up) has 0 missing values.
Column min(Add Friend) has 0 missing values.
Column min(Add to Playlist) has 0 missing values.
Column min(NextSong) has 0 missing values.
Column min(Roll Advert) has 0 missing values.
Column min(Thumbs Down) has 0 missing values.
Column min(Thumbs Up) has 0 missing values.
Column max(About) has 0 missing values.
Column max(Add Friend) has 0 missing values.
Column max(Add to Playlist) has 0 missing values.
Column max(NextSong) has 0 missing values.
Column max(Roll Advert) has 0 missing values.
Column max(Thumbs Down) has 0 missing values.
Column min(itemInSession) has 0 missing values.
Column max(itemInSession) has 0 missing v

In [41]:
dataset = clf_df.drop("userId").fillna(0).persist()

In [42]:
mapping_list = [['churn', 'churn'], 
                   ['user_os', 'user_os'],
                   ['location_state', 'location_state'],
                   ['location_city', 'location_city'],
                   ['sum(length)', 'length_sum'],
                   ['avg(About)', 'page_view_about_avg'],
                   ['avg(Error)', 'page_view_error_avg'],
                   ['avg(NextSong)', 'page_view_next_song_avg'],
                   ['avg(Roll Advert)', 'page_view_roll_advert_avg'],
                   ['avg(Thumbs Down)', 'page_view_thumbs_down_avg'],
                   ['avg(Thumbs Up)', 'page_view_thumbs_up_avg'],
                   ['min(Add Friend)', 'page_view_add_friend_min'],
                   ['min(Add to Playlist)', 'page_view_add_to_playlist_min'],
                   ['min(NextSong)', 'page_view_next_song_min'],
                   ['min(Roll Advert)', 'page_view_roll_advert_min'],
                   ['min(Thumbs Down)', 'page_view_thumbs_down_min'],
                   ['min(Thumbs Up)', 'page_view_thumbs_up_min'],
                   ['max(About)', 'page_view_about_max'],
                   ['max(Add Friend)', 'page_view_add_friend_max'],
                   ['max(Add to Playlist)', 'page_view_add_to_playlist_max'],
                   ['max(NextSong)', 'page_view_next_song_max'],
                   ['max(Roll Advert)', 'page_view_roll_advert_max'],
                   ['max(Thumbs Down)', 'page_view_thumbs_down_max'],
                   ['min(itemInSession)', 'item_in_session_min'],
                   ['max(itemInSession)', 'item_in_session_max'],
                   ['min(count(sessionId))', 'session_count_min'],
                   ['avg(count(sessionId))', 'session_count_avg'],
                   ['max(count(sessionId))', 'session_count_max']]
mapping = {m[0]:m[1] for m in mapping_list}
dataset = dataset.select([F.col(c).alias(mapping.get(c, c)) for c in dataset.columns])

In [43]:
dataset.createOrReplaceTempView("input_dataset")

In [44]:
dataset.printSchema()

root
 |-- churn: long (nullable = true)
 |-- user_os: string (nullable = true)
 |-- page_view_about_avg: double (nullable = false)
 |-- page_view_error_avg: double (nullable = false)
 |-- page_view_next_song_avg: double (nullable = false)
 |-- page_view_roll_advert_avg: double (nullable = false)
 |-- page_view_thumbs_down_avg: double (nullable = false)
 |-- page_view_thumbs_up_avg: double (nullable = false)
 |-- page_view_add_friend_min: long (nullable = true)
 |-- page_view_add_to_playlist_min: long (nullable = true)
 |-- page_view_next_song_min: long (nullable = true)
 |-- page_view_roll_advert_min: long (nullable = true)
 |-- page_view_thumbs_down_min: long (nullable = true)
 |-- page_view_thumbs_up_min: long (nullable = true)
 |-- page_view_about_max: long (nullable = true)
 |-- page_view_add_friend_max: long (nullable = true)
 |-- page_view_add_to_playlist_max: long (nullable = true)
 |-- page_view_next_song_max: long (nullable = true)
 |-- page_view_roll_advert_max: long (nullabl

In [45]:
dataset = dataset.withColumn("length_sum", F.log(dataset["length_sum"] + 1))
dataset = dataset.withColumn("active_time", F.log(dataset["active_time"] + 1))
dataset = dataset.persist()
dataset.createOrReplaceTempView("input_dataset")


## 5 Modelling

In [46]:
def evaluate_and_get_f1score(context, cv_model, data):
    results = cv_model.transform(data)
    prediction_evaluator = MulticlassClassificationEvaluator(predictionCol='prediction')
    print("  evaluation on ", context, "...")
    f1score = prediction_evaluator.evaluate(results, {prediction_evaluator.metricName: "f1"})
    print("  precision", prediction_evaluator.evaluate(results, {prediction_evaluator.metricName: "weightedPrecision"}))
    print("  recall", prediction_evaluator.evaluate(results, {prediction_evaluator.metricName: "weightedRecall"}))
    print("  accuracy", prediction_evaluator.evaluate(results, {prediction_evaluator.metricName: "accuracy"}))
    print("  f1score", f1score)
    print("---------")
    return f1score

def model_selection_metrics(classifier_factory, train_set, validation_set):
    num_folds = 3
    f1scores = []
    for i in range(10, 15):
        seed = np.power(2, i)
        print("with new seed:")
        multi_class_evaluator = MulticlassClassificationEvaluator(metricName='f1')
        param_grid = ParamGridBuilder().build()
        cv = CrossValidator(estimator=classifier_factory(seed), evaluator=multi_class_evaluator, estimatorParamMaps=param_grid, numFolds=num_folds)
        print("  training...")
        cv_model = cv.fit(train_set)
        evaluate_and_get_f1score("training", cv_model, train_set)
        f1score = evaluate_and_get_f1score("validation", cv_model, validation_set)
        f1scores.append(f1score)
    print("avg f1: ", np.mean(f1scores))

In [47]:
dataset = dataset.dropna()

In [48]:
numeric_columns = [
"page_view_about_avg", 
"page_view_error_avg",
"page_view_next_song_avg",
"page_view_roll_advert_avg",
"page_view_thumbs_down_avg",
"page_view_thumbs_up_avg",
"page_view_add_friend_min",
"page_view_add_to_playlist_min",
"page_view_next_song_min",
"page_view_roll_advert_min",
"page_view_thumbs_down_min",
"page_view_thumbs_up_min",
"page_view_about_max",
"page_view_add_friend_max",
"page_view_add_to_playlist_max",
"page_view_next_song_max",
"page_view_roll_advert_max",
"page_view_thumbs_down_max",
"item_in_session_min",
"item_in_session_max",
"length_sum",
"session_count_min",
"active_time"]

assembler = VectorAssembler(inputCols = numeric_columns, outputCol = 'numeric_features')
dataset = assembler.transform(dataset)



In [49]:
scaler = StandardScaler(inputCol='numeric_features', outputCol='scaled_features', withStd=True)
scale_model = scaler.fit(dataset)
dataset = scale_model.transform(dataset)

features_df = dataset

In [50]:
features_df.take(1)

[Row(churn=1, user_os=u'Windows-NT-5-1', page_view_about_avg=0.0, page_view_error_avg=0.14285714285714285, page_view_next_song_avg=48.666666666666664, page_view_roll_advert_avg=3.5238095238095237, page_view_thumbs_down_avg=1.5714285714285714, page_view_thumbs_up_avg=2.5238095238095237, page_view_add_friend_min=0, page_view_add_to_playlist_min=0, page_view_next_song_min=0, page_view_roll_advert_min=0, page_view_thumbs_down_min=0, page_view_thumbs_up_min=0, page_view_about_max=0, page_view_add_friend_max=3, page_view_add_to_playlist_max=4, page_view_next_song_max=170, page_view_roll_advert_max=11, page_view_thumbs_down_max=5, item_in_session_min=0, item_in_session_max=219, session_count_min=1, session_count_max=5, session_count_avg=3.142857142857143, location_state=u' OH', location_city=u'Findlay', length_sum=12.465937239422685, active_time=15.714485870664598, numeric_features=SparseVector(23, {1: 0.1429, 2: 48.6667, 3: 3.5238, 4: 1.5714, 5: 2.5238, 13: 3.0, 14: 4.0, 15: 170.0, 16: 11.0,

In [51]:
churn_dataset = features_df.select(col('scaled_features').alias('features'), 
                                   col('churn').alias('label')).persist()

In [52]:
train, test = churn_dataset.randomSplit([0.8, 0.2], seed=42)
train = train.persist()
test = test.persist()
train, valid = train.randomSplit([0.8, 0.2], seed=42)
train = train.persist()
valid = valid.persist()

## 5.2 Model parameters tuning on full dataset

In [53]:
num_folds = 5
model = RandomForestClassifier()
multi_class_evaluator = MulticlassClassificationEvaluator(metricName='f1')
param_grid = ParamGridBuilder() \
    .addGrid(model.maxDepth, [3, 5])\
    .addGrid(model.numTrees,[10, 15, 20, 25, 30])\
    .build()
# param_grid = ParamGridBuilder().build()

cv = CrossValidator(estimator=model, evaluator=multi_class_evaluator, estimatorParamMaps=param_grid, numFolds=num_folds)
print("training...")
cv_model = cv.fit(train)
evaluate_and_get_f1score("training", cv_model, train)
evaluate_and_get_f1score("validation", cv_model, valid)


training...
('  evaluation on ', 'training', '...')
('  precision', 0.8283302473207421)
('  recall', 0.8337907254943354)
('  accuracy', 0.8337907254943354)
('  f1score', 0.8080159153464045)
---------
('  evaluation on ', 'validation', '...')
('  precision', 0.8354104701766067)
('  recall', 0.8393109291160689)
('  accuracy', 0.8393109291160689)
('  f1score', 0.814618420175854)
---------
0.814618420175854

In [54]:
best_model = cv_model.bestModel
best_params = best_model.extractParamMap()

In [55]:
for param, value in best_params.items():
    print(param.name, ": ", value)

('subsamplingRate', ': ', 1.0)
('predictionCol', ': ', 'prediction')
('seed', ': ', -4140900678877021401)
('rawPredictionCol', ': ', 'rawPrediction')
('probabilityCol', ': ', 'probability')
('featuresCol', ': ', 'features')
('minInstancesPerNode', ': ', 1)
('impurity', ': ', 'gini')
('featureSubsetStrategy', ': ', 'auto')
('cacheNodeIds', ': ', False)
('maxDepth', ': ', 5)
('maxMemoryInMB', ': ', 256)
('numTrees', ': ', 20)
('checkpointInterval', ': ', 10)
('labelCol', ': ', 'label')
('maxBins', ': ', 32)
('minInfoGain', ': ', 0.0)

In [56]:
features_importance_list = list(zip(assembler.getInputCols(), best_model.featureImportances.values))
features_importance_list = sorted(features_importance_list, key=lambda pair: pair[1])
features_importance_list.reverse()
for feature, importance in features_importance_list:
    print(feature, ": ", importance)

('active_time', ': ', 0.5391434854428258)
('session_count_min', ': ', 0.14610714929219407)
('page_view_roll_advert_avg', ': ', 0.07360173499155157)
('page_view_roll_advert_max', ': ', 0.05808957341877924)
('page_view_thumbs_down_avg', ': ', 0.03183212491600829)
('page_view_next_song_avg', ': ', 0.023661223550430585)
('page_view_next_song_min', ': ', 0.022156787935501975)
('length_sum', ': ', 0.01996390291604185)
('page_view_roll_advert_min', ': ', 0.01714643793030387)
('page_view_thumbs_down_max', ': ', 0.015245902287995583)
('item_in_session_max', ': ', 0.014249017569289011)
('page_view_next_song_max', ': ', 0.008496615290304469)
('page_view_add_friend_min', ': ', 0.0058259861068556875)
('page_view_add_to_playlist_max', ': ', 0.005020921193396526)
('page_view_error_avg', ': ', 0.003927837496274965)
('page_view_thumbs_up_avg', ': ', 0.0033456328016264756)
('page_view_add_to_playlist_min', ': ', 0.00280802322233704)
('page_view_add_friend_max', ': ', 0.0026040423091284975)
('page_view_a

## 5.3 Evaluation on test

In [57]:
evaluate_and_get_f1score("test", best_model, test)

('  evaluation on ', 'test', '...')
('  precision', 0.8282483758049413)
('  recall', 0.8355525965379494)
('  accuracy', 0.8355525965379494)
('  f1score', 0.8124129861525997)
---------
0.8124129861525997