# Sparkify Project Workspace
This workspace contains a tiny subset (128MB) of the full dataset available (12GB). Feel free to use this workspace to build your project, or to explore a smaller subset with Spark before deploying your cluster on the cloud. Instructions for setting up your Spark cluster is included in the last lesson of the Extracurricular Spark Course content.

You can follow the steps below to guide your data analysis and model building portion of this project.

In [1]:
# import standard libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

import plotly.plotly as py
import plotly.graph_objs as go

import datetime
from time import time
%matplotlib inline
import re

#import pyspark libraries
import pyspark
from pyspark import SparkConf

# import sql Spark libraries
from pyspark.sql import SparkSession, Window
from pyspark.sql.types import StringType, IntegerType,DataType
from pyspark.sql.functions import udf
from pyspark.sql.functions import date_format
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.functions import min as Fmin
from pyspark.sql.functions import max as Fmax
from pyspark.sql.functions import avg, col, min, max, regexp_replace, concat, count, desc, asc, explode, lit, split, stddev, udf, lower, isnan, when, rank, from_unixtime

# import ml Spark libraries
from pyspark.ml import Pipeline
from pyspark.ml.feature import StandardScaler, MinMaxScaler, VectorAssembler
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier, DecisionTreeClassifier, LinearSVC
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.regression import LinearRegression
from pyspark.ml.clustering import KMeans
from pyspark.mllib.evaluation import BinaryClassificationMetrics

# import sklearn ml metrics libraries for metrics calculation
from sklearn.metrics import confusion_matrix, f1_score, classification_report

In [2]:
# create a Spark session
spark = SparkSession \
    .builder \
    .appName("My Sparkify app") \
    .getOrCreate()

In [3]:
# get the Spark context
spark.sparkContext.getConf().getAll()

[('spark.app.id', 'local-1617139075698'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.port', '46533'),
 ('spark.driver.host', '5488a941163e'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.app.name', 'My Sparkify app')]

In [4]:
spark

# Load and Clean Dataset
In this workspace, the mini-dataset file is `mini_sparkify_event_data.json`. Load and clean the dataset, checking for invalid or missing data - for example, records without userids or sessionids. 

In [5]:
# import mini-dataset file
file = "./mini_sparkify_event_data.json"
df = spark.read.json(file)

In [6]:
# Let's check the dataset
df.persist()

DataFrame[artist: string, auth: string, firstName: string, gender: string, itemInSession: bigint, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: bigint, sessionId: bigint, song: string, status: bigint, ts: bigint, userAgent: string, userId: string]

Now, let's prepare some functions that we will use later

In [7]:
# Let's build a function to count the nulls/missing/empty values
def count_null_col(df,column):
    '''
    Function that counts the nulls or missing values that exist in a column
    Input:  
            df: dataset where we want to count null/missing/empty values
            column: column of the dataset we want to count null/missing/empty values
    Output: 
            n_missing: it returns the number of missing values
    '''
    missing = df.filter(((df[column].isNull()) | isnan(df[column])) | (df[column] == ""))
    n_missing = missing.count()
    
    return n_missing

In [8]:
# Let's build a function to count the nulls/missing/empty values of every column of the dataset
def count_null_df(df):
    '''
    Function that counts the nulls or missing values that exist in a dataset
    Input:  
            df: dataset where we want to count null/missing/empty values
    Output: 
            missing: it returns a dictionary with the number of missing values of every column
            in case the column has nulls/missing/empty values
    '''   
    missing = {}
    for col in df.columns:
        n_missing = count_null_col(df,col)
        if n_missing > 0:
            missing.update({col: n_missing})
    
    return missing      

In [9]:
# Let's build a function to count if the categorical and numerical columns of the dataset
def cat_or_num(df):
    '''
    Function that clasifies the kind of columns (categorical or numerical) that exist in a dataset
    Input:  
            df: dataset where we want to clasify columns
    Output: 
            num_cols,cat:cols: it returns a dictionary with the category of every column
    
    '''   
    cat_cols = []
    num_cols = []

    for coltype in df.schema:
        ctype = str(coltype.dataType)
        if ctype == "StringType":
            cat_cols.append(coltype.name)

        elif ctype == "LongType" or ctype == "DoubleType":
            num_cols.append(coltype.name)
            
    return cat_cols, num_cols

In [10]:
# get information about the cols and features
def show_cols_info(df):
    '''
    Function that shows the summary of the information of the columns of the datasets
    Input:  
            df: dataset we want to show information 
    Output: 
            none: it prints a summary and the main values of every column
    
    '''     

    for column in df.columns:
        # show a summary of the important information of the column
        df.describe([column]).show()
        # show the different values of the field
        df.select([column]).distinct().show()
    

In [None]:
# Let's see how many missing values there are in the dataset
missing = count_null_df(df)
print("These are the columns with nulls/missing/empty values: {}\n".format(missing))

# Drop Rows with Missing Values
As you'll see, it turns out there are no missing values in session column, but there are values that are empty string. Also there are userID values that are empty strings. Let's detect and delete them as these rows are not useful

In [None]:
df_valid = df.dropna(how = "any", subset = ["userId", "sessionId"])

In [None]:
df_valid.count()

In [None]:
# Let's get the unique records of UserId
df.select("userId").dropDuplicates().sort("userId").show()

In [None]:
# After viewing the previous query, we can see there are some users with empty UserId
# Let's eliminate them

In [None]:
df_valid = df_valid.filter(df_valid["userId"] != "")

#### Let's count the number of records after remove the empty strings from userId field


In [None]:
df_valid.count()

In [None]:
# And check again if there are no empty field in userId column
df_valid.select("userId").dropDuplicates().sort("userId").show()

# Exploratory Data Analysis
When you're working with the full dataset, perform EDA by loading a small subset of the data and doing basic manipulations within Spark. In this workspace, you are already provided a small subset of data you can explore.

### Define Churn

Once you've done some preliminary analysis, create a column `Churn` to use as the label for your model. I suggest using the `Cancellation Confirmation` events to define your churn, which happen for both paid and free users. As a bonus task, you can also look into the `Downgrade` events.

### Explore Data
Once you've defined churn, perform some exploratory data analysis to observe the behavior for users who stayed vs users who churned. You can start by exploring aggregates on these two groups of users, observing how much of a specific action they experienced per a certain time unit or number of songs played.

For this usecase, I am going to define Churn as the Cancellation Confirmation event appears
And now, let's start with the EDA phase:

In [None]:
# Let's take an overview of the mini dataset file -->  18 fields 
df_valid.describe()

In [None]:
# Let's take a overview of the schema of the dataset file
df_valid.printSchema()

In [None]:
Explanations of the fields:
 
 |-- artist: singer of the song
 |-- auth: method of accessing sparkify          (values: Logged In/Cancelled)
 |-- firstName: first name of the client
 |-- gender: gender of the client                (values: M/F)
 |-- itemInSession: item count in a session
 |-- lastName: last name of the client
 |-- length: lenght of the song
 |-- level: account contract type                (values: free/paid)
 |-- location: main location of the client
 |-- method: method for accessing sparkify (put, get, etc..)   (values: PUT/GET)
 |-- page: page the user visits
 |-- registration: time of the registration (unix timestamp)
 |-- sessionId: identification of the session
 |-- song: song the client has listened
 |-- status: status of the accessing method (http status) (values: 307/404/200)
 |-- ts: information about the time of the user event (unix timestamp)
 |-- userAgent: browser the user has accessed to sparkify with
 |-- userId: identification for the client

In [None]:
# Let's categorize the columns, depending on their type (categorical or numerical)
cat_cols,num_cols = cat_or_num(df)

In [None]:
print ("The categorical columns are: {}".format(cat_cols))

In [None]:
print ("The numerical columns are: {}".format(num_cols))

In [None]:
# Let's see how many registries the file has --> we have 278.154 records
df_valid.count()

In [None]:
# Let's take a look at the first lines of the file, to see what they look like
df_valid.head(2)

In [None]:
# Let's see just one row
df_valid.show(n=1)

In [None]:
df_valid.take(5)

In [None]:
# Let's have an statistic overview of the dataset --> but with this view it is difficult to see...
df_valid.describe().show(n=2)

In [None]:
# Let's better go field by field 
df_valid.describe('artist').show()

In [None]:
df_valid.describe('auth').show()

In [None]:
# We have two values for auth = Logged In or Cancelled
df_valid.select('auth').distinct().collect()

In [None]:
df_valid.describe('firstName').show()

In [None]:
df_valid.describe('gender').show()

In [None]:
df_valid.select('gender').distinct().collect()

In [None]:
df_valid.describe('itemInSession').show()

In [None]:
df_valid.describe('lastName').show()

In [None]:
df_valid.describe('length').show()

In [None]:
df_valid.describe('level').show()

In [None]:
df_valid.select('level').distinct().collect() #--> two values: free or paid 

In [None]:
df_valid.describe('location').show()

In [None]:
# Let's see the different locations, it is a string composed by two parts separated by a comma: 
# the first one is the name of the location, the second one after the comma is the abbreviation of the location
# for an easier management of this information, we will use the second part

df_valid.select('location').distinct().collect()

In [None]:
# Prepare the data for a better visualization of the location column
# The location column has the form: Name,Short_Name, eg: 'Lexington-Fayette, KY'
# so we'll get the column location and split it by comma and then get the second part
# and then we'll take only the first part
df_valid = df_valid.withColumn("state", split(col("location"),',').getItem(1))
#df_valid = df_valid.withColumn("state", split(col("state"),'-').getItem(1))
df_valid.select("state").collect()

In [None]:
df_valid.describe('method').show()

In [None]:
df_valid.select('method').distinct().collect() # there are two methods: PUT/GET

In [None]:
df_valid.describe('page').show()

In [None]:
df_valid.select('page').distinct().collect() # there are 19 different pages

In [None]:
df_valid.describe('registration').show()

In [None]:
df_valid.describe('sessionId').show()

In [None]:
df_valid.describe('song').show()

In [None]:
df_valid.describe('status').show()

In [None]:
df_valid.select('status').distinct().collect() # there are 3 possible status

In [None]:
df_valid.describe('ts').show()

In [None]:
df_valid.describe('userAgent').show()

In [None]:
df_valid.describe('userId').show()

In [None]:
df_valid.select("userID").show()

In [None]:
# Let's see how many distinct users are in this dataset --> 225
df_valid.select("userId").distinct().count()

In [None]:
# And let's see how many different pages are --> 19
df_valid.select("page").dropDuplicates().sort("page").show()

In [None]:
# Let's see an example of row in the dataset, for instance let's see userid= 30
df_valid.select(["UserId", "firstname", "page", "song"]).where(df_valid.userId=="30").collect()[0]

In [None]:
# Now we are going to add two new columns derived from ts field:
# We would like to know the hour and datetime in UTF timestamp
# for that we prepare a lambda function
get_date       = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).strftime("%Y-%m-%d %H:%M:%S"))
get_month      = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0). month)
get_day        = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0). day)
get_hour       = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0). hour)
get_weekday    = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0).strftime('%w'))

