# Udacity Data Science Nano Degree Capstone Project 
## Sparkify - Predicting Customer Churn

This notebook details my work on predicting customer churn for <b> Sparkify</b>, a fictitious music streaming service that offers both free and paid levels. <br>
I undertake this project as part of Udacity's Data Science NanoDegree.<p>
    
The goal of the project is to build a model/models to help Sparkify with their goal of customer retention. In order to accomplish this task, we must be able to determine which users are in danger of cancelling the service and (optionally) which paid users are in danger of downgrading to the free version. <p>
    
Sparky has provided an extremely rich 12GB dataset that details user actions at the level of each individual action. <br>
In this notebook we will examine a subset (128MB) of this dataset using <b> PySpark </b> in order to familiarize ourselves with the data and develop our scripts. <br>
Then, we will be prepared to run these scripts on a fully fledged Spark Cluster against the entire dataset. <p>
    
I have chosen to frame this problem in the context of user sessions - the predictive model(s) would be triggered to generate new predictions after each user session ends, taking into account all user behavior up to that point. <br>
Customers in danger of churning (according to the model's predictions) could then be targeted for retention strategy.

In [1]:
from pyspark.sql import SparkSession

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1587474200544_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
sc.install_pypi_package("pandas")
sc.install_pypi_package("user-agents")
sc.install_pypi_package("ua-parser")
sc.install_pypi_package("matplotlib")
sc.install_pypi_package("pyarrow==0.14.1")
sc.install_pypi_package("scipy")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting pandas
  Using cached pandas-1.0.3-cp36-cp36m-manylinux1_x86_64.whl (10.0 MB)
Collecting python-dateutil>=2.6.1
  Using cached python_dateutil-2.8.1-py2.py3-none-any.whl (227 kB)
Installing collected packages: python-dateutil, pandas
Successfully installed pandas-1.0.3 python-dateutil-2.8.1

Processing /mnt/var/lib/livy/.cache/pip/wheels/ee/81/ab/61252f61e63504224d79f0e9d32953c424511e715026206a48/user_agents-2.1-py3-none-any.whl
Collecting ua-parser>=0.9.0
  Using cached ua_parser-0.10.0-py2.py3-none-any.whl (35 kB)
Installing collected packages: ua-parser, user-agents
Successfully installed ua-parser-0.10.0 user-agents-2.1


Collecting matplotlib
  Using cached matplotlib-3.2.1-cp36-cp36m-manylinux1_x86_64.whl (12.4 MB)
Collecting pyparsing!=2.0.4,!=2.1.2,!=2.1.6,>=2.0.1
  Using cached pyparsing-2.4.7-py2.py3-none-any.whl (67 kB)
Collecting kiwisolver>=1.0.1
  Using cached kiwisolver-1.2.0-cp36-cp36m-manylinux1_x86_64.whl (88 kB)
Collecting cycler>=0.10
  Using cached cycle

In [3]:
# import libraries
import pyspark
from pyspark.sql.types import StringType, IntegerType, StructType, StructField, BooleanType
from pyspark.sql import Window 

import pyspark.sql.functions as F
from pyspark.sql.functions import udf, desc, asc, concat, col, lit, when

from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator
from pyspark.ml.feature import VectorIndexer, VectorAssembler, Imputer, StandardScaler
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression

from user_agents import parse
from scipy import stats

import datetime
import numpy as np
import pyarrow
import pandas as pd
import matplotlib.pyplot as plt

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
# Create spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .config("spark.executor.heartbeatInterval", "10000s")\
    .getOrCreate()

# Read in full sparkify dataset
json_path = "s3n://udacity-dsnd/sparkify/sparkify_event_data.json"
user_log = spark.read.json(json_path)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
user_log.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

Save the output of some cells in order to not run them every time.

Input:
`user_log.head()`

Output:
`Row(artist='Popol Vuh', auth='Logged In', firstName='Shlok', gender='M', itemInSession=278, lastName='Johnson', length=524.32934, level='paid', location='Dallas-Fort Worth-Arlington, TX', method='PUT', page='NextSong', registration=1533734541000, sessionId=22683, song='Ich mache einen Spiegel - Dream Part 4', status=200, ts=1538352001000, userAgent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId='1749042')`


In [6]:
# check the shape of the dataframe
def check_shape(df):
    """Print the dimensions of a PySpark DataFrame"""
    print((df.count(), len(df.columns)))


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Input:
`check_shape(user_log)`

Ouput:
`(26259199, 18)`

26 million rows!

In [7]:
def check_nulls(df):
    """Count null values in PySpark dataframe.
        Args:
            df (PySpark DataFrame): data
        Returns:
            nulls_dict (dict): dictionary of column:num_nulls
    """
    check_nulls = [c[0] for c in df.dtypes if c[1] != 'boolean']
    pd_nulls = df\
        .select([F.count(when(col(c).isNull(), c)).alias(c) for c in check_nulls])\
        .toPandas()\
        .T
    pd_nulls.columns=["num"]
    nulls_dict = pd_nulls.num.to_dict()
    
    return nulls_dict

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Input: <br>
`nulls = check_nulls(user_log)`

`for k, v in nulls.items():
    if v > 0:
        print(k, v)
`

Unlike in the test dataset, we have no null userIds. We do have null user information though.

Input: <br>
`user_log\
    .filter('firstName IS NULL')\
    .select('userId', 'firstName')\
    .take(5)`
    
Output:
`[Row(userId='1261737', firstName=None), Row(userId='1261737', firstName=None), Row(userId='1261737', firstName=None), Row(userId='1261737', firstName=None), Row(userId='1261737', firstName=None)]`



This is all the same userId - let's check how many unique userIds for rows with empty user info

Input: <br>
`user_log\
    .filter('firstName IS NULL')\
    .select('userId')\
    .dropDuplicates()\
    .count()
`

Output: `1`

On the full dataset, when firstName is NULL, there is only one distinct userId - we will filter where firstName is not null to drop invalid user records.

In [8]:
df = user_log.filter("firstName IS NOT NULL")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Create new sessionId column

In [9]:
# calculate difference in minutes to next hit for userId.
# if next hit is > 30 mins away or null, set 1 for sessionEnd flag
# if page is "logout", set 1 for sessionEndFlag

df = df \
    .withColumn("activityDay", F.to_date(F.from_unixtime(col('ts') / 1000)))\
    .withColumn("firstActivityDay", F.min(col("activityDay")).over(Window.partitionBy(col('userId'))))\
    .withColumn("daysSinceFirst", F.datediff(col("activityDay"), col("firstActivityDay")))\
    .withColumn('nextHit', 
                F.lead('ts', count=1, default=0)\
                    .over(Window\
                        .partitionBy('userId')\
                        .orderBy('ts')
                    )
    )\
    .withColumn('nextHitDiffMin',
                ((col('nextHit') - col('ts')) / 60 / 1000)
    )\
    .withColumn('nextHitDiff',
                (col('nextHit') - col('ts'))
    )\
    .withColumn('sessionEnd',
               F.when((col('nextHit') == 0) |\
                      (col('nextHitDiffMin') > 30) | \
                      (col('page') == 'Logout'), 1)\
                .otherwise(0)\
    )\
    .withColumn('sessionPhase',
               F.sum('sessionEnd')\
                    .over(Window\
                         .partitionBy('userId')\
                         .orderBy(F.desc('ts'))
                    )
    )\
    .withColumn('newSessionId',
               F.dense_rank().over(Window.orderBy(*['userId', 'sessionPhase']))
    )\
    .withColumn('cancelFlag', F.when((col('page') == 'Cancellation Confirmation'), 1).otherwise(0))\
    .withColumn('cancelPhase',
               F.sum('cancelFlag')\
                    .over(Window\
                         .partitionBy('userId')\
                         .orderBy(F.desc('ts'))
                    )
    )\
    .withColumn('userCancelPhase',
               F.dense_rank().over(Window.orderBy(*['userId', 'cancelPhase']))
    )\
    .withColumn('newItemInSession',
                F.row_number().over(Window\
                                    .partitionBy('newSessionId')\
                                    .orderBy('ts'))
    )\
    .drop('itemInSession', 'sessionId')\
    .withColumn("songSkip", \
                F.when((col('page') == 'NextSong') &\
                       ((col('length') - (col('nextHitDiff') / 1000)) >= 30) &\
                       (col('nextHit') != 0), 1).otherwise(0))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
# repartition to make aggregation easier - puts all records with 
# the same value for the partition key into the same partition
df.repartition('newSessionId')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[artist: string, auth: string, firstName: string, gender: string, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: bigint, song: string, status: bigint, ts: bigint, userAgent: string, userId: string, activityDay: date, firstActivityDay: date, daysSinceFirst: int, nextHit: bigint, nextHitDiffMin: double, nextHitDiff: bigint, sessionEnd: int, sessionPhase: bigint, newSessionId: int, cancelFlag: int, cancelPhase: bigint, userCancelPhase: int, newItemInSession: int, songSkip: int]

In [11]:
df.persist(pyspark.StorageLevel.MEMORY_AND_DISK)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[artist: string, auth: string, firstName: string, gender: string, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: bigint, song: string, status: bigint, ts: bigint, userAgent: string, userId: string, activityDay: date, firstActivityDay: date, daysSinceFirst: int, nextHit: bigint, nextHitDiffMin: double, nextHitDiff: bigint, sessionEnd: int, sessionPhase: bigint, newSessionId: int, cancelFlag: int, cancelPhase: bigint, userCancelPhase: int, newItemInSession: int, songSkip: int]

In [12]:
df.take(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(artist=None, auth='Logged In', firstName='Camren', gender='M', lastName='Walker', length=None, level='paid', location='New Haven-Milford, CT', method='GET', page='Home', registration=1531215008000, song=None, status=200, ts=1539882499000, userAgent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId='1000025', activityDay=datetime.date(2018, 10, 18), firstActivityDay=datetime.date(2018, 10, 2), daysSinceFirst=16, nextHit=1539882591000, nextHitDiffMin=1.5333333333333332, nextHitDiff=92000, sessionEnd=0, sessionPhase=1, newSessionId=1, cancelFlag=0, cancelPhase=1, userCancelPhase=1, newItemInSession=1, songSkip=0)]

In [13]:
df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- activityDay: date (nullable = true)
 |-- firstActivityDay: date (nullable = true)
 |-- daysSinceFirst: integer (nullable = true)
 |-- nextHit: long (nullable = true)
 |-- nextHitDiffMin: double (nullable = true)
 |-- nextHitDiff: long (nullable = true)
 |-- sessionEnd: integer (nullable = false)
 |-- sessionPhase: long (nullable = true)
 |-- newSessionId: integer (nullable = true)

Check for and remove outliers

In [14]:
new_session_examine = df\
    .groupby("newSessionId", "userId")\
    .agg(
        F.max("newItemInSession").alias("itemsInSession"), 
        F.min("ts").alias("firstHit"),
        F.max("ts").alias("lastHit")
    )\
    .withColumn("sessionLengthMinutes", (col("lastHit") - col("firstHit")) / 1000 / 60)\
    .drop("firstHit", "lastHit")\
    .toPandas()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
z_scores = np.abs(stats.zscore(new_session_examine.sessionLengthMinutes))
valid_mask = (z_scores < 4) 
invalid_sessions = new_session_examine.loc[~valid_mask]
invalid_sessions.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

     newSessionId   userId  itemsInSession  sessionLengthMinutes
184           185  1000214             287           1000.366667
260           261  1000248             326           1143.316667
320           321  1000407             374           1311.650000
379           380  1000409             394           1402.000000
466           467  1000625             269            937.050000

In [16]:
valid_mask.mean()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0.9916259166494993

In [17]:
invalid_sessions.describe()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

        newSessionId  itemsInSession  sessionLengthMinutes
count    4634.000000     4634.000000           4634.000000
mean   274829.231334      300.600993           1083.150824
std    159860.572121       58.365331            206.722924
min       185.000000      182.000000            877.850000
25%    138112.750000      261.000000            934.237500
50%    274120.000000      284.000000           1019.225000
75%    412653.750000      322.000000           1162.762500
max    553094.000000      724.000000           2663.983333

In [18]:
new_session_examine.describe()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

        newSessionId  itemsInSession  sessionLengthMinutes
count  553374.000000   553374.000000         553374.000000
mean   276687.500000       46.046110            156.277511
std    159745.458269       50.176928            180.387061
min         1.000000        1.000000              0.000000
25%    138344.250000       12.000000             34.033333
50%    276687.500000       30.000000             96.583333
75%    415030.750000       62.000000            213.266667
max    553374.000000      724.000000           2663.983333

In [19]:
invalid_sprk = spark.createDataFrame(invalid_sessions)
invalid_sprk = invalid_sprk.select('newSessionId')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
df = df\
    .join(invalid_sprk, how='left_anti', on='newSessionId')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
df.persist(pyspark.StorageLevel.MEMORY_AND_DISK)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[newSessionId: int, artist: string, auth: string, firstName: string, gender: string, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: bigint, song: string, status: bigint, ts: bigint, userAgent: string, userId: string, activityDay: date, firstActivityDay: date, daysSinceFirst: int, nextHit: bigint, nextHitDiffMin: double, nextHitDiff: bigint, sessionEnd: int, sessionPhase: bigint, cancelFlag: int, cancelPhase: bigint, userCancelPhase: int, newItemInSession: int, songSkip: int]

In [22]:
df.take(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(newSessionId=26, artist=None, auth='Logged In', firstName='Camren', gender='M', lastName='Walker', length=None, level='paid', location='New Haven-Milford, CT', method='GET', page='Home', registration=1531215008000, song=None, status=200, ts=1539127622000, userAgent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId='1000025', activityDay=datetime.date(2018, 10, 9), firstActivityDay=datetime.date(2018, 10, 2), daysSinceFirst=7, nextHit=1539127651000, nextHitDiffMin=0.48333333333333334, nextHitDiff=29000, sessionEnd=0, sessionPhase=26, cancelFlag=0, cancelPhase=1, userCancelPhase=1, newItemInSession=1, songSkip=0)]

In [23]:
print(df.rdd.getNumPartitions())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

200

#### Define churn and aggregate to user

In [24]:
cancellers = df\
    .groupby('userId', 'userCancelPhase')\
    .agg(F.max(col('cancelFlag')).alias('cancelFlag'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
cancellers.persist()
cancellers.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+---------------+----------+
| userId|userCancelPhase|cancelFlag|
+-------+---------------+----------+
|1082829|           1847|         1|
|1085268|           1905|         0|
|1140963|           3139|         1|
|1155727|           3476|         0|
|1160560|           3592|         0|
|1238435|           5451|         1|
|1478842|          10693|         0|
|1612184|          13592|         0|
|1732870|          16361|         1|
|1759123|          16986|         1|
|1022020|            527|         0|
|1068533|           1542|         0|
|1140717|           3135|         0|
|1517931|          11533|         0|
|1547803|          12169|         0|
|1568489|          12621|         0|
|1647434|          14369|         0|
|1672693|          14972|         0|
|1677903|          15076|         0|
|1799099|          17892|         0|
+-------+---------------+----------+
only showing top 20 rows

In [26]:
user_agg = df\
    .groupby("userId", "userCancelPhase")\
    .agg(F.first(col('registration')).alias('registration'),
         F.min(col('ts')).alias("observationStart"), 
         F.max(col('ts')).alias("observationEnd"),
         F.count(col('ts')).alias('activityCount'),
         F.sum(F.when(col("page") == "NextSong", 1).otherwise(0)).alias("songsListened"),
         F.sum(F.when(col("page") == "Add to Playlist", 1).otherwise(0)).alias("songsAdded"),
         F.sum(col('songSkip')).alias('songsSkipped'),
         F.approxCountDistinct(col('artist')).alias('numUniqueArtists'),
         F.approxCountDistinct(col('song')).alias('numUniqueSongs'),
         F.sum(F.when(col("page") == "Add Friend", 1).otherwise(0)).alias("friendsAdded"),
         F.sum(F.when(col("page") == "Thumbs Up", 1).otherwise(0)).alias("thumbsUp"),
         F.sum(F.when(col("page") == "Thumbs Down", 1).otherwise(0)).alias("thumbsDown"),
         F.sum(F.when(col("page") == "Error", 1).otherwise(0)).alias("errors"),
         F.sum(F.when(col("page") == "Help", 1).otherwise(0)).alias("helpVisits"),
         F.sum(F.when(col("page") == "Submit Upgrade", 1).otherwise(0)).alias("numUpgrades"),
         F.sum(F.when(col("page") == "Submit Downgrade", 1).otherwise(0)).alias("numDowngrades"),
         F.sum(F.when(col("page") == "Roll Advert", 1).otherwise(0)).alias("advertsPlayed"),
         F.approxCountDistinct(col('newSessionId')).alias('numSessions'),
         F.first(col('level')).alias("levelAtStart"),
         F.last(col('level')).alias("levelAtEnd"),
         F.first(col('auth')).alias("authAtStart"),
         F.last(col('auth')).alias("authAtEnd"),
         F.first(col('location')).alias('location'),
         F.first(col('gender')).alias("gender"),
         F.first(col('userAgent')).alias('userAgent')
        )\
    .withColumn("observedHours", F.ceil((col('observationEnd') - col("observationStart")) / 1000 / 60 / 60))\
    .withColumn("observedDays", F.ceil(col("observedHours") / 24))\
    .withColumn("songsPerDay", col("songsListened") / col("observedDays"))\
    .withColumn("uniqueArtistsPerDay", col("numUniqueArtists") / col("observedDays"))\
    .withColumn("skippedSongRatio", col("songsSkipped") / col("songsListened"))\
    .withColumn("uniqueSongRatio", col("numUniqueSongs") / col("songsListened"))\
    .withColumn("songToActivityRatio", col("songsListened") / col("activityCount"))\
    .withColumn("songsAddedPerDay", col("songsAdded") / col("observedDays"))\
    .withColumn("friendsAddedPerDay", col("friendsAdded") / col("observedDays"))\
    .withColumn("thumbsUpPerDay", col("thumbsUp") / col("observedDays"))\
    .withColumn("thumbsDownPerDay", col("thumbsDown") / col("observedDays"))\
    .withColumn("thumbsDownRatio", col("thumbsDown") / (col("thumbsDown") + col("thumbsUp")))\
    .withColumn("errorsPerDay", col("errors") / col("observedDays"))\
    .withColumn("helpVisitsPerDay", col("helpVisits") / col("observedDays"))\
    .withColumn("advertsPerDay", col("advertsPlayed") / col("observedDays"))\
    .withColumn("sessionsPerDay", col("numSessions") / col("observedDays"))\
    .withColumn('metroArea', F.split(col('location'), ', ').getItem(0))\
    .withColumn('state', F.split(col('location'), ', ').getItem(1))\
    .join(cancellers, on=['userId', 'userCancelPhase'])\
    .select(
        'userId',
        'numUpgrades',
        'numDowngrades',
        'levelAtStart',
        'gender',
        'userAgent',
        'songsPerDay',
        'uniqueArtistsPerDay',
        'skippedSongRatio',
        'songToActivityRatio',
        'uniqueSongRatio',
        'songsAddedPerDay',
        'friendsAddedPerDay',
        'thumbsUpPerDay',
        'thumbsDownPerDay',
        'thumbsDownRatio',
        'errorsPerDay',
        'helpVisitsPerDay',
        'advertsPerDay',
        'sessionsPerDay',
        'metroArea',
        'state',
        'cancelFlag'
    )

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
# source: https://medium.com/@mahadir.ahmad/how-to-parse-billion-of-user-agents-using-pyspark-f57e680727e7

# Define our function that returns parsed user agent according to UDF schema

# from user_agents
def parse_ua(ua_string):
    # parse library cannot parse None
    if ua_string is None:
        ua_string = ""

    parsed_string = parse(ua_string)

    output =  [
        parsed_string.device.model,
        parsed_string.os.family,
        parsed_string.browser.family,
        (parsed_string.is_mobile or parsed_string.is_tablet),
        ]
    # If any of the column have None value it doesn't comply with schema
    # and thus throw Null Pointer Exception
    for i in range(len(output)):
        if output[i] is None:
            output[i] = 'Unknown'
    return output


# Our schema of parsing outcome - pyspark udf
ua_parser_udf = F.udf(lambda z: parse_ua(z), StructType([
            StructField("device_model", StringType(), False),
            StructField("os_family", StringType(), False),
            StructField("browser_family", StringType(), False),
            StructField("is_mobile_or_tablet", BooleanType(), False),
        ])) 

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
parsed_agent = user_agg\
    .select("userId", "userAgent")\
    .dropDuplicates()\
    .withColumn('parsed', ua_parser_udf('userAgent'))\
    .select( 
        "userId", 
        F.col("parsed.device_model").alias("deviceModel"),
        F.col("parsed.os_family").alias("osFamily"),
        F.col("parsed.browser_family").alias("browserFamily"),
        F.col("parsed.is_mobile_or_tablet").alias("isMobileOrTablet"),
    )

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
parsed_agent.persist(pyspark.StorageLevel.MEMORY_AND_DISK)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[userId: string, deviceModel: string, osFamily: string, browserFamily: string, isMobileOrTablet: boolean]

In [30]:
parsed_agent.take(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(userId='1039101', deviceModel='Unknown', osFamily='Windows', browserFamily='Firefox', isMobileOrTablet=False)]

In [31]:
user_agg = user_agg\
    .join(parsed_agent, on='userId')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Add user behavior trend/slope

Aggregate user behavior to day level. `daysSinceFirst` is the nth day of activity since the first observation of the user. We'll restrict the trend to the last week of observed user activity.

In [32]:
user_behavior = df\
    .groupby('userId', 'daysSinceFirst')\
    .agg(F.count('*').alias('numInteractions'))\
    .orderBy('userId', 'daysSinceFirst')

user_behavior.orderBy(desc('userId')).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------------+---------------+
| userId|daysSinceFirst|numInteractions|
+-------+--------------+---------------+
|1999996|            10|              2|
|1999996|            27|             27|
|1999996|             3|              2|
|1999996|            37|             51|
|1999996|            13|              9|
|1999996|            26|             65|
|1999996|            24|             45|
|1999996|            21|            135|
|1999996|            22|              5|
|1999996|            20|             13|
|1999996|            23|            116|
|1999996|            56|             11|
|1999996|            42|              8|
|1999996|            28|             34|
|1999996|             0|             18|
|1999996|            25|              4|
|1999908|            48|            155|
|1999908|            41|             20|
|1999908|            10|             46|
|1999908|            18|            156|
+-------+--------------+---------------+
only showing top

Define trend line udf

In [33]:
# output of the UDF
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import pandas_udf, PandasUDFType

trend_schema = StructType([
    StructField("userId", StringType(), False),
    StructField("slope", DoubleType(), False),
    StructField("intercept", DoubleType(), False)
])

# define the UDF - expects a dataframe as output, with the schema of the new df following the defined schema
@pandas_udf(trend_schema, PandasUDFType.GROUPED_MAP)
def fit_trend(pdf):
    userId = pdf.userId[0]
    day_idx = pdf.daysSinceFirst
    num = pdf.numInteractions
    
    continuous_y = np.zeros(day_idx.max() + 1)
    continuous_x = np.arange(0, day_idx.max() + 1)
    continuous_y[day_idx] = num
    
    # cannot fit a without at least 7 days of data
    if len(continuous_y) <= 7:
        slope, intercept = 0.001, 0.001
    else:
        slope, intercept = np.polyfit(continuous_x[-7:], continuous_y[-7:], deg=1)
    
    df = pd.DataFrame([[userId, slope, intercept]], columns=["userId", "slope", "intercept"])

    return df

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [34]:
user_trend = user_behavior\
    .groupby('userId')\
    .apply(fit_trend)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [35]:
user_trend.persist(pyspark.StorageLevel.MEMORY_AND_DISK)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[userId: string, slope: double, intercept: double]

In [36]:
user_trend.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------------------+-------------------+
| userId|              slope|          intercept|
+-------+-------------------+-------------------+
|1000280|  15.24999999999998| -588.4285714285705|
|1002185|   37.2857142857144|-2001.0000000000066|
|1017805| 14.678571428571384|-301.07142857142736|
|1030587|  24.64285714285716| -1082.714285714286|
|1033297| 16.464285714285698| -465.7857142857137|
|1057724| 26.285714285714278|-1378.2857142857138|
|1059049|0.10714285714285739| -3.500000000000009|
|1069552| 2.3571428571428417| -120.9285714285706|
|1071308|-1.3571428571428612|  92.71428571428588|
|1076191| 0.6428571428571428| -2.357142857142855|
|1083324|  16.46428571428572| -895.8571428571433|
|1102913|0.10714285714285637| -5.535714285714246|
|1114507|-10.285714285714263|  609.7142857142844|
|1133196|  9.892857142857135| -424.7142857142853|
|1142513| 13.142857142857078| -688.4285714285677|
|1151194| 32.285714285714185| -1666.428571428565|
|1156065|  2.071428571428571| -17.78571428571429|


In [37]:
user_agg = user_agg\
    .join(user_trend, on='userId')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [38]:
user_agg.persist()
user_agg.take(2)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(userId='1001725', numUpgrades=1, numDowngrades=0, levelAtStart='free', gender='M', userAgent='"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', songsPerDay=14.0, uniqueArtistsPerDay=12.272727272727273, skippedSongRatio=0.17532467532467533, songToActivityRatio=0.8006932409012132, uniqueSongRatio=0.9004329004329005, songsAddedPerDay=0.30303030303030304, friendsAddedPerDay=0.3333333333333333, thumbsUpPerDay=0.8484848484848485, thumbsDownPerDay=0.2727272727272727, thumbsDownRatio=0.24324324324324326, errorsPerDay=0.0, helpVisitsPerDay=0.06060606060606061, advertsPerDay=0.45454545454545453, sessionsPerDay=0.45454545454545453, metroArea='Boise City', state='ID', cancelFlag=0, deviceModel='Unknown', osFamily='Windows', browserFamily='Chrome', isMobileOrTablet=False, slope=7.392857142857158, intercept=-204.53571428571473), Row(userId='1006591', numUpgrades=0, numDowngrades=0, levelAtStart='paid', gender='F', userAgent='Mozilla/5.0 (

Unpersist frames we don't need anymore to free memory.

In [39]:
df.unpersist()
parsed_agent.unpersist()
user_trend.unpersist()
cancellers.unpersist()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[userId: string, userCancelPhase: int, cancelFlag: int]

In [40]:
nulls = check_nulls(user_agg)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [41]:
null_cols = [k for k,v in nulls.items() if v > 0]
nulls

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

{'userId': 0, 'numUpgrades': 0, 'numDowngrades': 0, 'levelAtStart': 0, 'gender': 0, 'userAgent': 0, 'songsPerDay': 16, 'uniqueArtistsPerDay': 16, 'skippedSongRatio': 16, 'songToActivityRatio': 0, 'uniqueSongRatio': 16, 'songsAddedPerDay': 16, 'friendsAddedPerDay': 16, 'thumbsUpPerDay': 16, 'thumbsDownPerDay': 16, 'thumbsDownRatio': 408, 'errorsPerDay': 16, 'helpVisitsPerDay': 16, 'advertsPerDay': 16, 'sessionsPerDay': 16, 'metroArea': 0, 'state': 0, 'cancelFlag': 0, 'deviceModel': 0, 'osFamily': 0, 'browserFamily': 0, 'slope': 0, 'intercept': 0}

In [42]:
user_agg.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- userId: string (nullable = true)
 |-- numUpgrades: long (nullable = true)
 |-- numDowngrades: long (nullable = true)
 |-- levelAtStart: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- songsPerDay: double (nullable = true)
 |-- uniqueArtistsPerDay: double (nullable = true)
 |-- skippedSongRatio: double (nullable = true)
 |-- songToActivityRatio: double (nullable = true)
 |-- uniqueSongRatio: double (nullable = true)
 |-- songsAddedPerDay: double (nullable = true)
 |-- friendsAddedPerDay: double (nullable = true)
 |-- thumbsUpPerDay: double (nullable = true)
 |-- thumbsDownPerDay: double (nullable = true)
 |-- thumbsDownRatio: double (nullable = true)
 |-- errorsPerDay: double (nullable = true)
 |-- helpVisitsPerDay: double (nullable = true)
 |-- advertsPerDay: double (nullable = true)
 |-- sessionsPerDay: double (nullable = true)
 |-- metroArea: string (nullable = true)
 |-- state: string (nullable = true)
 |-- cance

In [43]:
# to impute, column datatype must be Double
for c in null_cols:
    user_agg = user_agg.withColumn(c, col(c).cast('double'))
    
# we will treat bool values as categorical and encode them - thus, cast them to string
for c, d in user_agg.dtypes:
    if d == 'boolean':
        user_agg = user_agg.withColumn(c, col(c).cast('string'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [44]:
final_dataset = user_agg.drop('userAgent', 'metroArea', 'state')
final_dataset.persist()
final_dataset.take(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(userId='1001725', numUpgrades=1, numDowngrades=0, levelAtStart='free', gender='M', songsPerDay=14.0, uniqueArtistsPerDay=12.272727272727273, skippedSongRatio=0.17532467532467533, songToActivityRatio=0.8006932409012132, uniqueSongRatio=0.9004329004329005, songsAddedPerDay=0.30303030303030304, friendsAddedPerDay=0.3333333333333333, thumbsUpPerDay=0.8484848484848485, thumbsDownPerDay=0.2727272727272727, thumbsDownRatio=0.24324324324324326, errorsPerDay=0.0, helpVisitsPerDay=0.06060606060606061, advertsPerDay=0.45454545454545453, sessionsPerDay=0.45454545454545453, cancelFlag=0, deviceModel='Unknown', osFamily='Windows', browserFamily='Chrome', isMobileOrTablet='false', slope=7.392857142857158, intercept=-204.53571428571473)]

# Modeling

Here we'll be:
 - Splitting into train and test
 - Transforming features in a pipeline
     - One Hot Encoding categorical columns
     - Fill in NaN values for numeric columns with median imputation
     - Scaling all numeric columns
 - Fitting a few models and tuning hyper parameters
 - Evaluating our models

In [45]:
SEED = 42
(trainingData, testData) = final_dataset.randomSplit([0.75, 0.25], seed=SEED)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### String Index and One Hot Encode categorical columns

In this stage we will be transforming our categorical columns into something usable by machine learning algorithms. Some machine learning algorithms in scikit-learn, such as RandomForestClassifier, happily take categorical features as-is. 
However, in PySpark we must use a VectorAssembler to prep our features for model training. The Assembler will complain if we try to feed it string values.

Briefly, here is the idea behind what we're about to do. <br>
If there are `n` unique categories in our column, StringIndexing returns numbers ranging from 0 to `n - 1` for each string, ordered by decreasing occurrence of the string value. 
This alone would be enough to allow our categorical features to play with VectorAssembler; after all, we've transformed strings into numbers. There is a trap here though in the fact that our model will see this feature as having inherent ordering. If our categories are in fact not ordered then this will be a problem.

We solve this by One Hot Encoding our transformed categorical columns.
In short, OHE creates a new binary flag column for each unique value in the categorical column.
<br> Beware! For fields with high cardinality (many unique values), this will force creation of many new columns.

For our dataset this shouldn't pose an issue. Most of our categorical features are low cardinality and we already grouped the `state` and `metroArea` fields in preparation for our transformations.


At this point we are defining the stages of our Pipeline - the actual transformation will take place later.

In [46]:
encode_cols = [c for c,d in final_dataset.dtypes if d == "string" and c != "userId"]
encode_cols

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['levelAtStart', 'gender', 'deviceModel', 'osFamily', 'browserFamily', 'isMobileOrTablet']

In [47]:
encode_stages = []
for name in encode_cols:
    strIndexer = StringIndexer(inputCol=name, outputCol=f"{name}_Idx")
    oneHotEncoder = OneHotEncoderEstimator(
        inputCols=[f"{name}_Idx"],
        outputCols=[f"{name}_vec"]
    )
    encode_stages += [strIndexer, oneHotEncoder]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Impute median for all numeric columns with NaNs
Using our previously defined list of columns with empty values, we'll create the imputation stage of our pipeline.

In [48]:
imputer = Imputer(inputCols=null_cols, outputCols = [f"{c}_Impute" for c in null_cols], strategy="median")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Final transformed features list

In [49]:
exclude_cols = encode_cols + null_cols + ['userId', 'cancelFlag']
feats = [c for c in final_dataset.columns if c not in exclude_cols] + \
    [f'{c}_vec' for c in encode_cols] +\
    [f'{c}_Impute' for c in null_cols]

feats

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['numUpgrades', 'numDowngrades', 'songToActivityRatio', 'slope', 'intercept', 'levelAtStart_vec', 'gender_vec', 'deviceModel_vec', 'osFamily_vec', 'browserFamily_vec', 'isMobileOrTablet_vec', 'songsPerDay_Impute', 'uniqueArtistsPerDay_Impute', 'skippedSongRatio_Impute', 'uniqueSongRatio_Impute', 'songsAddedPerDay_Impute', 'friendsAddedPerDay_Impute', 'thumbsUpPerDay_Impute', 'thumbsDownPerDay_Impute', 'thumbsDownRatio_Impute', 'errorsPerDay_Impute', 'helpVisitsPerDay_Impute', 'advertsPerDay_Impute', 'sessionsPerDay_Impute']

In [50]:
vec_assembler = VectorAssembler(inputCols=feats, outputCol="features")
all_stages = encode_stages + [imputer, vec_assembler]
all_stages

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[StringIndexer_585094466080, OneHotEncoderEstimator_9e704bf9c42e, StringIndexer_caba29f9bb3d, OneHotEncoderEstimator_77457c833cc3, StringIndexer_820cb41b0dfa, OneHotEncoderEstimator_ece5ff1945f2, StringIndexer_9bca60a4e374, OneHotEncoderEstimator_b16422e1a9f2, StringIndexer_c71956c33da3, OneHotEncoderEstimator_bfc68566765b, StringIndexer_601138203e88, OneHotEncoderEstimator_6de9bf3e1a50, Imputer_5b7cd644583b, VectorAssembler_16ac820b9081]

Fit the preprocessing pipeline

In [51]:
pipe_model = Pipeline(stages=all_stages).fit(trainingData)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [52]:
trainingTransformed = pipe_model.transform(trainingData)
testTransformed = pipe_model.transform(testData)
trainingTransformed.persist()
trainingTransformed.take(1)
testTransformed.persist()
testTransformed.take(1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(userId='1006591', numUpgrades=0, numDowngrades=0, levelAtStart='paid', gender='F', songsPerDay=7.5, uniqueArtistsPerDay=6.208333333333333, skippedSongRatio=0.20277777777777778, songToActivityRatio=0.8, uniqueSongRatio=0.9194444444444444, songsAddedPerDay=0.3125, friendsAddedPerDay=0.20833333333333334, thumbsUpPerDay=0.5625, thumbsDownPerDay=0.125, thumbsDownRatio=0.18181818181818182, errorsPerDay=0.0, helpVisitsPerDay=0.041666666666666664, advertsPerDay=0.0, sessionsPerDay=0.2708333333333333, cancelFlag=0, deviceModel='Mac', osFamily='Mac OS X', browserFamily='Firefox', isMobileOrTablet='false', slope=10.499999999999968, intercept=-445.4285714285698, levelAtStart_Idx=0.0, levelAtStart_vec=SparseVector(1, {0: 1.0}), gender_Idx=1.0, gender_vec=SparseVector(1, {}), deviceModel_Idx=1.0, deviceModel_vec=SparseVector(3, {1: 1.0}), osFamily_Idx=1.0, osFamily_vec=SparseVector(4, {1: 1.0}), browserFamily_Idx=1.0, browserFamily_vec=SparseVector(6, {1: 1.0}), isMobileOrTablet_Idx=0.0, isMobi

In [53]:
rf = RandomForestClassifier(labelCol="cancelFlag",
                            featuresCol="features", 
                            maxDepth=10, 
                            numTrees=100, 
                            seed=SEED)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [54]:
model = rf.fit(trainingTransformed)
transformed = model.transform(trainingTransformed)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Examine feature importance

In [55]:
def ExtractFeatureImp(featureImp, dataset, featuresCol):
    """Extract feature importance from RandomForest
        Args: 
            featureImp (model.featureImportances): feature importances from PySpark model
            dataset (PySpark dataframe)
            featuresCol (string): name of featuresCol
        Returns 
            varslist (pandas dataframe): sorted dataframe of variable name and importance
            
    """
    list_extract = []
    for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
        list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
    varlist = pd.DataFrame(list_extract)
    varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
    return(varlist.sort_values('score', ascending = False))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [56]:
feats = ExtractFeatureImp(model.featureImportances, transformed, "features")
feats['cum_score'] = feats.score.cumsum()
feats.head(25)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

    idx                        name     score  cum_score
16   32        advertsPerDay_Impute  0.143106   0.143106
6    22  uniqueArtistsPerDay_Impute  0.125876   0.268982
17   33       sessionsPerDay_Impute  0.080771   0.349754
12   28     thumbsDownPerDay_Impute  0.079868   0.429622
8    24      uniqueSongRatio_Impute  0.064311   0.493932
4     4                   intercept  0.058134   0.552066
5    21          songsPerDay_Impute  0.053053   0.605119
2     2         songToActivityRatio  0.051102   0.656221
10   26   friendsAddedPerDay_Impute  0.046639   0.702860
13   29      thumbsDownRatio_Impute  0.044710   0.747569
9    25     songsAddedPerDay_Impute  0.039187   0.786756
14   30         errorsPerDay_Impute  0.034227   0.820983
3     3                       slope  0.030302   0.851286
11   27       thumbsUpPerDay_Impute  0.030134   0.881420
15   31     helpVisitsPerDay_Impute  0.029743   0.911163
7    23     skippedSongRatio_Impute  0.027045   0.938207
1     1               numDowngr

In [57]:
evaluator = BinaryClassificationEvaluator(
    labelCol="cancelFlag", 
    rawPredictionCol="rawPrediction"
)

print(f"AUC on Train Set {evaluator.evaluate(transformed)}")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

AUC on Train Set 0.9180030425368996

AUC on the training set is 0.91 which is quite high.

In [58]:
test_transformed = model.transform(testTransformed)
print(f"AUC on Test Set {evaluator.evaluate(test_transformed)}")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

AUC on Test Set 0.8470839043703369

AUC on the test set is 0.85. This is quite good!

In [59]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(labelCol="cancelFlag", 
                    featuresCol="features", 
                    maxDepth=4,
                    maxIter=20,
                    seed=SEED)
gbt_model = gbt.fit(trainingTransformed)
gbt_transformed = gbt_model.transform(trainingTransformed)
print(f"AUC on Train Set {evaluator.evaluate(gbt_transformed)}")
gbt_test = gbt_model.transform(testTransformed)
print(f"AUC on Test Set {evaluator.evaluate(gbt_test)}")
feats = ExtractFeatureImp(gbt_model.featureImportances, gbt_transformed, "features")
feats['cum_score'] = feats.score.cumsum()
feats.head(25)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

AUC on Train Set 0.8834504739440706
AUC on Test Set 0.8579246611847112
    idx                             name     score  cum_score
4     4                        intercept  0.170356   0.170356
3     3                            slope  0.168904   0.339260
6    22       uniqueArtistsPerDay_Impute  0.120682   0.459942
16   32             advertsPerDay_Impute  0.072849   0.532791
8    24           uniqueSongRatio_Impute  0.072532   0.605323
14   30              errorsPerDay_Impute  0.062012   0.667335
2     2              songToActivityRatio  0.058119   0.725454
11   27            thumbsUpPerDay_Impute  0.052553   0.778006
17   33            sessionsPerDay_Impute  0.042264   0.820270
5    21               songsPerDay_Impute  0.028531   0.848801
18    5            levelAtStart_vec_paid  0.024934   0.873735
15   31          helpVisitsPerDay_Impute  0.024665   0.898400
0     0                      numUpgrades  0.019563   0.917963
12   28          thumbsDownPerDay_Impute  0.018820   0.936783

In [60]:
aupr_evaluator = BinaryClassificationEvaluator(
    labelCol="cancelFlag", 
    rawPredictionCol="rawPrediction",
    metricName="areaUnderPR"
)

paramGrid = ParamGridBuilder()\
    .addGrid(gbt.maxDepth, [4, 6, 8])\
    .addGrid(gbt.maxIter, [20])\
    .build()

cv = CrossValidator(
    estimator=gbt, 
    evaluator=aupr_evaluator, 
    estimatorParamMaps=paramGrid,
    numFolds=3,
)

cvModel = cv.fit(trainingTransformed)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-60:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 917, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 865, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 3493



In [61]:
list(zip(cvModel.avgMetrics, paramGrid))\

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[(0.7013155626173116, {Param(parent='GBTClassifier_5ad071f8f4b1', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 4, Param(parent='GBTClassifier_5ad071f8f4b1', name='maxIter', doc='max number of iterations (>= 0).'): 20}), (0.7106374489088237, {Param(parent='GBTClassifier_5ad071f8f4b1', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 6, Param(parent='GBTClassifier_5ad071f8f4b1', name='maxIter', doc='max number of iterations (>= 0).'): 20}), (0.6874800984542297, {Param(parent='GBTClassifier_5ad071f8f4b1', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 8, Param(parent='GBTClassifier_5ad071f8f4b1', name='maxIter', doc='max number of iterations (>= 0).'): 20})]

In [63]:
perf = pd.DataFrame(paramGrid, index=cvModel.avgMetrics).reset_index()
perf.columns=['AvgAUCPR', 'maxDepth', 'numTrees']
perf

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

   AvgAUCPR  maxDepth  numTrees
0  0.701316         4        20
1  0.710637         6        20
2  0.687480         8        20

In [64]:
new_gbt = cvModel.bestModel

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [65]:
new_train = new_gbt.transform(trainingTransformed)
new_test = new_gbt.transform(testTransformed)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [66]:
aupr_evaluator.evaluate(new_train)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0.8189010938850245

In [67]:
aupr_evaluator.evaluate(new_test)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0.7063462215408329

In [69]:
evaluator.evaluate(new_train)

VBox()

An error was encountered:
Invalid status code '404' from https://172.31.37.23:18888/sessions/1 with error payload: {"msg":"Session '1' not found."}
