# Sparkify Project Workspace
This workspace contains a tiny subset (128MB) of the full dataset available (12GB). Feel free to use this workspace to build your project, or to explore a smaller subset with Spark before deploying your cluster on the cloud. Instructions for setting up your Spark cluster is included in the last lesson of the Extracurricular Spark Course content.

You can follow the steps below to guide your data analysis and model building portion of this project.

In [27]:
# import libraries
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import avg, col, concat, desc, explode, lit, min, max, split,udf,sum 
#from pyspark.sql.functions import sum as Fsum
from pyspark.sql.functions import isnan, when, count,desc,asc
from pyspark.sql.types import *

from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer,CountVectorizer, HashingTF, IDF, PCA 
from pyspark.ml.feature import VectorIndexer, OneHotEncoderEstimator, StopWordsRemover,VectorAssembler
from pyspark.ml.feature import StandardScaler,IndexToString, StringIndexer
from pyspark.ml.classification import LogisticRegression,RandomForestClassifier

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.linalg import Vectors 

#from workspace_utils import active_session
import re
import datetime
import numpy as np
import pandas as pd
%matplotlib inline
import matplotlib.pyplot as plt
from operator import add
from functools import reduce

In [28]:
spark = SparkSession.builder \
    .master("local") \
    .appName("Spark Capstone Project") \
    .getOrCreate()

2.3.2


# Load and Clean Dataset
Clean your dataset, checking for invalid or missing data. For example, records without userids or sessionids. In this workspace, the filename is `mini_sparkify_event_data.json`.

In [55]:
mini_sparkify_event_data = 'mini_sparkify_event_data.json'
df = spark.read.json(mini_sparkify_event_data)
df.persist()

DataFrame[artist: string, auth: string, firstName: string, gender: string, itemInSession: bigint, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: bigint, sessionId: bigint, song: string, status: bigint, ts: bigint, userAgent: string, userId: string]

First look at the data : - 

In [56]:
print(df.take(1))
df.printSchema()

