# Sparkify Project

In [58]:
# import libraries
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import isnan, when, count, col, length
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

In [2]:
# create a Spark session
spark = SparkSession.builder\
    .master("local")\
    .appName('Sparkify')\
    .getOrCreate()

# Load and Clean Dataset
Using tiny subset (128MB) of the full dataset available (12GB) for local development.

Mini dataset: `s3n://udacity-dsnd/sparkify/mini_sparkify_event_data.json`

Full dataset: `s3n://udacity-dsnd/sparkify/sparkify_event_data.json`


In [None]:
# Uncomment to download

# import requests
# url = 'https://udacity-dsnd.s3.amazonaws.com/sparkify/mini_sparkify_event_data.json'

# def download_file(url):
#     local_filename = url.split('/')[-1]
#     with requests.get(url, stream=True) as r:
#         r.raise_for_status()
#         with open(local_filename, 'wb') as f:
#             for chunk in r.iter_content(chunk_size=8192): 
#                 f.write(chunk)
#     return local_filename

# download_file(url)

In [6]:
event_data = "mini_sparkify_event_data.json"
df = spark.read.json(event_data)
df.head()

Row(artist='Martha Tilston', auth='Logged In', firstName='Colin', gender='M', itemInSession=50, lastName='Freeman', length=277.89016, level='paid', location='Bakersfield, CA', method='PUT', page='NextSong', registration=1538173362000, sessionId=29, song='Rockpools', status=200, ts=1538352117000, userAgent='Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0', userId='30')

In [7]:
df.toPandas()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Martha Tilston,Logged In,Colin,M,50,Freeman,277.89016,paid,"Bakersfield, CA",PUT,NextSong,1.538173e+12,29,Rockpools,200,1538352117000,Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) G...,30
1,Five Iron Frenzy,Logged In,Micah,M,79,Long,236.09424,free,"Boston-Cambridge-Newton, MA-NH",PUT,NextSong,1.538332e+12,8,Canada,200,1538352180000,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",9
2,Adam Lambert,Logged In,Colin,M,51,Freeman,282.82730,paid,"Bakersfield, CA",PUT,NextSong,1.538173e+12,29,Time For Miracles,200,1538352394000,Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) G...,30
3,Enigma,Logged In,Micah,M,80,Long,262.71302,free,"Boston-Cambridge-Newton, MA-NH",PUT,NextSong,1.538332e+12,8,Knocking On Forbidden Doors,200,1538352416000,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",9
4,Daft Punk,Logged In,Colin,M,52,Freeman,223.60771,paid,"Bakersfield, CA",PUT,NextSong,1.538173e+12,29,Harder Better Faster Stronger,200,1538352676000,Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) G...,30
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
286495,,Logged Out,,,41,,,paid,,GET,Home,,500,,200,1543622240000,,
286496,,Logged Out,,,42,,,paid,,PUT,Login,,500,,307,1543622241000,,
286497,,Logged In,Emilia,F,43,House,,paid,"New York-Newark-Jersey City, NY-NJ-PA",GET,Home,1.538337e+12,500,,200,1543622248000,Mozilla/5.0 (compatible; MSIE 9.0; Windows NT ...,300011
286498,,Logged In,Emilia,F,44,House,,paid,"New York-Newark-Jersey City, NY-NJ-PA",GET,About,1.538337e+12,500,,200,1543622398000,Mozilla/5.0 (compatible; MSIE 9.0; Windows NT ...,300011


#### Loading, cleaning the dataset and checking for invalid or missing data - for example, records without userids or sessionids. 

In [29]:
#Checking if there are NaNs
df.select([count(when(isnan(c), c)).alias(c+'IsNan') for c in df.columns]).toPandas()

Unnamed: 0,artistIsNan,authIsNan,firstNameIsNan,genderIsNan,itemInSessionIsNan,lastNameIsNan,lengthIsNan,levelIsNan,locationIsNan,methodIsNan,pageIsNan,registrationIsNan,sessionIdIsNan,songIsNan,statusIsNan,tsIsNan,userAgentIsNan,userIdIsNan
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


No Nans in the mini-dataset.

In [31]:
#Checking if there are null values
df.select([count(when(col(c).isNull(), c)).alias(c+'IsNull') for c in df.columns]).toPandas()

Unnamed: 0,artistIsNull,authIsNull,firstNameIsNull,genderIsNull,itemInSessionIsNull,lastNameIsNull,lengthIsNull,levelIsNull,locationIsNull,methodIsNull,pageIsNull,registrationIsNull,sessionIdIsNull,songIsNull,statusIsNull,tsIsNull,userAgentIsNull,userIdIsNull
0,58392,0,8346,8346,0,8346,58392,0,8346,0,0,8346,0,58392,0,0,8346,0


Appearently missing data is correlated (missing counts of columns consistently appear having 8346 or 58392 Null values).

In [None]:
# TODO: print corr matrix for missing values


In [66]:
# Check schema and column types
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [65]:
# userId is string and shold be always with length greater than 0
# df.filter(df.userId).toPandas()
df.select('userId', length(col('userId')).alias('userIdLength')).distinct().orderBy(col('userIdLength')).show(5)

+------+------------+
|userId|userIdLength|
+------+------------+
|      |           0|
|     6|           1|
|     3|           1|
|     5|           1|
|     4|           1|
+------+------------+
only showing top 5 rows



Looks like there are users with userId equals to `''`. We'll drop them from our dataframe:

In [None]:
df.drop()

In [20]:
numCols = [col[0] for col in df.dtypes if not col[1]=='string']; numCols

['itemInSession', 'length', 'registration', 'sessionId', 'status', 'ts']

In [21]:
df.select(numCols).describe().show()

+-------+------------------+-----------------+--------------------+-----------------+------------------+--------------------+
|summary|     itemInSession|           length|        registration|        sessionId|            status|                  ts|
+-------+------------------+-----------------+--------------------+-----------------+------------------+--------------------+
|  count|            286500|           228108|              278154|           286500|            286500|              286500|
|   mean|114.41421291448516|249.1171819778458|1.535358834084427...|1041.526554973822|210.05459685863875|1.540956889810483...|
| stddev|129.76726201140994|99.23517921058361| 3.291321616327586E9|726.7762634630741| 31.50507848842214|1.5075439608226302E9|
|    min|                 0|          0.78322|       1521380675000|                1|               200|       1538352117000|
|    max|              1321|       3024.66567|       1543247354000|             2474|               404|       1543799

# Exploratory Data Analysis

### Define Churn

We create a column `Churn` to use as the label for our model. We choose the `Cancellation Confirmation` events to define the churn, which happen for both paid and free users. We also analyze the `Downgrade` events.

### Explore Data
Analyzing the behavior for users who stayed vs users who churned. 
We explore aggregates on these two groups of users, observing how much of a specific action they experienced per a certain time unit or number of songs played.

In [None]:
# How many songs do users listen to on average between visiting our home page

function = udf(lambda ishome : int(ishome == 'Home'), IntegerType())

user_window = Window \
    .partitionBy('userID') \
    .orderBy(desc('ts')) \
    .rangeBetween(Window.unboundedPreceding, 0)

cusum = df.filter((df.page == 'NextSong') | (df.page == 'Home')) \
    .select('userID', 'page', 'ts') \
    .withColumn('homevisit', function(col('page'))) \
    .withColumn('period', Fsum('homevisit').over(user_window))

cusum.filter((cusum.page == 'NextSong')) \
    .groupBy('userID', 'period') \
    .agg({'period':'count'}) \
    .agg({'count(period)':'avg'}).show()

In [None]:
cusum.show()

In [None]:
# Calculating Statistics by Hour

get_hour = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0). hour)

