In [18]:
# Import relevant libraries
from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
%matplotlib inline

import pandas as pd
import numpy as np
import datetime
import pyspark.sql.functions as F

In [19]:
spark = SparkSession.builder.appName("Sparkify").getOrCreate()

In [20]:
# Read data file
path = "medium-sparkify-event-data.json"
user_data = spark.read.json(path)

In [21]:
# Check schema
user_data.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [22]:
# First row of user data
user_data.take(1)

[Row(artist='Martin Orford', auth='Logged In', firstName='Joseph', gender='M', itemInSession=20, lastName='Morales', length=597.55057, level='free', location='Corpus Christi, TX', method='PUT', page='NextSong', registration=1532063507000, sessionId=292, song='Grand Designs', status=200, ts=1538352011000, userAgent='"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', userId='293')]

In [23]:
# Create table from data frame. Allows us to use the Spark sql api to interact with the data
user_data.createOrReplaceTempView("user_data_table")

In [24]:
# Run a query to retrieve all columns of the first row of user data
query = """select * from user_data_table"""
spark.sql(query).show(1)

+-------------+---------+---------+------+-------------+--------+---------+-----+------------------+------+--------+-------------+---------+-------------+------+-------------+--------------------+------+
|       artist|     auth|firstName|gender|itemInSession|lastName|   length|level|          location|method|    page| registration|sessionId|         song|status|           ts|           userAgent|userId|
+-------------+---------+---------+------+-------------+--------+---------+-----+------------------+------+--------+-------------+---------+-------------+------+-------------+--------------------+------+
|Martin Orford|Logged In|   Joseph|     M|           20| Morales|597.55057| free|Corpus Christi, TX|   PUT|NextSong|1532063507000|      292|Grand Designs|   200|1538352011000|"Mozilla/5.0 (Mac...|   293|
+-------------+---------+---------+------+-------------+--------+---------+-----+------------------+------+--------+-------------+---------+-------------+------+-------------+---------

In [25]:
# Are there any missing users? These are possibly people browsing the website and can be removed
# from the data set
spark.sql("""select userId, gender, registration, auth, page 
from user_data_table 
where userId is null or userId = ''""").show(5)

+------+------+------------+----------+-----+
|userId|gender|registration|      auth| page|
+------+------+------------+----------+-----+
|      |  null|        null|Logged Out| Home|
|      |  null|        null|Logged Out| Home|
|      |  null|        null|Logged Out| Home|
|      |  null|        null|Logged Out|Login|
|      |  null|        null|Logged Out| Home|
+------+------+------------+----------+-----+
only showing top 5 rows



In [26]:
# Remove the missing users and update the table
spark.sql("""select * 
from user_data_table 
where userId is null or userId <> ''""").createOrReplaceTempView("user_data_table")

In [27]:
# When did users register and when did they first start using the site?
spark.sql("""select userId, registration, ts 
from user_data_table 
order by userId, ts""").show(5)

+------+-------------+-------------+
|userId| registration|           ts|
+------+-------------+-------------+
|    10|1538159495000|1538965220000|
|    10|1538159495000|1538965243000|
|    10|1538159495000|1538965485000|
|    10|1538159495000|1538965724000|
|    10|1538159495000|1538965881000|
+------+-------------+-------------+
only showing top 5 rows



In [28]:
# Start time cannot be earlier than registration but there are such users in the data file
spark.sql("""select userId, registration, ts 
from user_data_table 
where registration > ts
order by userId, ts""").show(5)

+------+-------------+-------------+
|userId| registration|           ts|
+------+-------------+-------------+
|100051|1542205030000|1542143416000|
|100051|1542205030000|1542143492000|
|100051|1542205030000|1542143515000|
|100051|1542205030000|1542143852000|
|100051|1542205030000|1542143853000|
+------+-------------+-------------+
only showing top 5 rows



In [29]:
# How many such users are there?
spark.sql("""select count(*) 
from user_data_table 
where registration > ts""").show()

+--------+
|count(1)|
+--------+
|     100|
+--------+



In [30]:
# We remove these records and update the data table
spark.sql("""select * 
from user_data_table
where registration <= ts""").createOrReplaceTempView("user_data_table")