In [None]:
# we apply the lambda function to convert the ts into hour and data
# we apply also the lambda function to convert the registration field into a new field "registration_time" with the timestamp 
df_valid = df_valid.withColumn("registration_time", get_date(df_valid.registration))
df_valid = df_valid.withColumn("date", get_date(df_valid.ts))
df_valid = df_valid.withColumn("month", get_month(df_valid.ts))
df_valid = df_valid.withColumn("day_of_month", get_day(df_valid.ts))
df_valid = df_valid.withColumn("hour", get_hour(df_valid.ts))
df_valid = df_valid.withColumn('day_of_week', get_weekday(df_valid.ts))

In [None]:
# Let's prepare sparksql 
# To do that, we need to create a temporary table, where we'll perform the SQL queries
df_valid.createOrReplaceTempView("Sparkify_sql")

In [None]:
# Let's prepare the registration_time for human understanding, that is year-month-day hour:minute:second, 
# And also let's add the weekday
df_time = df_valid.select('registration_time', date_format('registration_time', 'u').alias('weekday'))
df_time.show()

In [None]:
# Now let's calculate how long the user is subscribed to our Sparkify service
# To calculate that, we have to substract current date to the registration date

def get_days_from_registration(df_valid):
    days_from_registration = df_valid.ts - df_valid.registration
    days_from_registration = days_from_registration/(1000*3600*24)
    
    return days_from_registration
    
    
df_valid = df_valid.withColumn("subscription_days", get_days_from_registration(df_valid))

In [None]:
df_valid.select('subscription_days').sort("subscription_days").collect()[-1]

In [None]:
df_valid.select('userID', 'subscription_days').take(3)

### Now we continue with our Data Exploration! Let's do it just making questions!


In [None]:
#### Question: How many days/months does our dataset contain?

In [None]:
# We have a dataset with an initial date = 2018-10-01
day_from = list(df_valid.select('date').sort("date").collect()[0])
day_from 

In [None]:
# We have a dataset with an end date = 2018-12-03
day_to = list(df_valid.select('date').sort("date").collect()[-1])
day_to

In [None]:
#### We have a dataset with an initial date = 2018-10-01 and an end date = 2018-12-03
#### So we have a dataset of aprox 2 months information 

In [None]:
# Let's see the states
df_valid.select('userId','state').distinct().groupby('state').count().show()

#### Question: How is the proportion between Males and Females in our musical streaming service ? 


In [None]:
ngender = df_valid.select('gender','userId').distinct().groupby('gender').count().toPandas()
ngender.head(5)

In [None]:
# Let's plot this proportion with a graphic
# and show the percentage above
plt.figure(figsize = (7,7))
plt.title('Number of Users by Gender & Percentage')
ax = sns.barplot(x='gender', y='count',hue='gender', data=ngender);

# this code is to calculate the percentage 
# and present the numbers above the bars and centered in the middle
total = ngender.iloc[0]['count'] + ngender.iloc[1]['count']
for p in ax.patches:
    if p.get_height()>0:
        percentage = '{:.1f}%'.format(100 * p.get_height()/total)
        x = p.get_x() + p.get_width()/2.6
        y = p.get_y() + p.get_height() + 0.7
        ax.annotate(percentage, (x, y))

plt.xticks(size=12)
plt.xlabel('Gender',size=12)
plt.yticks(size=12)
plt.ylabel('Number of Users', size=12)
plt.legend(title='Gender', loc='best');
plt.savefig('Users_by_Gender.png')

#### Our datasets contains 104 records corresponding to female clients (46.2%) and 121 (53.8%) corresponding to male clients


#### Question: What about the level of the users? Is there a difference between male and female subscriptors?


In [None]:
# Now let's check the users by level of account and gender
# As we are going to use some sns plots, we have to convert the datasets to Pandas
df_gender_level = df_valid.select('userId','gender','level').distinct().groupby('gender','level').count().toPandas().sort_values(by='count')
df_gender_level.head()

#### For both free and paid services, the number of men is sligthly higher


In [None]:
plt.figure(figsize = (6,6))
ax = sns.barplot(x='level', y='count',hue='gender', data=df_gender_level);

plt.title('Number of Users by Gender & Percentage')

plt.xlabel('Account level')
plt.ylabel('Number of Users')
plt.title('Level Account by Gender')
plt.legend(title='Gender', loc='best');
plt.savefig('Level_Account_by_Gender.png')