user_log = user_log.withColumn("hour", get_hour(user_log.ts))

user_log.head()

songs_in_hour = user_log.filter(user_log.page == "NextSong").groupby(user_log.hour).count().orderBy(user_log.hour.cast("float"))

songs_in_hour.show()

songs_in_hour_pd = songs_in_hour.toPandas()
songs_in_hour_pd.hour = pd.to_numeric(songs_in_hour_pd.hour)

plt.scatter(songs_in_hour_pd["hour"], songs_in_hour_pd["count"])
plt.xlim(-1, 24);
plt.ylim(0, 1.2 * max(songs_in_hour_pd["count"]))
plt.xlabel("Hour")
plt.ylabel("Songs played");

In [None]:
# Drop Rows with Missing Values
# As wee see, it turns out there are no missing values in the userID or session columns. But there are userID values that are empty strings.
user_log_valid = user_log.dropna(how = "any", subset = ["userId", "sessionId"])

user_log_valid.count()

user_log.select("userId").dropDuplicates().sort("userId").show()

user_log_valid = user_log_valid.filter(user_log_valid["userId"] != "")

user_log_valid.count()

In [None]:
# Users Downgrade Their Accounts
# We find when users downgrade their accounts and then flag those log entries. Then we use a window function and cumulative sum to distinguish each user's data as either pre or post downgrade events.

user_log_valid.filter("page = 'Submit Downgrade'").show()

user_log.select(["userId", "firstname", "page", "level", "song"]).where(user_log.userId == "1138").collect()

flag_downgrade_event = udf(lambda x: 1 if x == "Submit Downgrade" else 0, IntegerType())

user_log_valid = user_log_valid.withColumn("downgraded", flag_downgrade_event("page"))

user_log_valid.head()

windowval = Window.partitionBy("userId").orderBy(desc("ts")).rangeBetween(Window.unboundedPreceding, 0)

user_log_valid = user_log_valid.withColumn("phase", Fsum("downgraded").over(windowval))

user_log_valid.select(["userId", "firstname", "ts", "page", "level", "phase"]).where(user_log.userId == "1138").sort("ts").collect()

# Feature Engineering

# Modeling
We split the full dataset into train, test, and validation sets. 

We evaluate the accuracy of the various models, tuning parameters as necessary. 

The winning model is chosen based on test accuracy and we report the results of the model on the validation set. Since the churned users are a fairly small subset, we use F1 score as the metric to optimize.

# Final Steps