In [31]:
# Pages visited by the user
spark.sql("""
select distinct page
from user_data_table""").show()

+--------------------+
|                page|
+--------------------+
|              Cancel|
|    Submit Downgrade|
|         Thumbs Down|
|                Home|
|           Downgrade|
|         Roll Advert|
|              Logout|
|       Save Settings|
|Cancellation Conf...|
|               About|
|            Settings|
|     Add to Playlist|
|          Add Friend|
|            NextSong|
|           Thumbs Up|
|                Help|
|             Upgrade|
|               Error|
|      Submit Upgrade|
+--------------------+



**Churn**: We define churn as users visiting the 'Cancellation Confirmation' page.

In [32]:
# Create a table of churned users
is_churned = spark.sql("""select userId, 'yes' as churned
from user_data_table
where page = 'Cancellation Confirmation'
""")

is_churned.createOrReplaceTempView("churned_users_table")

In [33]:
# number of churned users
spark.sql("""select count(*) from churned_users_table""").show()

+--------+
|count(1)|
+--------+
|      98|
+--------+



In [34]:
# Merge churned users back into the main table and update the table
churned = spark.sql("""select t1.*, t2.churned
from user_data_table t1
left join churned_users_table t2
on t1.userId = t2.userId
order by userId, ts""")

churned = churned.na.fill({'churned': 'no'})
churned.createOrReplaceTempView("user_data_table")

In [35]:
# Number of churned users vs non-churned users
spark.sql("""select churned, count(*) 
from (
        select distinct userId, churned
        from user_data_table
        where userId <> ''
)
group by churned""").show()

+-------+--------+
|churned|count(1)|
+-------+--------+
|     no|     349|
|    yes|      98|
+-------+--------+



We create averages per user for page visits by day and month.

In [36]:
# create month and date variables from ts
spark.udf.register("get_month", lambda x: int(datetime.datetime.fromtimestamp(x / 1000.0).month))
spark.udf.register("get_date", lambda x: datetime.datetime.utcfromtimestamp(x / 1000.0).strftime('%Y-%m-%d'))

<function __main__.<lambda>(x)>

In [40]:
query = """select t.userId, 
avg(sum_NextSong) avg_nextsong_per_day,
avg(sum_Submit_Downgrade) avg_submit_downgrade_per_day,
avg(sum_Thumbs_Down) avg_sum_thumbs_down_per_day,
avg(sum_Home) avg_sum_home_per_day,
avg(sum_Downgrade) avg_sum_downgrade_per_day,
avg(sum_Roll_Advert) avg_sum_roll_advert_per_day,
avg(sum_Logout) avg_sum_logout_per_day,
avg(sum_Save_Settings) avg_sum_save_settings_per_day,
avg(sum_About) avg_sum_about_per_day,
avg(sum_Settings) avg_sum_settings_per_day,
avg(sum_Add_to_Playlist) avg_sum_add_playlist_per_day,
avg(sum_Friend) avg_sum_add_friend_per_day,
avg(sum_Thumbs_Up) avg_sum_thumbs_up_per_day,
avg(sum_Help) avg_sum_help_per_day,
avg(sum_Upgrade) avg_sum_upgrade_per_day,
avg(sum_Error) avg_sum_error_per_day,
avg(sum_Submit_Upgrade) avg_submit_upgrade_per_day

from 
(
    select userId, 
    -- get_month(ts) month,
    get_date(ts) date,
    sum(case when page = 'NextSong' then 1 else 0 end) sum_NextSong,
    sum(case when page = 'Submit Downgrade' then 1 else 0 end) sum_Submit_Downgrade,
    sum(case when page = 'Thumbs Down' then 1 else 0 end) sum_Thumbs_Down,
    sum(case when page = 'Home' then 1 else 0 end) sum_Home,
    sum(case when page = 'Downgrade' then 1 else 0 end) sum_Downgrade,
    sum(case when page = 'Roll Advert' then 1 else 0 end) sum_Roll_Advert,
    sum(case when page = 'Logout' then 1 else 0 end) sum_Logout,
    sum(case when page = 'Save Settings' then 1 else 0 end) sum_Save_Settings,
    sum(case when page = 'About' then 1 else 0 end) sum_About,
    sum(case when page = 'Settings' then 1 else 0 end) sum_Settings,
    sum(case when page = 'Add to Playlist' then 1 else 0 end) sum_Add_to_Playlist,
    sum(case when page = 'Add Friend' then 1 else 0 end) sum_Friend,
    sum(case when page = 'Thumbs Up' then 1 else 0 end) sum_Thumbs_Up,
    sum(case when page = 'Help' then 1 else 0 end) sum_Help,
    sum(case when page = 'Upgrade' then 1 else 0 end) sum_Upgrade,
    sum(case when page = 'Error' then 1 else 0 end) sum_Error,
    sum(case when page = 'Submit Upgrade' then 1 else 0 end) sum_Submit_Upgrade
    
    from user_data_table
    group by userId, date
) t
group by t.userId
order by t.userId
"""

