# Song Recommendation System - Data Preprocessing
## Design strategy 
The act of downloading itself may not signal 'true' interest. But when combined with plays (the more the merrier), it is a smoking gun indicating strong preference. Therefore, we will form a utility matrix mainly based on play data, and add bonus values to those user/song pairs where there are also downloads.

To be clear, we would only recommend songs for users who have played *something* via streaming. For cold starters, use content-based recommendation instead (which is out of the scope of this data set and project).

Moreover, we will only base our recommendations on the **last 30 days of available data**. The assumption is that recent data is a better indication of user preference. From usage experience, Youtube has in fact adopted a similar strategy; individualized recommedations on the front page are reset after ~2 weeks of inactivity.

In [1]:
from pyspark.sql.types import *

import pyspark.sql.functions as F
import matplotlib.pyplot as plt
plt.style.use('ggplot')

import numpy as np

In [2]:
# Start a Spark session, in case not invoking from CMD as 'pyspark'
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("data_cleaning").getOrCreate()

In [3]:
play = spark.read.format('csv').options(header='true', inferSchema='true').load("../data/play_ds_cleaned.csv").cache()
down = spark.read.format('csv').options(header='true', inferSchema='true').load("../data/down_ds.csv").cache()

In [4]:
# Cast date to correct type, for easier manipulation
play = play.withColumn('date', F.col('date').cast('date'))
play = play.withColumn('song_id', F.col('song_id').cast('int'))
down = down.withColumn('date', F.col('date').cast('date'))

# play_time can't be in str form!
play = play.withColumn('play_time', F.col('play_time').cast('int'))

## Restrict the time window considered

Idea: recent data is better indicator of *current* user preference. We will take the recent 30 days, for starters. Later on in `B2_compute_score.ipynb` we will further divide the data up by date into training and test sets.

Filter early to reduce data size and speeds up computations! 

In [5]:
# Check last available date - both are May 12!
play.select('date').agg(F.max('date')).show()
down.select('date').agg(F.max('date')).show()

+----------+
| max(date)|
+----------+
|2017-05-12|
+----------+

+----------+
| max(date)|
+----------+
|2017-05-12|
+----------+



In [6]:
import datetime 
from dateutil import parser

lookback_window_size = 30
last_date = parser.parse('2017-05-12').date()

play = play.filter(F.col('date') >= (last_date - datetime.timedelta(lookback_window_size)))
down = down.filter(F.col('date') >= (last_date - datetime.timedelta(lookback_window_size)))

In [7]:
# Confirm filtering
play.select('date').agg(F.min('date')).show()
down.select('date').agg(F.min('date')).show()

+----------+
| min(date)|
+----------+
|2017-04-12|
+----------+

+----------+
| min(date)|
+----------+
|2017-04-12|
+----------+



## Data cleaning

In [8]:
# Check column names/types
play

DataFrame[uid: int, device: string, song_id: int, date: date, play_time: int, song_length: double]

In [9]:
# Check column names/types
down

DataFrame[uid: int, device: string, song_id: int, date: date]

In [10]:
# Drop unused column; not distinguishing between device types when recommending
play = play.drop('device')
down = down.drop('device')

#### Handle extreme `song_length`

There are observations where `song_length` is either shorter than 5 secs, or longer than 200 hours. Reasonable to suspect that the data itself is faulty --- because a simple search reveals other observations 1) with the same `song_id`; 2) but with "normal" `song_length`.

Strategy:
1. Define "normal" `song_length` as 1 secs <= `song_length` <= 80 mins. 80 mins is the longest possible length of a CD album. As for the lower bound...maybe some people do need to find really short tracks, so let's allow for that.
2. For songs (`song_id`) that actually have normal `song_length` elsewhere in the data, compute an avarage of those normal values, and impute the entries where `song_length` is anomalous.

In [11]:
# Show the extremes of song_length
play.select('song_length').agg(F.min('song_length')).show()
play.select('song_length').agg(F.max('song_length')).show()

+----------------+
|min(song_length)|
+----------------+
|            -1.0|
+----------------+

+----------------+
|max(song_length)|
+----------------+
|        776363.0|
+----------------+



In [12]:
# Count distinct song_id with *any* anomalous song_length
anomaly_sid = play.filter( (F.col('song_length') < 1) | (F.col('song_length') > 80*60) ).select('song_id').distinct()
print(anomaly_sid.count())

# Count distinct song_id with *any* normal song_length
normal_sid = play.filter( (F.col('song_length') >= 1) & (F.col('song_length') <= 80*60) ).select('song_id').distinct()
print(normal_sid.count())

71549
299114


In [13]:
# song_id's common to both sets above
# ~80% of anomaly_sid.count()!
anomaly_sid.select('song_id').intersect(normal_sid.select('song_id')).count()

