# Sparkify Project Workspace
This workspace contains a tiny subset (128MB) of the full dataset available (12GB). Feel free to use this workspace to build your project, or to explore a smaller subset with Spark before deploying your cluster on the cloud. Instructions for setting up your Spark cluster is included in the last lesson of the Extracurricular Spark Course content.

You can follow the steps below to guide your data analysis and model building portion of this project.

In [1]:
# Import libraries
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession

# from pyspark.sql.functions import avg, col, concat, desc, explode, lit, min, max, split, udf
from pyspark.sql import Window
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, FloatType
# from pyspark.sql import Window

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import CountVectorizer, IDF, Normalizer, PCA, RegexTokenizer, StandardScaler, StopWordsRemover, StringIndexer, VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

import re
import numpy as np
import math 

from datetime import datetime

from matplotlib import pyplot as plt
%matplotlib inline

In [2]:
# Create a Spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify Project") \
    .config("spark.driver.bindAddress","localhost")\
    .config("spark.ui.port","4050")\
    .getOrCreate()

In [3]:
# Check config
# spark.sparkContext.getConf().getAll()

In [4]:
# Check session
# spark

# Load and Clean Dataset
In this workspace, the mini-dataset file is `mini_sparkify_event_data.json`. Load and clean the dataset, checking for invalid or missing data - for example, records without userids or sessionids. 

In [5]:
# Load data
# Mini dataset
path = "../data/mini_sparkify_event_data.json"
# Full dataset
# path ="s3n://udacity-dsnd/sparkify/sparkify_event_data.json"
df_user_log = spark.read.json(path)

In [6]:
# check schema
df_user_log.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [7]:
# Describe
# user_log.describe()

In [8]:
# Show 1 row
# user_log.show(n=1)
df_user_log.head()

Row(artist='Martha Tilston', auth='Logged In', firstName='Colin', gender='M', itemInSession=50, lastName='Freeman', length=277.89016, level='paid', location='Bakersfield, CA', method='PUT', page='NextSong', registration=1538173362000, sessionId=29, song='Rockpools', status=200, ts=1538352117000, userAgent='Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0', userId='30')

In [9]:
# Check num of rows, columns
df_user_log.count(), len(df_user_log.columns)

(286500, 18)

We can delete eventual NaN and missing users:

In [10]:
# Drop NaN
df_user_log_valid = df_user_log.dropna(how = "any", subset = ["userId", "sessionId"])
# Drop empty users
df_user_log_valid = df_user_log_valid.filter(df_user_log_valid["userId"] != "")

In [11]:
df_user_log_valid.count()

278154

# Exploratory Data Analysis
When you're working with the full dataset, perform EDA by loading a small subset of the data and doing basic manipulations within Spark. In this workspace, you are already provided a small subset of data you can explore.

### Define Churn

Once you've done some preliminary analysis, create a column `Churn` to use as the label for your model. I suggest using the `Cancellation Confirmation` events to define your churn, which happen for both paid and free users. As a bonus task, you can also look into the `Downgrade` events.

### Explore Data
Once you've defined churn, perform some exploratory data analysis to observe the behavior for users who stayed vs users who churned. You can start by exploring aggregates on these two groups of users, observing how much of a specific action they experienced per a certain time unit or number of songs played.

In [12]:
# Check available levels
df_user_log_valid.select("level").dropDuplicates().show()

+-----+
|level|
+-----+
| free|
| paid|
+-----+



In [13]:
# Check available pages
df_user_log_valid.select("page").dropDuplicates().sort("page").show()

+--------------------+
|                page|
+--------------------+
|               About|
|          Add Friend|
|     Add to Playlist|
|              Cancel|
|Cancellation Conf...|
|           Downgrade|
|               Error|
|                Help|
|                Home|
|              Logout|
|            NextSong|
|         Roll Advert|
|       Save Settings|
|            Settings|
|    Submit Downgrade|
|      Submit Upgrade|
|         Thumbs Down|
|           Thumbs Up|
|             Upgrade|
+--------------------+



In [14]:
# Defining a churn variable based on the Cancellation Confirmation page
cancellation = F.udf(lambda x: 1 if x == "Cancellation Confirmation" else 0, IntegerType())  
df_user_log_valid = df_user_log_valid.withColumn("churn", cancellation("page"))