page_avg_per_day = spark.sql(query)

In [41]:
query = """select t.userId, 
avg(sum_NextSong) avg_nextsong_per_month,
avg(sum_Submit_Downgrade) avg_submit_downgrade_per_month,
avg(sum_Thumbs_Down) avg_sum_thumbs_down_per_month,
avg(sum_Home) avg_sum_home_month,
avg(sum_Downgrade) avg_sum_downgrade_per_month,
avg(sum_Roll_Advert) avg_sum_roll_advert_per_month,
avg(sum_Logout) avg_sum_logout_per_month,
avg(sum_Save_Settings) avg_sum_save_settings_per_month,
avg(sum_About) avg_sum_about_per_month,
avg(sum_Settings) avg_sum_settings_per_month,
avg(sum_Add_to_Playlist) avg_sum_add_playlist_per_month,
avg(sum_Friend) avg_sum_add_friend_per_month,
avg(sum_Thumbs_Up) avg_sum_thumbs_up_per_month,
avg(sum_Help) avg_sum_help_per_month,
avg(sum_Upgrade) avg_sum_upgrade_per_month,
avg(sum_Error) avg_sum_error_per_month,
avg(sum_Submit_Upgrade) avg_submit_upgrade_per_month

from 
(
    select userId, 
    get_month(ts) month,
    -- get_date(ts) date,
    sum(case when page = 'NextSong' then 1 else 0 end) sum_NextSong,
    sum(case when page = 'Submit Downgrade' then 1 else 0 end) sum_Submit_Downgrade,
    sum(case when page = 'Thumbs Down' then 1 else 0 end) sum_Thumbs_Down,
    sum(case when page = 'Home' then 1 else 0 end) sum_Home,
    sum(case when page = 'Downgrade' then 1 else 0 end) sum_Downgrade,
    sum(case when page = 'Roll Advert' then 1 else 0 end) sum_Roll_Advert,
    sum(case when page = 'Logout' then 1 else 0 end) sum_Logout,
    sum(case when page = 'Save Settings' then 1 else 0 end) sum_Save_Settings,
    sum(case when page = 'About' then 1 else 0 end) sum_About,
    sum(case when page = 'Settings' then 1 else 0 end) sum_Settings,
    sum(case when page = 'Add to Playlist' then 1 else 0 end) sum_Add_to_Playlist,
    sum(case when page = 'Add Friend' then 1 else 0 end) sum_Friend,
    sum(case when page = 'Thumbs Up' then 1 else 0 end) sum_Thumbs_Up,
    sum(case when page = 'Help' then 1 else 0 end) sum_Help,
    sum(case when page = 'Upgrade' then 1 else 0 end) sum_Upgrade,
    sum(case when page = 'Error' then 1 else 0 end) sum_Error,
    sum(case when page = 'Submit Upgrade' then 1 else 0 end) sum_Submit_Upgrade
    
    from user_data_table
    group by userId, month
) t
group by t.userId
order by t.userId
"""

page_avg_per_month = spark.sql(query)

In [42]:
# Create tables for averages
page_avg_per_day.createOrReplaceTempView("page_avg_per_day_table")
page_avg_per_month.createOrReplaceTempView("page_avg_per_month_table")