54487

In [14]:
# Demonstrate necessity of averaging: same song_id can correspond to multiple song_length
play.select('song_id', 'song_length').groupby('song_id')\
    .agg(F.countDistinct('song_length')).show(5)

+--------+---------------------------+
| song_id|count(DISTINCT song_length)|
+--------+---------------------------+
|21965412|                         13|
| 2997638|                          2|
| 9541609|                          1|
| 4795040|                          1|
| 6105580|                          1|
+--------+---------------------------+
only showing top 5 rows



In [15]:
# Compute average song_length for each of normal_sid (song_id) 
averaged_lengths = play.select('song_id', 'song_length')\
                        .filter( (F.col('song_length') >= 1) & (F.col('song_length') <= 80*60) )\
                        .groupby('song_id')\
                        .agg(F.mean('song_length'))

In [16]:
# Join with existing play data!
play = play.join(averaged_lengths, on='song_id', how='left')

# Fill gaps in avg(song_length) with existing song_length
play = play.withColumn("avg(song_length)", 
                       F.when(F.col('avg(song_length)').isNull(), 
                              F.col('song_length')).otherwise(F.col('avg(song_length)'))
                      )

In [17]:
# Drop the now redundant information
play = play.drop('song_length')

In [18]:
# Examine statistics of the new feature; seems reasonable...
play.select('avg(song_length)').agg(F.mean('avg(song_length)'), 
                                    F.stddev('avg(song_length)')).show()

+---------------------+-----------------------------+
|avg(avg(song_length))|stddev_samp(avg(song_length))|
+---------------------+-----------------------------+
|    267.2323546138402|           373.62636500890176|
+---------------------+-----------------------------+



In [19]:
# Much longer than the length considered 'normal'
play.approxQuantile("avg(song_length)", [0.5], 0.2)[0]

12091.0

In [21]:
# Set null values in avg(song_length), if any, to avg (Not using median because it's 20 hours)
# It would take too much additional data processing to utilize fancier imputation methods
# e.g. encoding the date...train K-means model...

# Copied from above
song_length_avg = 267.2323546138402

play = play.withColumn('avg(song_length)', 
                       F.when(F.col('avg(song_length)').isNull(), song_length_avg)\
                       .otherwise(F.col('avg(song_length)'))
                      )

In [22]:
# Last but not least, drop the rows with anolamous song_length
# Measure proportion of data thus affected: ~0.6%. Ok no problem.
print(play.filter((F.col('avg(song_length)') > 80.*60.) | (F.col('avg(song_length)') < 1.0)).count() \
      / play.count())

play = play.filter((F.col('avg(song_length)') <= 80.*60.) & (F.col('avg(song_length)') >= 1.0))

0.006739115838407458


#### Drop rows with missing `song_id`

In [23]:
'''
    Demonstrate that there are missing values in song_id
    Must drop observations missing song_id, because they are useless for making recommendations,
    which involves associating songs with users
'''

print("Missing values in raw data:")
for df in [play, down]:
    df.select(*(F.sum(F.col(c).isNull().cast("int")).alias(c) for c in df.columns)).show()
    
play = play.dropna(how='any', subset=['song_id'])
down = down.dropna(how='any', subset=['song_id'])

Missing values in raw data:
+-------+---+----+---------+----------------+
|song_id|uid|date|play_time|avg(song_length)|
+-------+---+----+---------+----------------+
|   1434|  0|   0|      596|               0|
+-------+---+----+---------+----------------+

+---+-------+----+
|uid|song_id|date|
+---+-------+----+
|  0|     72|   0|
+---+-------+----+



In [24]:
# Confirm results of dropping NAs in song_id
# Notice how there are less NAs in play_time now
# So some observations were missing both song_id and play_time!

print("Missing values after dropping NAs in song_id:")
for df in [play, down]:
    df.select(*(F.sum(F.col(c).isNull().cast("int")).alias(c) for c in df.columns)).show()

Missing values after dropping NAs in song_id:
+-------+---+----+---------+----------------+
|song_id|uid|date|play_time|avg(song_length)|
+-------+---+----+---------+----------------+
|      0|  0|   0|      419|               0|
+-------+---+----+---------+----------------+

+---+-------+----+
|uid|song_id|date|
+---+-------+----+
|  0|      0|   0|
+---+-------+----+



#### Impute missing `play_time`
There are entries marked explicitly as NA, and then there are the ones marked '0.0'. The latter is in fact a stand-in for null values. We will have to deal with both.

For the former, we will compute the median `play_time` for each corresponding `song_id`; hopefully that covers most of the missing values. Then, we will impute the latter (whatever that remains) using the global median.

In [25]:
songs_missing_play_time = play.filter(F.col('play_time').isNull()).select('song_id')