This graphic shows that there are more users with a paid account. 
And most users are women.

#### Question: What about the location of the users? Does this feature affects on the level of service?

In [None]:
df_location_level = df_valid.select('userId','state','level').groupby('state','level').count().orderBy(desc('count')).toPandas()
fig = plt.figure(figsize=(15,6))
plt.ylabel('Number of Users')
ax = fig.gca()
df_location_level.pivot(index='state', columns='level', values='count').plot(kind='bar', ax=ax)
plt.title('Users and Account types by State')
plt.savefig('df_state_level.png')

The previous graphic shows there are more users (also with paid accounts) in California (CA), New York (NY) and Texas (TX)

#### Question: What about the UserAgent? Is there a difference between subscriptors?

In [None]:
# Now let's see the different browsers the users login, that indicates different Operating Systems from users
df_os = df_valid.select('userAgent','userId').distinct().groupby('userAgent').count().toPandas().sort_values(by='count')
df_os

In [None]:
df_userAgent_level = df_valid.select('userId','userAgent','level').groupby('userAgent','level').count().toPandas().sort_values(by='count')
df_userAgent_level.head()

In [None]:
# Maybe it's interesting augment our dataset with the device the users utilize our Sparkify service
# let's map the operating system, through the Browser

map = {'macintosh':'MAC', 'iphone':'IPHONE','ipad':'IPAD',
       'x11':'LINUX','compatible':'WINDOWS',
       'windows nt 5.0':'WINDOWS','windows nt 5.1':'WINDOWS',
       'windows nt 6.1':'WINDOWS', 'windows nt 6.0':'WINDOWS',
       'windows nt 6.2':'WINDOWS','windows nt 6.3':'WINDOWS'}

classify_os = udf(lambda x: map[re.findall('\(([^\)]*)\)', x)[0].split(';')[0].lower()])

df_valid = df_valid.withColumn('OS', classify_os(df_valid.userAgent))

In [None]:
# And now we would like to know the count of OS's connections depending on the subscription level of the users
df_os_level = df_valid.select('userId','OS','level').groupby('OS','level').count().orderBy(desc('count')).toPandas()
df_os_level.head(15)

In [None]:
# But for unique userid's, what is the favourite OS?
df_os_level = df_valid.select('userId','OS','level').distinct().groupby('OS','level').count().orderBy(desc('count')).toPandas()
df_os_level.head(15)

#### In the previous table, we can see that Windows is the favourite Operating Systems for both free and paid subscription
#### And on the below table we can confirm by number of distinct users (remember there are 225 users during the time range of this analysis)

In [None]:
df_os_test = df_valid.select('userId','OS').distinct().groupby('OS').count().orderBy(desc('count')).toPandas()
df_os_test.head(15)

In [None]:
# level subscription of the user by Operating System
level_os = df_valid.dropDuplicates(['userId','OS','level']).groupby(['OS', 'level']).count().orderBy(desc('count')).toPandas()

ax = sns.barplot(x='level', y='count',hue='OS', data=level_os);
plt.xlabel('Subscription type')
plt.ylabel('Number of Users')
plt.title('What OS do Clients use by subscription type?')
plt.legend(title='OS', loc='best');
plt.savefig('df_OS_by_subscription_type.png')

#### The above table shows graphically the OS the users connect to our Sparkify service, both for free and paid subscription

#### Now let's see the number of songs a users listens to during the day (24 hours)

In [None]:
songs_in_hour = df_valid.filter(df_valid.page == "NextSong").groupby(df_valid.hour).count().orderBy(df_valid.hour.cast("float"))

In [None]:
songs_in_hour.show()

In [None]:
songs_in_hour_pd = songs_in_hour.toPandas()
songs_in_hour_pd.hour = pd.to_numeric(songs_in_hour_pd.hour)

In [None]:
plt.scatter(songs_in_hour_pd["hour"], songs_in_hour_pd["count"])
plt.xlabel("Hour")
plt.ylabel("Songs played");

In [None]:
sns.set_color_codes("pastel")
plt.figure(figsize=(10,5))
sns.barplot(x='hour', y='count', data=songs_in_hour_pd, color="b")
plt.title('Number of Songs listened by hour')
plt.ylabel('count');
plt.savefig('df_number_songs_by_hour.png')

In [None]:
# Now let's see if there are days when people listen more songs
songs_in_days = df_valid.filter(df_valid.page == "NextSong").groupby(df_valid.day_of_month).count().orderBy(df_valid.day_of_month.cast("float"))

In [None]:
# Let's calculate how many songs people listen per month
songs_in_days_pd = songs_in_days.toPandas()
songs_in_days_pd.day_of_month = pd.to_numeric(songs_in_days_pd.day_of_month)

In [None]:
songs_in_days_pd

In [None]:
# And show these numbers with a nice plot
sns.set_color_codes("pastel")
plt.figure(figsize=(10,5))
sns.barplot(x='day_of_month', y='count', data=songs_in_days_pd, color="g")
plt.title('Number of Songs listened by day of month')
plt.ylabel('count');
plt.savefig('df_number_songs_by_day_of_month.png')

In [None]:
#### Question: Is there a day of week when users listen more songs?

In [None]:
# Let's prepare the query
songs_in_weekdays = df_valid.filter(df_valid.page == "NextSong").groupby(df_valid.day_of_week).count().orderBy(df_valid.day_of_week)

In [None]:
# and convert it to pandas to display 
songs_in_weekdays_pd = songs_in_weekdays.toPandas()
songs_in_weekdays.day_of_week = pd.to_numeric(songs_in_weekdays_pd.day_of_week)

In [None]:
songs_in_weekdays.show()

In [None]:
# And plot with a barplot
sns.set_color_codes("pastel")
plt.figure(figsize=(10,5))
sns.barplot(x='day_of_week', y='count', data=songs_in_weekdays_pd, color="y")
plt.title('Number of Songs listened by day of week')
plt.ylabel('count');
plt.savefig('df_number_songs_by_day_of_week.png')

##### It seems that users prefer weekdays to use the service


In [None]:
#### With the graphic above, we cannot see a behavioural pattern 


# Users Downgrade Their Accounts

To find when users downgrade their accounts, let's use a window function and cumulative sum to distinguish each user's data as either pre or post downgrade events and then flag those log entries

### Now let's calculate the users who downgrade the service, that is, the users that downgraded or cancellate the service

In [None]:
downgrade_select = udf(lambda x: 1 if x == 'Submit Downgrade' else 0, IntegerType())

In [None]:
df_valid = df_valid.withColumn('downgrade_select', downgrade_select('page'))

In [None]:
df_valid.printSchema()

###### With the previous commands, we get only the time when the user downgraded, 
###### but we would like to have the information of that user from the beginning

In [None]:
# We need to have all information from users that have downgraded Sparkify service
# so we need to save an interval of time
windowSpec  = Window.partitionBy('userId')

df_valid = df_valid.withColumn('will_downgrade', max('downgrade_select').over(windowSpec))

In [None]:
df_downgrade = df_valid.select('userId','gender','downgrade_select').distinct().groupby('gender','downgrade_select').count().toPandas().sort_values(by='count')
df_downgrade.head()

#### Women are slightly more likely to downgrade than men

In [None]:
df_downgrade_level = df_valid.select('userId','level','downgrade_select').distinct().groupby('level','downgrade_select').count().toPandas().sort_values(by='count')
df_downgrade_level.head()

# Now let's calculate the users who churn, that is, the users that confirmed cancellation

In [None]:
churn_confirmation = udf(lambda x: 1 if x == "Cancellation Confirmation" else 0, IntegerType())
df_valid = df_valid.withColumn("churn", churn_confirmation(df_valid.page))
df_users_churned = df_valid.withColumn("churned", churn_confirmation("page"))

