# Sparkify Churn Analysis

We want to understand which customers have a high likelihood of downgrading from their premium subscription or cancelling their service altogether.

Since we're working with a large dataset of 12GB, we'll utilise Spark clusters to provide the extra computing power.

In [70]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, to_date, to_timestamp, from_unixtime, concat_ws, substring, month, year, count, countDistinct
from pyspark.sql.functions import sum as Fsum

In [2]:
# Create spark session
spark = SparkSession \
    .builder \
    .appName("Sparkify") \
    .getOrCreate()

# Load and Clean Dataset

For much of the process of exploring the data and training the model, we will use a small 128MB subset of the 12GB dataset, before using the full 12GB dataset for the final churn analysis.

The dataset contains rows for unregistered users who are yet to sign into Sparkify. For the purposes of our analysis, we'll remove these rows from our dataset. They're identified by rows containing no session or userId.

In [3]:
# Read in full sparkify dataset
# event_data = "s3n://udacity-dsnd/sparkify/sparkify_event_data.json"

# Mini sparkify dataset hosted in an S3 bucket
# event_data = "s3n://udacity-dsnd/sparkify/mini_sparkify_event_data.json"

# Get local file
event_data = "mini_sparkify_event_data.json"

df = spark.read.json(event_data)
df.head()

Row(artist='Martha Tilston', auth='Logged In', firstName='Colin', gender='M', itemInSession=50, lastName='Freeman', length=277.89016, level='paid', location='Bakersfield, CA', method='PUT', page='NextSong', registration=1538173362000, sessionId=29, song='Rockpools', status=200, ts=1538352117000, userAgent='Mozilla/5.0 (Windows NT 6.1; WOW64; rv:31.0) Gecko/20100101 Firefox/31.0', userId='30')

Remove rows without a user or session ID.

In [4]:
df = df.where((col('userId') != '') | (col('sessionId') != '' ))

In [5]:
df.count()

278154

# Exploratory Data Analysis

We want to understand churn rat. Let's first look at the pages that users can visit. These represent the overall event that occurs.s.

In [6]:
df.groupBy('page').count().orderBy('page').show(truncate=False)

+-------------------------+------+
|page                     |count |
+-------------------------+------+
|About                    |495   |
|Add Friend               |4277  |
|Add to Playlist          |6526  |
|Cancel                   |52    |
|Cancellation Confirmation|52    |
|Downgrade                |2055  |
|Error                    |252   |
|Help                     |1454  |
|Home                     |10082 |
|Logout                   |3226  |
|NextSong                 |228108|
|Roll Advert              |3933  |
|Save Settings            |310   |
|Settings                 |1514  |
|Submit Downgrade         |63    |
|Submit Upgrade           |159   |
|Thumbs Down              |2546  |
|Thumbs Up                |12551 |
|Upgrade                  |499   |
+-------------------------+------+



## Churn Rate

We define churning in this analysis as a user cancelling their subscription and hitting the 'Cancellation Confirmation' page. We make it so that users will only have their cancellation recorded once to give us a binary result.

In [7]:
df = df.withColumn("Churn", when(df.page == 'Cancellation Confirmation', 1).otherwise(0))
user_df = df.groupBy('userId').agg(when(Fsum('churn')>=1, 1).otherwise(0).alias('churn'))
user_df.groupBy('churn').count().show()

+-----+-----+
|churn|count|
+-----+-----+
|    1|   52|
|    0|  173|
+-----+-----+



In [8]:
churn_rate = user_df.select(Fsum('churn')).collect()[0][0]/user_df.select(user_df.churn).count()
print(f'{churn_rate:.2%}')

23.11%


# User level features

- Thumbs up/down in last 3 months (+change from month before?)
- Lifetime as a registered user
- Number of different artists listened to
- Time since last active
- Average number of songs listened to per month
- Total number of sessions

### Potential Key Features

- Number of friends
- Number of songs listened to in last month
- Time spent as paid user
- Number of adverts
- Thumbs up/down - counts as an engagement interaction