In [26]:
# Compute average song_length for each of normal_sid (song_id) 
def median(values_list):
    med = np.median(values_list)
    return float(med)
udf_median = F.udf(median, FloatType())

median_play_time = play.select('song_id', 'play_time')\
                        .filter( (~F.col('play_time').isNull()) & (F.col('song_id').isin(*songs_missing_play_time)) )\
                        .groupby('song_id')\
                        .agg(udf_median(F.collect_list(F.col('play_time'))))

# Rename for easy reference
median_play_time = median_play_time.withColumnRenamed('median(collect_list(play_time, 0, 0))',
                                                      'median_play_time')

In [27]:
play = play.join(median_play_time, how='left', on='song_id')

In [28]:
# Retain median_value where play_time was null
# Replace with existing value otherwise
play = play.withColumn('median_play_time',
                       F.when(F.col('play_time').isNull(), F.col('median_play_time'))\
                       .otherwise(F.col('play_time'))
                      )

In [29]:
# There are still 32 NAs in play_time!
play.select(*(F.sum(F.col(c).isNull().cast("int")).alias(c) for c in play.columns)).show()

+-------+---+----+---------+----------------+----------------+
|song_id|uid|date|play_time|avg(song_length)|median_play_time|
+-------+---+----+---------+----------------+----------------+
|      0|  0|   0|      419|               0|              32|
+-------+---+----+---------+----------------+----------------+



In [30]:
# Compute (approximate) global median of play_time
global_median_play_time = play.select('play_time').approxQuantile('play_time', [0.5], 0.1)[0]

In [31]:
# 4 mins and 8 seconds --- reasonable when most tracks are likely to be pop songs!
print(global_median_play_time)

248.0


In [32]:
# Compute global median and impute remaining missing values with it
# global_median_play_time = play.select('play_time').approxQuantile('play_time', [0.5], 0.1)[0]
play = play.withColumn('play_time',
                      F.when((F.col('play_time')==0.0) | (F.col('play_time').isNull()), global_median_play_time)\
                       .otherwise(F.col('play_time'))
                      )

In [33]:
# Confirm completion of imputation!
play.select(F.sum(F.col('play_time').isNull().cast('int'))).show()

+-------------------------------------+
|sum(CAST((play_time IS NULL) AS INT))|
+-------------------------------------+
|                                    0|
+-------------------------------------+



### Save song_ids and uids that're relevant

In [None]:
# songs = play.select('song_id').union(down.select('song_id'))
# users = play.select('uid').union(down.select('uid'))

# # MUST run distinct(), because the union method above preserves duplicates
# songs = songs.distinct()
# users = users.distinct()

# # Save to file
# songs.toPandas().to_csv("../data/rec/rec_songs.csv", index=False)
# users.toPandas().to_csv("../data/rec/rec_users.csv", index=False)

## Engineering intermediate features

For each user, we want
1. Number of plays per song, throughout the 30 days
2. Number of downloads per song, ditto
3. Ratio of play length / song length, ditto

#### Plays and downloads per song, per user

In [34]:
plays_per_song = play.groupby('uid', 'song_id').count()
downloads_per_song = down.groupby('uid', 'song_id').count()

# Rename to faciliate joining later
plays_per_song = plays_per_song.withColumnRenamed('count', 'p_count')
downloads_per_song = downloads_per_song.withColumnRenamed('count', 'd_count')

In [35]:
# Confirm that there are no missing values
plays_per_song.select(*(F.sum(F.col(c).isNull().cast("int")).alias(c) for c in plays_per_song.columns)).show()
downloads_per_song.select(*(F.sum(F.col(c).isNull().cast("int")).alias(c) for c in downloads_per_song.columns)).show()

+---+-------+-------+
|uid|song_id|p_count|
+---+-------+-------+
|  0|      0|      0|
+---+-------+-------+

+---+-------+-------+
|uid|song_id|d_count|
+---+-------+-------+
|  0|      0|      0|
+---+-------+-------+



#### Ratio of play length / song length

From exploring the data, several issues emerged; they must be solved before the titular feature can be computed.
- `play_time` may exceed `song_length`, sometimes by a lot (!)

Here are the remedies taken
- In lieu of further information, will assume that the entire song is played (i.e. ratio 1)

We will demonstrate the issues below, and tackle them one-by-one.

In [39]:
'''
    Illustrating how play_time > song_length sometimes
    Unfortunate anomalies: e.g. for song 1645 below, there are various play_time, some of which
    may not actually be the whole song. But in the current scheme, the whole song is *assumed* to be played
    
    Note that 1645's song_length isn't an integer; it's probably an imputed value. To avoid the anaomaly described above,
    we should start from making sure that 1) the logging system functions properly; 2) we have metadata for as many
    songs as possible.
'''