In [None]:
df_valid = df_valid.withColumn('will_churn', max('churn').over(windowSpec))

In [None]:
# Let's remember the schema of the data
df_valid.printSchema()

### Question: How many users churned? And what's the rate of churned users vs the total number of users ? 

In [None]:
# Let's count the number of user cancellations (churns) --> there are 52 users who have cancel the service
users_churned = df_users_churned.filter(df_users_churned["churned"] ==1).count()
# Let's calculate the total number of users
total_users = df_valid.select('userId').distinct().count()
# Let's calculate the percentage
percentage_churn = df_users_churned.groupby("userId").agg({"churned":"sum"}).select(avg("sum(churned)")).collect()[0]["avg(sum(churned))"]

print("Number of users who churned: {} vs Total Users: {}".format(users_churned, total_users))
print("Percentage of users who churned: {:.2f}%".format(percentage_churn * 100))

##### With those numbers, we can affirm that we have an imbalanced dataset....

In [None]:
# Let's see some details of the users who churned
df_users_churned.select(["userId", "gender", "level", "state", "ts"]).where(df_valid.churn == 1).sort("ts").show(20)

In [None]:
# Let's drop the UserId duplicates and then count the users who churned
# And prepare the data to be plotted with a sns, so we need to convert to a Pandas Dataframe
total_churn = df_valid.dropDuplicates(['userId','churn']).groupby(['churn']).count().toPandas()
print("Number of users who churned: {} ".format(total_churn))

In [None]:
# We like to see it graphically
sns.barplot(x='churn', y='count', data=total_churn);
plt.xlabel('Churn')
plt.ylabel('Number of Users')
plt.title('The number of Users who churned or not')
plt.legend(title='Churn', loc='best');
plt.savefig('Users_who_churned.png')

#### Question: Is there a difference in the gender between the users who churned and who did not churn? 


In [None]:
# Let's see the churn flag and also the gender
df_churn_gender = df_valid.select('userId','gender','churn').distinct().groupby('gender','churn').count().show()

In [None]:
# Let's prepare a query, convert the sorted data to pandas 
df_churn_gender = df_valid.select('userId','gender','churn').distinct().groupby('gender','churn').count().toPandas().sort_values(by='count')

In [None]:
sns.barplot(x='gender', y='count',hue='churn', data=df_churn_gender);
plt.xlabel('Gender')
plt.ylabel('Number of Users')
plt.title('Gender by Churn')
plt.legend(title='Churn', loc='best');
plt.savefig('Gender by Churn.png')

#### Question: Is there a difference in the subscription type between the users who churned and who did not churn? 


In [None]:
df_churn_level = df_valid.select('userId','level','churn').distinct().groupby('level','churn').count().toPandas().sort_values(by='count')

In [None]:
sns.barplot(x='level', y='count',hue='churn', data=df_churn_level);
plt.xlabel('Level')
plt.ylabel('Number of Users')
plt.title('Level by Churn')
plt.legend(title='Churn', loc='best');
plt.savefig('Level by Churn.png')

#### Question: Is there a difference in the location between the users who churned and who did not churn? 


In [None]:
df_churn_location = df_valid.select('userId','state','churn').distinct().groupby('state','churn').count().orderBy(desc('count')).toPandas()

In [None]:
fig = plt.figure(figsize=(12, 6))
g = sns.barplot(x='state', y='count',hue='churn', data=df_churn_location);
plt.xticks(rotation=90)
plt.xlabel('State')
plt.ylabel('Number of Users')
plt.title('Location by Churn')
plt.legend(title='Churn', loc='best');
plt.savefig('Location by Churn.png')

#### Question: Is there a difference in the day_of_week between the users who churned and who did not churn? 

In [None]:
day_of_week_by_churn = df_valid.dropDuplicates(['userId','day_of_week']).groupby(['day_of_week','churn']).count().toPandas()

In [None]:
fig, ax = plt.subplots(figsize=(10, 5))
g = sns.barplot(x="day_of_week",y="count", hue="churn", data=day_of_week_by_churn, ax=ax);
plt.xlabel('Day_of_week')
plt.ylabel('Number of Users')
plt.title('Day_of_Week by Churn')
plt.legend(title='Churn', loc='best');
plt.savefig('Day_of_week by Churn.png')

In [None]:
##### The data is very imbalanced, but it seems that the favourite day to churn is Monday (day_of_week = 0)
##### Maybe the responsibles of Sparkify could prepare some campaigns for Mondays (free trials, discounts, offers, etc..)

In [None]:
df_valid.select(['userId','page','churn','downgrade_select']).groupby(['page','downgrade_select']).count().toPandas()

In [None]:
page_total_downgraded = df_valid.dropDuplicates(['userId','page']).groupby(['page','downgrade_select']).count().toPandas()

fig = plt.figure(figsize=(12, 6))
g = sns.barplot(x='page', y='count',hue='downgrade_select', data=page_total_downgraded);
plt.xticks(rotation=90)
plt.xlabel('Page')
plt.ylabel('Number of Users')
plt.title('Activities of the User')
plt.legend(title='Downgraded', loc='best');
plt.savefig('Activities_User_downgraded.png')

In [None]:
#### Question: Is there a difference in the actions between the users who churned and those who did not churn? 
There are less movements on the users that are going to churn, less Thumbs-Up, less Thumbs-Down, etc..

In [None]:
page_total_churn = df_valid.dropDuplicates(['userId','page']).groupby(['page','will_churn']).count().toPandas()

fig = plt.figure(figsize=(12, 6))
g = sns.barplot(x='page', y='count',hue='will_churn', data=page_total_churn);
plt.xticks(rotation=90)
plt.xlabel('Page')
plt.ylabel('Number of Users')
plt.title('Activities of the User')
plt.legend(title='Churn', loc='best');
plt.savefig('Activities_User_Page.png')

In [None]:
# Let's try sparksql 
# To do that, we need to create a temporary table, where we'll perform the SQL queries
df_valid.createOrReplaceTempView("Sparkify_sql")

#### Question: What are the most played songs?

##### Now let's use spark.sql

In [None]:
songs_artists = spark.sql('''
        SELECT song, count(song) AS TOP_SONGS
        FROM Sparkify_sql
        GROUP BY song
        ORDER BY TOP_SONGS DESC
        LIMIT 10
         
''').show()

#### Question: What are the artist, users listen the most?

In [None]:
songs_artists = spark.sql('''
        SELECT artist, count(artist) AS TOP_ARTIST
        FROM Sparkify_sql
        GROUP BY artist
        ORDER BY TOP_ARTIST DESC
        LIMIT 10
         
''').show()

#### Question: What is the average length of song, users listen?


In [None]:
songs_length = spark.sql('''
        SELECT length
        FROM Sparkify_sql
''')

In [None]:
sns.distplot(songs_length.toPandas().dropna());

##### The average duration is about 250 seconds


In [None]:
sessionId_total_churn = df_valid.dropDuplicates(['userId','will_churn','sessionId']).groupby(['userId','will_churn']).count().toPandas()

# compare two groups of users
g = sns.FacetGrid(sessionId_total_churn, col="will_churn", sharey=False)
g.map(plt.hist, "count");

##### The users who don't churn use the Sparkify service much more than users who will churn

In [None]:
sessionId_total_churn.head(15)

In [None]:
# Let's see a record with a user with the time log, page and level
df_valid.select(["userId", "firstname", "ts", "page", "level"]).where(df_valid.userId == "125").sort("ts").collect()[0]

In [None]:
# Let's see more information about the 52 users who churned
df_valid.select(["userId", "firstname", "ts", "page", "level"]).where(df_valid.churn == 1).sort("ts").collect()[0]

In [None]:
# Let's calculate the max 
avg_time_user = df_valid.agg({"itemInSession": "max"}).collect()[0]