In [15]:
# Defining a sub_dwg variable based on the Submit Downgrade page
submit_dwg = F.udf(lambda x: 1 if x == "Submit Downgrade" else 0, IntegerType())
df_user_log_valid = df_user_log_valid.withColumn("sub_dwg", submit_dwg("page"))

In [16]:
# Getting UNIX time from miliseconds to seconds
df_user_log_valid = df_user_log_valid.withColumn("ts", df_user_log_valid.ts/1000)                        
df_user_log_valid = df_user_log_valid.withColumn("registration", df_user_log_valid.registration/1000)

In [17]:
# Defining a last_ts variable showing the timestamp of the last entry 
# (Can be used to filter with time, will be used to calculate total duration of the permanence)
window = Window.partitionBy("userId")
df_user_log_valid= df_user_log_valid.withColumn("last_ts", F.max('ts').over(window))

In [21]:
# Defining a perm_days variable showing the (rounded) number of days a user has spent with the service so far
df_user_log_valid = df_user_log_valid.withColumn("perm_days", \
                                                 F.round((df_user_log_valid.last_ts - df_user_log_valid.registration)\
                                                 /(3600*24)))

In [23]:
# We can remove from the dataframe users that have less than a day of permanence
df_user_log_valid = df_user_log_valid.filter(df_user_log_valid.perm_days > 0)       

In [22]:
# Check columns
df_user_log_valid.head()

Row(artist='Sleeping With Sirens', auth='Logged In', firstName='Darianna', gender='F', itemInSession=0, lastName='Carpenter', length=202.97098, level='free', location='Bridgeport-Stamford-Norwalk, CT', method='PUT', page='NextSong', registration=1538016340.0, sessionId=31, song='Captain Tyin Knots VS Mr Walkway (No Way)', status=200, ts=1539003534.0, userAgent='"Mozilla/5.0 (iPhone; CPU iPhone OS 7_1_2 like Mac OS X) AppleWebKit/537.51.2 (KHTML, like Gecko) Version/7.0 Mobile/11D257 Safari/9537.53"', userId='100010', churn=0, sub_dwg=0, last_ts=1542823952.0, perm_days=56.0)

In [25]:
df_user_log_valid.count()

278148

In [26]:
# Create a list of users that cancelled vs. users that stayed
users_cancelled = df_user_log_valid.select(["userId"]).where(df_user_log_valid.churn == 1).dropDuplicates().collect()
users_staying = df_user_log_valid.select(["userId"]).where(df_user_log_valid.churn == 0).dropDuplicates().collect()

In [27]:
len(users_cancelled), len(users_staying)

(52, 224)

We can build two datasets, with the users that left vs. the users that stay

In [29]:
# Select a subset of the dataframe with the users that left
users_cancelled_list = [us_c[0] for us_c in users_cancelled]
df_user_log_cancelled = df_user_log_valid.filter(df_user_log_valid.userId.isin(users_cancelled_list))

In [28]:
# Select a subset of the dataframe with the users that stayed
users_staying_list = [us_s[0] for us_s in users_staying]
df_user_log_staying = df_user_log_valid.filter(df_user_log_valid.userId.isin(users_staying_list))

In [30]:
# Check the size of the datasets
df_user_log_cancelled.count(), df_user_log_staying.count()

(44864, 278148)

We can further refine, building datasets focusing on the last week of activity for the users, so to see wether or not there's a change for the users that churn as they approach the moment they would leave

In [32]:
# Select last week of users that cancelled
df_user_log_cancelled_lweek = df_user_log_cancelled.filter((df_user_log_cancelled.last_ts - df_user_log_valid.ts) < \
                                                      (3600*24*7))

In [33]:
# Select last week of users that stay
df_user_log_staying_lweek = df_user_log_staying.filter((df_user_log_staying.last_ts - df_user_log_valid.ts) < \
                                                      (3600*24*7))

In [34]:
# Check the size of the datasets
df_user_log_cancelled_lweek.count(), df_user_log_staying_lweek.count()

(16594, 59910)

In [None]:
# Check what happens to the first userID leaving
# user_canc_0_lifecycle = df_user_log_valid.select(["userId", "firstname", "page", "level", "ts"]).\
#     where(df_user_log_valid.userId == users_cancelled[12][0]).collect()

In [None]:
# len(user_canc_0_lifecycle)

In [None]:
# The beginning...
# user_canc_0_lifecycle[:10]

In [None]:
# ... the end
# user_canc_0_lifecycle[400:450]

As a variable of interest we can consider the amount of time spent with the service:

In [None]:
# How much time did he/she spend with the service:
# print('Example of a user leaving the service:')
# print('Date when joining: ', datetime.fromtimestamp(user_canc_0_lifecycle[0][4]/1000))
# print('Date when leaving: ', datetime.fromtimestamp(user_canc_0_lifecycle[-1][4]/1000))
      
# # Quantify that time in days...
# print('Interval in days: ', '{:.3f}'.format((user_canc_0_lifecycle[-1][4]/1000 - user_canc_0_lifecycle[0][4]/1000)\
#                                             /(3600*24)))

We can take a look at the same metric for a user that stayed with the service:

In [None]:
# First userID staying
# user_stay_0_lifecycle = user_log_valid.select(["userId", "firstname", "page", "level", "ts"]).\
#     where(user_log_valid.userId == users_staying[0][0]).collect()

# # How much time did he/she spend with the service:
# print('Example of a user staying with the service:')
# print('Date when joining: ', datetime.fromtimestamp(user_stay_0_lifecycle[0][4]/1000))
# print('Last date recorded: ', datetime.fromtimestamp(user_stay_0_lifecycle[-1][4]/1000))
      
# # Quantify that time in days...
# print('Interval in days: ', '{:.3f}'.format((user_stay_0_lifecycle[-1][4]/1000 - user_stay_0_lifecycle[0][4]/1000)\
#                                             /(3600*24)))

**Time spent/numer of songs**

In [None]:
# Check dta frames sizes
user_log_cancelled.count(), user_log_staying.count()

In [None]:
# Select, for every user that cancelled, the number of songs and the time span they spent with the service
song_time_canc = user_log_cancelled.\
                filter(user_log_cancelled["song"] != "").\
                groupBy("userId").\
                agg(min("ts"), max("ts"), countDistinct("song")).\
                collect()

In [None]:
# Get the time difference
delta_time_cancelled = [(song_time[2]/1000 - song_time[1]/1000)/(3600*24) for song_time in song_time_canc]
# Get the number of songs listened per day day (on average)
num_songs_cancelled = [song_time[3]/math.ceil((song_time[2]/1000 - song_time[1]/1000)/(3600*24)) for song_time in song_time_canc]

In [None]:
# Select, for every user that stays, the number of songs and the time span they spent with the service
song_time_stay = user_log_staying.\
                filter(user_log_staying["song"] != "").\
                groupBy("userId").\
                agg(min("ts"), max("ts"), countDistinct("song")).\
                collect()

In [None]:
# Get the time difference
delta_time_staying = [(song_time[2]/1000 - song_time[1]/1000)/(3600*24) for song_time in song_time_stay]
# Get the number of songs listened per day day (on average)
num_songs_staying = [song_time[3]/math.ceil((song_time[2]/1000 - song_time[1]/1000)/(3600*24)) for song_time in song_time_stay]

In [None]:
# Plot histogram for time
fig, (ax1, ax2) = plt.subplots(1, 2)

fig.suptitle('Distribution of time spent by users ', fontsize=12, fontweight='bold')
fig.set_figheight(7) 
fig.set_figwidth(15)

ax1.hist(delta_time_cancelled, bins=50)
ax1.set_xlabel('# of days spent with the service', fontsize=11)
ax1.set_ylabel('# of users', fontsize=11)
ax1.set_title('Cancelling Users', fontsize=11,fontweight='bold')

ax2.hist(delta_time_staying, bins=50)
ax2.set_xlabel('# of days spent with the service', fontsize=11)
ax2.set_ylabel('# of users', fontsize=11)
ax2.set_title('Staying Users', fontsize=11,fontweight='bold');

In [None]:
# Look at the statistics
print('Statistics for time spent with the service for users that cancelled:')
spark.createDataFrame(delta_time_cancelled, FloatType()).describe().show()

In [None]:
# Look at the statistics
print('Statistics for time spent with the service for users that stayed:')
spark.createDataFrame(delta_time_staying, FloatType()).describe().show()

In [None]:
# Plot histogram for songs
fig, (ax1, ax2) = plt.subplots(1, 2)

fig.suptitle('Distribution of songs listened/day by users', fontsize=12, fontweight='bold')
fig.set_figheight(7) 
fig.set_figwidth(15)

ax1.hist(num_songs_cancelled, bins=50)
ax1.set_xlabel('# of songs/day', fontsize=11)
ax1.set_ylabel('# of users', fontsize=11)
ax1.set_title('Cancelling Users', fontsize=11,fontweight='bold')