In [43]:
# Examine gender distribution
spark.sql("""select gender, count(*)
from 
(
    select distinct userId, gender from user_data_table)
    group by gender
""").show()

+------+--------+
|gender|count(1)|
+------+--------+
|     F|     197|
|     M|     250|
+------+--------+



In [44]:
# Gender distribution by churned/non-churned. There does not appear to be much difference in churn rates by gender
spark.sql("""select churned, gender, count(*)
from 
(select distinct userId, gender, churned from user_data_table)
group by churned, gender
order by churned, gender
""").show()

+-------+------+--------+
|churned|gender|count(1)|
+-------+------+--------+
|     no|     F|     153|
|     no|     M|     196|
|    yes|     F|      44|
|    yes|     M|      54|
+-------+------+--------+



In [45]:
# Users visit the NextSong page most frequently
spark.sql("""select page, count(*) num_visits
from user_data_table
group by page
order by num_visits desc""").show()

+--------------------+----------+
|                page|num_visits|
+--------------------+----------+
|            NextSong|    432808|
|           Thumbs Up|     23824|
|                Home|     19084|
|     Add to Playlist|     12347|
|          Add Friend|      8086|
|         Roll Advert|      7760|
|              Logout|      5988|
|         Thumbs Down|      4909|
|           Downgrade|      3811|
|            Settings|      2963|
|                Help|      2644|
|               About|      1025|
|             Upgrade|       968|
|       Save Settings|       585|
|               Error|       503|
|      Submit Upgrade|       287|
|    Submit Downgrade|       117|
|              Cancel|        98|
|Cancellation Conf...|        98|
+--------------------+----------+



We create a separate column 'hour' and then calculate the average number of times the user visits the 'NextSong' per hour during his or her time on the site.

In [46]:
# Create hour variable and update table 
spark.udf.register("get_hour", lambda x: int(datetime.datetime.fromtimestamp(x / 1000.0).hour))

spark.sql('''
          SELECT *, get_hour(ts) AS hour
          FROM user_data_table 
          '''
          ).createOrReplaceTempView("user_data_table")

In [47]:
# Average number of visits to the NextSong page for all users and update table
next_song_visits = spark.sql("""select t.userId as userId, 
avg(t.visits_next_songs) avg_visits_next_songs, 
max(churned) as churned
from
(
   select userId, page, hour, count(*) as visits_next_songs, max(churned) as churned
    from user_data_table 
    where page = 'NextSong'
    group by userId, page, hour
    order by cast(hour as int) asc
) t
group by t.userId
order by t.userId
""")

next_song_visits.createOrReplaceTempView("next_song_visits_table")

In [48]:
# average mins on site per day by user and update table
spark.sql("""select t2.userId, 
avg(t2.mins_on_date) avg_mins_on_date

from
(
    select t1.userId, t1.date, sum(t1.mins_in_session) mins_on_date
    from
        (
            select userId, 
            get_date(ts) date,
            sessionId,
            ((max(ts) - min(ts))/1000)/60 mins_in_session
            from user_data_table
            group by 1, 2, 3
            order by 1, 2, 3
        ) t1
    group by 1, 2
    order by 1, 2
) t2
group by 1
order by 1
""").createOrReplaceTempView("mins_on_date_table")

In [49]:
# Number of paid vs free; most observations involve paid users
spark.sql(
"""
select level, count(*)
from user_data_table
group by level
""").show()

+-----+--------+
|level|count(1)|
+-----+--------+
| free|  109861|
| paid|  418044|
+-----+--------+



For a given user what proportion of time spent on the site is spent as a paid user? We calculate the total session minutes per user and calculate the proportion of that time the user spent at the paid level.

In [50]:
# Create a table with number of minutes spent in each session per user.

spark.sql("""
select userId, 
sessionId, 
((max(ts) - min(ts))/1000)/60 mins_in_session
from user_data_table
group by userId, sessionId
order by userId, sessionId
""").createOrReplaceTempView("mins_on_site_per_session")

In [51]:
# How much time does the user spend on the site as a paid vs free user?