In [None]:
avg_time_user

#### Question: Is there a difference in time between the users who churned and who did not churn? 

In [None]:
df_membership = df_valid.select(["userId", "registration", "ts", "churn"]) \
    .withColumn('membership_time',(df_valid.ts-df_valid.registration)) \
    .groupBy('userId', 'churn') \
    .agg({'membership_time':'max'}) \
    .withColumnRenamed('max(membership_time)', 'membership_time') \
    .select('userId', (col('membership_time')/1000/86400).alias('membership_time'),'churn') \
    .sort('membership_time') \
    .toPandas()                       

In [None]:
ax = sns.boxplot(data=df_membership, y='membership_time', x='churn', orient='v')
plt.ylabel('Membership time in days until churn')
plt.xlabel('Churn')
plt.title('Membership time (churn/no churn)')
sns.despine(ax=ax);
plt.savefig('Membership_time.png')

##### With the previous boxplot graphic we can see that users who churn
##### stay less time in the Sparkify service than those who don't churn
##### Users who churn stay 50 days in average
##### Users who don't churn stay aprox. 75 days in average 

# Feature Engineering
Once you've familiarized yourself with the data, build out the features you find promising to train your model on. To work with the full dataset, you can follow the following steps.
- Write a script to extract the necessary features from the smaller subset of data
- Ensure that your script is scalable, using the best practices discussed in Lesson 3
- Try your script on the full data set, debugging your script if necessary

If you are working in the classroom workspace, you can just extract features based on the small subset of data contained here. Be sure to transfer over this work to the larger dataset when you work on your Spark cluster.

In [None]:
# Let's remember the fields of our datset
# There are some fields that we can transform from categorical to numerical: e.g: gender, level
df_valid.printSchema()

In [None]:
These are the features I will include to build the model
Features:
 |-- userId: string (nullable = true)
 |-- churn: cancellation of the service
 |-- level: type of subscription + I will convert into number 0/1
 |-- n_artists: number of artist a user listens to + I will convert into number
 |-- n_songs:number of songs a user listens to + I will convert into number  
 |-- n_songs_play:number of songs added to the playlist + I will convert into number   
 |-- n_thumbs_up: number of thumbs up + I will convert into number
 |-- n_thumbs_down:  number of thumbs down + I will convert into number 
 |-- n_Errors: long (nullable = true)
 |-- n_friends: number of friends + I will convert into number 
 |-- n_Rolls: number of spots displayed to the user
 |-- n_Help: number of visits to help page
 |-- total_sdays: time of the subscription (in days)
 |-- n_sessions: number of sesions of a user
 |-- IPAD: the user connects with an IPAD to our Sparkify service
 |-- IPHONE: the user connects with an IPHONE to our Sparkify service
 |-- LINUX: the user connects with a Linux device to our Sparkify service
 |-- MAC: the user connects with a MAC device to our Sparkify service
 |-- WINDOWS: the user connects with a WINDOWS to our Sparkify service

#### Features related to the session


#### 1. number of sessions by user

In [None]:
# Let's calculate the number of the sessions of a user
n_sessions = df_valid.select('userId','sessionId').dropDuplicates().groupby('userId').count().withColumnRenamed('count','n_sessions')
df_n_sessions = n_sessions.withColumn('n_sessions', n_sessions.n_sessions.cast('bigint'))
df_n_sessions.count()

In [None]:
df_n_sessions.take(2)

#### 2. Operating System (device)

In [None]:
# Let's take the OS the user connects to the Sparkify session and make one hot encoding with the pivot option
df_os = df_valid.select('userId','OS').dropDuplicates().groupby('userId').pivot('OS').agg(count('OS')).fillna(0)

In [None]:
df_os.show(2)

#### Features related to the subscription
#### 1. subscription days of a user
#### 2. subscription cancelled (churn)
#### 3. level of subscription (payment)

In [None]:
# Let's calculate the subscription days a user stays in the service
subscription_days = df_valid.select('userId','subscription_days').groupBy('userId').agg({'subscription_days':'max'}).withColumnRenamed('max(subscription_days)','total_sdays')
df_subscription_days = subscription_days.withColumn('total_sdays', subscription_days.total_sdays.cast('double'))
df_subscription_days.count()

In [None]:
df_subscription_days.take(4)

In [None]:
# Let's calculate if a user churned the service
# We need to be careful with this searchs as we can forget basic things:
# We have to search through "churn" flag, but we will take the max value (1) 
# because the user before churning, he was subscribed in the service 
# and so the "churn" flag was equal to 0
# If we do a fast search with churn=1, the user is repeated
churn = df_valid.groupby('userId').max("churn").withColumnRenamed("max(churn)", "churn")
df_churn = churn.withColumn('churn', churn.churn.cast('int'))
df_churn.count()

In [None]:
df_churn.groupby("churn").count().show()

In [None]:
df_churn.take(2)

In [None]:
# Let's select the last level a user selected and change level into a numerical,binary field (0/1)
level = df_valid.select(['userId', 'level','ts']).orderBy(desc('ts')).dropDuplicates(['userId']).select(['userId', 'level']).replace(['free', 'paid'], ['0', '1'], 'level')

## Let's change level into a numerical,binary field (0/1)
#level = df_valid.select(['userId', 'level']).dropDuplicates(['userId']).replace(['free', 'paid'], ['0', '1'], 'level')
df_level = level.withColumn('level', level.level.cast('int'))
df_level.count()

In [None]:
df_level.take(2)

#### Features related to the user
#### 1. Gender of a user --> I will not include this feature in the model
#### 2. Number of artists listened by a user
#### 3. Number of songs listened by a user
#### 4. Number of songs added to the playlist
#### 5. Duration of songs added to the playlist
#### 6. Average number of songs a user listens to music during a session
#### 7. Number of thumbs_up a user does
#### 8. Number of thumbs_down a user does
#### 9. Number of friends a user adds
#### 10.Subscription days the user has been with our service

In [None]:
# Let's change gender into a numerical,binary field (0/1)
# But I will not include this feature in the training model
gender = df_valid.select(['userId', 'gender']).dropDuplicates(['userId']).replace(['F', 'M'], ['0', '1'], 'gender')
df_gender = gender.withColumn('gender', gender.gender.cast('int'))
df_gender.count()

In [None]:
df_gender.take(2)

In [None]:
# Let's calculate the number of artists a user listens to 
n_artists = df_valid.filter(df_valid.page=="NextSong").select(['userId', 'artist']).dropDuplicates().groupby('userId').count().withColumnRenamed('count', 'n_artists')
df_n_artists= n_artists.withColumn('n_artists', n_artists.n_artists.cast('int'))
df_n_artists.count()

In [None]:
df_n_artists.take(2)

In [None]:
# Let's calculate the number of songs a user listens to 
n_songs = df_valid.select('userId','song').groupBy('userId').count().withColumnRenamed('count', 'n_songs')
df_n_songs= n_songs.withColumn('n_songs', n_songs.n_songs.cast('double'))
df_n_songs.count()

In [None]:
df_n_songs.take(2)

In [None]:
# Let's calculate the number of songs a user adds to a playlist
n_songs_playlist = df_valid.select('userId','page').where(df_valid.page == 'Add to Playlist').groupBy('userId').count().withColumnRenamed('count', 'n_songs_playlist')
df_n_songs_playlist= n_songs_playlist.withColumn('n_songs_playlist', n_songs_playlist.n_songs_playlist.cast('double'))
df_n_songs_playlist.count()

In [None]:
df_n_songs_playlist.take(2)

In [None]:
# Let's calculate the duration of songs a user adds to a playlist
n_time_songs = df_valid.select('userId','length').groupBy('userId').sum().withColumnRenamed('sum(length)', 'n_time_songs')
df_n_time_songs= n_time_songs.withColumn('n_time_songs', n_time_songs.n_time_songs.cast('double'))
df_n_time_songs.count()

