# 1.1 Imports

In [41]:
# import libraries

import numpy as np
import pandas as pd

from pyspark.sql import SparkSession, Window
from pyspark import SparkFiles
from pyspark.sql.functions import avg, col, concat, count, desc, \
asc, explode, lit, min, max, split, stddev, udf, isnan, when, rank, \
log, sqrt, cbrt, exp, round
from pyspark.sql.functions import sum as Fsum
from pyspark.sql.types import IntegerType

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, \
LogisticRegressionModel, RandomForestClassifier, \
RandomForestClassificationModel, GBTClassifier, \
GBTClassificationModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

from pyspark.ml.feature import CountVectorizer, IDF, Normalizer, \
PCA, RegexTokenizer, Tokenizer, StandardScaler, StopWordsRemover, \
StringIndexer, VectorAssembler, MaxAbsScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.clustering import KMeans
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [2]:
# create parksession

spark = SparkSession.builder.master('local') \
        .appName('Sparkify').getOrCreate()

21/07/16 16:51:27 WARN Utils: Your hostname, Yats-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.192.149.232 instead (on interface en0)
21/07/16 16:51:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/07/16 16:51:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
# load data

path = 'mini_sparkify_event_data.json'
df = spark.read.json(path)
df.head()



Row(artist='Martha Tilston', auth='Logged In', firstName='Colin', gender='M', itemInSession=50, lastName='Freeman', length=277.89016, level='paid', location='Bakersfield, CA', method='PUT', page='NextSong', registration=1538173362000, sessionId=29, song='Rockpools', status=200, ts=1538352117000, userAgent='Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0', userId='30')

In [4]:
# check schema
df.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



# 1.2 Missing Values

# 1.2.1 Observations

In [5]:
# total number of rows
print(f'total number of rows: {df.count()}')

total number of rows: 286500


In [6]:
# number of missing values

for c in df.columns:
    df.select(
        [count(when(df[c].isNull(),True))]
    ).show()

+-----------------------------------------------+
|count(CASE WHEN (artist IS NULL) THEN true END)|
+-----------------------------------------------+
|                                          58392|
+-----------------------------------------------+

+---------------------------------------------+
|count(CASE WHEN (auth IS NULL) THEN true END)|
+---------------------------------------------+
|                                            0|
+---------------------------------------------+

+--------------------------------------------------+
|count(CASE WHEN (firstName IS NULL) THEN true END)|
+--------------------------------------------------+
|                                              8346|
+--------------------------------------------------+

+-----------------------------------------------+
|count(CASE WHEN (gender IS NULL) THEN true END)|
+-----------------------------------------------+
|                                           8346|
+-----------------------------------------

# 1.2.2 Missing Values in userId

In [7]:
# from above findings, 0 rows in userId is NULL
# so we check for samples

df.select(['userId']).drop_duplicates().orderBy(df['userId']).show(10)

+------+
|userId|
+------+
|      |
|    10|
|   100|
|100001|
|100002|
|100003|
|100004|
|100005|
|100006|
|100007|
+------+
only showing top 10 rows



In [8]:
# the first row is an empty string

In [9]:
# create a temporary view for running SQL queries
df.createOrReplaceTempView('df_table')

In [10]:
# display the page events of the user with empty string userId

spark.sql(
    '''
    SELECT DISTINCT page
    FROM df_table
    WHERE userId == ''
    '''
).show()

+-------------------+
|               page|
+-------------------+
|               Home|
|              About|
|Submit Registration|
|              Login|
|           Register|
|               Help|
|              Error|
+-------------------+



In [11]:
# display the page events of another user with non-empty string userId

spark.sql(
    '''
    SELECT DISTINCT page
    FROM df_table
    WHERE userId == 100001
    '''
).show()

+--------------------+
|                page|
+--------------------+
|              Cancel|
|         Thumbs Down|
|                Home|
|         Roll Advert|
|              Logout|
|Cancellation Conf...|
|            Settings|
|     Add to Playlist|
|          Add Friend|
|            NextSong|
|           Thumbs Up|
|                Help|
|             Upgrade|
|               Error|
+--------------------+



In [12]:
# we can see that 
# the page events of empty string userId user has nearly no features page except
# 'Submit Registration' which might indicate that it is a user with incomplete registration

In [13]:
# drop the row with empty userId
df = df.filter(df['userId'] != '')

# 1.3 Exploratory Data Analysis (EDA)

# 1.3.1 Overview of Numerical Features