query = """
select t.userId, t.level, ((sum(t.ts_in_session))/1000)/60 mins_in_session, max(t.churned) churned
from
(
    select userId, sessionId, level, max(ts) - min(ts) ts_in_session, max(churned) churned
    from user_data_table
    group by userId, sessionId, level
    order by userId, sessionId, level
) t
group by t.userId, t.level
order by t.userId, t.level
"""
spark.sql(query).createOrReplaceTempView("paid_free_table")

In [52]:
# Calculate percentage of time a given user spends as a paid user and create table

query = """
select t1.*, t2.total_mins_in_session, 
(t1.mins_in_session/t2.total_mins_in_session) * 100 as paid_pct,
t2.total_mins_in_session/t2.num_sessions avg_mins_per_session

from paid_free_table t1
join
(
    select userId, sum(mins_in_session) total_mins_in_session, count(*) num_sessions
    from mins_on_site_per_session
    group by userId
    order by userId
) t2
on t1.userId = t2.userId and t1.level = 'paid'
order by t1.userId
"""
spark.sql(query).createOrReplaceTempView("paid_pct_table")

In [53]:
# Does location matter? We group users by regions.
# We split 'location', keep information on the state and map it to regions

user_data_df = spark.sql("""select * from user_data_table""")
user_data_df2 = user_data_df.withColumn('location', F.split('location', ', ')[1])
user_data_df2.createOrReplaceTempView("user_data_table")

In [54]:
# Map states to regions and update user table
spark.sql("""
select *,
case when location in ('NC-SC', 'TN-MS-AR', 'TN-VA', 'OK', 'FL', 'TX', 'WV', 'AL', 'MD-WV',
                        'VA', 'GA', 'VA-NC', 'TN', 'MS', 'KY', 'SC', 'GA-AL', 'LA', 'MD', 'DC-VA-MD-WV') then 'south'
     when location in ('RI-MA', 'NJ', 'PA-NJ', 'NY-NJ-PA', 'NH', 'CT', 'PA', 'NY', 'PA-NJ-DE-MD',
                         'PA-NJ-DE-MD', 'MA-NH') then 'northeast'
     when location in ('UT', 'AR', 'CO', 'NV', 'WA', 'OR-WA', 'OR', 'CA', 'AK'
                         'OH', 'IN', 'IL', 'NE-IA', 'UT-ID') then 'west'
else 'midwest'
end as region
from user_data_table
""").createOrReplaceTempView("user_data_table")

In [55]:
# Distribution of churned vs non-churned users by region
query = """select churned, region, count(*)
from (
    select distinct userId, region, churned
    from user_data_table
)
group by churned, region
order by churned, region
"""
spark.sql(query).show()

+-------+---------+--------+
|churned|   region|count(1)|
+-------+---------+--------+
|     no|  midwest|      88|
|     no|northeast|      57|
|     no|    south|     111|
|     no|     west|      93|
|    yes|  midwest|      23|
|    yes|northeast|      22|
|    yes|    south|      35|
|    yes|     west|      18|
+-------+---------+--------+



In [56]:
# Visiting certain pages may be considered as positive engagement with the site
# What proportion of visits are positive?

query = """select userId, 
sum(num_interactions) total_interactions, 
sum(pos_engagement) total_pos_engagement,
sum(pos_engagement)/sum(num_interactions) * 100 pct_total_pos_engagement,
max(churned) churned
from (
        select userId, sessionId, count(*) num_interactions,
        sum(case when page in ("Add to Playlist", "Add Friend", 
                            "NextSong", "Thumbs Up", "Upgrade", "Submit Upgrade") then 1
        else 0
        end) as pos_engagement, max(churned) churned
        from user_data_table
        group by userId, sessionId
        order by userId, sessionId
    ) t
group by userId
order by userId
"""

spark.sql(query).createOrReplaceTempView("pct_pos_engagement_table")

Create an aggregate table where each row is a user and each column is a nominal or aggregate statistic. This table will be used for modelling.

In [57]:
# Create a table with aggregate data
query = """select distinct userId, gender, region, churned from user_data_table"""
spark.sql(query).createOrReplaceTempView("user_data_agg_table")

