In [2]:
# OPTIONAL: Load the "autoreload" extension so that code can change
%load_ext autoreload

# # OPTIONAL: always reload modules so that as you change code in src, it gets loaded
%autoreload 2

# Imports

In [3]:

from pyspark.sql import SparkSession
from ydata_profiling import ProfileReport

from pyspark.sql import functions as f
from pyspark.sql import Window
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier, LinearSVC
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline

import pandas as pd

import seaborn as sns
import matplotlib.pyplot as plt

%matplotlib inline

# Create Spark and Import Data

In [4]:
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

## Read in full sparkify dataset
event_data = "s3n://udacity-dsnd/sparkify/sparkify_event_data.json"
raw_df = spark.read.json(event_data)
raw_df.head()

Row(artist='Popol Vuh', auth='Logged In', firstName='Shlok', gender='M', itemInSession=278, lastName='Johnson', length=524.32934, level='paid', location='Dallas-Fort Worth-Arlington, TX', method='PUT', page='NextSong', registration=1533734541000, sessionId=22683, song='Ich mache einen Spiegel - Dream Part 4', status=200, ts=1538352001000, userAgent='"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId='1749042')

In [5]:
df = raw_df.persist()

In [6]:
df.show(3)

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+-------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page| registration|sessionId|                song|status|           ts|           userAgent| userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+-------+
|  Popol Vuh|Logged In|    Shlok|     M|          278| Johnson|524.32934| paid|Dallas-Fort Worth...|   PUT|NextSong|1533734541000|    22683|Ich mache einen S...|   200|1538352001000|"Mozilla/5.0 (Win...|1749042|
|Los Bunkers|Logged In|  Vianney|     F|            9|  Miller|238.39302| paid|San Francisco-Oak...|   PUT|NextSong|1537500318000|    20836|         MiÃ

# Data Exploration

## Initial overview using ydata-profiling library

Schema information - taken from https://www.kaggle.com/code/yukinagae/sparkify-project-churn-prediction

artist: Artist name (ex. Daft Punk) \
auth: User authentication status (ex. Logged) \
firstName: User first name (ex. Colin) \
gender: Gender (ex. F or M) \
itemInSession: Item count in a session (ex. 52) \
lastName: User last name (ex. Freeman) \
length: Length of song (ex. 223.60771) \
level: User plan (ex. paid) \
location: User's location (ex. Bakersfield \)
method: HTTP method (ex. PUT) \
page: Page name (ex. NextSong) \
registration: Registration timestamp (unix timestamp) (ex. 1538173362000) \
sessionId: Session ID (ex. 29) \
song: Song (ex. Harder Better Faster Stronger) \
status: HTTP status (ex. 200) \
ts: Event timestamp(unix timestamp) (ex. 1538352676000) \
userAgent: User's browswer agent (ex. Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0) \
userId: User ID (ex. 30) \

In [7]:
#the unique pages are:
df.select("page").distinct().show(23, truncate=False)

+-------------------------+
|page                     |
+-------------------------+
|Cancel                   |
|Submit Downgrade         |
|Thumbs Down              |
|Home                     |
|Downgrade                |
|Roll Advert              |
|Logout                   |
|Save Settings            |
|Cancellation Confirmation|
|About                    |
|Submit Registration      |
|Settings                 |
|Login                    |
|Register                 |
|Add to Playlist          |
|Add Friend               |
|NextSong                 |
|Thumbs Up                |
|Help                     |
|Upgrade                  |
|Error                    |
|Submit Upgrade           |
+-------------------------+



## Key Observations from the ydata-profile report:

 - 26_259_199 rows
 - 18 columns
 - 14_710_249

- no duplicate rows

  ### Columns:

  - artist - missing values, to be expected when the interaction doesn't involve a song (e.g. logging in)
  - firstName - missing 778_479 values (note originally thought was 14_562_194 values but ydata report is confusing / bugged - see below)
  - gender - missing 778_479 (3%) of values
  - lastName - missing 778_479 values (note originally thought was 12_948_838 values but ydata report is confusing / bugged - see below)
  - level - 2 distinct values (paid / free), no missing - the majority (78%) are paying users
  - location - has missing values. Most popular locations seem to be in USA
  - page - 22 distinct values, none missing. Most visited page is "NextSong"
  - registration - no missing values, but some at 0 which will need to be checked / cleaned up
  - status - none missing, 3 distinct values: 200 (OK), 307 (Temporary Redirect), 404 (NOT FOUND)
  - ts - none missing, don't have the same "zeros" issue as registrations
  - userId - none missing, but one very common value (1261737). More checks might be required (e.g. for empty string)


 side note for the negotiating team - our most played artists are Kings of Leon and Coldplay so don't lose the rights to play their songs!

In [8]:
# Understand the many missing values in "firstName" / "lastName" column
# Code inspired by: https://sparkbyexamples.com/pyspark/pyspark-find-count-of-null-none-nan-values/

df_missing_first_or_last_name = df.where(
    (f.col("firstName")=="") |
    (f.col("firstName").isNull()) |
    (f.col("lastName")=="") |
    (f.col("lastName").isNull())
    )
df_missing_first_or_last_name.show()

+------+----------+---------+------+-------------+--------+------+-----+--------+------+-----+------------+---------+----+------+-------------+---------+-------+
|artist|      auth|firstName|gender|itemInSession|lastName|length|level|location|method| page|registration|sessionId|song|status|           ts|userAgent| userId|
+------+----------+---------+------+-------------+--------+------+-----+--------+------+-----+------------+---------+----+------+-------------+---------+-------+
|  null|Logged Out|     null|  null|           87|    null|  null| paid|    null|   GET| Home|        null|     8615|null|   200|1538352008000|     null|1261737|
|  null|Logged Out|     null|  null|            0|    null|  null| free|    null|   PUT|Login|        null|     7433|null|   307|1538352041000|     null|1261737|
|  null|Logged Out|     null|  null|            4|    null|  null| free|    null|   GET| Home|        null|    25003|null|   200|1538352182000|     null|1261737|
|  null|Logged Out|     null

In [9]:
# what are the auth states for users missing names:
df_missing_first_or_last_name.select("auth").distinct().show()

+----------+
|      auth|
+----------+
|Logged Out|
|     Guest|
+----------+



In [10]:
# whilst users are always Logged Out or a guest (also not sure what the difference between those is), there are both levels (paid/free) as well as for the same userId: 1261737, which is the one we commented on having many more rows than all the other users. Keep digging:
df_missing_first_or_last_name.select("userId").distinct().show()

+-------+
| userId|
+-------+
|1261737|
+-------+



In [11]:
# So actually that is the only userId causing nulls in many columns 

df_missing_first_or_last_name.select("page").distinct().show()

+-------------------+
|               page|
+-------------------+
|               Home|
|              About|
|Submit Registration|
|              Login|
|           Register|
|               Help|
|              Error|
+-------------------+



In [12]:
# This user never goes to the "Upgrade" / "Submit Upgrade" page, so is odd they can have the "paid" level in the data. It could be some trial scheme sparkify has or similar, but as we don't know and since the "user" (it could be many different users that just get given the same id when logged out / a guest) is missing a lot of information about them (both personally and what they are doing on the platform e.g. how many songs listening to etc - if any), we will remove this user from the data

In [13]:
# But there are also more first names missing than last names:
df_missing_first_got_last = df_missing_first_or_last_name = df.where(
    (f.col("firstName").isNull()) &
    (f.col("lastName").isNotNull())
    )
df_missing_first_got_last.show()

+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+----+------+---+---------+------+
|artist|auth|firstName|gender|itemInSession|lastName|length|level|location|method|page|registration|sessionId|song|status| ts|userAgent|userId|
+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+----+------+---+---------+------+
+------+----+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+----+------+---+---------+------+



In [14]:
# Code inspired by: https://sparkbyexamples.com/pyspark/pyspark-find-count-of-null-none-nan-values/

df.select([f.count(f.when(f.isnan(c) | f.col(c).isNull(), c)).alias(c) for c in ["firstName", "lastName"]]
   ).show()

+---------+--------+
|firstName|lastName|
+---------+--------+
|   778479|  778479|
+---------+--------+



In [15]:
# So there is not a different number of nulls in each of the columns. This is different to the values from ydata-profiling tool, which clearly has some sort of bug to be resolved.
#(reviewed and found an issue already exists that covers it: https://github.com/ydataai/ydata-profiling/issues/1429)

In [16]:
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [17]:
# Investigate / clean up registration==0/null rows
# observation from the ydata report is that it is the same number of rows as the missing first/last names so likely the same "issue"
df.where((f.col("registration")<1) |f.col("registration").isNull()).show()

+------+----------+---------+------+-------------+--------+------+-----+--------+------+-----+------------+---------+----+------+-------------+---------+-------+
|artist|      auth|firstName|gender|itemInSession|lastName|length|level|location|method| page|registration|sessionId|song|status|           ts|userAgent| userId|
+------+----------+---------+------+-------------+--------+------+-----+--------+------+-----+------------+---------+----+------+-------------+---------+-------+
|  null|Logged Out|     null|  null|           87|    null|  null| paid|    null|   GET| Home|        null|     8615|null|   200|1538352008000|     null|1261737|
|  null|Logged Out|     null|  null|            0|    null|  null| free|    null|   PUT|Login|        null|     7433|null|   307|1538352041000|     null|1261737|
|  null|Logged Out|     null|  null|            4|    null|  null| free|    null|   GET| Home|        null|    25003|null|   200|1538352182000|     null|1261737|
|  null|Logged Out|     null

In [18]:
df.select(f.min(f.col("registration"))).show()

+-----------------+
|min(registration)|
+-----------------+
|    1508018725000|
+-----------------+



In [19]:
#re-check some results given don't fully trust the ydata report now:
describe_pdf = df.describe().toPandas()
describe_pdf

# Data Cleaning

In [None]:
# filter out user 1261737
df = df.where(f.col("userId")!=1261737)

In [None]:
df.select([f.count(f.when(f.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

# Target Extraction

As we have seen that most of the Sparkify users are on the "paid" level, I have decided to try to come up with a model that will predict users that are more likely to leave that level (either by downgrading to the free level or closing their account altogether) in the hope this will have biggest impact on maintaining future revenue.

need to filter to just paid and flag those that downgraded?

In [None]:
df_user_ids = df.select("userID").distinct()
print(f"There are {df_user_ids.count()} unique userIds")

Filter the dataframe down to just the interactions had whilst users are on the paid tier, using the "phase" approach

In [None]:
df = df.withColumn("level_change",
    f.when(f.col("page")=="Submit Upgrade", 1)
    .when(f.col("page")=="Submit Downgrade", -1)
    .when(f.col("page")=="Cancellation Confirmation", -1)
    .otherwise(0))

window = Window.partitionBy("userId").orderBy(f.asc("ts")).rangeBetween(Window.unboundedPreceding, Window.currentRow)
df = df.withColumn("level_phase", f.sum("level_change").over(window))


In [None]:
df.select("userId", "page", "ts", "level_phase").where(f.col("page").isin(["Submit Upgrade", "Submit Downgrade", "cancellation Confirmation"])).sort("userId", "ts").show()

In [None]:
#check the raw input for user 1000214 as shouldn't have a downgrade first (-1's):
raw_df.select("userId", "page", "ts").where((f.col("page").isin(["Submit Upgrade", "Submit Downgrade", "cancellation Confirmation"]))&(f.col("userId")==1000214)).sort("userId", "ts").show()

In [None]:
df.where((f.col("page")=="Submit Upgrade")&(f.col("level_phase")!=1)).select("userId").distinct().count()

In [None]:
# So there are 926 users that had a downgrade / cancellation before an upgrade. sample ID's:
df.where((f.col("page")=="Submit Upgrade")&(f.col("level_phase")!=1)).select("userId").distinct().show()

In [None]:
# Double check a second sample
raw_df.select("userId", "page", "ts").where((f.col("page").isin(["Submit Upgrade", "Submit Downgrade", "cancellation Confirmation"]))&(f.col("userId")==1250440)).sort("userId", "ts").show()

So we see users that downgrade first, implying we must be missing earlier data which covers what they were doing as a paid user.
We want to know all the actions of paying users, missing interactions could be important in predicting churn, so "paying periods" with missing data will be filtered out.

Clean up: For all users that upgraded, remove any data before their first upgrade:

In [None]:
df = (df
      .withColumn("upgrade_flag", f.when(f.col("page")=="Submit Upgrade", 1).otherwise(0))
      .withColumn("upgrade_phase", f.sum("upgrade_flag").over(window))
      .where(f.col("upgrade_phase")>0)
)


In [None]:
df.show(5)

In [None]:
# re-calculate level change and level phase cols with "cleaned" data:
df = df.withColumn("level_change",
    f.when(f.col("page")=="Submit Upgrade", 1)
    .when(f.col("page")=="Submit Downgrade", -1)
    .when(f.col("page")=="Cancellation Confirmation", -1)
    .otherwise(0))

window = Window.partitionBy("userId").orderBy(f.asc("ts")).rangeBetween(Window.unboundedPreceding, Window.currentRow)
df = df.withColumn("level_phase", f.sum("level_change").over(window))


In [None]:
df.select("userId", "page", "ts", "level_phase").where(f.col("page").isin(["Submit Upgrade", "Submit Downgrade", "cancellation Confirmation"])).sort("userId", "ts").show()

In [None]:
# check max phase is 1
# check min phase is -1, for if downgrade and then cancel
# check always upgrade first, i.e. phase is 1 when do an upgrade
df.select(f.max("level_phase"), f.min("level_phase")).show()

In [None]:
df.where((f.col("page")=="Submit Upgrade")&(f.col("level_phase")!=1)).select("userId").distinct().count()

As we have seen in samples users have upgraded multiple times (with downgrades in-between), we want to be able to differentiate between each period they were a paying user, and determine for each period if they churned or not, and hopefully use the features of each in a model.

In [None]:
df = (
    df
    .withColumn("userId_paid_phase", f.concat_ws("_", f.col("userId"), f.col("upgrade_phase")))
)

In [None]:
downgrade_window = Window.partitionBy("userID","upgrade_phase")
df = df.withColumn("downgraded", f.min("level_change").over(downgrade_window))

In [None]:
print("Number of unique User ID's in data:")
raw_df.select(f.countDistinct("userId")).show()
print("Number of unique User ID's that were on a paid level at some point:")
raw_df.select("userId","page").where(f.col("page")=="Submit Upgrade").select(f.countDistinct("userId")).show()

In [None]:
12082/22278 * 100 #% of users that paid level at some point

In [None]:
# review interactions where level changes for any data that might need cleaning
df.where(f.col("level_change")!=0).sort("userId").select("page", "userId", "level", "level_change","level_phase","upgrade_flag","upgrade_phase","userId_paid_phase","downgraded").show()

In [None]:
df.select("downgraded").distinct().show()

In [None]:
# wasn't expecting +1, would indicate there is a user that has upgraded their account and done nothing else since? investigate:
df.where(f.col("downgraded")==1).select("ts", "page", "userId", "level", "level_change","level_phase","upgrade_flag","upgrade_phase","userId_paid_phase","downgraded").show()

In [None]:
# check_raw data for this user:
raw_df.where((f.col("userId")==1977992)&(f.col("ts")>=1543620507000)).show()

In [None]:
# SO indeed that is the case, data and analysis is correct. Could think about filtering brand new users out of the data as not enough information to predict on churn.
df = df.withColumn("downgraded", f.when(f.col("downgraded")==-1,1).otherwise(0))

Now remove data that is not for "paying" users - i.e data when level was free, or "level_phase" < 1

In [None]:
df = df.where(f.col("level_phase")==1)

In [None]:
# Review the dataframe we have so far:
# number of unique times a user has had a period they have been a "paying" customer
# number of those periods where users have churned back to free
feature_df = df.groupBy("userId_paid_phase").agg(f.max("downgraded").alias("churned"), f.max("userId").alias("userId"))
feature_df.show()

#f.countDistinct("userId").alias("distinct_count_userId")).sort(f.desc("distinct_count_userId"))

In [None]:
print(f'Number of "paying periods" in data: {feature_df.count()}')
print(f'Number of "churns" from paid to free/cancelled in data: {feature_df.where(f.col("churned")==1).count()}')
print(f'Number of paying periods that didn\'t churn: {feature_df.where(f.col("churned")==0).count()} ')
print(f'Number of unique users in the data: {feature_df.select("userId").distinct().count()}')
      

In [None]:
feature_df.groupBy("userId").agg(f.count("userId").alias("distinct_count_userId"), f.min("churned")).sort(f.desc("distinct_count_userId")).show()

In [None]:
# so one user has upgraded at least 8 times, and in our data are "currently" paying users. Maybe how many times they have downgraded before could also be a factor
raw_df.select("userId", "page", "ts").where((f.col("page").isin(["Submit Upgrade", "Submit Downgrade", "cancellation Confirmation"]))&(f.col("userId")==1662781)).sort("userId", "ts").show()

Hence our goal is to predict which of the 7617 paying users we have are next to downgrade / cancel.

# Feature Extraction 
I first brainstorm possible features to investigate:

 - Duration of being at the paid level
 - Number of unique artists listened  (ever / over a period)
 - gender column
 - itemInSession column
 - Many from the pages column:
    - number of songs played (ever / over a period)
    - number of thumbs up (ever / over a period) 
    - number of thumbs down (ever / over a period)
    - number of visits to the downgrade and or cancel page (ever / over a period)
    - number of visits to the help page
    - number of HTTP errors (404 codes)

 - Number of times the user has churned previously

 I will then look at these to see if a model might be able to use them to differentiate between users about
 to downgrade and those that are not.


## Duration at the paid level

In [None]:
df = df.withColumn("level_start_ts",f.min(f.col("ts")).over(Window.partitionBy("userId_paid_phase")))


In [None]:
df.show()

In [None]:
df = df.withColumn("last_ts", f.max(f.col("ts")).over(Window.partitionBy("userId_paid_phase")))

In [None]:
df = df.withColumn("paid_duration_s", (f.col("last_ts") - f.col("level_start_ts"))/1000)

In [None]:
df_durations = df.select("userId_paid_phase", "paid_duration_s").distinct()

In [None]:
assert df_durations.count() == feature_df.count()

In [None]:
df_durations.show(3)

In [None]:
feature_df = feature_df.join(df_durations, on="userId_paid_phase", how="left")

In [None]:
feature_pdf = feature_df.toPandas()
feature_pdf

In [None]:
sns.violinplot(data=feature_pdf, x="churned", y="paid_duration_s")

In [None]:
feature_pdf["paid_duration_days"] = feature_pdf.paid_duration_s/(86400)
sns.violinplot(data=feature_pdf, x="churned", y="paid_duration_days")

In [None]:
feature_pdf["paid_duration_days"].mean()

In [None]:
raw_df.where(f.col("userId")==1814132).sort("ts",ascending=False).show()

## Unique Artists

In [None]:
df_unique_artists = df.groupby("userId_paid_phase").agg(f.countDistinct("artist").alias("unique_artist_count"))
feature_df = feature_df.join(df_unique_artists, on="userId_paid_phase", how="left")
feature_df.sort("unique_artist_count").show(5)
feature_df.sort(f.desc("unique_artist_count")).show(5)

In [None]:
# Plot some results
feature_pdf = feature_df.toPandas()

In [None]:
feature_pdf

In [None]:
feature_pdf.groupby("churned").describe()

In [None]:
feature_pdf["dummy"]=""

In [None]:
sns.violinplot(data=feature_pdf,y="unique_artist_count", x="dummy", hue="churned", split=True)

## Gender

In [None]:
df_gender = df.withColumn("gender_encoded_male_1",f.when(f.col("gender")=="M",1).otherwise(0)).groupby("userId_paid_phase").agg(f.max("gender_encoded_male_1").alias("male_1_female_0"))
feature_df = feature_df.join(df_gender, on="userId_paid_phase", how="left")
feature_df.show(5)

In [None]:
feature_pdf = feature_df.toPandas()
feature_pdf

In [None]:
# Look at the percentage of each gender that churn?
gender_summary_pdf = feature_pdf.groupby("male_1_female_0").apply(lambda x: pd.Series({"total_count":x["churned"].count(), "churned_count":x["churned"].sum()}))
gender_summary_pdf

In [None]:
gender_summary_pdf["pct_churn"] = gender_summary_pdf.churned_count / gender_summary_pdf.total_count * 100
gender_summary_pdf

In [None]:
(3611+3907)/(7246+7889)*100

The percentage of users that have churned is 49.7%, and that varies very minimally for different genders. It is tempting to omit gender as a feature, but it could be that when combined with other features it is useful in making predictions.

## itemIn Session Column

In [None]:
# Understand how this column works:
df.sort("userId_paid_phase").select("userId", "userId_paid_phase", "ts", "itemInSession", "page").show()

In [None]:
# Can see each row increments the itemInSession count by 1, wheras I am interested in the total
# number of items in each session, i.e. the "peaks" of the values before they drop down again.
w = Window.partitionBy("userId_paid_phase").orderBy("ts")
last_row_window = Window.partitionBy("userId_paid_phase",).orderBy(f.desc('ts'))

df_max_items_in_sessions = (
    df
    .sort("userId_paid_phase")
    .select("userId", "userId_paid_phase", "ts", "itemInSession", "page")
    .withColumn("prev_item",f.lag("itemInSession").over(w))
    .withColumn("item_minus_prev",f.col("itemInSession")-f.col("prev_item"))
    .withColumn("row_num", f.row_number().over(last_row_window))
    .where((f.col("item_minus_prev")<1) | (f.col("row_num")==1))
    .select("userId_paid_phase",f.col("prev_item").alias("items_in_session_counts"))
)
df_max_items_in_sessions.show(5)

#.where(f.col("item_minus_prev")<1).select("",f.col("prev_item").alias("session_peaks")).show()

# Note, the count for the last row is off by 1 (as we take the previous row value) but should
# be good enough as these numbers are comparitvely large and have spent excessive time already.

In [None]:
#also need to keep last row per paid period?
df_max_items_in_sessions.where(f.col("userId_paid_phase")=="1999908_1").show()

In [None]:
df.where(f.col("userId_paid_phase")=="1999908_1").show()

In [None]:
# Possible improvement option to investigate in the future - if should "reset" the item count at the start of a paid session, or keep both.

In [None]:
df_itemInSession = df_max_items_in_sessions.groupby("userId_paid_phase").agg(f.mean("items_in_session_counts").alias("mean(items_in_session_counts)"))

In [None]:
feature_df = feature_df.join(df_itemInSession, on="userId_paid_phase", how="left")
feature_df.show(2)

In [None]:
feature_pdf = feature_df.toPandas()
feature_pdf

In [None]:
sns.boxplot(data=feature_pdf,y="mean(items_in_session_counts)",x='churned')

In [None]:
feature_df.columns

# Review various columns looking at total count, and average past week compared to overall average

In [None]:
# df_songs_played = (
#     df
#     .where(f.col("page")=="NextSong")
#     .groupby("userId_paid_phase").agg(f.count("artist").alias("total_song_count"))
# )
# df_songs_played.show()

In [None]:

# Future improvement - It sounds reasonable that some of the churned users could be using the service less than they used to,
# so would like to try and generate a feature that compares the number of songs played recently, to the 
# number of songs played in the past. Ideally need to know when "now" is to calculate features like:
# "average songs per day last week vs average songs per day since upgrading"

# As we are not sure when "now" is, just look at the previous 1 week before either churning or the last interaction

In [None]:
one_week_in_ms = 1000 * 60 * 60 * 24 * 7
df = df.withColumn("one_week_prior_ts", f.col("last_ts") - one_week_in_ms)

In [None]:
feature_df.columns

In [None]:
df.columns

In [None]:
# feature_df = feature_df.drop('page_NextSong_total_count',
#  'page_NextSong_last_week_count')

In [None]:
col_values_to_count = [
    {"c":"page", "v":"NextSong"},
    {"c":"page", "v":"Thumbs Up"},
    {"c":"page", "v":"Thumbs Down"},
    {"c":"page", "v":"Downgrade"},
    {"c":"page", "v":"Cancel"},
    {"c":"page", "v":"Help"},
    {"c":"status", "v":404},
]

In [None]:

def process_value_to_count(input_df, col, value,feature_df):
    """calculate a new feature using the col and value field 
    on the input_df, and return the feature_df with a new column
    Also plot the results for review"""
    
    new_feature_total_col_name = f"{col}_{value}_total_count"
    new_feature_last_week_count_col_name = f"{col}_{value}_last_week_count"
    new_feature_last_week_pct_col_name = f"{col}_{value}_last_week_to_total_rate_ratio_pct"
    
    new_feature_df = (
        input_df
        .where(f.col(col)==value).groupby("userId_paid_phase")
        .agg(
            f.count(col).alias(new_feature_total_col_name),
            f.max("paid_duration_s").alias("paid_duration_s")
        )   
    )
    
    last_week_df = (
        input_df.where(f.col("ts")>=f.col("one_week_prior_ts"))
        .where(f.col(col)==value).groupby("userId_paid_phase")
        .agg(f.count(col).alias(new_feature_last_week_count_col_name))
    )
    new_feature_df = new_feature_df.join(last_week_df, on="userId_paid_phase", how="left")
    new_feature_df = (
        new_feature_df
        .withColumn(new_feature_last_week_pct_col_name,
                    ((f.col(new_feature_last_week_count_col_name))/(604800))
                    /
                    ((f.col(new_feature_total_col_name))/(f.col("paid_duration_s")))
                    *100
        )
    )
    
    feature_df = feature_df.join(new_feature_df.drop("paid_duration_s"), on="userId_paid_phase", how="left")
    feature_df = feature_df.fillna(0, subset=[new_feature_total_col_name,new_feature_last_week_pct_col_name])
    
    return feature_df

In [None]:
feature_df.columns

In [None]:
for c_v_dict in col_values_to_count:
    col = c_v_dict["c"]
    value = c_v_dict["v"]
    print(f"\nProcessing count of value {value} in column {col} ...")
    feature_df = process_value_to_count(input_df=df,col=col,value=value, feature_df=feature_df)

In [None]:
feat_pdf = feature_df.toPandas()
feat_pdf

In [None]:
feat_pdf.describe()

In [None]:
col = "page"
value = "NextSong"
new_feature_total_col_name = f"{col}_{value}_total_count"
new_feature_last_week_pct_col_name = f"{col}_{value}_last_week_to_total_rate_ratio_pct"

plt.figure()
fig, (bp,vp) = plt.subplots(1,2, sharey=True)

fig.suptitle(f"Plots for total counts of {value} in column {col}")
sns.boxplot(ax=bp, data=feat_pdf, y=new_feature_total_col_name, x="churned")

plt.figure()
sns.violinplot(ax=vp, data=feat_pdf, y=new_feature_total_col_name, x="churned")


plt.figure()
fig, (bp,vp) = plt.subplots(1,2, sharey=True)

fig.suptitle(f"Plots for % of last week vs total of {value} in column {col}")
sns.boxplot(ax=bp, data=feat_pdf, y=new_feature_last_week_pct_col_name, x="churned")

plt.figure()
sns.violinplot(ax=vp, data=feat_pdf, y=new_feature_last_week_pct_col_name, x="churned")


In [None]:
# Plot New Features
def display_plots(pdf, list_of_col_val_dict):
    f_pdf = pdf
    
    for c_v_dict in list_of_col_val_dict:
        col = c_v_dict["c"]
        value = c_v_dict["v"]
        print(f"\nDisplaying visuals for value {value} in column {col} ...")
        new_feature_total_col_name = f"{col}_{value}_total_count"
        new_feature_last_week_pct_col_name = f"{col}_{value}_last_week_to_total_rate_ratio_pct"
    
        plt.figure()
        fig, (bp,vp) = plt.subplots(1,2, sharey=True)
        
        fig.suptitle(f"Plots for total counts of {value} in column {col}")
        sns.boxplot(ax=bp, data=f_pdf, y=new_feature_total_col_name, x="churned")
        
        plt.figure()
        sns.violinplot(ax=vp, data=f_pdf, y=new_feature_total_col_name, x="churned")
        
        
        plt.figure()
        fig, (bp,vp) = plt.subplots(1,2)
        
        fig.suptitle(f"Plots for % of last week vs total of {value} in column {col}")
        sns.boxplot(ax=bp, data=f_pdf, y=new_feature_last_week_pct_col_name, x="churned")
        
        plt.figure()
        sns.violinplot(ax=vp, data=f_pdf, y=new_feature_last_week_pct_col_name, x="churned")




display_plots(feat_pdf, col_values_to_count)

In [None]:
feature_df.columns

## Comments:

- visits to the cancel page - all users that don't churn have never gone to the cancel page. Whilst some of the users that churn don't visit the page, that could also be because they only downgrade and don't cancel their accounts. We are not sure what happens on this interaction, it seems like you only reach that page if you have churned already, so will be removed from the data.
(Otherwise the model will use the fact that never got to the cancel page to predict no churn, but in reality that is too late)

In [None]:
feature_df = feature_df.drop('page_Cancel_last_week_count',
 'page_Cancel_last_week_pct',)

Possible feature improvements - look at percentages of each item against other values as well as the total counts. Again also compare "recent" counts/percentages to overall counts/percentages to try and detect a change in behaviour

# Making a Model to predict Churn

As we are trying to determine if a "user paying period" falls into a category of churned / not churned, we have a __classification__ problem to solve.

Supported classification algorithms in spark.ml include:
- Logistic Regression
- Random Forests - _chosen to try on our data_
- Gradient-Boosted Trees
- Support Vector Machines - _chosen to try on our data_
- Naive Bayes

In [None]:
# Check no missing values
feature_df.select([f.count(f.when(f.col(c).isNull(), c)).alias(c) for c in feature_df.columns]).show()

In [None]:
# need to fill missing values with 0, which is where the user didn't have any of those events, e.g. never went to Help page etc.
feature_df = feature_df.fillna(0)

In [None]:
feature_df = feature_df.persist()

In [None]:
print(f"features_df has {feature_df.count()} rows and {len(feature_df.columns)} cols")

In [None]:
feature_df.printSchema()

## Transform Features

In [None]:
feature_df.columns

In [None]:
input_cols = feature_df.columns[3:]
target_col = "churned"

In [None]:
feature_df.schema["userId"].dataType

In [None]:
numerical_cols = [c for c in input_cols if c!="male_1_female_0"]

In [None]:
# convert numerical cols to vectors:

assembler = VectorAssembler(inputCols=numerical_cols, outputCol="numerical_features_vector")
feature_df = assembler.transform(feature_df)

In [None]:
feature_df.columns

In [None]:
scaler = StandardScaler(inputCol="numerical_features_vector", outputCol="scaled_numerical_features_vector", withMean=True, withStd=True)
scalerModel = scaler.fit(feature_df)
feature_df = scalerModel.transform(feature_df)

In [None]:
# Add back the gender input col:
assembler = VectorAssembler(inputCols=["scaled_numerical_features_vector","male_1_female_0"], outputCol="features_vec")
feature_df = assembler.transform(feature_df)

In [None]:
model_df = feature_df.select(f.col("features_vec").alias("x"), f.col("churned").alias("Y"))
model_df.head()

In [None]:
# temp cell to speed things up... re run one abovr when remove this
model_df,_ = model_df.randomSplit([0.1,0.9])

In [None]:
train, test = model_df.randomSplit([0.8, 0.2], seed=7)

## Baseline
We saw previously close to 50% of "user paid periods" churned, so if we were to randomly guess if that period churned with an equal probability (50%), we should achieve an accuracy of 50%.

## Random Forest

In [None]:
rf = RandomForestClassifier(labelCol="Y", 
                            featuresCol="x")

evaluator = BinaryClassificationEvaluator(rawPredictionCol="pred") # prediction

rf_param_grid = (
    ParamGridBuilder()
    .addGrid(rf.maxDepth, [5,10])
    .addGrid(rf.maxBins, [25,35])
    .build()
)

pipeline=Pipeline(stages=[rf])

crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=rf_param_grid,
    evaluator=BinaryClassificationEvaluator(labelCol="Y", ),
    numFolds=2
)

rf_model = crossval.fit(train)

In [None]:

model = rf_model
y_pred = model.transform(test)

evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")
evaluator.setLabelCol("Y")
auc = evaluator.evaluate(y_pred)

print(f"\t{auc=}")

print(f"Best model params: {model.bestModel.extractParamMap()}")

In [None]:
def evaluate_model(model, test):
    model
    y_pred = model.transform(test)

    evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")
    evaluator.setLabelCol("Y")
    auc = evaluator.evaluate(y_pred)

    print(f"\t{auc=}")

    print(f"Best model params: {model.bestModel.extractParamMap()}")

## Support Vector Machine (SVM)

In [None]:
svc = LinearSVC(labelCol="Y", featuresCol="x")
pipeline = Pipeline(stages=[svc])
svc_param_grid = (ParamGridBuilder()
                 .addGrid(svc.aggregationDepth, [2,3])
                 .build())

crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=svc_param_grid,
    evaluator=evaluator,
    numFolds=2)

svc_model = crossval.fit(train)

In [None]:
evaluate_model(svc_model, test)

## Re-run models with input (if time allows)

In [None]:
model_df = feature_df.select(f.col("features_vec").alias("x"), f.col("churned").alias("Y"))

train, test = model_df.randomSplit([0.8, 0.2], seed=77)

## Random Forest

In [None]:
rf = RandomForestClassifier(labelCol="Y", 
                            featuresCol="x")

evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction") # prediction

rf_param_grid = (
    ParamGridBuilder()
    .addGrid(rf.maxDepth, [5,10])
    .addGrid(rf.maxBins, [25,35])
    .build()
)

pipeline=Pipeline(stages=[rf])

crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=rf_param_grid,
    evaluator=BinaryClassificationEvaluator(labelCol="Y", ),
    numFolds=2
)

rf_model = crossval.fit(train)

In [None]:
evaluate_model(rf_model, test)

## Support Vector Machine (SVM)

In [None]:
svc = LinearSVC(labelCol="Y", featuresCol="x")
pipeline = Pipeline(stages=[svc])
svc_param_grid = (ParamGridBuilder()
                 .addGrid(svc.aggregationDepth, [2,3])
                 .build())

crossval = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=svc_param_grid,
    evaluator=evaluator,
    numFolds=2)

svc_model = crossval.fit(train)

In [None]:
evaluate_model(svc_model, test)

# Improvements

- check / deal with users that might have upgraded multiple times
- Tune the period of "recent" activity to compare overall with, value different to 1 week could be better. Could also be a hyper parameter for the grid search in the pipeline. Adding the transform steps teo the pipeline also.
- Other feature ideas:
    - number of sessions (ever / over a period)
    - number of friends (ever / over a period) 
    - duration since registration
    - duration since upgrading to paid level
    -  day of the week / the week of the year?