This notebook follows `Sparkify.ipynb`.

In [1]:
# import libraries
from pyspark.sql import SparkSession, Window
from pyspark import SparkFiles
from pyspark.sql.functions import avg, col, concat, count, desc, \
asc, explode, lit, min, max, split, stddev, udf, isnan, when, rank, \
log, sqrt, cbrt, exp, countDistinct, monotonically_increasing_id
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.types import IntegerType, FloatType, StringType
from pyspark.ml.stat import Correlation
from pyspark.sql.functions import abs as Fabs

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, \
LogisticRegressionModel, RandomForestClassifier, \
RandomForestClassificationModel, GBTClassifier, \
GBTClassificationModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

from pyspark.ml.feature import CountVectorizer, IDF, Normalizer, \
PCA, RegexTokenizer, Tokenizer, StandardScaler, StopWordsRemover, \
StringIndexer, VectorAssembler, MaxAbsScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.clustering import KMeans
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

from time import time
import re
import numpy as np
import datetime
import random
random.seed(42)

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
2,application_1548971932490_0003,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
# import pandas as pd
# import matplotlib.pyplot as plt
# from pandas.plotting import scatter_matrix
# import seaborn as sns
# %matplotlib inline

VBox()

## 1. Data processing

### Load raw data

In [3]:
# Create spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

VBox()

In [4]:
# Read in full sparkify dataset
event_data = "s3n://dsnd-sparkify/sparkify_event_data.json"
df = spark.read.json(event_data)
df.head()

VBox()

