# **Part 1: Preparation** 

## 1.1_Configure workspaces

In [1]:
# import libraries
from pyspark.sql import SparkSession
# Create spark session
spark = SparkSession \
        .builder \
        .appName("Sparkify") \
        .getOrCreate()

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
8,application_1638450968632_0009,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
%%configure -f
{
    "conf": {
              "spark.driver.memory": "10000M",
              "spark.pyspark.python": "python3",
              "spark.pyspark.virtualenv.enabled": "true",
              "spark.pyspark.virtualenv.type":"native",
              "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv"
    }
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
9,application_1638450968632_0010,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
9,application_1638450968632_0010,pyspark,idle,Link,Link,✔


In [3]:
spark.sparkContext.getConf().get('spark.driver.memory')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'10000M'

## 1.2_import packages

In [4]:
sc.install_pypi_package("pandas==0.25.1")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting pandas==0.25.1
  Using cached pandas-0.25.1-cp36-cp36m-manylinux1_x86_64.whl (10.5 MB)
Collecting python-dateutil>=2.6.1
  Using cached python_dateutil-2.8.2-py2.py3-none-any.whl (247 kB)
Installing collected packages: python-dateutil, pandas
Successfully installed pandas-0.25.1 python-dateutil-2.8.2

In [5]:
sc.install_pypi_package("matplotlib==3.2.2")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting matplotlib==3.2.2
  Using cached matplotlib-3.2.2-cp36-cp36m-manylinux1_x86_64.whl (12.4 MB)
Collecting cycler>=0.10
  Using cached cycler-0.11.0-py3-none-any.whl (6.4 kB)
Collecting kiwisolver>=1.0.1
  Using cached kiwisolver-1.3.1-cp36-cp36m-manylinux1_x86_64.whl (1.1 MB)
Collecting pyparsing!=2.0.4,!=2.1.2,!=2.1.6,>=2.0.1
  Using cached pyparsing-3.0.6-py3-none-any.whl (97 kB)
Installing collected packages: pyparsing, kiwisolver, cycler, matplotlib
Successfully installed cycler-0.11.0 kiwisolver-1.3.1 matplotlib-3.2.2 pyparsing-3.0.6

In [6]:
import numpy as np
import pandas as pd
# import seaborn as sns
import matplotlib.pyplot as plt

from time import time

from pyspark.sql import Window
from pyspark.sql.functions import udf, col, desc, when, lit
from pyspark.sql.functions import min as Fmin, max as Fmax, sum as Fsum, avg, count as Fcount, first, last
from pyspark.sql.types import IntegerType, DoubleType, StringType

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, MaxAbsScaler, \
                               StandardScaler, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, \
                                  MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 1.3_load full dataset

In [7]:
# Read in full sparkify dataset
event_data = "s3n://udacity-dsnd/sparkify/sparkify_event_data.json"
df = spark.read.json(event_data)
df.show(n = 3)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+-------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page| registration|sessionId|                song|status|           ts|           userAgent| userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+-------+
|  Popol Vuh|Logged In|    Shlok|     M|          278| Johnson|524.32934| paid|Dallas-Fort Worth...|   PUT|NextSong|1533734541000|    22683|Ich mache einen S...|   200|1538352001000|"Mozilla/5.0 (Win...|1749042|
|Los Bunkers|Logged In|  Vianney|     F|            9|  Miller|238.39302| paid|San Francisco-Oak...|   PUT|NextSong|1537500318000|    20836|         MiÃ

In [8]:
df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

In [9]:
# Re-order columns in df
df = df.select('userId', 'registration', 'firstName', 'lastName', 'gender', 
               'location', 'userAgent', 'level', 'ts', 'sessionId', 
               'page', 'auth', 'itemInSession', 'method', 'status',
               'song', 'length', 'artist')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
df.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(userId='1749042', registration=1533734541000, firstName='Shlok', lastName='Johnson', gender='M', location='Dallas-Fort Worth-Arlington, TX', userAgent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', level='paid', ts=1538352001000, sessionId=22683, page='NextSong', auth='Logged In', itemInSession=278, method='PUT', status=200, song='Ich mache einen Spiegel - Dream Part 4', length=524.32934, artist='Popol Vuh')

# **Part 2: Data Preprocessing**

## 2.1_data cleaning

In [11]:
# Keep the logs that have the non-empty strings and were recorded before December.
df_clean = df.filter( (df["userId"] != "") & (df["ts"] < 1543622400000) )

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
num_user = df_clean.select("userId").dropDuplicates().count()

print(f'After cleaning, {df_clean.count()} logs left')
print(f'After cleaning, {num_user} unique users left')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

After cleaning, 26259197 logs left
After cleaning, 22278 unique users left

## 2.2_data wrangling

In [12]:
# function for extreacting the OS string
# These code can't run with pyspark==2.4.3

# get_agent_operation_system = udf(lambda x: x.split('(')[1].replace(";", " ").split(" ")[0])

# # Replace the original 'userAgent' column with a column having the OS info only
# df_clean_wrangle = df_clean.withColumn("userAgent", get_agent_operation_system("userAgent"))
# df_clean_wrangle = df_clean_wrangle.withColumnRenamed("userAgent", "userAgent_OS")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
df_clean_wrangle = df_clean

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 2.3_one-hot encoding_1 (churn, gender, last level)

In [14]:
# Define churned users who submitted the confirmation of cancellation.
# 0 and 1 represent non-churned users and churned users respectively.
churn_user_list = df_clean.filter(df_clean.page == "Cancellation Confirmation").select("userId").dropDuplicates()
churn_user_list = [ churn_user['userId'] for churn_user in churn_user_list.collect() ]

get_churn_user = udf(lambda x: 1 if x in churn_user_list else 0, IntegerType())

df_one_hot = df_clean_wrangle.withColumn("churn_bool", get_churn_user('userId'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
# One-hot encode the gender column.
# 0 and 1 represent female and male respectively.
get_gender = udf(lambda x: 1.0 if x == 'M' else 0.0, DoubleType())

df_one_hot = df_one_hot.withColumn("gender", get_gender(df_one_hot.gender))

df_one_hot = df_one_hot.withColumnRenamed("gender", "gender_bool")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
# One-hot encode users’ last subscription level within the observation period.
# 0 and 1 represent the free plan and the paid plan respectively.
window_user = Window.partitionBy('userId').orderBy('ts').rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)
get_level = udf(lambda x: 1.0 if x == 'paid' else 0.0, DoubleType())

df_one_hot = df_one_hot.withColumn("level", get_level(last('level').over(window_user)))
df_one_hot = df_one_hot.withColumnRenamed("level", "last_level_bool")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 2.4_proportion of churned users and non-churned users

In [17]:
df_count_proportion = df_one_hot.select("userId", "churn_bool").dropDuplicates()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
num_churn = df_count_proportion.filter( df_count_proportion.churn_bool == 1 ).count()
num_non_churn = num_user - num_churn

proportion_churn = num_churn / num_user
proportion_non_churn = num_non_churn / num_user

print(f'Number of churned users: {num_churn}')
print(f'Number of non-churned users: {num_non_churn}')
print(f'Propostion of churned users: {proportion_churn:.3f}')
print(f'Propostion of non-churned users: {proportion_non_churn:.3f}')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Number of churned users: 5003
Number of non-churned users: 17275
Propostion of churned users: 0.225
Propostion of non-churned users: 0.775

## 2.5_Feature Engineering

In [17]:
# Default observation start and end timestamps 
# for users who have registered before October 1 and had not churned
observation_start_default = 1538352000000 # 01/10/2018 00:00:00
observation_end_default = 1543622400000 # 30/11/2018 23:59:59

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
# Obtain the observation window of users and the corresponding duration in days.
window_user = Window.partitionBy('userId').orderBy('ts').rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df_one_hot = df_one_hot.withColumn("observation_start", \
                                   when(df_one_hot.registration > observation_start_default, df_one_hot.registration)\
                                   .otherwise(observation_start_default))

df_one_hot = df_one_hot.withColumn("last_log_ts", last(col('ts')).over(window_user))

df_one_hot = df_one_hot.withColumn("observation_end", \
                                   when(df_one_hot.churn_bool == 1, df_one_hot.last_log_ts)\
                                   .otherwise(observation_end_default))

df_one_hot = df_one_hot.withColumn("observation_duration_day", \
                                   (df_one_hot.observation_end - df_one_hot.observation_start) / 1000 / 3600 / 24)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
# Aggregation by userId
df_aggregation = \
df_one_hot\
.groupby('userId')\
.agg(
      # Aggregated user-level features
      first(col('churn_bool')).alias('label'),
      first(col('gender_bool')).alias('gender_bool'),
      first(col('last_level_bool')).alias('last_level_bool'),
      first(col('observation_duration_day')).alias('observation_duration_day'),
      
      # Aggregated statistics about interactions
      Fcount(col('page')).alias('interaction_num'),
      Fsum(when(col('page') == "NextSong", 1).otherwise(0)).alias("song_num"),
      Fsum(when(col('page') == "Thumbs Up", 1).otherwise(0)).alias("thump_up_num"),
      Fsum(when(col('page') == "Thumbs Down", 1).otherwise(0)).alias("thump_down_num"),
      Fsum(when(col('page') == "Add Friend", 1).otherwise(0)).alias("add_friend_num"),
      Fsum(when(col('page') == "Add to Playlist", 1).otherwise(0)).alias("add_playlist_num"),
      Fsum(when(col('page') == "Submit Downgrade", 1).otherwise(0)).alias("submit_downgrade_num"),
      Fsum(when(col('page') == "Submit Upgrade", 1).otherwise(0)).alias("submit_upgrade_num"),
      Fsum(when(col('page') == "Home", 1).otherwise(0)).alias("home_per_num"),
      Fsum(when(col('page') == "Roll Advert", 1).otherwise(0)).alias("advertisement_num"),
      Fsum(when(col('page') == "Help", 1).otherwise(0)).alias("help_num"),
      Fsum(when(col('page') == "Settings", 1).otherwise(0)).alias("setting_num"),
      Fsum(when(col('page') == "Error", 1).otherwise(0)).alias("error_num"),
      
      # Aggregated statistics about interactions within a particular period
      Fsum(when(col('ts') < col('observation_start') + (14 * 24 * 3600 * 1000), 1).otherwise(0)).alias("interaction_num_first_14_day"),
      Fsum(when(col('ts') > col('observation_end') - (14 * 24 * 3600 * 1000), 1).otherwise(0)).alias("interaction_num_last_14_day"),
      Fsum(when( (col('page') == "NextSong") & (col('ts') < col('observation_start') + (14 * 24 * 3600 * 1000)), 1 ).otherwise(0)).alias("song_num_first_14_day"),
      Fsum(when( (col('page') == "NextSong") & (col('ts') > col('observation_end') - (14 * 24 * 3600 * 1000)), 1 ).otherwise(0)).alias("song_num_last_14_day"),
     
      # Aggregated statistics of users' "age"
      ((Fmax('ts') - first('registration')) / 1000 / 3600 / 24).alias("day_since_registration")
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
# Obtain features for the model training
# Convert the above aggregated statistics into specific formats (per unit of time, ratio etc.)
df_engineered_feature = \
df_aggregation\
.withColumn('interaction_per_day', col('interaction_num') / col('observation_duration_day'))\
.withColumn('song_per_day', col('song_num') / col('observation_duration_day'))\
.withColumn('thump_up_per_day', col('thump_up_num') / col('observation_duration_day'))\
.withColumn('thump_down_per_day', col('thump_down_num') / col('observation_duration_day'))\
.withColumn('add_friend_per_day', col('add_friend_num') / col('observation_duration_day'))\
.withColumn('add_playlist_per_day', col('add_playlist_num') / col('observation_duration_day'))\
.withColumn('home_per_day', col('home_per_num') / col('observation_duration_day'))\
.withColumn('advertisement_per_day', col('advertisement_num') / col('observation_duration_day'))\
.withColumn('help_per_day', col('help_num') / col('observation_duration_day'))\
.withColumn('setting_per_day', col('setting_num') / col('observation_duration_day'))\
.withColumn('error_per_day', col('error_num') / col('observation_duration_day'))\
.withColumn('song_ratio', col('song_num') / col('interaction_num'))\
.withColumn( 'positive_ratio', (col('thump_up_num') + col('add_friend_num') + col('add_playlist_num')) / col('interaction_num') )\
.withColumn( 'negative_ratio', (col('thump_down_num') + col('help_num') + col('error_num')) / col('interaction_num') )\
.withColumn( 'thumb_up_down_ratio', col('thump_up_num') / (col('thump_down_num') + 0.0001) )\
.withColumn('plan_change_num', col('submit_upgrade_num') + col('submit_downgrade_num'))\
.withColumn( 'trend_interaction', (col('interaction_num_last_14_day') - col('interaction_num_first_14_day')) / 14 / col('observation_duration_day') )\
.withColumn( 'trend_song', (col('song_num_last_14_day') - col('song_num_first_14_day')) / 14 / col('observation_duration_day') )

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
# Filter out the intermediate information
df_engineered_feature = \
df_engineered_feature.select(df_engineered_feature.columns[0:4] + df_engineered_feature.columns[22:])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
# Obtain features for the model training
# Calculation of user's average item in a session and average session length
interaction_pattern_1 = \
df.groupby(['userId', 'sessionId'])\
  .agg(
       Fmax(col('itemInSession')).alias('item_in_session'), 
       ( Fmax(col('ts')) - Fmin(col('ts')) ).alias('session_length')     
  )\
 .groupby('userId')\
  .agg(
       avg(col('item_in_session')).alias('average_session_item'),
       avg(col('session_length')).alias('average_session_length')
  )

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
# Obtain features for the model training
# Calculation of user's average number of songs played between two consecutive home visits
windowhome = (Window.partitionBy('userId').orderBy('ts').rangeBetween(Window.unboundedPreceding, 0))

df_dummy = df.withColumn("home_phase", Fsum(when(df.page == "Home", 1).otherwise(0)).over(windowhome))

interaction_pattern_2 = \
df_dummy.groupby(['userId', 'home_phase'])\
        .agg(
             Fsum(when(col('page') == "NextSong", 1).otherwise(0)).alias('song_between_home')
        )\
        .groupby('userId')\
          .agg(
               avg(col('song_between_home')).alias('average_song')
          )

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
# Join the dataframe of engineered features together
df_engineered_feature = \
df_engineered_feature\
.join(interaction_pattern_1, on = 'userId')\
.join(interaction_pattern_2, on = 'userId')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
df_engineered_feature.show(n = 3)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----+-----------+---------------+----------------------+-------------------+------------------+-------------------+-------------------+-------------------+--------------------+------------------+---------------------+-------------------+-------------------+-------------------+------------------+-------------------+--------------------+-------------------+---------------+--------------------+--------------------+--------------------+----------------------+------------------+
| userId|label|gender_bool|last_level_bool|day_since_registration|interaction_per_day|      song_per_day|   thump_up_per_day| thump_down_per_day| add_friend_per_day|add_playlist_per_day|      home_per_day|advertisement_per_day|       help_per_day|    setting_per_day|      error_per_day|        song_ratio|     positive_ratio|      negative_ratio|thumb_up_down_ratio|plan_change_num|   trend_interaction|          trend_song|average_session_item|average_session_length|      average_song|
+-------+-----+-------

## 2.6_one-hot encoding_2 (userAgent_OS)

In [26]:
df_engineered_feature_3 = df_engineered_feature

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 2.7_check & drop highly correlated features

In [27]:
# Convert the spark dataframe having engineered features to pandas dataframe
df_pd_engineered_feature = df_engineered_feature_3.toPandas()
df_pd_engineered_feature

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

        userId  label  ...  average_session_length  average_song
0      1000280      1  ...            1.169436e+07     22.711111
1      1002185      0  ...            2.608435e+07     29.147541
2      1017805      0  ...            2.113667e+07     13.888889
3      1030587      0  ...            3.294364e+07     22.646154
4      1033297      0  ...            1.182100e+07     14.750000
...        ...    ...  ...                     ...           ...
22273  1930015      0  ...            1.837746e+07     22.023256
22274  1955029      0  ...            1.220356e+07     22.850000
22275  1960992      0  ...            5.644750e+06     15.500000
22276  1977992      0  ...            3.913200e+07     17.200000
22277  1981978      1  ...            5.799000e+06     11.692308

[22278 rows x 26 columns]

In [28]:
# Determine the features to be removed (correlation >= 0.7)
column_to_remove = []
counter = 0

# Calculate the correlation between engineered features
correlation_matrix = df_pd_engineered_feature.drop(['userId', 'label'], axis = 1).corr()
columns = correlation_matrix.columns.tolist()
columns.reverse()

for coln in columns:

  counter += 1

  if abs(correlation_matrix[coln].iloc[:-counter]).max() >= 0.7:
      column_to_remove.append(coln)  

print(f"Highly correlated features that should be removed:\n\n{column_to_remove}\n\n")

column_to_keep = df_pd_engineered_feature.columns.drop(column_to_remove).tolist()
print(f"Features to keep:\n\n{column_to_keep}")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Highly correlated features that should be removed:

['average_session_length', 'trend_song', 'help_per_day', 'home_per_day', 'add_playlist_per_day', 'add_friend_per_day', 'thump_up_per_day']


Features to keep:

['userId', 'label', 'gender_bool', 'last_level_bool', 'day_since_registration', 'interaction_per_day', 'song_per_day', 'thump_down_per_day', 'advertisement_per_day', 'setting_per_day', 'error_per_day', 'song_ratio', 'positive_ratio', 'negative_ratio', 'thumb_up_down_ratio', 'plan_change_num', 'trend_interaction', 'average_session_item', 'average_song']

In [29]:
# a user did not use the service at all after registration
# it can be just removed due to the null values in engineered feature columns
print(df_pd_engineered_feature[df_pd_engineered_feature.isnull().any(axis = 1)])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

       userId  label  ...  average_session_length  average_song
2410  1261737      0  ...            3.272018e+08           0.0

[1 rows x 26 columns]

In [30]:
df_pd_engineered_feature.shape

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(22278, 26)

In [31]:
df_pd_engineered_feature = df_pd_engineered_feature.dropna(how = 'any', axis = 0)
df_pd_engineered_feature.shape

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(22277, 26)

# **Part 3: Modeling** 

## 3.1_train-test split

In [32]:
dataset = spark.createDataFrame(df_pd_engineered_feature[column_to_keep])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [33]:
dataset.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- userId: string (nullable = true)
 |-- label: long (nullable = true)
 |-- gender_bool: double (nullable = true)
 |-- last_level_bool: double (nullable = true)
 |-- day_since_registration: double (nullable = true)
 |-- interaction_per_day: double (nullable = true)
 |-- song_per_day: double (nullable = true)
 |-- thump_down_per_day: double (nullable = true)
 |-- advertisement_per_day: double (nullable = true)
 |-- setting_per_day: double (nullable = true)
 |-- error_per_day: double (nullable = true)
 |-- song_ratio: double (nullable = true)
 |-- positive_ratio: double (nullable = true)
 |-- negative_ratio: double (nullable = true)
 |-- thumb_up_down_ratio: double (nullable = true)
 |-- plan_change_num: long (nullable = true)
 |-- trend_interaction: double (nullable = true)
 |-- average_session_item: double (nullable = true)
 |-- average_song: double (nullable = true)

In [60]:
# Train-test split
churn_train, churn_test = \
dataset.drop('userId').filter( dataset.label == 1 ).randomSplit([0.755, 0.245], seed = 40)

non_churn_train, non_churn_test = \
dataset.drop('userId').filter( dataset.label == 0 ).randomSplit([0.75, 0.25], seed = 40)

train_set = \
churn_train.union(non_churn_train)

test_set = \
churn_test.union(non_churn_test)

# Ensure that proportions of churned users in the train set and the test set 
# are similar
proportion_of_churned_train = churn_train.count() / train_set.count()
proportion_of_churned_test = churn_test.count() / test_set.count()

print(f'Number of churned users in train set: {churn_train.count()}', \
      f'\nNumber of churned users in test set: {churn_test.count()}')

print(f'\nNumber of non-churned users in train set: {non_churn_train.count()}', \
      f'\nNumber of non-churned users in test set: {non_churn_test.count()}')

print(f'\nproportion of churned users in the train set: {proportion_of_churned_train:.3f}', \
      f'\nproportion of churned users in the test set: {proportion_of_churned_test:.3f}')

print(f'\nOriginal dataset: {dataset.count()} rows', \
      f'\nTrain set and validation set: {train_set.count()} rows', \
      f'\nTest set: {test_set.count()} rows', \
      f'\nTrain:Test = {train_set.count() / dataset.count():.3f}:{test_set.count() / dataset.count():.3f}')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Number of churned users in train set: 3738 
Number of churned users in test set: 1265

Number of non-churned users in train set: 12955 
Number of non-churned users in test set: 4319

proportion of churned users in the train set: 0.224 
proportion of churned users in the test set: 0.227

Original dataset: 22277 rows 
Train set and validation set: 16693 rows 
Test set: 5584 rows 
Train:Test = 0.749:0.251

## 3.2_pipeline functions [ train_and_eval(), train_model(), build_CV(), evaluate_model() ]

In [61]:
def build_CV(classifier, paramGrid, feature_cols):
    '''
    Build a pipeline with or without the cross validation
    
    INPUT:
      classifier - a untrained machine learning classifier supported by Spark
      paramGrid (list) - a list of hyperparameters to search over
    
    OUTPUT:
      crossval (Pipeline/CrossValidator) - Pipeline or cross validator object
    '''
    # Configure a ML pipeline

    # it vectorizes the input features
    assembler = VectorAssembler(inputCols = feature_cols, \
                                outputCol = "rawFeatures")

    # it standardizes the numerical features
    scaler = StandardScaler(inputCol = "rawFeatures", \
                            outputCol = "scaledFeatures",\
                            withStd = True, \
                            withMean = True)

    # define a pipeline object
    pipeline = Pipeline(stages = [assembler, scaler, classifier])

    if paramGrid[0]:
        # define an object for cross validation
        crossval = CrossValidator( 
                                  estimator = pipeline,
                                  estimatorParamMaps = paramGrid,
                                  evaluator = BinaryClassificationEvaluator(metricName = 'areaUnderPR'),
                                  numFolds = 3,
                                  seed = 40
        )

    else:
        crossval = pipeline

    return crossval

def train_model(classifier, train, paramGrid, feature_cols):
    '''
    Fit pipeline or cross validator that have a machine learning classifier
    
    INPUT:
      classifier - untrained machine learning classifier
      paramGrid (list) - a list of hyperparameters to search over
      train (Spark dataframe) - train set
    
    OUTPUT:
      model (PipelineModel / CrossValidatorModel) - a trained machine learning model
      training_time (float) - training time
    '''
    cross_val = build_CV(classifier, paramGrid, feature_cols) # obtain the object to be trained
    start = time()
    model = cross_val.fit(train) # train the model with the train set
    end = time()
    training_time = end - start

    return model, training_time

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [62]:
def evaluate_model(model, data, prob = False):
    '''
    Evaluate the trained model
    
    INPUT:
      model (PipelineModel / CrossValidatorModel) - a trained machine learning model
      data (Spark dataframe) - training set or testing set
    
    OUTPUT:
      evalMetrics (dict): disctionary of evaluation metrics
    '''
    start = time()
    pred = model.transform(data) # make prediction

    # define evaluators
    if prob:
        evaluator = MulticlassClassificationEvaluator(predictionCol = "probability", labelCol = "label")
    else:
        evaluator = MulticlassClassificationEvaluator(predictionCol = "prediction", labelCol = "label")

    evaluator_areaUnderPR = BinaryClassificationEvaluator(metricName = 'areaUnderPR')

    end = time()
    prediction_time = end - start
    
    # Calculate metrics
    evalMetrics = {}
    evalMetrics["areaUnderPR"] = evaluator_areaUnderPR.evaluate(pred)
    evalMetrics["f1"] = evaluator.evaluate(pred, {evaluator.metricName: "f1"})
    evalMetrics['confusion_matrix'] = pred.groupby("label").pivot("prediction").count()
    evalMetrics['prediction_time'] = prediction_time
    
    return evalMetrics

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [63]:
# lump all steps together
def train_and_eval(classifier, train, test, paramGrid, feature_cols):
    '''
    Train and evaluate model performance on both training and testing sets
    
    INPUT:
      classifier - a untrained machine learning classifier
      train (Spark dataframe) - train set
      test (Spark dataframe) - test set
      paramGrid (list) - a list of hyperparameters to search over
    
    OUTPUT:
      evalMetrics (dict) - dictionary of evaluation metrics, 
      summary (Spark dataframe) - table of evaluation metrics
      model (PipelineModel / CrossValidatorModel) - trained machine learning model
    '''
    # train the model
    model, training_time = train_model(classifier, train, paramGrid, feature_cols)
    
    # evaluate model performance
    eval_Metrics_Training = evaluate_model(model, train)
    eval_Metrics_Testing = evaluate_model(model, test)

    # compile metrics
    evalMetrics = {}
    evalMetrics['PR_train'] = eval_Metrics_Training['areaUnderPR']
    evalMetrics['f1_train'] = eval_Metrics_Training['f1']
    evalMetrics['PR_test'] = eval_Metrics_Testing['areaUnderPR']
    evalMetrics['f1_test'] = eval_Metrics_Testing['f1']

    evalMetrics['confusion_matrix_test'] = eval_Metrics_Testing['confusion_matrix']

    evalMetrics['train_time'] = training_time
    evalMetrics['pred_time'] = eval_Metrics_Training['prediction_time'] \
                               + eval_Metrics_Testing['prediction_time']

    # convert metrics into a Spark dataframe
    metrics_to_display = \
    {
        k:round(v, 4) for k, v in evalMetrics.items() if ('confusion_matrix' not in k)
    }
    
    summary = spark.createDataFrame(pd.DataFrame([metrics_to_display], columns = metrics_to_display.keys()))
    
    return evalMetrics, summary, model

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 3.3_train logistic regression with default hyperparameters

### 3.2.3.0_determine input futures

In [64]:
# Define input feature columns for the model training
feature_cols = dataset.drop('label', 'userId').columns
               
feature_cols

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['gender_bool', 'last_level_bool', 'day_since_registration', 'interaction_per_day', 'song_per_day', 'thump_down_per_day', 'advertisement_per_day', 'setting_per_day', 'error_per_day', 'song_ratio', 'positive_ratio', 'negative_ratio', 'thumb_up_down_ratio', 'plan_change_num', 'trend_interaction', 'average_session_item', 'average_song']

### 3.2.3.1_evaluation functions [ start_train(), print_coefficient_importance() ]

In [65]:
def return_coefficient_importance(model, 
                                 is_coeff = True, 
                                 feature_name = feature_cols):
  '''
  Return the feature coefficients or importance of the trained model
  
  INPUT:
    model (PipelineModel / CrossValidatorModel) - a trained machine learning model
    is_coeff (bool) - tell the function if it can find feature coefficients 
                     from the trained model
  
  OUTPUT:
    df_pd_coefficient_importance (pandas dataframe) - dataframe of feature 
                                                      coefficients or importance 
  '''
  # get the values of feature coefficients or importances
  coeff_or_feature_importance = \
  model.stages[-1].coefficients if is_coeff else model.stages[-1].featureImportances

  # convert the values of feature coefficients or importances into a list
  list_temp = [coeff_or_feature_importance[i] for i in range(len(coeff_or_feature_importance))]

  # convert the list of feature coefficients or importances into pandas dataframe
  df_pd_coefficient_importance = \
  pd.DataFrame({"feature": feature_name, "coefficient": list_temp})\
              .sort_values('coefficient', ascending = False)

  return df_pd_coefficient_importance

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [66]:
def start_train(classifier, 
                text,
                train = train_set,
                test = test_set,
                feature_cols = feature_cols,
                param_grid = ParamGridBuilder().build()):
  '''
  Starter function for training a classifier
  
  INPUT:
    classifier - a untrained machine learning classifier supported by Spark
    text (str) - name of the classifier
    train (Spark dataframe) - train set
    test (Spark dataframe) - test set
    feature_cols (list) - a list of features for training the model
    paramGrid (list) - a list of hyperparameters to search over
  
  OUTPUT:
    best_model - the trained model
    eval_metric (dict) - dictionary of evaluation metrics
    summary (Spark dataframe) - table of evaluation metrics
  '''
  # start training the input classifier
  eval_metric, summary, model = \
  train_and_eval(classifier, train, test, param_grid, feature_cols)

  if param_grid[0]:
      best_model = model.bestModel # save the best model

  else:
      best_model = model

  #sShow metrics
  print(text, ':')
  summary.show()

  return best_model, eval_metric, summary

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 3.2.3.2_train logistic regression

In [67]:
# Train and evaluate the logistic regression
classifier_LR = \
LogisticRegression(labelCol = "label", 
                   featuresCol = "scaledFeatures")

best_model_LR, eval_metric_LR, summary_LR = \
start_train(classifier = classifier_LR,
            text = 'Logistic Regression (initial)')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Logistic Regression (initial) :
+--------+--------+-------+-------+----------+---------+
|PR_train|f1_train|PR_test|f1_test|train_time|pred_time|
+--------+--------+-------+-------+----------+---------+
|  0.7359|   0.845| 0.7339| 0.8418|     8.274|   0.1468|
+--------+--------+-------+-------+----------+---------+

## 3.4_refinement - logistic regression

### 3.4.1_grid search

In [68]:
# Refine the logistic regression model by grid searching
classifier_LR_grid = \
LogisticRegression(labelCol = "label", 
                   featuresCol = "scaledFeatures")

# define hyperparameters to search over
param_grid = \
ParamGridBuilder().addGrid(classifier_LR_grid.regParam, [0, 1e-6, 2e-6, 3e-6, 4e-6])\
                  .addGrid(classifier_LR_grid.elasticNetParam, [0])\
                  .addGrid(classifier_LR_grid.maxIter, [25, 50, 100])\
                  .build()

# train and evaluate the model
best_model_LR_grid, eval_metric_LR_grid, summary_LR_grid = \
start_train(classifier = classifier_LR_grid,
            text = 'Logistic Regression (grid search)',
            param_grid = param_grid)

best_model_stage_3 = best_model_LR_grid.stages[2]
print('Best combination of hyperparameters:')
print('elasticNetParam: ', best_model_stage_3.extractParamMap()[best_model_stage_3.elasticNetParam])
print('regParam: ', best_model_stage_3.extractParamMap()[best_model_stage_3.regParam])
print('maxIter: ', best_model_stage_3.extractParamMap()[best_model_stage_3.maxIter])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Logistic Regression (grid search) :
+--------+--------+-------+-------+----------+---------+
|PR_train|f1_train|PR_test|f1_test|train_time|pred_time|
+--------+--------+-------+-------+----------+---------+
|  0.7357|  0.8452| 0.7336| 0.8412|  218.1774|   0.1658|
+--------+--------+-------+-------+----------+---------+

Best combination of hyperparameters:
elasticNetParam:  0.0
regParam:  2e-06
maxIter:  50

In [69]:
is_L2 = best_model_stage_3.extractParamMap()[best_model_stage_3.elasticNetParam]
reg_param = best_model_stage_3.extractParamMap()[best_model_stage_3.regParam]
max_iter = best_model_stage_3.extractParamMap()[best_model_stage_3.maxIter]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 3.4.2_remove less important features

In [70]:
# display the values of feature coefficients in descending order
df_coeff = return_coefficient_importance(model = best_model_LR_grid, is_coeff = True)
df_coeff

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                   feature  coefficient
3      interaction_per_day    15.932083
6    advertisement_per_day     2.234649
1          last_level_bool     0.571159
5       thump_down_per_day     0.488728
14       trend_interaction     0.265628
7          setting_per_day     0.259764
16            average_song     0.209835
11          negative_ratio     0.068993
12     thumb_up_down_ratio     0.066860
0              gender_bool    -0.014972
15    average_session_item    -0.022972
8            error_per_day    -0.254462
9               song_ratio    -0.336648
10          positive_ratio    -0.506109
2   day_since_registration    -0.636986
13         plan_change_num    -0.639716
4             song_per_day   -14.939554

In [71]:
# display the absolute values of feature coefficients in descending order
df_coeff['coefficient'] = abs(df_coeff['coefficient'])
df_coeff = df_coeff.sort_values(by = ['coefficient'], ascending = False).reset_index(drop = True)
df_coeff

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                   feature  coefficient
0      interaction_per_day    15.932083
1             song_per_day    14.939554
2    advertisement_per_day     2.234649
3          plan_change_num     0.639716
4   day_since_registration     0.636986
5          last_level_bool     0.571159
6           positive_ratio     0.506109
7       thump_down_per_day     0.488728
8               song_ratio     0.336648
9        trend_interaction     0.265628
10         setting_per_day     0.259764
11           error_per_day     0.254462
12            average_song     0.209835
13          negative_ratio     0.068993
14     thumb_up_down_ratio     0.066860
15    average_session_item     0.022972
16             gender_bool     0.014972

In [72]:
# Identify the model having highest PR_test and using less input features
# Then, identify the corresponding input features
classifier_LR_remove = \
LogisticRegression(labelCol = "label", 
                   featuresCol = "scaledFeatures")

# adopt the best combination of hyperparameters found by grid searching
classifier_LR_remove.setParams(elasticNetParam = is_L2, regParam = reg_param, maxIter = max_iter)

feature_col_reduced = feature_cols.copy()
coeff_list = df_coeff['feature'].tolist()

for i in range(len(coeff_list)):

  feature_col_reduced.remove(coeff_list[- 1 - i])

  # Train the model and evaluate performance
  best_model_LR_remove, eval_metric_LR_remove, summary_LR_remove = \
  start_train(classifier = classifier_LR_remove, \
              text = f"Logistic Regression_remove_{i} with {feature_col_reduced}",
              feature_cols = feature_col_reduced)
  
  if len(feature_col_reduced) == 1: break

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Logistic Regression_remove_0 with ['last_level_bool', 'day_since_registration', 'interaction_per_day', 'song_per_day', 'thump_down_per_day', 'advertisement_per_day', 'setting_per_day', 'error_per_day', 'song_ratio', 'positive_ratio', 'negative_ratio', 'thumb_up_down_ratio', 'plan_change_num', 'trend_interaction', 'average_session_item', 'average_song'] :
+--------+--------+-------+-------+----------+---------+
|PR_train|f1_train|PR_test|f1_test|train_time|pred_time|
+--------+--------+-------+-------+----------+---------+
|  0.7355|  0.8454|  0.734| 0.8412|    5.6599|   0.1879|
+--------+--------+-------+-------+----------+---------+

Logistic Regression_remove_1 with ['last_level_bool', 'day_since_registration', 'interaction_per_day', 'song_per_day', 'thump_down_per_day', 'advertisement_per_day', 'setting_per_day', 'error_per_day', 'song_ratio', 'positive_ratio', 'negative_ratio', 'thumb_up_down_ratio', 'plan_change_num', 'trend_interaction', 'average_song'] :
+--------+--------+-----

In [73]:
# best combination of input features
# from the model named Logistic Regression_remove_2
feature_col_reduced =  \
['last_level_bool', 'day_since_registration', 'interaction_per_day', 
 'song_per_day', 'thump_down_per_day', 'advertisement_per_day', 
 'setting_per_day', 'error_per_day', 'song_ratio', 
 'positive_ratio', 'negative_ratio', 'plan_change_num', 
 'trend_interaction', 'average_song']

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [81]:
# Train and evaluate model 
# with the best combination of hyperparameters found by grid search
# without less important features
classifier_LR_remove = \
LogisticRegression(labelCol = "label", 
                   featuresCol = "scaledFeatures")

classifier_LR_remove.setParams(elasticNetParam = is_L2, regParam = reg_param, maxIter = max_iter)

best_model_LR_remove, eval_metric_LR_remove, summary_LR_remove = \
start_train(classifier = classifier_LR_remove,
            text = 'Logistic Regression (remove less important features)',
            feature_cols = feature_col_reduced)

print('The features remain:\n')
feature_col_reduced

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Logistic Regression (remove less important features) :
+--------+--------+-------+-------+----------+---------+
|PR_train|f1_train|PR_test|f1_test|train_time|pred_time|
+--------+--------+-------+-------+----------+---------+
|  0.7356|  0.8453| 0.7341| 0.8422|    4.0678|   0.1336|
+--------+--------+-------+-------+----------+---------+

The features remain:

['last_level_bool', 'day_since_registration', 'interaction_per_day', 'song_per_day', 'thump_down_per_day', 'advertisement_per_day', 'setting_per_day', 'error_per_day', 'song_ratio', 'positive_ratio', 'negative_ratio', 'plan_change_num', 'trend_interaction', 'average_song']

### 3.4.3_weighted classes

This measure did not work well for the full dataset

In [79]:
# Assign different class weights to class 0 and class 1
# cal_weight = udf(lambda x: 0.225 if x == 0 else 0.775, DoubleType())

# train_set_weighted = train_set.withColumn('class_weight', cal_weight('label'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [80]:
# Train and evaluate the model
# with the best combination of hyperparameters found by grid search
# without less important features
# with unequal class weights
# classifier_LR_weighted = \
# LogisticRegression(labelCol = "label", 
#                    featuresCol = "scaledFeatures",
#                    weightCol = 'class_weight')

# classifier_LR_weighted.setParams(elasticNetParam = is_L2, regParam = reg_param, maxIter = max_iter)

# best_model_LR_weighted, eval_metric_LR_weighted, summary_LR_weighted = \
# start_train(classifier = classifier_LR_weighted, \
#             text = 'Logistic Regression (classes with weights)',
#             feature_cols = feature_col_reduced,
#             train = train_set_weighted)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Logistic Regression (classes with weights) :
+--------+--------+-------+-------+----------+---------+
|PR_train|f1_train|PR_test|f1_test|train_time|pred_time|
+--------+--------+-------+-------+----------+---------+
|  0.7287|  0.8295| 0.7259| 0.8273|     4.338|   0.1421|
+--------+--------+-------+-------+----------+---------+

### 3.4.4_threshold

In [84]:
# Find the optimal threshold
train_summary_LR_remove = best_model_LR_remove.stages[-1].summary
f_measure = train_summary_LR_remove.fMeasureByThreshold
max_f_measure = f_measure.groupBy().max('F-Measure').select('max(F-Measure)').head()

best_threshold_LR_remove = \
f_measure.where(f_measure['F-Measure'] == max_f_measure['max(F-Measure)']) \
         .select('threshold').head()['threshold']

print(f'Optimal Thershold: {best_threshold_LR_remove}')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Optimal Thershold: 0.2868668028009616

In [89]:
# Train and evaluate the model 
# with the best combination of hyperparameters found by grid search
# without less important features
# with the optimal threshold
classifier_LR_threshold = \
LogisticRegression(labelCol = "label", 
                   featuresCol = "scaledFeatures",
                   threshold = best_threshold_LR_remove)

classifier_LR_threshold.setParams(elasticNetParam = is_L2, regParam = reg_param, maxIter = max_iter)

best_model_LR_threshold, eval_metric_LR_threshold, summary_LR_threshold = \
start_train(classifier = classifier_LR_threshold, \
            text = 'Logistic Regression (optimal threshold)',
            feature_cols = feature_col_reduced)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Logistic Regression (optimal threshold) :
+--------+--------+-------+-------+----------+---------+
|PR_train|f1_train|PR_test|f1_test|train_time|pred_time|
+--------+--------+-------+-------+----------+---------+
|  0.7356|  0.8463| 0.7341|  0.847|    4.0846|   0.1498|
+--------+--------+-------+-------+----------+---------+

# **Part 4: Final Evaluation (full 12GB dataset)** 

In [90]:
# Define the classifer for final evaluation
classifier_LR_final = \
LogisticRegression(labelCol = "label", 
                   featuresCol = "scaledFeatures")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [91]:
classifier_LR_final.setParams(elasticNetParam = is_L2, regParam = reg_param, maxIter = max_iter)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

LogisticRegression_d9687dc09b7f

In [92]:
# input the more important features only
feature_col_reduced =  \
['last_level_bool', 'day_since_registration', 'interaction_per_day', 
 'song_per_day', 'thump_down_per_day', 'advertisement_per_day', 
 'setting_per_day', 'error_per_day', 'song_ratio', 
 'positive_ratio', 'negative_ratio', 'plan_change_num', 
 'trend_interaction', 'average_song']

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [93]:
# Adopt the optimal threshold for the train set, which is 0.559
classifier_LR_final.setParams(threshold = best_threshold_LR_remove);

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

LogisticRegression_d9687dc09b7f

In [95]:
# Train the model and evaluate performance

best_model_LR_final, eval_metric_LR_final, summary_LR_final = \
start_train(classifier = classifier_LR_final, \
            text = 'Logistic Regression (final)',
            feature_cols = feature_col_reduced)

# confusion matrix
print('Confusion matrix:')
eval_metric_LR_final['confusion_matrix_test'].show()


print('Setting for refinement:\n')

best_model_stage_3 = best_model_LR_final.stages[2]
print('   -Hyperparameters selected by grid searching:')
print('       elasticNetParam: ', best_model_stage_3.extractParamMap()[best_model_stage_3.elasticNetParam])
print('       regParam: ', best_model_stage_3.extractParamMap()[best_model_stage_3.regParam])
print('       maxIter: ', best_model_stage_3.extractParamMap()[best_model_stage_3.maxIter], '\n')

print(f'   -Optimal Threshold: {best_threshold_LR_remove:.3f}', '\n')

print('   -The features remain:')
for i in feature_col_reduced:
  print(f'      {i}')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Logistic Regression (final) :
+--------+--------+-------+-------+----------+---------+
|PR_train|f1_train|PR_test|f1_test|train_time|pred_time|
+--------+--------+-------+-------+----------+---------+
|  0.7356|  0.8463| 0.7341|  0.847|    4.7405|   0.1742|
+--------+--------+-------+-------+----------+---------+

Confusion matrix:
+-----+----+---+
|label| 0.0|1.0|
+-----+----+---+
|    0|3880|439|
|    1| 418|847|
+-----+----+---+

Setting for refinement:

   -Hyperparameters selected by grid searching:
       elasticNetParam:  0.0
       regParam:  2e-06
       maxIter:  50 

   -Optimal Threshold: 0.287 

   -The features remain:
      last_level_bool
      day_since_registration
      interaction_per_day
      song_per_day
      thump_down_per_day
      advertisement_per_day
      setting_per_day
      error_per_day
      song_ratio
      positive_ratio
      negative_ratio
      plan_change_num
      trend_interaction
      average_song