In [14]:
# numerical columns
num_cols = ['itemInSession', 'registration', 'status', 'ts', 'length']

In [15]:
# describe numerical columns
df.select(num_cols).describe().show()

+-------+------------------+--------------------+------------------+--------------------+-----------------+
|summary|     itemInSession|        registration|            status|                  ts|           length|
+-------+------------------+--------------------+------------------+--------------------+-----------------+
|  count|            278154|              278154|            278154|              278154|           228108|
|   mean|114.89918174824018|1.535358834084427...|209.10321620397335|1.540958915431871...|249.1171819778458|
| stddev|  129.851729399489| 3.291321616327586E9|30.151388851328214|1.5068287123306298E9|99.23517921058361|
|    min|                 0|       1521380675000|               200|       1538352117000|          0.78322|
|    max|              1321|       1543247354000|               404|       1543799476000|       3024.66567|
+-------+------------------+--------------------+------------------+--------------------+-----------------+



In [43]:
# 'status' is HTTP status

header = 'status'
df.groupBy(header).count() \
.withColumn('%', round((col('count')/df.count())*100,2)) \
.orderBy('%', ascending=False).show()

+------+------+-----+
|status| count|    %|
+------+------+-----+
|   200|254718|91.57|
|   307| 23184| 8.33|
|   404|   252| 0.09|
+------+------+-----+



In [17]:
# 'status' should be classified as categorical feature

# 1.3.2 Overview of Categorical Features

In [21]:
# catagorical columns
cat_cols = [
    'artist', 'auth', 'firstName', 'lastName', 
    'gender', 'level', 'location','method', 
    'page', 'song', 'userAgent'
]

In [None]:
# we only observe the features that can be classified

In [42]:
# values count of 'auth'

header = 'auth'
df.groupBy(header).count() \
.withColumn('%', round((col('count')/df.count())*100,2)) \
.orderBy('%', ascending=False).show()

+---------+------+-----+
|     auth| count|    %|
+---------+------+-----+
|Logged In|278102|99.98|
|Cancelled|    52| 0.02|
+---------+------+-----+



In [44]:
# values count of 'gender'

header = 'gender'
df.groupBy(header).count() \
.withColumn('%', round((col('count')/df.count())*100,2)) \
.orderBy('%', ascending=False).show()

+------+------+-----+
|gender| count|    %|
+------+------+-----+
|     F|154578|55.57|
|     M|123576|44.43|
+------+------+-----+



In [45]:
# values count of 'level'

header = 'level'
df.groupBy(header).count() \
.withColumn('%', round((col('count')/df.count())*100,2)) \
.orderBy('%', ascending=False).show()

+-----+------+-----+
|level| count|    %|
+-----+------+-----+
| paid|222433|79.97|
| free| 55721|20.03|
+-----+------+-----+



In [47]:
# first 5 values count of 'location'

header = 'location'
df.groupBy(header).count() \
.withColumn('%', round((col('count')/df.count())*100,2)) \
.orderBy('%', ascending=False).limit(5).show()

+--------------------+-----+-----+
|            location|count|    %|
+--------------------+-----+-----+
|Los Angeles-Long ...|30131|10.83|
|New York-Newark-J...|23684| 8.51|
|Boston-Cambridge-...|13873| 4.99|
|Houston-The Woodl...| 9499| 3.42|
|Charlotte-Concord...| 7780|  2.8|
+--------------------+-----+-----+



In [48]:
# values count of 'method'

header = 'method'
df.groupBy(header).count() \
.withColumn('%', round((col('count')/df.count())*100,2)) \
.orderBy('%', ascending=False).show()

+------+------+-----+
|method| count|    %|
+------+------+-----+
|   PUT|257818|92.69|
|   GET| 20336| 7.31|
+------+------+-----+



In [54]:
# unique values of 'page'

header = 'page'
print([x[header] for x in df.select(header).drop_duplicates().collect()])

['Cancel', 'Submit Downgrade', 'Thumbs Down', 'Home', 'Downgrade', 'Roll Advert', 'Logout', 'Save Settings', 'Cancellation Confirmation', 'About', 'Settings', 'Add to Playlist', 'Add Friend', 'NextSong', 'Thumbs Up', 'Help', 'Upgrade', 'Error', 'Submit Upgrade']


# 1.3.3 Define Churn

In [None]:
# churn can be detected by the appearance of 'Cancellation Confirmation' in column 'page'

# udf? lambda? map?

In [None]:
# END