Row(artist=u'Popol Vuh', auth=u'Logged In', firstName=u'Shlok', gender=u'M', itemInSession=278, lastName=u'Johnson', length=524.32934, level=u'paid', location=u'Dallas-Fort Worth-Arlington, TX', method=u'PUT', page=u'NextSong', registration=1533734541000, sessionId=22683, song=u'Ich mache einen Spiegel - Dream Part 4', status=200, ts=1538352001000, userAgent=u'"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId=u'1749042')

### Drop missing values

In [5]:
df = df.dropna(how='any', subset=['userId', 'sessionId'])
df = df.filter(df['userId'] != "")

VBox()

In [6]:
# # Examine the number of missing values in each column
# print("Total number of rows in the dataset: {}.".format(df.count()))

# for coln in df.columns:
#     missing_count = df.filter((isnan(df[coln])) | (df[coln].isNull()) | (df[coln] == "")).count()
#     print("Column {} has {} missing values.".format(coln, missing_count))

VBox()

Total number of rows in the dataset: 26259199.
Column artist has 5408927 missing values.
Column auth has 0 missing values.
Column firstName has 778479 missing values.
Column gender has 778479 missing values.
Column itemInSession has 0 missing values.
Column lastName has 778479 missing values.
Column length has 5408927 missing values.
Column level has 0 missing values.
Column location has 778479 missing values.
Column method has 0 missing values.
Column page has 0 missing values.
Column registration has 778479 missing values.
Column sessionId has 0 missing values.
Column song has 5408927 missing values.
Column status has 0 missing values.
Column ts has 0 missing values.
Column userAgent has 778479 missing values.
Column userId has 0 missing values.

### Feature engineering

#### Create features

In [6]:
# Define churn
flag_churn_event = udf(lambda x: 1 if x == "Cancellation Confirmation" else 0, IntegerType())
churn = df.withColumn("churned", flag_churn_event("page"))\
    .select(['userId', 'churned'])\
    .groupBy('userId').agg(max('churned').alias("churn"))

VBox()

In [20]:
# churn.count()

VBox()

22278

In [7]:
# Latest level
func_levels = udf(lambda x: 1 if x=="paid" else 0, IntegerType())
levels = df.select(['userId', 'level', 'ts'])\
    .orderBy(desc('ts'))\
    .dropDuplicates(['userId'])\
    .select(['userId', 'level'])\
    .withColumn('level', func_levels('level').cast(IntegerType()))

VBox()

In [8]:
# Time since registration in seconds, and gender
func_gender = udf(lambda x: 1 if x=="M" else (0 if x=="F" else -1), IntegerType())
time_gender = df.groupBy(['userId', 'gender'])\
    .agg(max('ts'), avg('registration'))\
    .withColumn('time_since_regi', (col('max(ts)')-col('avg(registration)'))/lit(1000))\
    .withColumn('gender', func_gender('gender'))

avg_time = time_gender.select(avg('time_since_regi')).collect()[0]['avg(time_since_regi)']
time_gender = time_gender.fillna(avg_time, subset=['time_since_regi'])\
    .drop('max(ts)').drop('avg(registration)')

VBox()

In [9]:
# # Number of artists, total length, number of sessions, number of songs, and number of page events
# # that the user has engaged
# engagement = df.groupBy('userId')\
#     .agg(countDistinct('artist').alias('num_artists_dist'), 
#          countDistinct('sessionId').alias('num_sessions'),
#          countDistinct('song').alias('num_songs_dist'),
#          count('song').alias('num_songs'),
#          count('page').alias('num_events'),
#          Fsum('length').alias('tot_length')
#         )

VBox()

In [10]:
# # Statistics of the number of songs per artist that the user has listened to
# per_artist = df.filter(~df['artist'].isNull())\
#     .groupBy(['userId', 'artist'])\
#     .agg(count('song').alias('num_songs'))\
#     .groupBy('userId')\
#     .agg(avg(col('num_songs')).alias('avg_songs_per_artist'),
#          stddev(col('num_songs')).alias('std_songs_per_artist')
#         )\
#     .fillna(0)

VBox()

In [9]:
# Statistics of the time spent sper session (float)
per_session = df.groupBy(['userId', 'sessionId'])\
    .agg(max('ts'), min('ts'), count('song').alias('num_songs'))\
    .withColumn('time', (col('max(ts)')-col('min(ts)'))/lit(1000))\
    .groupBy('userId')\
    .agg(stddev(col('time')), 
         avg(col('time')),
         stddev(col('num_songs')).alias('std_songs_per_session'),
         avg(col('num_songs')).alias('avg_songs_per_session')
        )\
    .withColumnRenamed('stddev_samp(time)', 'std_time_per_session')\
    .withColumnRenamed('avg(time)', 'avg_time_per_session')\
    .fillna(0)

VBox()

In [10]:
# for coln in ['avg_time_per_session', 'std_time_per_session']:
#     missing_count = per_session.filter(
#         (isnan(per_session[coln])) | (per_session[coln].isNull()) | (per_session[coln] == "")
#     ).count()
#     print("Column {} has {} missing values.".format(coln, missing_count))

VBox()

In [11]:
# Calculate usage fraction of each userAgent
window = Window.partitionBy("userId").rowsBetween(Window.unboundedPreceding,Window.unboundedFollowing)
func_agent_device = udf(
    lambda x: "user_agent_"+x.split('(')[1].replace(";", " ").split(" ")[0] if '(' in str(x) else 'user_agent_none', 
    StringType()
)

df = df.fillna('', subset=['userAgent'])
agents = df.withColumn("userAgent", func_agent_device(col("userAgent")))
agents = agents.groupBy(["userId", 'userAgent'])\
    .agg(count("userAgent").alias("user_agent_usage_count"))\
    .withColumn('total', Fsum(col('user_agent_usage_count')).over(window))\
    .withColumn('user_agent_usage', col('user_agent_usage_count')/col('total'))\
    .groupBy("userId").pivot("userAgent").sum("user_agent_usage")\
    .fillna(0)\
    .drop('user_agent_none')

VBox()

In [12]:
# Count usages of each page event by each user
pages_to_exclude = ['Cancel', 'Downgrade', 'Cancellation Confirmation', 'Upgrade', 
                    'Submit Registration', 'Login', 'Register']
func_pages = udf(lambda x: "page_"+x.replace(" ", "_").lower())
pages = df.filter(~df['page'].isin(pages_to_exclude))\
    .withColumn("page", func_pages(df["page"]))\
    .groupBy(['userId']).pivot("page").agg(count('page'))\
    .fillna(0)
pages = pages.withColumn("page_up_down_ratio", pages["page_thumbs_up"]/(pages['page_thumbs_down']+0.1))

VBox()

In [13]:
# Calculate the fraction of each page event by each user
pages = pages.withColumn(
    'total', sum(pages[coln] for coln in pages.columns if coln not in ['userId', 'page_up_down_ratio'])
)
for coln in pages.columns:
    if coln not in ['userId', 'total', 'page_up_down_ratio']:
        new_col_name = coln[0:5]+'frac_'+coln[5:]
        pages = pages.withColumn(new_col_name, pages[coln] / pages['total'])
pages = pages.drop('total')

VBox()

In [14]:
# Find the user's geographical division based on the first listed location state
path = "s3://aws-emr-resources-986775742469-us-east-2/region.csv"
region = spark.read.csv(path, header=True)

func_locations = udf(lambda x: x.split(', ')[1].split('-')[0] if x!='' else 'none')
func_location_names = udf(lambda x: "location_"+x.replace(" ", "_").lower())
locations = df.fillna('', subset=['location'])\
    .withColumn('location', func_locations(col('location')))\
    .select(['userId', 'location']).dropDuplicates(subset=['userId'])
locations = locations.join(region, locations['location']==region['State Code'], how='left')\
    .select(['userId', col("Division").alias("location")]).fillna('none')\
    .withColumn('location', func_location_names('location'))
locations = locations.groupBy('userId').pivot('location').agg(count('location')).fillna(0).drop('location_none')

VBox()

### Check correlation

In [39]:
# # Join features together
# dataset2 = churn.join(levels, ['userId'])\
#     .join(time_gender, ['userId'])\
#     .join(engagement, ['userId'])\
#     .join(per_artist, ['userId'])\
#     .join(per_session, ['userId'])\
#     .join(agents, ['userId'])\
#     .join(pages, ['userId'])\
#     .join(locations, ['userId'])

# dataset2.head()

VBox()

Row(userId=u'1000280', churn=1, level=0, gender=1, max(ts)=1542149985000, avg(registration)=1535470939000.0, time_since_regi=6679046.0, num_artists_dist=767, num_sessions=22, num_songs_dist=945, num_songs=1022, num_events=1317, tot_length=259349.89726000006, max_songs_per_artist=11, avg_songs_per_artist=1.332464146023468, std_songs_per_artist=1.018254818633842, std_time_per_session=10370.460836274826, avg_time_per_session=11694.363636363636, std_songs_per_session=41.453045465381415, avg_songs_per_session=46.45454545454545, user_agent_Macintosh=0.0, user_agent_Windows=1.0, user_agent_X11=0.0, user_agent_compatible=0.0, user_agent_iPad=0.0, user_agent_iPhone=0.0, user_agent_none=0.0, page_about=0, page_add_friend=14, page_add_to_playlist=25, page_error=3, page_help=8, page_home=44, page_login=0, page_logout=15, page_nextsong=1022, page_register=0, page_roll_advert=74, page_save_settings=1, page_settings=9, page_submit_downgrade=1, page_submit_registration=0, page_submit_upgrade=1, page_t

In [40]:
# # Write data
# path2 = "s3://aws-emr-resources-986775742469-us-east-2/processed2.json"
# dataset2.write.format('json').save(path2)

VBox()

In [41]:
# # Read in processed data
# path2 = "s3://aws-emr-resources-986775742469-us-east-2/processed2.json"
# dataset2 = spark.read.json(path2)
# dataset2.head()

VBox()

Row(avg(registration)=1525261059000.0, avg_songs_per_artist=1.0863636363636364, avg_songs_per_session=59.75, avg_time_per_session=15133.25, churn=1, gender=0, level=1, location_east_north_central=0, location_east_south_central=0, location_middle_atlantic=0, location_mountain=0, location_new_england=0, location_none=0, location_pacific=0, location_south_atlantic=0, location_west_north_central=0, location_west_south_central=1, max(ts)=1540247149000, max_songs_per_artist=5, num_artists_dist=220, num_events=299, num_sessions=4, num_songs=239, num_songs_dist=233, page_about=0, page_add_friend=9, page_add_to_playlist=4, page_error=0, page_frac_about=0.0, page_frac_add_friend=0.03103448275862069, page_frac_add_to_playlist=0.013793103448275862, page_frac_error=0.0, page_frac_help=0.006896551724137931, page_frac_home=0.041379310344827586, page_frac_login=0.0, page_frac_logout=0.017241379310344827, page_frac_nextsong=0.8241379310344827, page_frac_register=0.0, page_frac_roll_advert=0.0, page_fra

In [89]:
# features_to_exclude = ['userId', 'churn', 'user_agent_none', 'location_none', 'page_frac_login',
#                        'page_frac_register', 'page_frac_submit_registration', 'page_login', 
#                        'page_register', 'page_submit_registration', 'avg(registration)', 'max(ts)']
# features_to_keep = [coln for coln in dataset2.columns if coln not in features_to_exclude]
# feature_cols2 = dataset2.select(features_to_keep).columns
# assembler = VectorAssembler(inputCols=feature_cols2, outputCol="features")
# dataset_vect = assembler.transform(dataset2.select(features_to_keep))
# dataset_corr = Correlation.corr(dataset_vect, "features")
# spearmanCorr = dataset_corr.collect()[0][0]

VBox()

In [99]:
# # Find correlated features that have absolute correlation >= 0.5
# spearmanCorrList = [[float(ar) for ar in arr] for arr in spearmanCorr.toArray()]
# corr_matrix = spark.createDataFrame(spearmanCorrList, schema=feature_cols2)
# correlated_cols = []
# for coln in corr_matrix.columns:
#     correlated = corr_matrix.filter(Fabs(corr_matrix[coln])>=0.5).count()
#     if correlated > 1:
#         correlated_cols.append(coln)

VBox()

In [103]:
# assembler_corr = VectorAssembler(inputCols=correlated_cols, outputCol="features")
# dataset_vect_corr = assembler_corr.transform(dataset2.select(correlated_cols))
# dataset_corr_corr = Correlation.corr(dataset_vect_corr, "features")
# spearmanCorr_corr = dataset_corr_corr.collect()[0][0]

VBox()

In [106]:
# # Find features to be removed, i.e. feature that has >0.8 correlation with any remaining feature
# spearmanCorrList_corr = [[float(ar) for ar in arr] for arr in spearmanCorr_corr.toArray()]
# corr = spark.createDataFrame(spearmanCorrList_corr, schema=correlated_cols)
# corr = corr.withColumn("id", monotonically_increasing_id())

VBox()

In [133]:
# cols_to_remove = []
# counter = 0
# for coln in corr.drop('id').columns:
#     ind = corr.select('id').collect()[counter]['id']
#     corr_ind = corr.select(coln).where(corr['id']>=ind)
#     counter += 1
#     if corr_ind.filter(Fabs(corr_ind[coln]) >= 0.8).count() > 0:
#         cols_to_remove.append(coln)
# print("Highly correlated features that should be removed:\n\n{}\n\n".format(cols_to_remove))
# cols_to_keep = [coln for coln in dataset2.columns if coln not in cols_to_remove]
# print("Features to keep:\n\n{}".format(cols_to_keep))

VBox()

Highly correlated features that should be removed:

['avg_songs_per_artist', 'avg_songs_per_session', 'avg_time_per_session', 'level', 'max_songs_per_artist', 'num_artists_dist', 'num_events', 'num_sessions', 'num_songs', 'num_songs_dist', 'page_about', 'page_add_friend', 'page_add_to_playlist', 'page_error', 'page_frac_home', 'page_frac_nextsong', 'page_frac_roll_advert', 'page_help', 'page_home', 'page_logout', 'page_nextsong', 'page_roll_advert', 'page_save_settings', 'page_settings', 'page_submit_downgrade', 'page_submit_upgrade', 'page_thumbs_down', 'page_thumbs_up', 'std_songs_per_artist', 'std_songs_per_session', 'std_time_per_session', 'tot_length', 'user_agent_Macintosh', 'user_agent_Windows']


Features to keep:

['avg(registration)', 'churn', 'gender', 'location_east_north_central', 'location_east_south_central', 'location_middle_atlantic', 'location_mountain', 'location_new_england', 'location_none', 'location_pacific', 'location_south_atlantic', 'location_west_north_centra

#### Compile and transform features

In [15]:
# Join features together
dataset = churn.join(levels, ['userId'])\
    .join(time_gender, ['userId'])\
    .join(per_session, ['userId'])\
    .join(agents, ['userId'])\
    .join(pages, ['userId'])\
    .join(locations, ['userId'])

VBox()

In [16]:
# Drop the highly correlated features
cols_to_keep = [
    'userId', 'churn', 'level', 'time_since_regi', 'gender', 'std_time_per_session', 
    'user_agent_Macintosh', 'user_agent_Windows', 'user_agent_X11', 
    'user_agent_compatible', 'user_agent_iPad', 'user_agent_iPhone', 'page_about', 
    'page_error', 'page_roll_advert', 'page_save_settings', 'page_submit_downgrade', 
    'page_submit_upgrade', 'page_thumbs_down', 'page_thumbs_up', 'page_frac_about', 
    'page_frac_add_friend', 'page_frac_add_to_playlist', 'page_frac_error', 
    'page_frac_help', 'page_frac_home', 'page_frac_logout', 'page_frac_nextsong', 
    'page_frac_roll_advert', 'page_frac_save_settings', 'page_frac_settings', 
    'page_frac_submit_downgrade', 'page_frac_submit_upgrade', 'page_frac_thumbs_down', 
    'page_frac_thumbs_up', 'location_east_north_central', 'location_east_south_central', 
    'location_middle_atlantic', 'location_mountain', 'location_new_england', 
    'location_pacific', 'location_south_atlantic', 'location_west_north_central', 
    'location_west_south_central', 'page_up_down_ratio'
]
dataset = dataset.select(cols_to_keep)

VBox()

In [17]:
# Feature transformation
to_sqrt = ['std_time_per_session']
to_log = [
    'page_about', 'page_error', 'page_roll_advert', 'page_save_settings', 
    'page_thumbs_down', 'page_frac_about', 'page_frac_add_friend', 
    'page_frac_add_to_playlist', 'page_frac_error', 'page_frac_help', 
    'page_frac_home', 'page_frac_logout', 'page_frac_roll_advert', 
    'page_frac_save_settings', 'page_frac_settings', 
    'page_frac_submit_downgrade', 'page_frac_submit_upgrade', 
    'page_frac_thumbs_down', 'page_settings', 'page_thumbs_up',
    'page_up_down_ratio'
]
col_names = [
    coln for coln in dataset.columns if 
    ('churn' not in coln) and 
    ('level' not in coln) and
    ('userId' not in coln) and
    ('gender' not in coln) and
    ('user_agent_' not in coln) and
    ('location_' not in coln) and
    ('_submit_' not in coln)
]
for col_name in col_names:
    if col_name in to_sqrt:
        dataset = dataset.withColumn(col_name, sqrt(dataset[col_name]+1))
    elif col_name in to_log:
        dataset = dataset.withColumn(col_name, log(dataset[col_name]+1))

VBox()

#### Sanity check

In [4]:
print("Total number of rows in the dataset: {}.".format(dataset.count()))

VBox()

Total number of rows in the dataset: 22278.

In [5]:
dataset.columns

VBox()

['churn', 'gender', 'level', 'location_east_north_central', 'location_east_south_central', 'location_middle_atlantic', 'location_mountain', 'location_new_england', 'location_pacific', 'location_south_atlantic', 'location_west_north_central', 'location_west_south_central', 'page_about', 'page_error', 'page_frac_about', 'page_frac_add_friend', 'page_frac_add_to_playlist', 'page_frac_error', 'page_frac_help', 'page_frac_home', 'page_frac_logout', 'page_frac_nextsong', 'page_frac_roll_advert', 'page_frac_save_settings', 'page_frac_settings', 'page_frac_submit_downgrade', 'page_frac_submit_upgrade', 'page_frac_thumbs_down', 'page_frac_thumbs_up', 'page_roll_advert', 'page_save_settings', 'page_submit_downgrade', 'page_submit_upgrade', 'page_thumbs_down', 'page_thumbs_up', 'page_up_down_ratio', 'std_time_per_session', 'time_since_regi', 'userId', 'user_agent_Macintosh', 'user_agent_Windows', 'user_agent_X11', 'user_agent_compatible', 'user_agent_iPad', 'user_agent_iPhone']

In [6]:
# Examine the number of missing values in each column
for coln in dataset.columns:
    missing_count = dataset.filter((isnan(dataset[coln])) | (dataset[coln].isNull()) | (dataset[coln] == "")).count()
    print("Column {} has {} missing values.".format(coln, missing_count))

VBox()

Column churn has 0 missing values.
Column gender has 0 missing values.
Column level has 0 missing values.
Column location_east_north_central has 0 missing values.
Column location_east_south_central has 0 missing values.
Column location_middle_atlantic has 0 missing values.
Column location_mountain has 0 missing values.
Column location_new_england has 0 missing values.
Column location_pacific has 0 missing values.
Column location_south_atlantic has 0 missing values.
Column location_west_north_central has 0 missing values.
Column location_west_south_central has 0 missing values.
Column page_about has 0 missing values.
Column page_error has 0 missing values.
Column page_frac_about has 0 missing values.
Column page_frac_add_friend has 0 missing values.
Column page_frac_add_to_playlist has 0 missing values.
Column page_frac_error has 0 missing values.
Column page_frac_help has 0 missing values.
Column page_frac_home has 0 missing values.
Column page_frac_logout has 0 missing values.
Column 

In [7]:
dataset.printSchema()

VBox()

root
 |-- churn: long (nullable = true)
 |-- gender: long (nullable = true)
 |-- level: long (nullable = true)
 |-- location_east_north_central: long (nullable = true)
 |-- location_east_south_central: long (nullable = true)
 |-- location_middle_atlantic: long (nullable = true)
 |-- location_mountain: long (nullable = true)
 |-- location_new_england: long (nullable = true)
 |-- location_pacific: long (nullable = true)
 |-- location_south_atlantic: long (nullable = true)
 |-- location_west_north_central: long (nullable = true)
 |-- location_west_south_central: long (nullable = true)
 |-- page_about: double (nullable = true)
 |-- page_error: double (nullable = true)
 |-- page_frac_about: double (nullable = true)
 |-- page_frac_add_friend: double (nullable = true)
 |-- page_frac_add_to_playlist: double (nullable = true)
 |-- page_frac_error: double (nullable = true)
 |-- page_frac_help: double (nullable = true)
 |-- page_frac_home: double (nullable = true)
 |-- page_frac_logout: double (n

### Save processed data

In [18]:
path = "s3://aws-emr-resources-986775742469-us-east-2/processed.json"
# dataset.write.format('json').save(path)
dataset.write.format('json').mode('overwrite').save(path)

VBox()

In [None]:
# Other syntax found online
# df.write.parquet("s3a://bucket-name/shri/test.parquet",mode="overwrite")

# dataset.write.format("org.apache.spark.sql.json")\
#     .mode(SaveMode.Append).save("hdfs://localhost:9000/sampletext.txt");

# df.write.mode('append').json(yourtargetpath)

## 2. Machine learning

In [2]:
# Create spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

VBox()

In [3]:
# Read in processed data
path = "s3://aws-emr-resources-986775742469-us-east-2/processed.json"
dataset = spark.read.json(path)
dataset.head()

VBox()

Row(churn=1, gender=0, level=1, location_east_north_central=0, location_east_south_central=0, location_middle_atlantic=0, location_mountain=0, location_new_england=0, location_pacific=0, location_south_atlantic=0, location_west_north_central=0, location_west_south_central=1, page_about=0.0, page_error=0.0, page_frac_about=0.0, page_frac_add_friend=0.030562650410166668, page_frac_add_to_playlist=0.013698844358161927, page_frac_error=0.0, page_frac_help=0.0068728792877620504, page_frac_home=0.04054609439435001, page_frac_logout=0.01709443335930004, page_frac_nextsong=0.8241379310344827, page_frac_roll_advert=0.0, page_frac_save_settings=0.0034423441909726986, page_frac_settings=0.0034423441909726986, page_frac_submit_downgrade=0.0, page_frac_submit_upgrade=0.0, page_frac_thumbs_down=0.013698844358161927, page_frac_thumbs_up=0.04482758620689655, page_roll_advert=0.0, page_save_settings=0.6931471805599453, page_submit_downgrade=0, page_submit_upgrade=0, page_thumbs_down=1.6094379124341003,

### Train-test split

In [4]:
# Rename churn column into label
dataset = dataset.withColumn('label', dataset['churn'].cast('float')).drop('churn') #important to have float type

# Feature columns to be converted into vector
feature_cols = dataset.drop('label').drop('userId').columns

VBox()

In [5]:
# Train-test split
train, test = dataset.drop('userId').randomSplit([0.8, 0.2], seed=42)

VBox()

### Pipeline and functions

In [6]:
def buildCV(classifier, paramGrid):
    '''
    Build a cross validation pipeline
    
    INPUT
    classifier: untrained machine learning classifier
    paramGrid: a grid of parameters to search over
    
    OUTPUT
    crossval: cross validator
    '''
    # Configure an ML pipeline
    assembler = VectorAssembler(inputCols=feature_cols, outputCol="rawFeatures")
    scaler = MaxAbsScaler(inputCol="rawFeatures", outputCol="scaledFeatures")
#     scaler = MinMaxScaler(inputCol="rawFeatures", outputCol="scaledFeatures")
#     scaler = StandardScaler(inputCol="rawFeatures", outputCol="scaledFeatures", withStd=True, withMean=True)
    pipeline = Pipeline(stages=[assembler, scaler, classifier])

    # Cross validation
    crossval = CrossValidator(
        estimator=pipeline,
        estimatorParamMaps=paramGrid,
        evaluator=MulticlassClassificationEvaluator(metricName='f1'),
        numFolds=3
    )
    return crossval

VBox()

In [7]:
def trainModel(classifier, train, paramGrid):
    '''
    Train the machine learning model
    
    INPUT
    classifier: untrained machine learning classifier
    paramGrid: a grid of parameters to search over
    train (Spark dataframe): training dataset
    
    OUTPUT
    model: trained machine learning model
    training_time (float): training time
    '''
    crossval = buildCV(classifier, paramGrid)
    start = time()
    model = crossval.fit(train)
    end = time()
    training_time = end - start
    return model, training_time

VBox()

In [8]:
def evaluateModel(model, data, prob=False):
    '''
    Evaluate model performance
    
    INPUT
    model: trained machine learning model
    data (Spark dataframe): either training set or testing set
    
    OUTPUT
    evalMetrics (dict): disctionary of evaluation metrics
    '''
    # Make prediction
    start = time()
    pred = model.transform(data)
    if prob:
        evaluator = MulticlassClassificationEvaluator(predictionCol="probability", labelCol="label")
    else:
        evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label")
    end = time()
    prediction_time = end - start
    
    # Calculate metrics
    evalMetrics = {}
    evalMetrics["precision"] = evaluator.evaluate(pred, {evaluator.metricName: "weightedPrecision"})
    evalMetrics["recall"] = evaluator.evaluate(pred, {evaluator.metricName: "weightedRecall"})
    evalMetrics["f1"] = evaluator.evaluate(pred, {evaluator.metricName: "f1"})
    evalMetrics["accuracy"] = evaluator.evaluate(pred, {evaluator.metricName: "accuracy"})
    evalMetrics['confusion_matrix'] = pred.groupby("label").pivot("prediction").count()
    evalMetrics['prediction_time'] = prediction_time
    
    return evalMetrics

def evaluateTrainTest(model, train, test, training_time):
    '''
    Evaluate model performance on both training and testing sets
    
    INPUT
    model: trained machine learning model
    train: training set
    test: testing set
    training_time (float): training time
    
    OUTPUT
    evalMetrics (dict): disctionary of evaluation metrics, 
        compiled from training, testing metrics
    summary (Spark dataframe): table of evaluation metrics
    '''
    # Evaluate model performance
    evalMetricsTraining = evaluateModel(model, train)
    evalMetricsTesting = evaluateModel(model, test)

    # Compile metrics
    evalMetrics = {}
    evalMetrics['train_time'] = training_time
    evalMetrics['f1_train'] = evalMetricsTraining['f1']
    evalMetrics['acc_train'] = evalMetricsTraining['accuracy']
    evalMetrics['f1_test'] = evalMetricsTesting['f1']
    evalMetrics['acc_test'] = evalMetricsTesting['accuracy']
    evalMetrics['pred_time'] = evalMetricsTraining['prediction_time'] \
        + evalMetricsTesting['prediction_time']

    # Summarize metrics into a Spark dataframe
    metrics_to_display = {
        k:round(v, 4) for k,v in evalMetrics.items() if ('confusion_matrix' not in k)
    }
    summary = spark.createDataFrame([list(metrics_to_display.values())], list(metrics_to_display.keys()))
    
    return evalMetrics, summary

VBox()

In [9]:
# Lump all steps together
def trainAndEval(classifier, train, test, paramGrid):
    '''
    Train and evaluate model performance on both training and testing sets
    
    INPUT
    classifier: untrained machine learning classifier
    train: training set
    test: testing set
    paramGrid: a grid of parameters to search over
    
    OUTPUT
    evalMetrics (dict): disctionary of evaluation metrics, 
        compiled from training, testing metrics
    summary (Spark dataframe): table of evaluation metrics
    model: trained machine learning model
    '''
    # Train the model
    model, training_time = trainModel(classifier, train, paramGrid)
    
    # Evaluate model performance
    evalMetricsTraining = evaluateModel(model, train)
    evalMetricsTesting = evaluateModel(model, test)

    # Compile metrics
    evalMetrics = {}
    evalMetrics['train_time'] = training_time
    evalMetrics['f1_train'] = evalMetricsTraining['f1']
    evalMetrics['acc_train'] = evalMetricsTraining['accuracy']
    evalMetrics['f1_test'] = evalMetricsTesting['f1']
    evalMetrics['acc_test'] = evalMetricsTesting['accuracy']
    evalMetrics['pred_time'] = evalMetricsTraining['prediction_time'] \
        + evalMetricsTesting['prediction_time']

    # Summarize metrics into a Spark dataframe
    metrics_to_display = {
        k:round(v, 4) for k,v in evalMetrics.items() if ('confusion_matrix' not in k)
    }
    summary = spark.createDataFrame([list(metrics_to_display.values())], list(metrics_to_display.keys()))
    
    return evalMetrics, summary, model

VBox()

### Hyperparameter tuning

In [29]:
# Check later
# spark.executor.heartbeatInterval

VBox()

#### Grid search

In [12]:
# Classifier
classifier = GBTClassifier(labelCol="label", featuresCol="scaledFeatures")

# Construct a grid of parameters to search over
# paramGrid = ParamGridBuilder()\
#     .addGrid(classifier.maxIter, [20, 50, 100, 200])
#     .addGrid(classifier.regParam, [0.1, 0.01]) \
#     .addGrid(classifier.fitIntercept, [False, True])\
#     .addGrid(classifier.elasticNetParam, [0.0, 0.5, 1.0])\
#     .build()

# paramGrid = ParamGridBuilder() \
#     .addGrid(classifier.maxDepth,[5, 10]) \
#     .addGrid(classifier.numTrees, [20, 50, 100, 200]) \
#     .addGrid(classifier.minInstancesPerNode, [1, 5, 10, 50]) \
#     .addGrid(classifier.subsamplingRate, [0.4, 0.7, 1.0]) \
#     .build()

# paramGrid = ParamGridBuilder()\
#     .addGrid(classifier.featureSubsetStrategy, ['auto', 'all', 'onethird', 'sqrt'])\
#     .addGrid(classifier.maxDepth,[5, 10, 20])\
#     .addGrid(classifier.maxIter, [20, 50, 100])\
#     .addGrid(classifier.minInstancesPerNode, [1, 5, 10, 50])\
#     .addGrid(classifier.subsamplingRate, [0.4, 0.7, 1.0])\
#     .build()

paramGrid = ParamGridBuilder()\
    .addGrid(classifier.maxDepth,[5, 10])\
    .addGrid(classifier.maxIter, [20, 50])\
    .build()

# Train the model
model, training_time = trainModel(classifier, train, paramGrid)

# Evaluate model performance
evalMetrics, summary = evaluateTrainTest(model, train, test, training_time)

# Show metrics
print("Best model:")
summary.drop('train_time').drop('pred_time').show()

VBox()

Best model:
+-------+---------+--------+--------+
|f1_test|acc_train|acc_test|f1_train|
+-------+---------+--------+--------+
| 0.8229|   0.8515|  0.8387|  0.8362|
+-------+---------+--------+--------+

In [13]:
# Best model and parameters
bestModel = model.bestModel
bestCLModel = bestModel.stages[2]
bestParams = bestCLModel.extractParamMap()

# Feature importance
feature_ind = bestCLModel.featureImportances.indices.tolist()
feature_name = [feature_cols[ind] for ind in feature_ind]
feature_coef = bestCLModel.featureImportances.values.tolist()
print(feature_name)
print()
print(feature_coef)

VBox()

['gender', 'level', 'location_east_north_central', 'location_east_south_central', 'location_middle_atlantic', 'location_mountain', 'location_new_england', 'location_pacific', 'location_south_atlantic', 'location_west_north_central', 'location_west_south_central', 'page_about', 'page_error', 'page_frac_about', 'page_frac_add_friend', 'page_frac_add_to_playlist', 'page_frac_error', 'page_frac_help', 'page_frac_home', 'page_frac_logout', 'page_frac_nextsong', 'page_frac_roll_advert', 'page_frac_save_settings', 'page_frac_settings', 'page_frac_submit_downgrade', 'page_frac_submit_upgrade', 'page_frac_thumbs_down', 'page_frac_thumbs_up', 'page_roll_advert', 'page_save_settings', 'page_submit_upgrade', 'page_thumbs_down', 'page_thumbs_up', 'page_up_down_ratio', 'std_time_per_session', 'time_since_regi', 'user_agent_Macintosh', 'user_agent_Windows', 'user_agent_X11', 'user_agent_compatible', 'user_agent_iPad']
()
[0.00011934404163344246, 0.05998992893742344, 0.00826396386802028, 0.00132816504