In [None]:
df_n_time_songs.take(2)

In [None]:
# Let's calculate the number of songs a user adds to a playlist
n_songs_play = df_valid.select('userId', 'page').where(df_valid.page =='Add to Playlist').groupBy('userId').agg({'page':'count'}).withColumnRenamed('count(page)','n_songs_play')
df_n_songs_play = n_songs_play.withColumn('n_songs_play', n_songs_play.n_songs_play.cast('double'))
df_n_songs_play.count()

In [None]:
# Let's calculate the average number of songs a user listens to music during a session
n_avg_songs = df_valid.where('page == "NextSong"').groupby(['userId', 'sessionId']).count().groupby(['userId']).agg({'count':'avg'}).withColumnRenamed('avg(count)','n_avg_songs')
df_n_avg_songs= n_avg_songs.withColumn('n_avg_songs', n_avg_songs.n_avg_songs.cast('double'))
df_n_avg_songs.count()

In [None]:
df_n_avg_songs.take(2)

In [None]:
# Let's calculate the number of thumbs_up a user does
n_thumbs_up = df_valid.select('userId','page').where(df_valid.page == 'Thumbs Up').groupBy('userId').count().withColumnRenamed('count', 'n_thumbs_up')
df_n_thumbs_up= n_thumbs_up.withColumn('n_thumbs_up', n_thumbs_up.n_thumbs_up.cast('bigint'))
df_n_thumbs_up.count()

In [None]:
df_n_thumbs_up.take(2)

In [None]:
# Let's calculate the number of thumbs_down a user does
n_thumbs_down = df_valid.select('userId','page').where(df_valid.page == 'Thumbs Down').groupBy('userId').count().withColumnRenamed('count', 'n_thumbs_down')
df_n_thumbs_down= n_thumbs_down.withColumn('n_thumbs_down', n_thumbs_down.n_thumbs_down.cast('bigint'))
df_n_thumbs_down.count()

In [None]:
n_thumbs_down.take(2)

In [None]:
# Let's calculate the number of Error Pages a user experiments
n_Errors = df_valid.select('userId','page').where(df_valid.page == 'Error').groupBy('userId').count().withColumnRenamed('count', 'n_Errors')
df_n_Errors = n_Errors.withColumn('n_Errors', n_Errors.n_Errors.cast('bigint'))
df_n_Errors.count()

In [None]:
df_n_Errors.take(2)

In [None]:
# Let's calculate the number of friends a user adds
n_friends = df_valid.select('userId','page').where(df_valid.page == 'Add Friend').groupBy('userId').count().withColumnRenamed('count', 'n_friends')
df_n_friends= n_friends.withColumn('n_friends', n_friends.n_friends.cast('bigint'))
df_n_friends.count()

In [None]:
df_n_friends.take(2)

In [None]:
# Let's calculate the number of Roll Advert a user experiments
n_Rolls = df_valid.select('userId','page').where(df_valid.page == 'Roll Advert').groupBy('userId').count().withColumnRenamed('count', 'n_Rolls')
df_n_Rolls = n_Rolls.withColumn('n_Rolls', n_Rolls.n_Rolls.cast('bigint'))
df_n_Rolls.count()

In [None]:
df_n_Rolls.take(2)

In [None]:
# Let's calculate the number of Help a user visits
n_Help = df_valid.select('userId','page').where(df_valid.page == 'Help').groupBy('userId').count().withColumnRenamed('count', 'n_Help')
df_n_Help = n_Help.withColumn('n_Help', n_Help.n_Help.cast('bigint'))
df_n_Help.count()

In [None]:
df_n_Help.take(2)

In [None]:
# Let's calculate the membership time of a user
#df_lifetime = df_valid.select('userId', 'subscription_days')
df_subscription_days.count()

In [None]:
df_subscription_days.take(2)

In [None]:
df_n_sessions.count()

In [None]:
df_n_sessions.take(2)

In [None]:
# Now let's collect all the previous features in a set
features = [df_n_sessions, df_total_time, df_subscription_days, df_downgraded,
            df_gender, df_level, df_n_artists, df_n_songs, df_n_songs_play,
            df_n_time_songs, df_n_avg_songs, df_n_thumbs_up, 
            df_n_thumbs_down, df_n_Errors, df_n_friends, df_n_Rolls, df_n_Help]

In [None]:
# Let's check the number of churn users
df_valid.groupby("churn").count().show()

In [None]:
# # Now let's collect all the previous features in a set

final_features = df_churn.join(df_level,'userId','outer')\
     .join(df_n_artists,'userId','outer') \
     .join(df_n_songs,'userId','outer') \
     .join(df_n_songs_play,'userId','outer')\
     .join(df_n_thumbs_up,'userId','outer') \
     .join(df_n_thumbs_down,'userId','outer') \
     .join(df_n_Errors,'userId','outer') \
     .join(df_n_friends,'userId','outer') \
     .join(df_n_Rolls,'userId','outer') \
     .join(df_n_Help,'userId','outer') \
     .join(df_subscription_days,'userId','outer') \
     .join(df_n_sessions,'userId','outer') \
     .join(df_os,'userId','outer')

In [None]:
# Let's have a general overview of the final features dataframe
final_features.printSchema()

In [None]:
# Let's check if churn proportion maintains (as I had some problems with this)
final_features.groupby("churn").count().show()

In [None]:
# Let's see the size of the final features dataframe
rows = final_features.count()
cols = len(final_features.columns)
print("The number of rows is {} and the number of columns is {}".format(rows, cols))

In [None]:
# We don't need userId, as we will predict if a user will churn, but we are not interested in the id of that user
final_features = final_features.drop('userId', 'sessionId')

In [None]:
# But we need to imputate nulls with a value, 0 for instance
final_features= final_features.na.fill(0)

In [None]:
final_features.show(2)

# Modeling
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.

In [None]:
# Change the name of the churn column into label, that will be the feature to predict
final_features = final_features.withColumnRenamed("churn","label")

In [None]:
final_features.printSchema()

In [None]:
numerical_features = final_features

In [None]:
ff_pandas = numerical_features.toPandas()

In [None]:
ff_pandas.shape

In [None]:
columns_features = numerical_features.columns

In [None]:
columns_features

In [None]:
# Let's calculate the correlation matrix to see if there are 
# correlation between features
corr = final_features.toPandas().corr()
corr

In [None]:
# Let's plot a heatmap with the correlation of the variables
# we can see there are some correlated features, such as number of songs and the number of artist, 
# that are related, the number of songs added to the playlist
sns.heatmap(corr, xticklabels=corr.columns, yticklabels=corr.columns,  cmap="Blues");

In [None]:
# First we have to vectorize our features with a VectorAssembler
# because SparkML requires a vector of features
assembler = VectorAssembler(inputCols=columns_features[1:], outputCol="NumFeatures")
df = assembler.transform(numerical_features)
df

In [None]:
# The second step is to standardize the features, to avoid that a feature that has higher values,
# dominates the rest of features
# To do that we use the StandardScaler method (scaling the standard deviation) 
scaler = StandardScaler(inputCol="NumFeatures", outputCol="features", withStd=True)
scalerModel = scaler.fit(df)
df = scalerModel.transform(df)
df

In [None]:
# We eliminate NumFeatures column as we don't need it anymore
df = df.drop('NumFeatures')

In [None]:
df.take(2)

In [None]:
df_final = df.select('label','features')

In [None]:
df_final.take(5)

In [None]:
df_final.groupby("label").count().show()

In [None]:
# Split dataset into train and test 70% vs 30%
train, test = df_final.randomSplit([0.7, 0.3], seed = 42)
#train, test, validation = df_final.randomSplit([0.6, 0.2, 0.2], seed = 42)