ax2.hist(num_songs_staying, bins=50)
ax2.set_xlabel('# of songs/day', fontsize=11)
ax2.set_ylabel('# of users', fontsize=11)
ax2.set_title('Staying Users', fontsize=11,fontweight='bold');

In [None]:
# Look at the statistics
print('Statistics for songs/day for users that cancelled:')
spark.createDataFrame(num_songs_cancelled, FloatType()).describe().show()

In [None]:
# Look at the statistics
print('Statistics for songs/day for users that stay:')
spark.createDataFrame(num_songs_staying, FloatType()).describe().show()

We can check the quantiles for the num of songs/day

In [None]:
spark.createDataFrame(num_songs_cancelled, FloatType()).approxQuantile("value", [0.25, 0.5, 0.75], 0.25)

In [None]:
spark.createDataFrame(num_songs_staying, FloatType()).approxQuantile("value", [0.25, 0.5, 0.75], 0.25)

**Roll advert/Add friend**

Other maybe useful pages are thoe marked as `Roll Advert` or `Add Friend`

In [None]:
# Defining a roll_adv variable based on the Roll Advert page
roll_adv = udf(lambda x: 1 if x == "Roll Advert" else 0, IntegerType())  

user_log_cancelled = user_log_cancelled.withColumn("roll_adv", roll_adv("page"))
user_log_staying = user_log_staying.withColumn("roll_adv", roll_adv("page"))

In [None]:
# Defining an add_friend variable based on the Add Friend page
add_friend = udf(lambda x: 1 if x == "Add Friend" else 0, IntegerType())  

user_log_cancelled = user_log_cancelled.withColumn("add_friend", add_friend("page"))
user_log_staying = user_log_staying.withColumn("add_friend", add_friend("page"))

In [None]:
user_log_cancelled.head()

In [None]:
user_log_staying.head()

Added friend event:

In [None]:
added_friend_canc = user_log_cancelled.\
                groupBy("userId").\
                agg(min("ts"), max("ts"), Ssum("add_friend")).collect()

In [None]:
added_friend_event_canc = [added_friend[3]/math.ceil((added_friend[2]/1000 - added_friend[1]/1000)/(3600*24))\
                           for added_friend in added_friend_canc]

In [None]:
added_friend_stay = user_log_staying.\
                groupBy("userId").\
                agg(min("ts"), max("ts"), Ssum("add_friend")).collect()

In [None]:
added_friend_event_stay = [added_friend[3]/math.ceil((added_friend[2]/1000 - added_friend[1]/1000)/(3600*24))\
                           for added_friend in added_friend_stay]

In [None]:
# Plot histogram for added friends
fig, (ax1, ax2) = plt.subplots(1, 2)

fig.suptitle('Distribution of added friends/day by users', fontsize=12, fontweight='bold')
fig.set_figheight(7) 
fig.set_figwidth(15)

ax1.hist(added_friend_event_canc, bins=100)
ax1.set_xlabel('# of added friends/day', fontsize=11)
ax1.set_ylabel('# of users', fontsize=11)
ax1.set_title('Cancelling Users', fontsize=11,fontweight='bold')

ax2.hist(added_friend_event_stay, bins=100)
ax2.set_xlabel('# of added friends/day', fontsize=11)
ax2.set_ylabel('# of users', fontsize=11)
ax2.set_title('Staying Users', fontsize=11,fontweight='bold');

In [None]:
# Look at the statistics
print('Statistics for added friend/day for users that cancelled:')
spark.createDataFrame(added_friend_event_canc, FloatType()).describe().show()

In [None]:
# Look at the statistics
print('Statistics for added friend/day for users that stay:')
spark.createDataFrame(added_friend_event_stay, FloatType()).describe().show()

Rolled advert event:

In [None]:
rolled_advert_canc = user_log_cancelled.\
                groupBy("userId").\
                agg(min("ts"), max("ts"), Ssum("roll_adv")).collect()

In [None]:
rolled_advert_event_canc = [rolled_adv[3]/math.ceil((rolled_adv[2]/1000 - rolled_adv[1]/1000)/(3600*24))\
                            for rolled_adv in rolled_advert_canc]

In [None]:
rolled_advert_stay = user_log_staying.\
                groupBy("userId").\
                agg(min("ts"), max("ts"), Ssum("roll_adv")).collect()