In [74]:
df.groupBy('userId').agg(
    when(Fsum('churn')>=1, 1).otherwise(0).alias('churn'),
    count(when(col('Page') == 'NextSong', True)).alias('songs'),
    countDistinct('artist').alias('uniqueArtists'),
    count(when(col('Page').isin(['Thumbs Up', 'Thumbs Down']), True)).alias('thumbs'),
).show()

+------+-----+-----+-------------+------+
|userId|churn|songs|uniqueArtists|thumbs|
+------+-----+-----+-------------+------+
|100010|    0|  275|          252|    22|
|200002|    0|  387|          339|    27|
|   125|    1|    8|            8|     0|
|    51|    1| 2111|         1385|   121|
|   124|    0| 4079|         2232|   212|
|     7|    0|  150|          142|     8|
|    15|    0| 1914|         1302|    95|
|    54|    1| 2841|         1744|   192|
|   155|    0|  820|          643|    61|
|   132|    0| 1928|         1299|   113|
|   154|    0|   84|           78|    11|
|100014|    1|  257|          233|    20|
|   101|    1| 1797|         1241|   102|
|    11|    0|  647|          534|    49|
|   138|    0| 2070|         1332|   119|
|300017|    0| 3632|         2070|   331|
|    29|    1| 3028|         1804|   176|
|    69|    0| 1125|          865|    81|
|100021|    1|  230|          207|    16|
|    42|    0| 3573|         2073|   191|
+------+-----+-----+-------------+

# Appendix

## Downgrades

Note how people can downgrade multiple times. Here we see people downgrading up to 3 times. This was not the case for cancellations.

In [None]:
dff = df.withColumn("Downgrades", when(df.page == 'Submit Downgrade', 1).otherwise(0))
user_dff = dff.groupBy('userId').agg(Fsum('Downgrades').alias('Downgrades'))
user_dff.groupBy('Downgrades').count().show()


+----------+-----+
|Downgrades|count|
+----------+-----+
|         0|  176|
|         1|   37|
|         3|    2|
|         2|   10|
+----------+-----+



## Monthly figures

These were dropped since it was taking a long time to get monthly numbers for a subset of data only lasting 3 months. Figures like this are key for reporting but maybe not for churn analysis.

### Key metrics

- Montly active users over time
- Daily active users
- Average listen time per user, incl. partial listens and repeats
- Total paid vs. unpaid users
- Total ads served over time

In [None]:
# Convert the 'ts' column into a datetime feature
df = df.withColumn(
    'unix_timestamp',
    concat_ws(
        ".",
        # Pyspark doesn't handle milliseconds so we divide by 1000
        from_unixtime((col("ts")/1000), "yyyy-MM-dd HH:mm:ss"),
        substring(col("ts"), -3, 3)
    )
)

# Convert this to a daily date feature
df = df.withColumn('date', to_date(col('unix_timestamp')))

df = df.withColumn('month_year', concat_ws('-', month('date'), year('date')))

# These columns are in case of calculating the number of songs/adverts played per month
df = df.withColumn("NextSong", when(df.page == 'NextSong', 1).otherwise(0))
df = df.withColumn("RollAdvert", when(df.page == 'Roll Advert', 1).otherwise(0))

In [None]:
daily_df = df.groupby('userId', 'date').sum().select(
    col('userId'),
    col('date'),
    col('sum(length)').alias('total_length'),
    col('sum(NextSong)').alias('songs'),
    col('sum(RollAdvert)').alias('adverts'),
).toPandas()

In [None]:
daily_df.date = pd.to_datetime(daily_df.date)

In [None]:
monthly_df = df.groupby('userId', 'month_year').sum().select(
    col('userId'),
    col('month_year'),
    col('sum(length)').alias('total_length'),
    col('sum(churn)').alias('churn'),
    col('sum(NextSong)').alias('songs'),
    col('sum(RollAdvert)').alias('adverts'),
)
monthly_df.show(3)

In [None]:
monthly_df.groupBy('month_year').count().show()