In [None]:
# Let's check if we have both label (0/1)
train.groupby("label").count().show()

In [None]:
# Let's check if we have both label (0/1)
test.groupby("label").count().show()

In [None]:
# Let's build a function to calculate the performance of our models
# inspired by https://stackoverflow.com/questions/60772315/how-to-evaluate-a-classifier-with-apache-spark-2-4-5-and-pyspark-python
def calculate_performace(results):
    '''
    Function that calculate the performance of a model
    Input:  
            model: model we want to evaluate
    Output: 
            print performance metrics
    '''  
    # Create both evaluators
    evaluatorMulti = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction')
    evaluator = BinaryClassificationEvaluator(labelCol = 'label', rawPredictionCol='prediction', metricName='areaUnderROC')

    # Make predictions
    predictionAndTarget = results.select('label', 'prediction')
    #print ("The Ground Truth is {} and the prediction is ".format(target, prediction))    
    
    # Get metrics
    acc = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "accuracy"})
    f1 = evaluatorMulti.evaluate(predictionAndTarget, {evaluatorMulti.metricName: "f1"})
    auc = evaluator.evaluate(predictionAndTarget)  
   
    # Prepare a DataFrame with all metrics
    performance = pd.DataFrame(index=['Accuracy', 'F1', 'AUC'], \
                     data={'Performance value': [acc, f1, auc]})
    
    # Return metrics
    return performance       

In [None]:
clf_LR = LogisticRegression(maxIter=10, regParam=0.0)
clf_DT = DecisionTreeClassifier(seed=5)
clf_GBT = GBTClassifier(maxDepth=5, maxIter = 10, seed=42)
clf_RF = RandomForestClassifier(seed=5)
clf_SVM = LinearSVC(maxIter = 10, regParam=0.01)

In [None]:
%%time
############## LOGISTIC REGRESSION ##############
# Train the model
LR_model = clf_LR.fit(train)
# Test the model
prediction_LR = LR_model.transform(test)

# Let's prepare the confusion matrix
# and cast the results to Pandas 
# in order to use sklearn metrics (confussion matrix and classification report)
y_true = prediction_LR.select("label").toPandas()
y_pred = prediction_LR.select("prediction").toPandas()

conf_matrix_LR = confusion_matrix(y_true, y_pred)
clas_rep_LR = classification_report(y_true, y_pred)

print("Logistic Regression:\n")
print(conf_matrix_LR)
print("\n")
print(clas_rep_LR)

In [None]:
%%time
############## DECISION TREES ##############
# Train the model
DT_model = clf_DT.fit(train)
# Test the model
prediction_DT = DT_model.transform(test)

# Let's prepare the confusion matrix
# and cast the results to Pandas 
# in order to use sklearn metrics (confussion matrix and classification report)
y_true = prediction_DT.select("label").toPandas()
y_pred = prediction_DT.select("prediction").toPandas()

conf_matrix_DT = confusion_matrix(y_true, y_pred)
clas_rep_DT = classification_report(y_true, y_pred)

print("Decission Trees:\n")
print(conf_matrix_DT)
print("\n")
print(clas_rep_DT)

In [None]:
%%time
############## GRADIENT BOOSTED TREE ##############
# Train the model
GBT_model = clf_GBT.fit(train)
# Test the model
prediction_GBT = GBT_model.transform(test)

# Let's prepare the confusion matrix
# and cast the results to Pandas 
# in order to use sklearn metrics (confussion matrix and classification report)
y_true = prediction_GBT.select("label").toPandas()
y_pred = prediction_GBT.select("prediction").toPandas()

conf_matrix_GBT = confusion_matrix(y_true, y_pred)
clas_rep_GBT = classification_report(y_true, y_pred)

print("Gradient Boosted Tree:\n")
print(conf_matrix_GBT)
print("\n")
print(clas_rep_GBT)

In [None]:
%%time
############## RANDOM FOREST ##############
# Train the model
RF_model = clf_RF.fit(train)
# Test the model
prediction_RF = RF_model.transform(test)

# Let's prepare the confusion matrix
# and cast the results to Pandas 
# in order to use sklearn metrics (confussion matrix and classification report)
y_true = prediction_RF.select("label").toPandas()
y_pred = prediction_RF.select("prediction").toPandas()

conf_matrix_RF = confusion_matrix(y_true, y_pred)
clas_rep_RF = classification_report(y_true, y_pred)

print("Random Forest:\n")
print(conf_matrix_RF)
print("\n")
print(clas_rep_RF)

In [None]:
%%time
############## SUPPORT VECTOR MACHINE ##############
# Train the model
SVM_model = clf_SVM.fit(train)
# Test the model
prediction_SVM = SVM_model.transform(test)

# Let's prepare the confusion matrix
# and cast the results to Pandas 
# in order to use sklearn metrics (confussion matrix and classification report)
y_true = prediction_SVM.select("label").toPandas()
y_pred = prediction_SVM.select("prediction").toPandas()

conf_matrix_SVM = confusion_matrix(y_true, y_pred)
clas_rep_SVM = classification_report(y_true, y_pred)

print("Support Vector Machine:\n")
print(conf_matrix_SVM)
print("\n")
print(clas_rep_SVM)

In [None]:
# Gradient Boosted Trees model has the best F1-score
# Let's see the importance feature of this model
# to do that, first we get the indices of the feature importances and convert them to a list
feat_imp_ind   = GBT_model.featureImportances.indices.tolist()
# and we get the feature names, from the original final_features except the label (churn)
features_without_label = final_features.columns[1:]
feat_imp_key   = [features_without_label[ind] for ind in feat_imp_ind]
# after that we get the weight, that is, the importance value and convert them to a list
feat_imp_value = GBT_model.featureImportances.values.tolist()
# next we get all together in a dataframe, to be able to display it with a nice sns barplot
feat_df        = pd.DataFrame(list(zip(features_without_label,feat_imp_value)),columns=['Feature','Importance']).sort_values('Importance',ascending=False)
# prepare the graphical details
plt.figure(figsize=(6,6))
plt.title("Feature Importance on Gradient Boosted Trees model")
sns.barplot(x='Importance', y='Feature', data=feat_df);
plt.savefig('Feature_Importance_DT.png')

#### Hyperparameter Tuning

In [None]:
%%time
# Now I am going to fine-tune the Decision Tree model
# as it is the one with the highest F1-Score
# Inspiration in https://gist.github.com/colbyford/7758088502211daa90dbc1b51c408762

# Create the initial GBT Model

dt = GBTClassifier(featuresCol="features", labelCol="label", maxDepth=2)

# Prepare the parameters to find the best combination
dt_param_grid = ParamGridBuilder() \
    .addGrid(dt.maxBins,[2, 4]) \
    .addGrid(dt.maxDepth,[2, 4]) \
    .build()

# Evaluate the model
dtevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")

# Create 5-fold CrossValidator
dtcv = CrossValidator(estimator = dt,
                      estimatorParamMaps = dt_param_grid,
                      evaluator = dtevaluator,
                      numFolds = 3)

# Run Cross Validations
dtcvModel = dtcv.fit(train)
print(dtcvModel)

# Use test set here so we can measure the accuracy of our model on new data
dtpredictions = dtcvModel.transform(test)

# Evaluate best model
print('Accuracy:', dtevaluator.evaluate(dtpredictions))

In [None]:
dtcvModel.avgMetrics

In [None]:
best_model = dtcvModel.bestModel

In [None]:
best_model_pred = best_model.transform(test)

In [None]:
y_true_best = best_model_pred.select("label").toPandas()
y_pred_best = best_model_pred.select("prediction").toPandas()

conf_matrix_GBT_best = confusion_matrix(y_true_best, y_pred_best)
clas_rep_GBT_best = classification_report(y_true_best, y_pred_best)

