In [28]:
import findspark
findspark.init('C:/spark-2.3.2-bin-hadoop2.7')

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import warnings
warnings.filterwarnings('ignore')
%matplotlib inline

In [83]:
# importing necessary libraries and initialising the data path
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, avg, desc,countDistinct, count

data_dir = 'C:/Users/John/PycharmProjects/Capstone/'

In [23]:
# creating a spark session
spark = SparkSession.builder.master('local').appName("capstone_project").getOrCreate()

In [24]:
# reading in the data
df = spark.read.json(data_dir + 'mini_sparkify_event_data.json')
df.persist()

DataFrame[artist: string, auth: string, firstName: string, gender: string, itemInSession: bigint, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: bigint, sessionId: bigint, song: string, status: bigint, ts: bigint, userAgent: string, userId: string]

# Now that we have loaded the data, Let's do some Exploratory Data Analysis(EDA) to get a feel for the data.

In [25]:
# the total number of records and features in the dataframe
n_record = df.count()
n_col = len(df.columns)
print('the total number of records and features in the dataframe is: {} and {}'.format(n_record, n_col))

the total number of records and features in the dataframe is: 286500 and 18


In [26]:
# Lets see the columns and their data types.
# we can safely leave out firstName, lastName, 
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [29]:
# Lets see the summary of the columns in this dataframe
df.describe().toPandas()

Unnamed: 0,summary,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,count,228108,286500,278154,278154,286500.0,278154,228108.0,286500,278154,286500,286500,278154.0,286500.0,228108,286500.0,286500.0,278154,286500.0
1,mean,551.0852017937219,,,,114.41421291448516,,249.1171819778372,,,,,1535358834085.557,1041.526554973822,Infinity,210.05459685863875,1540956889810.4714,,59682.02278593872
2,stddev,1217.7693079161374,,,,129.76726201141085,,99.23517921058324,,,,,3291321616.328068,726.7762634630834,,31.50507848842202,1507543960.8187113,,109091.9499991052
3,min,!!!,Cancelled,Adelaida,F,0.0,Adams,0.78322,free,"Albany, OR",GET,About,1521380675000.0,1.0,ÃÂg ÃÂtti GrÃÂ¡a ÃÂsku,200.0,1538352117000.0,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10)...",
4,max,ÃÂlafur Arnalds,Logged Out,Zyonna,M,1321.0,Wright,3024.66567,paid,"Winston-Salem, NC",PUT,Upgrade,1543247354000.0,2474.0,ÃÂau hafa sloppiÃÂ° undan ÃÂ¾unga myrkursins,404.0,1543799476000.0,Mozilla/5.0 (compatible; MSIE 9.0; Windows NT ...,99.0


**Lets first scrutnize the 'artist' column and see if this could be used as features. However, it is likely that there will be lost of unique artists which will make using this column as a feature unworkable.**

In [115]:
# Lets find out how many unique artists are there.
n_of_artists = df.filter((col('artist').isNotNull()) & (col('artist') != '')).select(col('artist')).dropDuplicates().count()
print('the number of artists present in the dataframe is {}'.format(n_of_artists))

the number of artists present in the dataframe is 17655


**There are way too many artists, so we can't use this column as a feature just as we suspected.**

In [31]:
# Creates database table for SQL mode.
df.createOrReplaceTempView('df_table')

In [124]:
# Lets poke the 'auth' column a bit and find out what it has to offer.
# First lets find out what are the unique values in 'auth' column
df.select('auth').filter((col('auth').isNotNull()) & (col('auth') != '')).dropDuplicates().show()

+----------+
|      auth|
+----------+
|Logged Out|
| Cancelled|
|     Guest|
| Logged In|
+----------+



**Looks like 'auth' is authentication type. Here Cancelled, probably, means churn. Since we are trying to predict user churn, just for the fun of it lets figure out what is the churn ratio for the dataset.**

In [135]:
# Dataframe method
# df.filter((col('userId').isNotNull()) & (col('userId') != '')).groupby('auth').agg(countDistinct('userId').alias('unique_user_count')).show()

# SQL method
df_auth = spark.sql('''
    SELECT auth, COUNT(DISTINCT userId) as unique_user_count
    FROM df_table  
    WHERE userId IS NOT NULL AND userID != ''
    GROUP BY auth
    ORDER BY unique_user_count DESC
''')

df_auth.show()

n_users_registered = df_auth.filter(col('auth') == 'Logged In').select('unique_user_count').first()[0]
n_users_left = df_auth.filter(col('auth') == 'Cancelled').select('unique_user_count').first()[0]

print('The number of users registered: {} and the number of users left: {} with a churn ratio of: {}'\
                      .format(n_users_registered,n_users_left,round((n_users_left/n_users_registered),2)))


+---------+-----------------+
|     auth|unique_user_count|
+---------+-----------------+
|Logged In|              225|
|Cancelled|               52|
+---------+-----------------+

The number of users registered: 225 and the number of users left: 52 with a churn ratio of: 0.23


**We can't figure out if the users that left came back again for the service with any certainity as we are not sure when the user comes back he is assigned the same user id as before. It would have been interesting to find that out though.**

In [142]:
# The next festure of interest is the 'page' column. Lets give it a whirl.
# First lets find out what are the unique values in 'page' column
# since 'page' column has no missing values, we need not filter out the NULL value or empty strings present in this row.
df_page = df.select('page').filter((col('page').isNotNull()) & (col('page') != '')).dropDuplicates()
print('the number of unique values in the page column is: {}'.format(df_page.count()))

df_page.toPandas()

the number of unique values in the page column is: 22


Unnamed: 0,page
0,Cancel
1,Submit Downgrade
2,Thumbs Down
3,Home
4,Downgrade
5,Roll Advert
6,Logout
7,Save Settings
8,Cancellation Confirmation
9,About


**Looks like the dataset dutifully captures users every single interaction with the website. Right from the moment they logged in to the moment they left, every single page user visited or in other words every single click the user made has been captured.**

**Also, notice that there is a value called 'Cancellation Confirmation' which also could be denoting customer churn. If we find out how many users clicked 'Cancellation Confirmation' and if it is the same as what we found in the 'auth' column then we can confirm that 'Cancellation Confirmation' definitely means user churn.**

In [150]:
assert df.filter(df.auth == 'Cancelled').count() == df.filter(df.page == 'Cancellation Confirmation').count(),"Oops, our assumption seems to be wrong"
print('If you seeing this that means "Cancelled" in auth column is the same as "Cancellation Confirmation" in page column')

df.filter((col('userId').isNotNull()) & (col('userId') != '')).groupby('page').agg(countDistinct('userId')).toPandas()

If you seeing this that means "Cancelled" in auth column is the same as "Cancellation Confirmation" in page column


Unnamed: 0,page,count(DISTINCT userId)
0,Cancel,52
1,Submit Downgrade,49
2,Thumbs Down,203
3,Home,223
4,Downgrade,154
5,Roll Advert,207
6,Logout,213
7,Save Settings,132
8,Cancellation Confirmation,52
9,About,155


**I have a sneaking suspicion that any non-essential service will lose customers because of two major reasons. 1) Because the customer didn't like the service or any particular aspect of the service 2) Because the customer has a predisposition - This type of customers will leave even if the service is perfect and meets or exceeds their expectations. One of the  reason could be time consciousness(they don't want to spend a lot of time listening to music and they believe their time is being wasted because of this music service and they quit).**