[Row(artist='Martha Tilston', auth='Logged In', firstName='Colin', gender='M', itemInSession=50, lastName='Freeman', length=277.89016, level='paid', location='Bakersfield, CA', method='PUT', page='NextSong', registration=1538173362000, sessionId=29, song='Rockpools', status=200, ts=1538352117000, userAgent='Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0', userId='30')]
root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (null

Dropping Rows With Missing Items : - 

In [57]:
print("Total Count: ", df.count())
print("Missing Value in Columns :- ")
for colName, dtype in df.dtypes:
    print(colName,':', df.filter(df[colName].isNull()).count())

Total Count:  286500
Missing Value in Columns :- 
artist : 58392
auth : 0
firstName : 8346
gender : 8346
itemInSession : 0
lastName : 8346
length : 58392
level : 0
location : 8346
method : 0
page : 0
registration : 8346
sessionId : 0
song : 58392
status : 0
ts : 0
userAgent : 8346
userId : 0


As we can see:
UserID, itemInSession, sessionID, time stamp,authorization, level, method and page has zero missing values. 

1. Artist, Length and Song have same number of missing values
2. User Agent, registration, first name, last name, location and gender have the same number of missing values. 

In [58]:
df.filter(df.registration.isNull()).show(3,False)

print("Number of rows with missing registration: ", df.filter(df.registration.isNull()).count())
print("Number of rows with missing registration, name, gender, location and user agent: ",\
      df.filter(df.registration.isNull() & df.firstName.isNull() & df.lastName.isNull() &
               df.gender.isNull() & df.location.isNull() & df.userAgent.isNull()).count()) 

df = df.filter(df.registration.isNotNull())

+------+----------+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+----+------+-------------+---------+------+
|artist|auth      |firstName|gender|itemInSession|lastName|length|level|location|method|page|registration|sessionId|song|status|ts           |userAgent|userId|
+------+----------+---------+------+-------------+--------+------+-----+--------+------+----+------------+---------+----+------+-------------+---------+------+
|null  |Logged Out|null     |null  |100          |null    |null  |free |null    |GET   |Home|null        |8        |null|200   |1538355745000|null     |      |
|null  |Logged Out|null     |null  |101          |null    |null  |free |null    |GET   |Help|null        |8        |null|200   |1538355807000|null     |      |
|null  |Logged Out|null     |null  |102          |null    |null  |free |null    |GET   |Home|null        |8        |null|200   |1538355841000|null     |      |
+------+----------+---------+------+----

These are all missing at the same row - where the user is not logged in. So as we can't identify these users individually and they constitute just ~3% of the records we drop the rows where registration is 0. 

Total Records : 286500

Missing Registration: 8346

% Missing Registration : 2.91%

Columns to Drop
- First name and last name aren't that useful
- Auth has no variability - all users are logged in 
- We can user id to uniquely identify a user - so registration number isn't needed
- All activities of a user will be collected without factoring in timestamp

In [59]:
dropIrrelevant = ["firstName","lastName","auth","ts","registration"]
keepCols = list(set(df.columns) - set(dropIrrelevant))
df = df.select(keepCols)
df.show(3,True)

+---------+------+------+-------------+--------------------+----------------+--------+------+-----+------+--------------------+-----------------+---------+
|sessionId|gender|method|itemInSession|            location|          artist|    page|userId|level|status|           userAgent|             song|   length|
+---------+------+------+-------------+--------------------+----------------+--------+------+-----+------+--------------------+-----------------+---------+
|       29|     M|   PUT|           50|     Bakersfield, CA|  Martha Tilston|NextSong|    30| paid|   200|Mozilla/5.0 (Wind...|        Rockpools|277.89016|
|        8|     M|   PUT|           79|Boston-Cambridge-...|Five Iron Frenzy|NextSong|     9| free|   200|"Mozilla/5.0 (Win...|           Canada|236.09424|
|       29|     M|   PUT|           51|     Bakersfield, CA|    Adam Lambert|NextSong|    30| paid|   200|Mozilla/5.0 (Wind...|Time For Miracles| 282.8273|
+---------+------+------+-------------+--------------------+----

In [60]:
print("Number of rows with missing artist: ", df.filter(df.artist.isNull()).count())
print("Number of rows with missing artist, song, song length: ",\
      df.filter(df.artist.isNull() & df.song.isNull() & df.length.isNull()).count()) 

print("Next song page with missing song: ", df.filter(df.song.isNull() & (df.page=="NextSong")).count()) 
print("Song on a different page: ", df.filter(df.song.isNotNull() & (df.page!="NextSong")).count()) 

Number of rows with missing artist:  50046
Number of rows with missing artist, song, song length:  50046
Next song page with missing song:  0
Song on a different page:  0


When we look into song which has same missing values as length and artist we find, 

When song is null, length is not defined as well as artist. Song is only present on the page nextSong.

Total Records : 286500

Missing Song (not a song page): 58392

% Missing Registration : 20.38 %

20% rows do not contain songs but these other pages can be relevant in determining churn so we will use it later, instead of dropping it.  

In [61]:
df.groupby('level','page').count().sort('page').show(100,False)
df.groupby('page','method').count().sort('page','method').show(100,False)

+-----+-------------------------+------+
|level|page                     |count |
+-----+-------------------------+------+
|free |About                    |93    |
|paid |About                    |402   |
|free |Add Friend               |893   |
|paid |Add Friend               |3384  |
|paid |Add to Playlist          |5354  |
|free |Add to Playlist          |1172  |
|paid |Cancel                   |31    |
|free |Cancel                   |21    |
|free |Cancellation Confirmation|21    |
|paid |Cancellation Confirmation|31    |
|paid |Downgrade                |2055  |
|free |Error                    |53    |
|paid |Error                    |199   |
|paid |Help                     |1132  |
|free |Help                     |322   |
|paid |Home                     |7461  |
|free |Home                     |2621  |
|free |Logout                   |922   |
|paid |Logout                   |2304  |
|free |NextSong                 |41991 |
|paid |NextSong                 |186117|
|paid |Roll Adve

Looking into pages column : 
    
Free users can reach upgrade and submit upgrade page  
Paid users can reach downgrade and submit downgrade page    

The remaining pages are common. A page has either GET or POST method not both. 

# Exploratory Data Analysis
When you're working with the full dataset, perform EDA by loading a small subset of the data and doing basic manipulations within Spark. In this workspace, you are already provided a small subset of data you can explore.

### Define Churn

Once you've done some preliminary analysis, create a column `Churn` to use as the label for your model. I suggest using the `Cancellation Confirmation` events to define your churn, which happen for both paid and free users. As a bonus task, you can also look into the `Downgrade` events.

### Explore Data
Once you've defined churn, perform some exploratory data analysis to observe the behavior for users who stayed vs users who churned. You can start by exploring aggregates on these two groups of users, observing how much of a specific action they experienced per a certain time unit or number of songs played.

In [38]:
df = df.na.fill(0)

#MODIFY IF SPACES
@F.udf
def removeSpace(x):
    x = x.replace(' ', '')
    return x

df = df.withColumn('page',removeSpace('page')) #space getting in column name

pages = set()
pagesList = df.select('page').collect()
removePage = ['CancellationConfirmation','SubmitDowngrade']

for page in pagesList:
    page = page.__getitem__("page")
    pages.add(page)

@F.udf
def getBinary(x,val):
    if x==val:
        return 1
    return 0

for page in list(pages):
    df = df.withColumn('page_'+page,getBinary('page',lit(page)))

df = df.withColumn('churn',reduce(add,[F.col("page_"+x) for x in removePage]))

for page in removePage:
    pages.remove(page)
    df = df.drop("page_"+page)

print(df.columns)

['sessionId', 'gender', 'method', 'itemInSession', 'location', 'artist', 'page', 'userId', 'level', 'status', 'userAgent', 'song', 'length', 'page_Cancel', 'page_RollAdvert', 'page_ThumbsDown', 'page_SubmitUpgrade', 'page_NextSong', 'page_Settings', 'page_Error', 'page_Logout', 'page_Home', 'page_AddtoPlaylist', 'page_Downgrade', 'page_ThumbsUp', 'page_SaveSettings', 'page_About', 'page_Upgrade', 'page_Help', 'page_AddFriend', 'churn']


In each page column - 0 indicates the event did not happen on this page, 1 indicates the event happened on this page

I've defined churn as if the user reaches Cancellation Confirmation as well as Submit Downgrade page. The Downgrade page is included as downgrade also tells us the user did not like the app (some focus on the paid features perhaps).


Many features can't be grouped as is, so I will do some feature transformation first. 

# Feature Engineering
Once you've familiarized yourself with the data, build out the features you find promising to train your model on. To work with the full dataset, you can follow the following steps.
- Write a script to extract the necessary features from the smaller subset of data
- Ensure that your script is scalable, using the best practices discussed in Lesson 3
- Try your script on the full data set, debugging your script if necessary

If you are working in the classroom workspace, you can just extract features based on the small subset of data contained here. Be sure to transfer over this work to the larger dataset when you work on your Spark cluster.

If we look at the agents - the information isn't presented in a manner easy to accumulate. Here, the agent into one of four categories :
- Linux OS
- Windows OS
- MAC OSX
- Apple phone

In [39]:
@F.udf
def countAgent(x):
    x = str.lower(x)
    if "linux" in x:
        return "LINUX"
    if "windows" in x:
        return "WINDOWS"
    if "iphone" in x:
        return "IPHONE"
    return "MAC"

df = df.withColumn('userAgent',countAgent('userAgent'))

There are users over various locations in the USA. The state can be identified from the location. The user can be put into a region based on the state
- North-East
- South
- West
- MidWest

In [None]:
def getStates(x):
    x = x.replace(" ", "")
    x = x.split(',')
    return x[-1]

def getRegion(x):
    x = x.split('-')
    x = x[0]
    if x in ['NY','NJ','PA','NH','CT', 'ME','MA','RI','VT']:
        return "NORTHEAST"
    if x in ['DE','FL','GA','MD','NC','SC','VA','DC','WV','AL',
             'KY','MS','TN','AR','LA','OK','TX']:
        return "SOUTH"
    if x in ['AK','CA','WA','OR', 'HI','ID','MT','UT','AZ',
             'CO','NV','NM','WY']:
        return "WEST"
    if x in ['IN', 'IL','MI', 'MN','MO', 'WI','OH','IA','NE',
             'WI','KS','ND','SD']:
        return "MIDWEST"
    return "NORTHEAST"

locationtoStates = udf(lambda x: getStates(x), StringType())
statesToRegion = udf(lambda x: getRegion(x), StringType())

df = df.withColumn('location',locationtoStates('location'))
df = df.withColumn('location',statesToRegion('location'))

There can be several http status codes. Code 200 is success and anything else means the page isn't accessible - which could cause users to churn. That is why the status code is transformed so 1 represents a succesful page hit and 0 is a failure

In [None]:
df.groupby('status').count().show()
def getStatus(x):
    if x== 200:
        return 1
    return 0
getNoErrorStatus = udf(lambda x: getStatus(x), IntegerType())
df = df.withColumn('status',getNoErrorStatus('status'))
df.show()

Some artists could be popular on the app. Other artists might not be available/popular causing its fans to leave. So we can aggregate all the artists a user listens too. 

In [None]:
@F.udf
def simplifyArtists(x):
    x = [item.lower() for item in x]
    ans = ""
    for item in x: 
        item = item.lower()
        item = re.split(';|,|feat.|feat|ft.|ft|/',item)
        for it in item:
            ans += " " + it
    x = ", ".join(x)
    ans = ' '.join(ans.split())
    return ans

Now we can finally group the data based on the user+free/paid account. We are grouping on two levels, as a persons behaviour can be different based on which mode of subscription they have at the moment. 
- Artists are aggregated
- Numeric columns are summed up (with the exception of song length - we take average)
- Max is taken of the categoric column, many columns don't have variability over user. Example: gender, location. Also, churn must be taken as 1 if any event leads to a churn

In [40]:
DF = df.groupby('userId','level').agg(sum("status").alias('status'),\
                                      avg('length').alias('length'),\
                                      max("gender").alias('gender'),\
                                      max("location").alias('location'),\
                                      max("userAgent").alias('userAgent'),\
                                      count("*").alias('action'),\
                                      sum("page_ThumbsDown").alias("page_ThumbsDown"),\
                                      sum("page_SubmitUpgrade").alias("page_SubmitUpgrade"),\
                                      sum("page_Upgrade").alias("page_Upgrade"),\
                                      sum("page_Help").alias("page_Help"),\
                                      sum("page_Logout").alias("page_Logout"),\
                                      sum("page_AddtoPlaylist").alias("page_AddtoPlaylist"),\
                                      sum("page_AddFriend").alias("page_AddFriend"),\
                                      sum("page_RollAdvert").alias("page_RollAdvert"),\
                                      sum("page_About").alias("page_About"),\
                                      sum("page_Downgrade").alias("page_Downgrade"),\
                                      sum("page_NextSong").alias("page_NextSong"),\
                                      sum("page_Cancel").alias("page_Cancel"),\
                                      sum("page_Home").alias("page_Home"),\
                                      sum("page_SaveSettings").alias("page_SaveSettings"),\
                                      sum("page_Error").alias("page_Error"),\
                                      sum("page_ThumbsUp").alias("page_ThumbsUp"),\
                                      sum("page_Settings").alias("page_Settings"),\
                                      max("churn").alias('churn'),\
                                      simplifyArtists(F.collect_list("artist")).alias("artist")
                                     ).sort('userId') # NEW DATAFRAME


@F.udf
def simplifyArtists2(x):
    x = list(set(x.split()))
    return (' '.join(x))

DF.select(simplifyArtists2('artist')).show(2,False)
DF = DF.withColumn('artist',simplifyArtists2('artist'))
DF.show(1,False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

# Modeling
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.

In [44]:
#Create pipeline stages
stages = []

Pages = [("page_"+ page) for page in pages]
numericColumns = ['action','length', 'status']
numericColumns += Pages
categoricalColumns = ['level','gender','location','userAgent']
tfidfColumns = ["artist"]

print(numericColumns + categoricalColumns + tfidfColumns)
print(len(numericColumns + categoricalColumns + tfidfColumns))

# Features
##category
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + '_index')
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "_vec"])
    stages += [stringIndexer, encoder]
    
for tfidfCol in tfidfColumns:
    tokenizer = Tokenizer(inputCol=tfidfCol, outputCol=tfidfCol+"_tokens")
    hashingTF = HashingTF(inputCol=tfidfCol+"_tokens", outputCol=tfidfCol+ "_tf")
    idf = IDF(inputCol=tfidfCol+"_tf", outputCol=tfidfCol+"_tfidf")
    #stages += [tokenizer,hashingTF,idf]    

#label
label_index = StringIndexer(inputCol = 'churn', outputCol = 'label')
stages += [label_index]

#assembler
assemblerInputs =  [cat + "_vec" for cat in categoricalColumns]+ \
                  numericColumns #+ \
                   #[tfidf + "_tfidf" for tfidf in tfidfColumns]\
                 
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features_assem")
scaler = StandardScaler(inputCol="features_assem", outputCol="bPCfeatures",withStd=True, withMean=False)

# kk = 3 (63.92)
# kk = 10 (90.15)
# kk = 15 (96.49)

kk = 10  
stages += [assembler,scaler]
pca = PCA(k=kk, inputCol="bPCfeatures", outputCol="features")
stagesPCA = stages + [pca]
rf = RandomForestClassifier() #numtrees=10 
stagesPCA += [rf]
print(stagesPCA)

pipeline = Pipeline(stages=stagesPCA)

# model = pipeline.fit(DF) 
# var =  model.stages[-1].explainedVariance
# sumVar = 0
# for e in var:
#     print(e)
#     sumVar += e
# print("SUM: ", sumVar)

['action', 'length', 'status', 'page_Cancel', 'page_RollAdvert', 'page_ThumbsDown', 'page_SubmitUpgrade', 'page_NextSong', 'page_Settings', 'page_Error', 'page_Logout', 'page_Home', 'page_AddtoPlaylist', 'page_Downgrade', 'page_ThumbsUp', 'page_SaveSettings', 'page_About', 'page_Upgrade', 'page_Help', 'page_AddFriend', 'level', 'gender', 'location', 'userAgent', 'artist']
25
[StringIndexer_4f6db73b08b848b806b4, OneHotEncoderEstimator_402f97cb9a817d5751e5, StringIndexer_4b3b8cdde502df8f34ae, OneHotEncoderEstimator_4cb18621e1a8afd9ad2c, StringIndexer_4208a096231301e28a41, OneHotEncoderEstimator_4922bc4bf35e59ac9e77, StringIndexer_4a3695cd15cb0c875728, OneHotEncoderEstimator_4026bad9f8e6f46eefcc, StringIndexer_4e93a6ea9870a6e53eb9, VectorAssembler_4edc80b75ad63ea3bea4, StandardScaler_4e0fb8eb2dda027c38b4, PCA_4f738cf9ae89166cdc9b, RandomForestClassifier_476a94ed7c03fcd80e19]


We must do PCA first so we don't overfit leading to bad performance. Without PCA result there was ~74% accuracy. We are selecting 10 columns as this explains just greather than 90% of variance. 

*Removing tf-idf(artist) is a conscious decision. PCA will not fit on so many columns (262172). Moreover, removing this column (without transforming space) reduced accuracy by 3-4%

In [49]:
trainingData, testData = DF.randomSplit([0.7, 0.3])
trainingData.cache()

paramGrid2 = (ParamGridBuilder()\
              .addGrid(rf.numTrees, [3,5,10,15]).build())

crossval = CrossValidator(estimator=pipeline,estimatorParamMaps=paramGrid2,\
                          evaluator=MulticlassClassificationEvaluator(metricName="weightedRecall"),numFolds=10)
#with active_session():
cvModel = crossval.fit(trainingData)
results = cvModel.transform(testData)

print(results.filter(results.label == results.prediction).count())
print(results.count())
results.select('label','prediction','probability').sort('label','prediction').show(200,False)
    
#89.32% accuracy

92
103
+-----+----------+-----------------------------------------+
|label|prediction|probability                              |
+-----+----------+-----------------------------------------+
|0.0  |0.0       |[0.7805737041335387,0.21942629586646145] |
|0.0  |0.0       |[0.9416666666666667,0.05833333333333333] |
|0.0  |0.0       |[0.9070196456824549,0.09298035431754505] |
|0.0  |0.0       |[0.9647058823529411,0.035294117647058816]|
|0.0  |0.0       |[0.9882352941176471,0.011764705882352941]|
|0.0  |0.0       |[0.9722352941176471,0.02776470588235294] |
|0.0  |0.0       |[0.8249609531566572,0.17503904684334287] |
|0.0  |0.0       |[1.0,0.0]                                |
|0.0  |0.0       |[0.7993867172074213,0.20061328279257878] |
|0.0  |0.0       |[0.9264179223654818,0.07358207763451817] |
|0.0  |0.0       |[0.9381826282478348,0.06181737175216524] |
|0.0  |0.0       |[0.9264179223654818,0.07358207763451817] |
|0.0  |0.0       |[0.9117780008834551,0.08822199911654491] |
|0.0  |0.0       

Using accuracy metric gives ~83% so we use WeightedRecall instead. Recall is relevant here as we do not want false negatives. Identifying churn rate, or users that are going to churn, and taking actions to keep users is essential. We do not want to lose customers in a customer-faced businesss.

# Final Steps
Clean up your code, adding comments and renaming variables to make the code easier to read and maintain. Refer to the Spark Project Overview page and Data Scientist Capstone Project Rubric to make sure you are including all components of the capstone project and meet all expectations. Remember, this includes thorough documentation in a README file in a Github repository, as well as a web app or blog post.