### Sparkify EDA & ETL Notebook
This notebook contains the exploratory data analysis (EDA) and extract, transform and load (ETL) process of the full dataset (12GB) from the project Sparkify. 

This notebook is not well commented on because it is a work version of the final notebook. You can find the final and commented notebook in this GitHub repository named Sparkify_AWS_EMR.

In [1]:
from pyspark.sql import SparkSession, Window

from pyspark.sql.functions import *
from pyspark.sql.types import *


import pyspark.sql.functions as F

import datetime
import time
from pyspark.sql.functions import date_format


from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression,RandomForestClassifier, GBTClassifier, DecisionTreeClassifier, LinearSVC
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.feature import MinMaxScaler, StringIndexer, VectorAssembler,StandardScaler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder



VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
9,application_1610887512695_0010,pyspark,idle,,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
# create spark session
spark = SparkSession \
    .builder \
    .appName('Sparkify') \
    .getOrCreate()



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# 2 Load and Clean Dataset and Data Understanding
In this workspace, I use the full dataset. Load and clean the dataset, checking for invalid or missing data - for example, records without userids or sessionids. 



In [6]:
event_data = 's3n://udacity-dsnd/sparkify/sparkify_event_data.json'
df = spark.read.json(event_data)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
df.show(2)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+-------+
|     artist|     auth|firstName|gender|itemInSession|lastName|   length|level|            location|method|    page| registration|sessionId|                song|status|           ts|           userAgent| userId|
+-----------+---------+---------+------+-------------+--------+---------+-----+--------------------+------+--------+-------------+---------+--------------------+------+-------------+--------------------+-------+
|  Popol Vuh|Logged In|    Shlok|     M|          278| Johnson|524.32934| paid|Dallas-Fort Worth...|   PUT|NextSong|1533734541000|    22683|Ich mache einen S...|   200|1538352001000|"Mozilla/5.0 (Win...|1749042|
|Los Bunkers|Logged In|  Vianney|     F|            9|  Miller|238.39302| paid|San Francisco-Oak...|   PUT|NextSong|1537500318000|    20836|         MiÃ

In [8]:
print(f'Total rows: {df.count():,}'), print('Total columns {}'.format(len(df.columns)));

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Total rows: 26,259,199
Total columns 18
(None, None)

In [9]:
df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

In [10]:
#checking null values for al variables mid dataset
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+----+---------+------+-------------+--------+-------+-----+--------+------+----+------------+---------+-------+------+---+---------+------+
| artist|auth|firstName|gender|itemInSession|lastName| length|level|location|method|page|registration|sessionId|   song|status| ts|userAgent|userId|
+-------+----+---------+------+-------------+--------+-------+-----+--------+------+----+------------+---------+-------+------+---+---------+------+
|5408927|   0|   778479|778479|            0|  778479|5408927|    0|  778479|     0|   0|      778479|        0|5408927|     0|  0|   778479|     0|
+-------+----+---------+------+-------------+--------+-------+-----+--------+------+----+------------+---------+-------+------+---+---------+------+

In [11]:
# customers events
df.select('page').dropDuplicates().sort('page').show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+
|                page|
+--------------------+
|               About|
|          Add Friend|
|     Add to Playlist|
|              Cancel|
|Cancellation Conf...|
|           Downgrade|
|               Error|
|                Help|
|                Home|
|               Login|
|              Logout|
|            NextSong|
|            Register|
|         Roll Advert|
|       Save Settings|
|            Settings|
|    Submit Downgrade|
| Submit Registration|
|      Submit Upgrade|
|         Thumbs Down|
+--------------------+
only showing top 20 rows