**The reason i am bringing this up is, what a customer does before he becomes a paid member(if at all he does) might have a strong correlation with what he does after he does **

In [108]:
# Lets scrutnize the gender column.
# what is the proportion of male and female users
spark.sql('''
    SELECT T.gender, COUNT(*) * 100.0/ SUM(count(*)) OVER() AS percentage
    FROM (SELECT DISTINCT userId, gender
    FROM df_table
    WHERE gender IS NOT NULL) AS T 
    GROUP BY T.gender    
''').show()

+------+-----------------+
|gender|       percentage|
+------+-----------------+
|     F|44.19642857142857|
|     M|55.80357142857143|
+------+-----------------+



In [38]:
# lets drop all the records that have any null value in any of the columns.
df_no_missing = df.na.drop(how='any')
n_df_no_missing = df_no_missing.count()
print('the number of records in the df_no_missing is: {}'.format(n_df_no_missing))

the number of records in the df_no_missing is: 432877


In [39]:
# Lets just scrutnize the 'artist' column. This column has 432877 non null records and the rest are null records.
df_artist_exist = df.filter(col('artist').isNotNull() == True)
df_artist_null = df.filter(col('artist').isNull() == True)
n_artist_exist = df_artist_exist.count()
n_artist_null = df_artist_null.count()
print('the number of records that have artists present: {}'.format(n_artist_exist))
print('the number of records that doesnt have artists present: {}'.format(n_artist_null))

the number of records that have artists present: 432877
the number of records that doesnt have artists present: 110828


In [40]:
# lets check if the dataframes df_no_missing and df_artist_exist are the same or not

a_prime = df_no_missing.groupBy(sorted(df_no_missing.columns)).count()
b_prime = df_artist_exist.groupBy(sorted(df_artist_exist.columns)).count()

assert a_prime.subtract(b_prime).count() == b_prime.subtract(a_prime).count() == 0, "If you seeing this the two dataframes are not equal"
print("The two data frames are equal")

The two data frames are equal