In [None]:
rolled_advert_event_stay = [rolled_adv[3]/math.ceil((rolled_adv[2]/1000 - rolled_adv[1]/1000)/(3600*24)) \
                            for rolled_adv in rolled_advert_stay]

In [None]:
# Plot histogram for rolled advert
fig, (ax1, ax2) = plt.subplots(1, 2)

fig.suptitle('Distribution of rolled advert event', fontsize=12, fontweight='bold')
fig.set_figheight(7) 
fig.set_figwidth(15)

ax1.hist(rolled_advert_event_canc, bins=100)
ax1.set_xlabel('# of rolled adverts/day', fontsize=11)
ax1.set_ylabel('# of users', fontsize=11)
ax1.set_title('Cancelling Users', fontsize=11,fontweight='bold')

ax2.hist(rolled_advert_event_stay, bins=100)
ax2.set_xlabel('# of rolled adverts/day', fontsize=11)
ax2.set_ylabel('# of users', fontsize=11)
ax2.set_title('Staying Users', fontsize=11,fontweight='bold');

In [None]:
# Look at the statistics
print('Statistics for rolled advert/day for users that cancelled:')
spark.createDataFrame(rolled_advert_event_canc, FloatType()).describe().show()

In [None]:
# Look at the statistics
print('Statistics for rolled advert/day for users that stay:')
spark.createDataFrame(rolled_advert_event_stay, FloatType()).describe().show()

**Downgrade plan**

In [None]:
# Defining a dwg variable based on the the change of level from "paid" to "free"

In [None]:
# Defining a sub_dwg variable based on the Submit Downgrade page
sub_dwg = udf(lambda x: 1 if x == "Submit Downgrade" else 0, IntegerType())  

user_log_cancelled = user_log_cancelled.withColumn("sub_dwg", sub_dwg("page"))
user_log_staying = user_log_staying.withColumn("sub_dwg", sub_dwg("page"))

In [None]:
user_log_cancelled.head()

In [None]:
user_log_staying.head()

In [None]:
downgrade_canc = user_log_cancelled.select(Ssum("sub_dwg")).collect()

In [None]:
downgrade_stay = user_log_staying.select(Ssum("sub_dwg")).collect()

In [None]:
downgrade_canc[0][0], downgrade_stay[0][0]

In [None]:
# Defining a pay_plan variable based on the Level page
pay_plan = udf(lambda x: 1 if x == "paid" else 0, IntegerType())  

user_log_cancelled = user_log_cancelled.withColumn("pay_plan", pay_plan("level"))
user_log_staying = user_log_staying.withColumn("pay_plan", pay_plan("level"))

In [None]:
user_log_cancelled.head()

In [None]:
user_log_staying.head()

In [None]:
pay_plan_canc = user_log_cancelled.\
                groupBy("userId").\
                agg(max("pay_plan")).collect()

In [None]:
pay_plan_canc_list = [pay_plan_status[1] for pay_plan_status in pay_plan_canc]

In [None]:
pay_plan_stay = user_log_staying.\
                groupBy("userId").\
                agg(max("pay_plan")).collect()

In [None]:
pay_plan_stay_list = [pay_plan_status[1] for pay_plan_status in pay_plan_stay]

In [None]:
sum(pay_plan_canc_list), sum(pay_plan_stay_list)

In [None]:
downgrade_canc[0][0]/sum(pay_plan_canc_list), downgrade_stay[0][0]/sum(pay_plan_stay_list)

In [None]:
# window = Window.partitionBy("userId").rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)
# user_log_valid_2 = user_log_valid.withColumn("Schurn", Ssum("churn").over(window))

In [None]:
# user_log_valid_2.head()

In [None]:
# user_log_valid_2.tail(5)

# Feature Engineering
Once you've familiarized yourself with the data, build out the features you find promising to train your model on. To work with the full dataset, you can follow the following steps.
- Write a script to extract the necessary features from the smaller subset of data
- Ensure that your script is scalable, using the best practices discussed in Lesson 3
- Try your script on the full data set, debugging your script if necessary

If you are working in the classroom workspace, you can just extract features based on the small subset of data contained here. Be sure to transfer over this work to the larger dataset when you work on your Spark cluster.

# Modeling
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.

# Final Steps
Clean up your code, adding comments and renaming variables to make the code easier to read and maintain. Refer to the Spark Project Overview page and Data Scientist Capstone Project Rubric to make sure you are including all components of the capstone project and meet all expectations. Remember, this includes thorough documentation in a README file in a Github repository, as well as a web app or blog post.