In [12]:
# find userId's that have cancelled
df.filter("page = 'Cancellation Confirmation'").show(n=2, vertical=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

-RECORD 0-----------------------------
 artist        | null                 
 auth          | Cancelled            
 firstName     | Harrison             
 gender        | M                    
 itemInSession | 32                   
 lastName      | Turner               
 length        | null                 
 level         | paid                 
 location      | Dayton, OH           
 method        | GET                  
 page          | Cancellation Conf... 
 registration  | 1536683985000        
 sessionId     | 24588                
 song          | null                 
 status        | 200                  
 ts            | 1538360145000        
 userAgent     | Mozilla/5.0 (Wind... 
 userId        | 1768454              
-RECORD 1-----------------------------
 artist        | null                 
 auth          | Cancelled            
 firstName     | Andrew               
 gender        | M                    
 itemInSession | 16                   
 lastName      | Lutz    

In [13]:
df.select(['userId', 'level','page','artist', 'song', 'length']).where(df.userId == '18').collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[]

So when the customer goes to other pages than the Next song, artist, song, and length values are null. If you drop all null values you will lose the information on customers' behavior, even canceled and downgraded customers.

In [14]:
# checking empty values for userID
df.filter(df['userId']=='').count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0

In [15]:
# checking empty values for sessionId
df.filter(df['sessionId']=='').count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0

In [16]:
# create timestamp column from original ts column
get_timestamp = udf(lambda x : datetime.datetime.fromtimestamp(x/1000.0).strftime('%Y-%m-%d %H:%M:%S'))


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
# filter empty gender, create timestamp, month, day, weekday,hour, date variables 
# create new registartion datatype form long to timestamp for further processing
def clean_data(df):
    df = df\
        .filter(col('gender')!='null')\
        .withColumn('timestamp', get_timestamp(df['ts']).cast('timestamp'))\
        .withColumn('month', month('timestamp'))\
        .withColumn('day', dayofmonth(col('timestamp')))\
        .withColumn('weekday', date_format(col('timestamp'),'EEEE'))\
        .withColumn('hour', hour('timestamp'))\
        .withColumn('reg_timestamp', get_timestamp(df['registration']).cast('timestamp'))\
        .withColumn('date', from_unixtime(col('ts')/1000).cast(DateType()))
        
    
    return df




VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
df = clean_data(df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- weekday: string (nullable = true)
 |-- hour: integer (nullable = true)
 |-- reg_timestamp: timestamp (nullable = true)
 |-- date: date (nullable = true)

In [14]:
# testaa gender
df.groupby('gender').count().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+--------+
|gender|   count|
+------+--------+
|     F|12181158|
|     M|13299562|
+------+--------+

In [20]:
df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

25480720

# 3 Exploratory Data Analysis

Here i'm working with: This workspace contains a tiny subset (128MB) 

or the full dataset available (12GB).

### Define Churn

Column Churn will be created to use as the label for the ML models. Churn column consists of customers who have canceled or downgraded their subscriptions. Cancellation Confirmation and Submit downgrade events define the customers churn, which happens for both paid and free users.

### Explore Data
I will perform some exploratory data analysis to observe the behavior for users who stayed or downgraded vs users who churned. First, exploring aggregates on these two groups of users, observing how much of a specific action they experienced per a certain time unit and the number of songs played.

In [21]:
# process churn variable
cancel_churn = udf(lambda x: int(x=='Cancellation Confirmation'), IntegerType())
downgrade_churn = udf(lambda x: int(x=='Submit Downgrade'), IntegerType())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
# create new columns: cancelled and downgraded
df = df.withColumn('downgraded', downgrade_churn('page')).withColumn('cancelled', cancel_churn('page'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
# create churn column
df = df.withColumn('churn', when((col('cancelled')==1) | (col('downgraded')==1),1).otherwise(0))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
# number of customers
df.select('userId').distinct().count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

22277

In [25]:
# customers who downgraded or cancelled
df.agg({'cancelled' : 'sum', 'downgraded' : 'sum'}).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+--------------+
|sum(downgraded)|sum(cancelled)|
+---------------+--------------+
|           6494|          5003|
+---------------+--------------+

In [26]:
# customers who downgraded or cancelled
df.agg({'cancelled' : 'sum', 'downgraded' : 'sum'}).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+--------------+
|sum(downgraded)|sum(cancelled)|
+---------------+--------------+
|           6494|          5003|
+---------------+--------------+

In [27]:
users_in_churn = df.agg(sum(df.cancelled + df.downgraded)).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------------------+
|sum((cancelled + downgraded))|
+-----------------------------+
|                        11497|
+-----------------------------+

In [28]:
# churn rate
11497/22277

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0.5160928311711631

In [29]:
# churned customers by gender
df.dropDuplicates(["userId", "churn"]).groupby(["churn", "gender"]).count().sort("churn").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+------+-----+
|churn|gender|count|
+-----+------+-----+
|    0|     M|11651|
|    0|     F|10626|
|    1|     M| 4687|
|    1|     F| 4276|
+-----+------+-----+

### Saved model

In [42]:
df.write.mode('overwrite').save('s3://<BUCKET_NAME>/df_clean', format='json')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# load df_clean
#df = spark.read.json('s3://sparkifyown/df_clean')

In [None]:
# DATABRICKS data frame
#gender distribution between regular and churn customers
churn_gender = df.drop_duplicates(['userId', 'churn']).groupby(['churn', 'gender']).count().sort('churn')
display(churn_gender)

<img src='gender_churn.png'>

In [43]:
# churned customers by level
df.drop_duplicates(['userId', 'churn']).groupby(['churn', 'level']).count().sort('churn').show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+-----+-----+
|churn|level|count|
+-----+-----+-----+
|    0| free|14406|
|    0| paid| 7871|
|    1| free| 1204|
|    1| paid| 7759|
+-----+-----+-----+

In [None]:
# DATABRICKS data frame
# churn and levels
level_data = df.drop_duplicates(['userId', 'churn']).groupby(['churn', 'level']).count().sort('churn')
display(level_data)

<img src="level_churn.png">

**Note** Customers can have many events on the level page, so they can have multiple downgrades and upgrade events and that's why the number of free and paid events is bigger than the number of customers.

In [44]:
df.groupby('page').agg(countDistinct('userId')).sort('page').show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----------------------+
|                page|count(DISTINCT userId)|
+--------------------+----------------------+
|               About|                 14460|
|          Add Friend|                 20305|
|     Add to Playlist|                 21260|
|              Cancel|                  5003|
|Cancellation Conf...|                  5003|
|           Downgrade|                 15209|
|               Error|                 11272|
|                Help|                 18243|
|                Home|                 22072|
|              Logout|                 21160|
|            NextSong|                 22261|
|         Roll Advert|                 20068|
|       Save Settings|                 12237|
|            Settings|                 18933|
|    Submit Downgrade|                  5103|
|      Submit Upgrade|                 12082|
|         Thumbs Down|                 20031|
|           Thumbs Up|                 21732|
|             Upgrade|            

In [45]:
songs_in_hour = df.filter(df.page == 'NextSong').groupby(df.hour).count().orderBy(df.hour.cast('float')).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------+
|hour|  count|
+----+-------+
|   0| 841968|
|   1| 815369|
|   2| 792394|
|   3| 772736|
|   4| 757519|
|   5| 746738|
|   6| 738686|
|   7| 734562|
|   8| 737505|
|   9| 746179|
|  10| 762729|
|  11| 791741|
|  12| 828841|
|  13| 881877|
|  14| 951486|
|  15|1027741|
|  16|1068131|
|  17|1069106|
|  18|1050910|
|  19|1021136|
+----+-------+
only showing top 20 rows

In [None]:
# DATABRICKS data frame
songs_in_hour = df.filter(df.page == 'NextSong').groupby(df.hour).count().orderBy(df.hour.cast('float'))
display(songs_in_hour)

<img src="songs_hour.png">

In [46]:
# songs per days
songs_in_day = df.filter(df.page == 'NextSong').groupby(df.weekday).count().show()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+-------+
|  weekday|  count|
+---------+-------+
|Wednesday|3498672|
|  Tuesday|3545454|
|   Friday|3448315|
| Thursday|3402855|
| Saturday|2014448|
|   Monday|3188927|
|   Sunday|1751601|
+---------+-------+

In [None]:
# DATABRICKS data frame
songs_in_day = df.filter(df.page == 'NextSong').groupby(df.weekday).count()
display(songs_in_day)

<img src="songs_weekday.png">

In [None]:
# DATABRICKS data frame
df = df.withColumn('state', split(col('location'),',').getItem(1))
state_data = df.dropDuplicates(['churn','userId']).groupby(['state','churn']).count().sort('state')
display(state_data)

<img src="location1.png">

In [47]:
# sessions duration per day per userId
day_session = df.groupby('userId','date','sessionId').\
    agg(F.max('ts').alias('end'), F.min('ts').alias('start')).\
    withColumn('day_session_duration', (col('end')-col('start'))*0.001)
day_session.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+----------+---------+-------------+-------------+--------------------+
| userId|      date|sessionId|          end|        start|day_session_duration|
+-------+----------+---------+-------------+-------------+--------------------+
|1372655|2018-10-01|    26264|1538438255000|1538424626000|             13629.0|
|1339441|2018-10-01|     8410|1538438388000|1538395196000|             43192.0|
|1351140|2018-10-01|     2368|1538437762000|1538420713000|             17049.0|
|1472238|2018-10-01|     7331|1538434551000|1538405518000|             29033.0|
|1141148|2018-10-01|    20490|1538438390000|1538364679000|             73711.0|
|1223922|2018-10-01|    14820|1538438251000|1538390515000|             47736.0|
|1596949|2018-10-01|    28166|1538438241000|1538424357000|             13884.0|
|1449452|2018-10-01|    28560|1538438308000|1538418299000|             20009.0|
|1319363|2018-10-02|    24768|1538443608000|1538438416000|              5192.0|
|1227242|2018-10-02|    19079|1538452795

In [48]:
day_session_max = day_session.agg({'day_session_duration': 'max'}).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------------+
|max(day_session_duration)|
+-------------------------+
|                  86397.0|
+-------------------------+

In [49]:
# max seconds in hours
86397/3600

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

23.999166666666667

In [50]:
# daily session average duration 
daily_duration = df.groupby('userId','date','sessionId').\
    agg(F.max('ts').alias('end'), F.min('ts').alias('start')).\
    withColumn('session_duration', (col('end')-col('start'))*0.001).\
    groupby('userId','date').\
    avg('session_duration').\
    groupby('userId').\
    agg(round(F.avg('avg(session_duration)')).alias('daily_sec_avg')).\
    orderBy('daily_sec_avg', ascending=False)
daily_duration.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------------+
| userId|daily_sec_avg|
+-------+-------------+
|1685455|      72852.0|
|1815032|      71574.0|
|1867030|      63449.0|
|1945321|      63116.0|
|1133049|      62968.0|
|1788891|      61363.0|
|1005217|      60681.0|
|1032628|      59231.0|
|1242486|      57459.0|
|1645549|      57315.0|
|1021090|      57043.0|
|1382275|      56060.0|
|1947971|      54438.0|
|1366694|      54239.0|
|1368082|      54042.0|
|1435384|      52866.0|
|1730053|      51444.0|
|1358369|      49304.0|
|1840331|      48942.0|
|1578939|      48424.0|
+-------+-------------+
only showing top 20 rows

In [51]:
# sessions duration per month per userId
month_session = df.groupby('userId','month','sessionId').\
    agg(F.max('ts').alias('end'), F.min('ts').alias('start')).\
    withColumn('month_session_duration', (col('end')-col('start'))*0.001)
month_session.show()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----+---------+-------------+-------------+----------------------+
| userId|month|sessionId|          end|        start|month_session_duration|
+-------+-----+---------+-------------+-------------+----------------------+
|1656284|   10|    18520|1538587885000|1538513994000|               73891.0|
|1762251|   10|    18962|1538585050000|1538577708000|                7342.0|
|1864952|   10|    17447|1538604942000|1538574190000|               30752.0|
|1016701|   10|    30374|1538582760000|1538580822000|                1938.0|
|1927775|   10|    34246|1538588861000|1538570873000|               17988.0|
|1676531|   10|    29305|1538632261000|1538579481000|               52780.0|
|1392515|   10|    31924|1538585478000|1538580000000|                5478.0|
|1453014|   10|    37327|1538601680000|1538582054000|               19626.0|
|1452292|   10|     1296|1538585922000|1538583524000|                2398.0|
|1199076|   10|    36948|1538605531000|1538599710000|                5821.0|

In [52]:
month_duration = df.groupby('userId','month','sessionId').\
    agg(F.max('ts').alias('end'), F.min('ts').alias('start')).\
    withColumn('session_duration', (col('end')-col('start'))*0.001).\
    groupby('userId','month').\
    avg('session_duration').\
    groupby('userId').\
    agg(round(F.avg('avg(session_duration)')).alias('month_sec_avg')).\
    orderBy('month_sec_avg', ascending=False)
month_duration.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------------+
| userId|month_sec_avg|
+-------+-------------+
|1815032|     143408.0|
|1272739|     142548.0|
|1151521|     136324.0|
|1788643|     125212.0|
|1334667|     120627.0|
|1021090|     114250.0|
|1382275|     112276.0|
|1719533|     111821.0|
|1839248|     111230.0|
|1730053|     107110.0|
|1483106|     102642.0|
|1005217|     101344.0|
|1788891|      99570.0|
|1578939|      97063.0|
|1872977|      94126.0|
|1379420|      94065.0|
|1502280|      93627.0|
|1073319|      92369.0|
|1835097|      92294.0|
|1221432|      89689.0|
+-------+-------------+
only showing top 20 rows

In [53]:
# registered days per users
reg_days = df.groupby('userId').agg(round(((F.max(unix_timestamp('timestamp')) - F.min(unix_timestamp('reg_timestamp')))/86400)).alias('reg_days'))
reg_days.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------+
| userId|reg_days|
+-------+--------+
|1390009|    36.0|
|1519090|    64.0|
|1394508|    92.0|
|1178731|    93.0|
|1351489|    74.0|
|1358765|    93.0|
|1500901|    84.0|
|1718034|    46.0|
|1384823|    54.0|
|1083324|   149.0|
|1633577|   133.0|
|1875484|    44.0|
|1492713|   127.0|
|1658815|    66.0|
|1114507|   100.0|
|1331962|    72.0|
|1588738|   119.0|
|1338783|    61.0|
|1965481|    75.0|
|1057724|    96.0|
+-------+--------+
only showing top 20 rows

In [54]:
# active days per user 2018-10-1 - 2018-11-30
active_days = df.groupby('userId').agg(round(((F.max(unix_timestamp('timestamp')) - F.min(unix_timestamp('timestamp')))/86400)).alias('active_days'))
active_days.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----------+
| userId|active_days|
+-------+-----------+
|1658815|       60.0|
|1178731|       61.0|
|1057724|       58.0|
|1718034|        9.0|
|1069552|       56.0|
|1588738|       58.0|
|1875484|       28.0|
|1396828|       43.0|
|1983423|       41.0|
|1142513|       57.0|
|1384823|       45.0|
|1200956|       59.0|
|1271218|       60.0|
|1965481|       59.0|
|1339632|       59.0|
|1803077|       44.0|
|1190352|       57.0|
|1619792|       56.0|
|1612069|       57.0|
|1002185|       58.0|
+-------+-----------+
only showing top 20 rows

# 4 Feature Engineering
Next we build out the features we find promising to train our model on. After aggregating features by individual customers, I will also exam features by visuals and some tests.

Selected features are:
- gender
- level of customer: paid or free
- sum of artists per customer
- sum of songs per customer
- sum of sessions per customer
- sum of length per customer
- sum of thumbs up given per customer
- sum of thumbs down given per customer
- sum of friends added per customer
- sum of songs added to playlist per customer
- sum of advertisements showed per customer
- sum of errors per customer
- sum of help page visited per customer
- average items in session per customer
- average session length per customer



I also took these features on that we can describe churned customers better:
- cancellation confirmation
- submit downgrade
- sum churn feature (calculates all variations to churn)

In [55]:
# DATABRICKS data frame# create user_df with aggregated features grouping by userId
# create new columns: cancelled, downgraded,sum_churn,sum_artists, sum_songs, sum_sessions, avg_items
# sum_lenght, avg_lenght, sum_tup, sum_tdown, sum_friend, sum_playlist, sum_advert,sum_help


user_df = df.groupby('userId')\
    .agg(
 # user_df features:
    first(when(col('gender') == "F", 1).otherwise(0)).alias('gender'),
    first(when(col('level') == "paid", 1).otherwise(0)).alias('level'),
    F.sum(when(col('page') == "Cancellation Confirmation", 1).otherwise(0)).alias('cancelled'),
    F.sum(when(col('page') == "Submit Downgrade", 1).otherwise(0)).alias('downgraded'),
    F.sum(when((col('cancelled')==1) | (col('downgraded')==1),1).otherwise(0)).alias('sum_churn'),
    F.sum(when(col('page') == "NextSong", 1).otherwise(0)).alias('sum_songs'),
    F.sum(when(col('sessionId')> 1, 1).otherwise(0)).alias('sum_sessions'),
    F.avg(when(col('itemInSession')> 1, 1).otherwise(0)).alias('avg_items'),
    F.sum(when(col('length')>1, 1).otherwise(0)).alias('sum_lenght'),
    F.avg(when(col('length')>1, 1).otherwise(0)).alias('avg_lenght'),
    F.sum(when(col('page') == "Thumbs Up", 1).otherwise(0)).alias('sum_tup'),
    F.sum(when(col('page') == "Thumbs Down", 1).otherwise(0)).alias('sum_tdown'),
    F.sum(when(col('page') == "Add Friend", 1).otherwise(0)).alias('sum_friend'),
    F.sum(when(col('page') == "Add to Playlist", 1).otherwise(0)).alias('sum_playlist'), 
    F.sum(when(col('page') == "Roll Advert", 1).otherwise(0)).alias('sum_advert'),
    F.sum(when(col('page') == "Error", 1).otherwise(0)).alias('sum_errors'),
    F.sum(when(col('page') == "Help", 1).otherwise(0)).alias('sum_help'))



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [57]:
# join previously created features registered and active days in user_df 
temp_df = reg_days.join(active_days,['userId'])
user_df = user_df.join(temp_df,['userId'])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [59]:
# create new churn value 0 or 1 instead of churn values 1-3
label = df.groupby('userId').agg(countDistinct('churn').alias('sum_churn')).dropDuplicates()
label = label.groupby('userId')\
    .agg(first(when(col('sum_churn') > 1, 1).otherwise(0)).alias('churn'))


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [60]:
user_df = user_df.join(label, ['userId'])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [61]:
# now the whole dataset is ready for examing, so processing the last check for missing values
user_df.select([count(when(isnull(c), c)).alias(c) for c in user_df.columns]).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+------+-----+---------+----------+---------+---------+------------+---------+----------+----------+-------+---------+----------+------------+----------+----------+--------+--------+-----------+-----+
|userId|gender|level|cancelled|downgraded|sum_churn|sum_songs|sum_sessions|avg_items|sum_lenght|avg_lenght|sum_tup|sum_tdown|sum_friend|sum_playlist|sum_advert|sum_errors|sum_help|reg_days|active_days|churn|
+------+------+-----+---------+----------+---------+---------+------------+---------+----------+----------+-------+---------+----------+------------+----------+----------+--------+--------+-----------+-----+
|     0|     0|    0|        0|         0|        0|        0|           0|        0|         0|         0|      0|        0|         0|           0|         0|         0|       0|       0|          0|    0|
+------+------+-----+---------+----------+---------+---------+------------+---------+----------+----------+-------+---------+----------+------------+----------+--------

In [62]:
# all variations of customers who are in churn (1 = Female, 0 = Male)
churn = user_df.dropDuplicates(['userId', 'cancelled','downgraded','sum_churn']).groupby(['gender','cancelled','downgraded','sum_churn']).count().sort('sum_churn').show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+---------+----------+---------+-----+
|gender|cancelled|downgraded|sum_churn|count|
+------+---------+----------+---------+-----+
|     1|        0|         0|        0| 6350|
|     0|        0|         0|        0| 6964|
|     1|        0|         1|        1| 1521|
|     0|        0|         1|        1| 1623|
|     1|        1|         0|        1| 1788|
|     0|        1|         0|        1| 2072|
|     1|        0|         2|        2|  330|
|     0|        0|         2|        2|  319|
|     1|        1|         1|        2|  431|
|     0|        1|         1|        2|  442|
|     0|        1|         2|        3|  109|
|     1|        0|         3|        3|   61|
|     1|        1|         2|        3|   89|
|     0|        0|         3|        3|   72|
|     1|        1|         3|        4|   36|
|     0|        0|         4|        4|   13|
|     0|        1|         3|        4|   23|
|     1|        0|         4|        4|   11|
|     1|        1|         4|     

As we can see in the table above, there are 1788 female and 2072 male customers who have cancelled directly (not being downgraded first). This is the largest group in variations of churned customers with 3860 customers togteher. 

sum of churn 1
Second biggest group of variations is customers who have downgraded once: 1521 female and 1623 male, together 3144 customers. 

sum of churn 2
Customers who have downgraded once and cancelled are 431 female and 442 male, together 873.

sum of churn 3-5
some of the customers have downgraded 3 to 5 times and then cancelled. This is not a big number of customers but would be interesting to searhc more. Customers in this group together are:

female 61 + 89 + 36 + 11 + 2 = 199

male 109 + 72 + 13 + 23 + 3 = 220

In [63]:
# save user_df copy for in case
#user_df.write.mode('overwrite').save('s3://<BUCKET_NAME>/user_df.json', format='json')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [64]:
# save user_df to csv file
#user_df\
#.coalesce(1)\
#.write\
#.mode('overwrite')\
#.option('header', 'true')\
#.csv('s3://<BUCKET_NAME>/user_df.csv')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [61]:
user_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

22277

# 5 Modeling
Split the full dataset into train, test, and validation sets. Test out several of the machine learning methods you learned. Evaluate the accuracy of the various models, tuning parameters as necessary. Determine your winning model based on test accuracy and report results on the validation set. Since the churned users are a fairly small subset, I suggest using F1 score as the metric to optimize.

In [3]:
# load dataset for setting up ML pipeline
#user_df = spark.read.json('s3://<BUCKET_NAME>/user_df.json')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
user_df.persist()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[active_days: double, avg_items: double, avg_lenght: double, cancelled: bigint, churn: bigint, downgraded: bigint, gender: bigint, level: bigint, reg_days: double, sum_advert: bigint, sum_churn: bigint, sum_errors: bigint, sum_friend: bigint, sum_help: bigint, sum_lenght: bigint, sum_playlist: bigint, sum_sessions: bigint, sum_songs: bigint, sum_tdown: bigint, sum_tup: bigint, userId: string]

In [4]:
# drop columns for ML dataframe: new name user_ml
user_df = user_df.drop('userId', 'cancelled','downgraded','sum_churn', 'gender','level')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
# setting features, split dataset to tarin and test
#features = [col for col in user_df.columns if col!='churn'] 
features = ['sum_songs', 'sum_sessions','sum_lenght','sum_tup',\
              'sum_tdown','sum_friend','sum_playlist','sum_advert',\
             'sum_help','sum_errors','avg_items','avg_lenght','reg_days','active_days']
train, test = user_df.randomSplit([0.8, 0.2], seed=42)
train = train.cache()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
def model_pipeline(clf, params):
    """
    Create features as vectors
    Standardize vectorized features
    Create a pipeline to build a ML models
    """
    assembler = VectorAssembler(inputCols=features, outputCol='vect_features')
    scaler = StandardScaler(inputCol='vect_features', outputCol='scaled_features',withStd = True)
    #scaler = MinMaxScaler(inputCol='vect_features1', outputCol='scaled_features1')
    pipeline = Pipeline(stages=[assembler, scaler, clf])

    model = CrossValidator(
        estimator=pipeline,
        estimatorParamMaps=params,
        evaluator = MulticlassClassificationEvaluator(labelCol='churn', metricName='f1'),
        numFolds=3,
    )
    return model

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
# evaluation metrics for ML models
def model_metrics(pred_model):
   
    evaluator = MulticlassClassificationEvaluator(predictionCol = 'prediction',labelCol='churn')
    print('The metrics for the model:')
    print('The f1 Score is {}'.format(evaluator.evaluate(pred_model, {evaluator.metricName : 'f1'})))
    print('The accuracy is {}'.format(evaluator.evaluate(pred_model, {evaluator.metricName : 'accuracy'})))

    auc_evaluator = BinaryClassificationEvaluator(labelCol='churn')
    metric = auc_evaluator.evaluate(pred_model, {auc_evaluator.metricName: 'areaUnderROC'})
    print('The areaUnderROC: {:.2%}'.format(metric))
    

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### logistic regression classifier

In [70]:
# logistic regression classifier
mod1 = LogisticRegression(featuresCol='scaled_features', labelCol='churn')
mod1_param = ParamGridBuilder().build()
model_lr = model_pipeline(mod1, mod1_param)
# fit logistic regression 
cv_model_lr = model_lr.fit(train)
# predict logistic regression 
pred_lr = cv_model_lr.transform(test)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### random forest classifier

In [73]:
# random forest classifier
mod2 = RandomForestClassifier(featuresCol='scaled_features', labelCol='churn')
mod2_param = ParamGridBuilder().build()
model_rf = model_pipeline(mod2, mod2_param)
# fit random forest
cv_model_rf = model_rf.fit(train)
# predict random forest
pred_rf = cv_model_rf.transform(test)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-73:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 2496



#### gradient boost classifier

In [74]:
# gradient boost classifier
mod3 =GBTClassifier(featuresCol='scaled_features', labelCol='churn')
mod3_param = ParamGridBuilder().build()
model_gb = model_pipeline(mod3, mod3_param)
# fit gradient boost
cv_model_gb = model_gb.fit(train)
# predict gradient boost
pred_gb = cv_model_gb.transform(test)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-74:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 2695



In [71]:
# metrics for logistic regression classifier
model_metrics(pred_lr)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The metrics for the model:
The f1 Score is 0.7339593769127118
The accuracy is 0.738580931263858
The areaUnderROC: 80.04%

Exception in thread cell_monitor-70:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 2171



In [75]:
# metrics for random forrest classifier
model_metrics(pred_rf)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The metrics for the model:
The f1 Score is 0.7576566440782728
The accuracy is 0.7625277161862528
The areaUnderROC: 82.45%

In [76]:
# metrics for gradient boost classifier
model_metrics(pred_gb)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The metrics for the model:
The f1 Score is 0.7619165060479449
The accuracy is 0.7662971175166298
The areaUnderROC: 84.21%

**Note** Best two models were Logistic Regression Classifier and Random Forrest Classifier. Next these models will be tuned for the final ranking.

#### Tuning the logistic regression classifier

In [9]:
# tuned logistic regression classifier
# https://spark.apache.org/docs/latest/ml-tuning.html
lr_tuned = LogisticRegression(featuresCol='scaled_features', labelCol='churn')
param_grid_tuned_lr = ParamGridBuilder()\
    .addGrid(lr_tuned.regParam, [0.1, 0.01]) \
    .addGrid(lr_tuned.fitIntercept, [False, True])\
    .build()
    #.addGrid(lr_tuned.elasticNetParam, [0.0, 0.5, 1.0])\
    
model_lr_tuned = model_pipeline(lr_tuned, param_grid_tuned_lr)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
# fit tuned best logistic regression classifier
cv_model_lr_tuned = model_lr_tuned.fit(train)
best_model = cv_model_lr_tuned.bestModel

# predict tuned best logistic regression
pred_best_model = best_model.transform(test)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-10:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 398



In [11]:
# metrics for tuned best logistic regression classifier
model_metrics(pred_best_model)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The metrics for the model:
The f1 Score is 0.7372242705436406
The accuracy is 0.7440609592111161
The areaUnderROC: 80.10%

### Tuning Random Forrest Classifier

In [12]:
# toimii
def tune_pipeline(train, test, numTrees=[10,100], maxDepth=[5,20]):
    """
    describe
    """
    assembler = VectorAssembler(inputCols=features, outputCol='vect_features')
    scaler = StandardScaler(inputCol='vect_features', outputCol='scaled_features',withStd = True)
    #scaler = MinMaxScaler(inputCol='vect_features', outputCol='scaled_features')
    
    clf = RandomForestClassifier(featuresCol='scaled_features', labelCol='churn')
    

        
    param_grid_tuned = ParamGridBuilder()\
    .addGrid(clf.numTrees,numTrees)\
    .addGrid(clf.maxDepth,maxDepth).build()

    mod = CrossValidator(
        estimator = Pipeline(stages=[assembler,scaler, clf]),
        estimatorParamMaps=param_grid_tuned,
        evaluator = MulticlassClassificationEvaluator(labelCol='churn', metricName='f1'),
        numFolds=3,
    )
    
    cvModel = mod.fit(train)
    predictions = cvModel.transform(test)
    
    model_metrics(predictions)
    
    bestPipeline = cvModel.bestModel
    
    # importances of features
    for i in range(len(bestPipeline.stages[2].featureImportances)):
        print("{} : {} \n".format(features[i], bestPipeline.stages[2].featureImportances[i]))
    
    print('Best parameters : max depth:{}, num Trees:{}'.\
          format(bestPipeline.stages[2].getOrDefault('maxDepth'), bestPipeline.stages[2].getNumTrees))
    

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
tune_pipeline(train,test)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The metrics for the model:
The f1 Score is 0.774039264960112
The accuracy is 0.7767817122366651
The areaUnderROC: 85.14%
sum_songs : 0.058721937888561955 

sum_sessions : 0.05391335325872726 

sum_lenght : 0.030341235572362502 

sum_tup : 0.04209069671310241 

sum_tdown : 0.08644245059371634 

sum_friend : 0.05446991946920268 

sum_playlist : 0.04793915039726194 

sum_advert : 0.1394908224101629 

sum_help : 0.045185897933685854 

sum_errors : 0.02596352726246073 

avg_items : 0.07709279624876421 

avg_lenght : 0.07191347188733141 

reg_days : 0.11152934642639839 

active_days : 0.1549053939382614 

Best parameters : max depth:20, num Trees:100

Best model was a Random Forrest Classifier with f1 score 0.76, accuracy 0.75 and area under ROC 77%.

### Tuning gradient boost classifier

In [8]:
# tuned gradient boost classifier
gb_tuned = GBTClassifier(featuresCol='scaled_features', labelCol='churn',maxIter = 15, seed = 42)

param_grid_tuned_gb = ParamGridBuilder().\
addGrid(gb_tuned.maxIter, [10, 15, 20])\
.addGrid(gb_tuned.maxDepth, [5, 10]).build()

model_gb_tuned = model_pipeline(gb_tuned, param_grid_tuned_gb)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
# fit tuned gradient boost classifier
cv_model_gb_tuned = model_gb_tuned.fit(train)
best_model_gb = cv_model_gb_tuned.bestModel

# predict tuned best logistic regression
pred_best_model_gb = best_model_gb.transform(test)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exception in thread cell_monitor-9:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/conda/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/conda/lib/python3.7/site-packages/awseditorssparkmonitoringwidget-1.0-py3.7.egg/awseditorssparkmonitoringwidget/cellmonitor.py", line 178, in cell_monitor
    job_binned_stages[job_id][stage_id] = all_stages[stage_id]
KeyError: 3298



In [10]:
# metrics for tuned best gradient boost classifier
model_metrics(pred_best_model_gb)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The metrics for the model:
The f1 Score is 0.7679592206034864
The accuracy is 0.7698341550874047
The areaUnderROC: 84.59%