In [58]:
# Join the percentage of paid user time table
query = """select t1.*, t2.paid_pct 
from user_data_agg_table t1
left join (
        select userId, paid_pct
        from paid_pct_table
) t2
on t1.userId = t2.userId
"""

user_data_agg = spark.sql(query)
user_data_agg = user_data_agg.na.fill({'paid_pct': 0.0})
user_data_agg.createOrReplaceTempView("user_data_agg_table")

In [59]:
# Join the positive engagements table
query = """select t1.*, t2.pct_total_pos_engagement 
from user_data_agg_table t1
join (
        select userId, pct_total_pos_engagement
        from pct_pos_engagement_table
) t2
on t1.userId = t2.userId
"""

user_data_agg = spark.sql(query)
user_data_agg.createOrReplaceTempView("user_data_agg_table")

In [60]:
# Join the average number of visits to the NextSong page table
query = """select t1.*, t2.avg_visits_next_songs 
from user_data_agg_table t1
join (
        select userId, avg_visits_next_songs
        from next_song_visits_table
) t2
on t1.userId = t2.userId
"""

user_data_agg = spark.sql(query)
user_data_agg.createOrReplaceTempView("user_data_agg_table")

In [61]:
# Join the average number of minutes table
query = """select t1.*, t2.avg_mins_on_date 
from user_data_agg_table t1
join (
        select userId, avg_mins_on_date
        from mins_on_date_table
) t2
on t1.userId = t2.userId
"""

user_data_agg = spark.sql(query)
user_data_agg.createOrReplaceTempView("user_data_agg_table")

In [62]:
# Join the average visits to different pages per day table
query = """select t1.churned,
t1.gender, 
t1.region,
t1.paid_pct,
t1.pct_total_pos_engagement,
t1.avg_visits_next_songs,
t1.avg_mins_on_date,
t2.userId,
t2.avg_nextsong_per_day,
t2.avg_submit_downgrade_per_day,
t2.avg_sum_thumbs_down_per_day,
t2.avg_sum_home_per_day,
t2.avg_sum_downgrade_per_day,
t2.avg_sum_roll_advert_per_day,
t2.avg_sum_logout_per_day,
t2.avg_sum_save_settings_per_day,
t2.avg_sum_about_per_day,
t2.avg_sum_settings_per_day,
t2.avg_sum_add_playlist_per_day,
t2.avg_sum_add_friend_per_day,
t2.avg_sum_thumbs_up_per_day,
t2.avg_sum_help_per_day,
t2.avg_sum_upgrade_per_day,
t2.avg_sum_error_per_day,
t2.avg_submit_upgrade_per_day

from user_data_agg_table t1
join (
        select *
        from page_avg_per_day_table
) t2
on t1.userId = t2.userId
"""

user_data_agg = spark.sql(query)
user_data_agg.createOrReplaceTempView("user_data_agg_table")

In [63]:
# Join the average visits to different pages per month table
query = """select t1.*,
t2.avg_nextsong_per_month,
t2.avg_submit_downgrade_per_month,
t2.avg_sum_thumbs_down_per_month,
t2.avg_sum_home_month,
t2.avg_sum_downgrade_per_month,
t2.avg_sum_roll_advert_per_month,
t2.avg_sum_logout_per_month,
t2.avg_sum_save_settings_per_month,
t2.avg_sum_about_per_month,
t2.avg_sum_settings_per_month,
t2.avg_sum_add_playlist_per_month,
t2.avg_sum_add_friend_per_month,
t2.avg_sum_thumbs_up_per_month,
t2.avg_sum_help_per_month,
t2.avg_sum_upgrade_per_month,
t2.avg_sum_error_per_month,
t2.avg_submit_upgrade_per_month

from user_data_agg_table t1
join (
        select *
        from page_avg_per_month_table
) t2
on t1.userId = t2.userId
"""

user_data_agg = spark.sql(query)
user_data_agg.createOrReplaceTempView("user_data_agg_table")

In [66]:
# Save dataframe with aggregate data
path = "user_data_agg.json"
user_data_agg.write.json(path, mode="overwrite")