play.filter(F.col('play_time') > F.col('avg(song_length)')).select('song_id', 'play_time', 'avg(song_length)').show(5)

play.filter(F.col('play_time') > 2*F.col('avg(song_length)')).select('song_id', 'play_time', 'avg(song_length)').show(5)

+-------+---------+------------------+
|song_id|play_time|  avg(song_length)|
+-------+---------+------------------+
|   1645|    274.0|272.82608695652175|
|   1645|    289.0|272.82608695652175|
|   1645|    317.0|272.82608695652175|
|   1645|    305.0|272.82608695652175|
|   1645|    278.0|272.82608695652175|
+-------+---------+------------------+
only showing top 5 rows

+-------+---------+------------------+
|song_id|play_time|  avg(song_length)|
+-------+---------+------------------+
| 124743|    248.0|              84.0|
| 133948|  60160.0|             233.0|
| 235318|  48540.0|219.35714285714286|
| 235318|   3252.0|219.35714285714286|
| 235318|   6360.0|219.35714285714286|
+-------+---------+------------------+
only showing top 5 rows



Resolution

In [41]:
# For each play event, compute the ratio of played length over total song length
# In case song_length == 0, set the ratio to 0 also

play_over_song_length = play.withColumn('ps_ratio', 
                                        F.when(F.col('play_time') < F.col('avg(song_length)'), F.col('play_time')/F.col('avg(song_length)'))\
                                        .otherwise(1.0))

play_over_song_length = play_over_song_length.select('uid', 'song_id', 'ps_ratio')

In [48]:
# Verify integrity; and what is the average anyways?
play_over_song_length.agg(F.round(F.min('ps_ratio'),4), F.max('ps_ratio'), F.mean('ps_ratio')).show()

+-----------------------+-------------+-----------------+
|round(min(ps_ratio), 4)|max(ps_ratio)|    avg(ps_ratio)|
+-----------------------+-------------+-----------------+
|                 2.0E-4|          1.0|0.742947795602251|
+-----------------------+-------------+-----------------+



In [80]:
'''
    Last but not least, consider the fact that there are many rows
    with the same (uid, song_id) in play_over_song_length. However,
    to build our utility matrix, we can only have one value.
    
    For simplicity, for each (uid, song_id) pair, we will take the average 
    ps_ratio. A more elaborate (and probably better) method would be to take 
    an average weighted by event date --- more recent, higher the weight.
'''

play_over_song_length = play_over_song_length.groupby('uid', 'song_id').agg(F.mean('ps_ratio'))
play_over_song_length = play_over_song_length.withColumnRenamed("avg(ps_ratio)", "ps_ratio")

## Join features into one DataFrame


In [49]:
# Left-join from plays_per_song,
# to ensure that we keep and prioritize songs that have been played
events_per_song = plays_per_song.join(downloads_per_song, how='left', on=['uid', 'song_id'])

In [50]:
events_per_song.select(*(F.sum(F.col(c).isNull().cast("int")).alias(c) for c in events_per_song.columns)).show()

# Fill missing d_count with 0; 
# NAs arise because that particular song has not been downloaded by that particular user
events_per_song = events_per_song.fillna(0, subset=['d_count'])

+---+-------+-------+-------+
|uid|song_id|p_count|d_count|
+---+-------+-------+-------+
|  0|      0|      0|1646084|
+---+-------+-------+-------+



In [81]:
rec_features = events_per_song.join(play_over_song_length, on=['uid', 'song_id'], how='left')

In [82]:
# Check for NAs --- none, which is perfect!
rec_features.select(*( F.sum(F.col(c).isNull().cast('int')) for c in rec_features.columns)).show()

+-------------------------------+-----------------------------------+-----------------------------------+-----------------------------------+------------------------------------+
|sum(CAST((uid IS NULL) AS INT))|sum(CAST((song_id IS NULL) AS INT))|sum(CAST((p_count IS NULL) AS INT))|sum(CAST((d_count IS NULL) AS INT))|sum(CAST((ps_ratio IS NULL) AS INT))|
+-------------------------------+-----------------------------------+-----------------------------------+-----------------------------------+------------------------------------+
|                              0|                                  0|                                  0|                                  0|                                   0|
+-------------------------------+-----------------------------------+-----------------------------------+-----------------------------------+------------------------------------+



## Save intermediate features to file


In [87]:
rec_features.coalesce(1).write\
            .format("com.databricks.spark.csv")\
            .option("header", "true")\
            .save("../data/rec/rec_features.csv")

In [83]:
rec_features.count()

1777079

In [84]:
rec_features.select('uid', 'song_id').distinct().count()

1777079

In [75]:
rec_features

DataFrame[uid: int, song_id: int, p_count: bigint, d_count: bigint, ps_ratio: double]