# Load data into Spark DataFrame

In [188]:
from pyspark.sql.types import *
import pyspark.sql.functions as F
import numpy as np
import pandas as pd
from scipy import sparse
from sklearn.metrics.pairwise import cosine_similarity
from time import time

In [189]:
df2 = spark.read.csv('/Users/vl/Downloads/BitTiger-DS501-1805-master/Capstone/Capstone_Music_Box_Spark/src/data/play_ds.csv', header = 'TRUE').cache()


In [190]:
df2.show(5)

+---------+------+--------+----------+--------------------+-----------+
|      uid|device| song_id|      date|           play_time|song_length|
+---------+------+--------+----------+--------------------+-----------+
|168551042|    ar|  505355|2017-03-30|                 106|        277|
|168551417|    ar|  727161|2017-03-30|                  27|        226|
|168551408|    ar|15750838|2017-03-30|7>(123.138.230.80)TM|          0|
|168548493|    ip| 6661513|2017-03-30|                  63|        243|
|168551221|    ar| 5914258|2017-03-30|                   2|        276|
+---------+------+--------+----------+--------------------+-----------+
only showing top 5 rows



# Data cleaning

In [191]:
# change data type
df2 = df2.withColumn('date',F.col('date').cast('date'))
df2 = df2.withColumn('play_time', F.col('play_time').cast('int'))
df2 = df2.withColumn('song_length', F.col('song_length').cast('int'))

df2

DataFrame[uid: string, device: string, song_id: string, date: date, play_time: int, song_length: int]

In [192]:
# remove device column
df2 = df2.drop('device')

In [193]:
# verify song length are reasonable
df2.select('song_length').agg(F.min('song_length')).show()
df2.select('song_length').agg(F.max('song_length')).show()

+----------------+
|min(song_length)|
+----------------+
|     -2147483648|
+----------------+

+----------------+
|max(song_length)|
+----------------+
|       399313856|
+----------------+



In [194]:
## remove lines with song length less than 0 or greater than 600 seconds
df2 = df2.filter((F.col('song_length') > 0) & (F.col('song_length') <= 600))

In [195]:
# remove lines with negative play time
df2 = df2.filter((F.col('play_time') > 0))

In [196]:
# check for data inconsistency
df2.select('song_id', 'song_length').groupby('song_id')\
    .agg(F.countDistinct('song_length')).show(5)

+--------+---------------------------+
| song_id|count(DISTINCT song_length)|
+--------+---------------------------+
|11801086|                          1|
|  128181|                          2|
| 4254340|                          1|
|14321040|                          2|
| 1136678|                          1|
+--------+---------------------------+
only showing top 5 rows



In [197]:
# replace multiple song length with average length for the same song id 
avg = df2.select('song_id', 'song_length')\
    .groupby('song_id')\
    .agg(F.mean('song_length'))
df2 = df2.join(avg, on='song_id', how='left')
df2 = df2.drop('song_length')
df2

DataFrame[song_id: string, uid: string, date: date, play_time: int, avg(song_length): double]

# Construct utility matrix

In [198]:
# based on play time compute an implicit rating for each song j played by user i, rating ∈ [0:10]
def total_playtime(df):
    df = df \
        .groupBy('uid','song_id') \
        .agg(F.sum(F.col('play_time')).alias('playtime_t') 
            )
    return df

play = df2.drop('avg(song_length)','date')

play_total = total_playtime(play)

play_max = play_total.groupBy('uid').agg(F.max(F.col('playtime_t')).alias('playtime_m'))

utility_tmp = play_total.join(play_max, on = 'uid', how = 'left')

utility = utility_tmp.withColumn('rating', F.col('playtime_t')/F.col('playtime_m')*10)

utility_m = utility.drop('playtime_t', 'playtime_m')

In [264]:
utility_m.show(10)

+---------+--------+--------------------+
|      uid| song_id|              rating|
+---------+--------+--------------------+
|117677098|12792419| 0.03291314221768752|
|117677098| 5785728| 0.02567225092979627|
|117677098| 2943425|0.030609222262449397|
|117677098| 3630504| 0.09775203238653193|
|117677098|11677710| 0.02369746239673502|
|117677098|   69873| 0.07668762136721193|
|117677098|  611566|  0.0822828555442188|
|117677098| 5085956| 0.10992989500707634|
|117677098|15171705| 0.09643550669782444|
|117677098| 1099200| 0.09643550669782444|
+---------+--------+--------------------+
only showing top 10 rows



# Popularity-based recommender

In [250]:
# Recommend the top 10 songs that have been played for least 85% of it's length by the most people
df3 = df2.withColumn('playtime_85pp', F.col('play_time')/F.col('avg(song_length)'))
df4 = df3.filter(~(df3["song_id"].isNull()))
df5 = df4.filter(~(df4["song_id"] == 0))

In [261]:
# Recommender: past 7-day top 10
rec1 = df5.filter(F.col('date').between('2017-05-06', '2017-05-13'))
df_rec_7 = rec1.groupBy('song_id').agg(
    F.count(F.col('playtime_85pp') >= 0.85).alias('count')
).sort('count', ascending=False)

df_rec_7.show(10)

+--------+-----+
| song_id|count|
+--------+-----+
| 9950164| 6437|
| 5237384| 4966|
|15249349| 4218|
| 5114569| 3003|
|13273544| 2921|
| 6468891| 2752|
| 6657692| 2280|
|15807836| 2025|
| 3620537| 2016|
|16827761| 1873|
+--------+-----+
only showing top 10 rows



In [262]:
# Recommender: last 30-day top 10
rec2 = df5.filter(F.col('date').between('2017-04-13', '2017-05-13'))
df_rec_30 = rec2.groupBy('song_id').agg(
    F.count(F.col('playtime_85pp') >= 0.85).alias('count')
).sort('count', ascending=False)

df_rec_30.show(10)

+--------+-----+
| song_id|count|
+--------+-----+
| 9950164|35156|
|15249349|22034|
| 5237384|20734|
| 6468891|14302|
|13273544|13533|
| 5114569|13364|
|15807836|10904|
|23646084|10789|
| 6657692|10620|
| 3620537|10082|
+--------+-----+
only showing top 10 rows