print("Best Gradient Boosted Trees:\n")
print(conf_matrix_DT_best)
print("\n")
print(clas_rep_DT_best)

In [None]:
# The best model after the hyperparameter tuning is not better than the original
# So we will keep the original as the best model

In [None]:
# In order to execute it on the larger dataset, I will prepare some functions
# based on the previous code

In [None]:
def load_data_spark(path_to_dataset):
    '''
    Function that load a dataset
    Input:  
            path_to_dataset: path to the file containing the data
    Output: 
            df: loaded dataset
    '''  
    df = spark.read.json(path_to_dataset)  
    
    return df

In [None]:
def feature_engineerme(df_input):
    '''
    Function that collects the feature to build the ML model
    Input:  
            df_input: dataframe
    Output: 
            df_output: output datadrame with the features
    '''  
    # This dataframe will contain the following features:
    # |-- userId: string (nullable = true)
    # |-- churn: cancellation of the service
    # |-- level: type of subscription + I will convert into number 0/1
    # |-- n_artists: number of artist a user listens to + I will convert into number
    # |-- n_songs:number of songs a user listens to + I will convert into number  
    # |-- n_songs_play:number of songs added to the playlist + I will convert into number   
    # |-- n_thumbs_up: number of thumbs up + I will convert into number
    # |-- n_thumbs_down:  number of thumbs down + I will convert into number 
    # |-- n_Errors: long (nullable = true)
    # |-- n_friends: number of friends + I will convert into number 
    # |-- n_Rolls: number of spots displayed to the user
    # |-- n_Help: number of visits to help page
    # |-- total_sdays: time of the subscription (in days)
    # |-- n_sessions: number of sesions of a user
    # |-- IPAD: the user connects with an IPAD to our Sparkify service
    # |-- IPHONE: the user connects with an IPHONE to our Sparkify service
    # |-- LINUX: the user connects with a Linux device to our Sparkify service
    # |-- MAC: the user connects with a MAC device to our Sparkify service
    # |-- WINDOWS: the user connects with a WINDOWS to our Sparkify service


    churn = df_input.groupby('userId').max("churn").withColumnRenamed("max(churn)", "churn")
    df_churn = churn.withColumn('churn', churn.churn.cast('int'))
    df_churn.count()
        
    level = df_input.select(['userId', 'level','ts']).orderBy(desc('ts')).dropDuplicates(['userId']).select(['userId', 'level']).replace(['free', 'paid'], ['0', '1'], 'level')
    ## Let's change level into a numerical,binary field (0/1)
    #level = df_valid.select(['userId', 'level']).dropDuplicates(['userId']).replace(['free', 'paid'], ['0', '1'], 'level')
    df_level = level.withColumn('level', level.level.cast('int'))
    df_level.count()
    
    # Let's calculate the number of artists a user listens to 
    n_artists = df_input.filter(df_valid.page=="NextSong").select(['userId', 'artist']).dropDuplicates().groupby('userId').count().withColumnRenamed('count', 'n_artists')
    df_n_artists= n_artists.withColumn('n_artists', n_artists.n_artists.cast('int'))
    df_n_artists.count()
    
    n_songs = df_input.select('userId','song').groupBy('userId').count().withColumnRenamed('count', 'n_songs')
    df_n_songs= n_songs.withColumn('n_songs', n_songs.n_songs.cast('double'))
    df_n_songs.count()
    
    n_songs_play = df_input.select('userId', 'page').where(df_valid.page =='Add to Playlist').groupBy('userId').agg({'page':'count'}).withColumnRenamed('count(page)','n_songs_play')
    df_n_songs_play = n_songs_play.withColumn('n_songs_play', n_songs_play.n_songs_play.cast('double'))
    df_n_songs_play.count()
        
    n_thumbs_up = df_input.select('userId','page').where(df_valid.page == 'Thumbs Up').groupBy('userId').count().withColumnRenamed('count', 'n_thumbs_up')
    df_n_thumbs_up= n_thumbs_up.withColumn('n_thumbs_up', n_thumbs_up.n_thumbs_up.cast('bigint'))
    df_n_thumbs_up.count()
    
    n_thumbs_down = df_input.select('userId','page').where(df_valid.page == 'Thumbs Down').groupBy('userId').count().withColumnRenamed('count', 'n_thumbs_down')
    df_n_thumbs_down= n_thumbs_down.withColumn('n_thumbs_down', n_thumbs_down.n_thumbs_down.cast('bigint'))
    df_n_thumbs_down.count()
    
    n_Errors = df_input.select('userId','page').where(df_valid.page == 'Error').groupBy('userId').count().withColumnRenamed('count', 'n_Errors')
    df_n_Errors = n_Errors.withColumn('n_Errors', n_Errors.n_Errors.cast('bigint'))
    df_n_Errors.count()
    
    n_friends = df_input.select('userId','page').where(df_valid.page == 'Add Friend').groupBy('userId').count().withColumnRenamed('count', 'n_friends')
    df_n_friends= n_friends.withColumn('n_friends', n_friends.n_friends.cast('bigint'))
    df_n_friends.count()
    
    n_Rolls = df_input.select('userId','page').where(df_valid.page == 'Roll Advert').groupBy('userId').count().withColumnRenamed('count', 'n_Rolls')
    df_n_Rolls = n_Rolls.withColumn('n_Rolls', n_Rolls.n_Rolls.cast('bigint'))
    df_n_Rolls.count()
    
    # Let's calculate the number of Help a user visits
    n_Help = df_input.select('userId','page').where(df_valid.page == 'Help').groupBy('userId').count().withColumnRenamed('count', 'n_Help')
    df_n_Help = n_Help.withColumn('n_Help', n_Help.n_Help.cast('bigint'))
    df_n_Help.count()
    
    # Let's calculate the subscription days a user stays in the service
    subscription_days = df_input.select('userId','subscription_days').groupBy('userId').agg({'subscription_days':'max'}).withColumnRenamed('max(subscription_days)','total_sdays')
    df_subscription_days = subscription_days.withColumn('total_sdays', subscription_days.total_sdays.cast('double'))
    df_subscription_days.count()

    n_sessions = df_input.select('userId','sessionId').dropDuplicates().groupby('userId').count().withColumnRenamed('count','n_sessions')
    df_n_sessions = n_sessions.withColumn('n_sessions', n_sessions.n_sessions.cast('bigint'))
    df_n_sessions.count()
    
    # Let's take the OS the user connects to the Sparkify session and make one hot encoding with the pivot option
    df_os = df_valid.select('userId','OS').dropDuplicates().groupby('userId').pivot('OS').agg(count('OS')).fillna(0)   
    
    df_output = df_churn.join(df_level,'userId','outer')\
     .join(df_n_artists,'userId','outer') \
     .join(df_n_songs,'userId','outer') \
     .join(df_n_songs_play,'userId','outer')\
     .join(df_n_thumbs_up,'userId','outer') \
     .join(df_n_thumbs_down,'userId','outer') \
     .join(df_n_Errors,'userId','outer') \
     .join(df_n_friends,'userId','outer') \
     .join(df_n_Rolls,'userId','outer') \
     .join(df_n_Help,'userId','outer') \
     .join(df_subscription_days,'userId','outer') \
     .join(df_n_sessions,'userId','outer') \
     .join(df_os,'userId','outer')
    

    return df_output

In [None]:
df_final_feat = feature_engineerme(df_valid)

# Final Steps
Clean up your code, adding comments and renaming variables to make the code easier to read and maintain. Refer to the Spark Project Overview page and Data Scientist Capstone Project Rubric to make sure you are including all components of the capstone project and meet all expectations. Remember, this includes thorough documentation in a README file in a Github repository, as well as a web app or blog post.