##  Collaborative Filtering Recommender Systems using pyspark
### *Goal: to recommend songs to the music box users based on the user play activities
### *Feature selected to measure user behavior similarity: counts( play time/song_length) of the song played by each user

In [1]:
from pyspark import SparkContext
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')# the last two lines have to be added for spark.read.csv to run
spark = SparkSession(sc)

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

### Load data

In [2]:
df = spark.read.csv('../data/play_ds.csv',header=True,inferSchema=True).cache()

In [11]:
df

DataFrame[uid: int, device: string, song_id: decimal(20,0), date: timestamp, play_time: string, song_length: double]

In [3]:
df.show(10)

+---------+------+--------+-------------------+---------+-----------+
|      uid|device| song_id|               date|play_time|song_length|
+---------+------+--------+-------------------+---------+-----------+
|168540348|    ar|   77260|2017-03-30 00:00:00|    64528|        0.0|
|168551323|    ar|16889394|2017-03-30 00:00:00|       93|       93.0|
|168548223|    ar|  908636|2017-03-30 00:00:00|        1|      179.0|
|168550237|    ar| 6511070|2017-03-30 00:00:00|      144|      237.0|
|168519439|    ar|       0|2017-03-30 00:00:00|      227|      228.0|
|168551579|    ar| 8762277|2017-03-30 00:00:00|      213|      232.0|
|168538990|    ar| 6369563|2017-03-30 00:00:00|        0|      281.0|
|168551086|    ar|23491655|2017-03-30 00:00:00|      253|      254.0|
|168549973|    ip| 3194852|2017-03-30 00:00:00|        3|      644.0|
|168551524|    ar| 6635279|2017-03-30 00:00:00|        1|      231.0|
+---------+------+--------+-------------------+---------+-----------+
only showing top 10 

#### Drop missing values

In [4]:
df=df.na.drop() ## drop missing values

In [5]:
from pyspark.sql.types import DoubleType

In [6]:
df_num=df.withColumn('play_time',df['play_time'].cast(DoubleType()))

In [7]:
df_num


DataFrame[uid: int, device: string, song_id: decimal(20,0), date: timestamp, play_time: double, song_length: double]

In [22]:
df_num.show(300)

+---------+------+--------+-------------------+---------+-----------+
|      uid|device| song_id|               date|play_time|song_length|
+---------+------+--------+-------------------+---------+-----------+
|168540348|    ar|   77260|2017-03-30 00:00:00|  64528.0|        0.0|
|168551323|    ar|16889394|2017-03-30 00:00:00|     93.0|       93.0|
|168548223|    ar|  908636|2017-03-30 00:00:00|      1.0|      179.0|
|168550237|    ar| 6511070|2017-03-30 00:00:00|    144.0|      237.0|
|168519439|    ar|       0|2017-03-30 00:00:00|    227.0|      228.0|
|168551579|    ar| 8762277|2017-03-30 00:00:00|    213.0|      232.0|
|168538990|    ar| 6369563|2017-03-30 00:00:00|      0.0|      281.0|
|168551086|    ar|23491655|2017-03-30 00:00:00|    253.0|      254.0|
|168549973|    ip| 3194852|2017-03-30 00:00:00|      3.0|      644.0|
|168551524|    ar| 6635279|2017-03-30 00:00:00|      1.0|      231.0|
|168551453|    ip|19759248|2017-03-30 00:00:00|      0.0|        0.0|
|168548239|    ar| 6

In [23]:
df_num.na.drop().show(276)

+---------+------+--------+-------------------+---------+-----------+
|      uid|device| song_id|               date|play_time|song_length|
+---------+------+--------+-------------------+---------+-----------+
|168540348|    ar|   77260|2017-03-30 00:00:00|  64528.0|        0.0|
|168551323|    ar|16889394|2017-03-30 00:00:00|     93.0|       93.0|
|168548223|    ar|  908636|2017-03-30 00:00:00|      1.0|      179.0|
|168550237|    ar| 6511070|2017-03-30 00:00:00|    144.0|      237.0|
|168519439|    ar|       0|2017-03-30 00:00:00|    227.0|      228.0|
|168551579|    ar| 8762277|2017-03-30 00:00:00|    213.0|      232.0|
|168538990|    ar| 6369563|2017-03-30 00:00:00|      0.0|      281.0|
|168551086|    ar|23491655|2017-03-30 00:00:00|    253.0|      254.0|
|168549973|    ip| 3194852|2017-03-30 00:00:00|      3.0|      644.0|
|168551524|    ar| 6635279|2017-03-30 00:00:00|      1.0|      231.0|
|168551453|    ip|19759248|2017-03-30 00:00:00|      0.0|        0.0|
|168548239|    ar| 6

In [8]:
# remove the songs with 0 length
# one day play_time cannot exceed 86400s 
df_num=df_num[(df_num['song_length']>0) & (df_num['play_time']>0)&(df_num['play_time']<86400)]

In [9]:
df_num.show(20)

+---------+------+--------+-------------------+---------+-----------+
|      uid|device| song_id|               date|play_time|song_length|
+---------+------+--------+-------------------+---------+-----------+
|168551323|    ar|16889394|2017-03-30 00:00:00|     93.0|       93.0|
|168548223|    ar|  908636|2017-03-30 00:00:00|      1.0|      179.0|
|168550237|    ar| 6511070|2017-03-30 00:00:00|    144.0|      237.0|
|168519439|    ar|       0|2017-03-30 00:00:00|    227.0|      228.0|
|168551579|    ar| 8762277|2017-03-30 00:00:00|    213.0|      232.0|
|168551086|    ar|23491655|2017-03-30 00:00:00|    253.0|      254.0|
|168549973|    ip| 3194852|2017-03-30 00:00:00|      3.0|      644.0|
|168551524|    ar| 6635279|2017-03-30 00:00:00|      1.0|      231.0|
|168548239|    ar| 6673573|2017-03-30 00:00:00|    205.0|      205.0|
|168551182|    ar|23491653|2017-03-30 00:00:00|    311.0|      312.0|
|168550845|    ar| 7183671|2017-03-30 00:00:00|     25.0|      303.0|
|168550945|    ar|23

### Feature generation

In [10]:
import pyspark.sql.functions as F
df_num=df_num.withColumn('play_time_ratio',(F.col('play_time')/F.col('song_length')))

In [11]:
df_num.show(20)


+---------+------+--------+-------------------+---------+-----------+--------------------+
|      uid|device| song_id|               date|play_time|song_length|     play_time_ratio|
+---------+------+--------+-------------------+---------+-----------+--------------------+
|168551323|    ar|16889394|2017-03-30 00:00:00|     93.0|       93.0|                 1.0|
|168548223|    ar|  908636|2017-03-30 00:00:00|      1.0|      179.0| 0.00558659217877095|
|168550237|    ar| 6511070|2017-03-30 00:00:00|    144.0|      237.0|  0.6075949367088608|
|168519439|    ar|       0|2017-03-30 00:00:00|    227.0|      228.0|  0.9956140350877193|
|168551579|    ar| 8762277|2017-03-30 00:00:00|    213.0|      232.0|  0.9181034482758621|
|168551086|    ar|23491655|2017-03-30 00:00:00|    253.0|      254.0|  0.9960629921259843|
|168549973|    ip| 3194852|2017-03-30 00:00:00|      3.0|      644.0|0.004658385093167702|
|168551524|    ar| 6635279|2017-03-30 00:00:00|      1.0|      231.0|0.004329004329004329|

#### Remove bots and outliers

In [12]:
## remove the outliers from the dataset
Q3=df_num.approxQuantile('play_time_ratio',[0.75],0.25)
Q1=df_num.approxQuantile('play_time_ratio',[0.25],0.25)
IQR=Q3[0]-Q1[0]
print(Q3,Q1,IQR)

[4150.0] [1.634879379907254e-07] 4149.999999836512


In [59]:
df_num=df_num.withColumn('song_id',df['song_id'].cast(DoubleType()))

In [23]:
df_num=df_num.na.drop()
df_num=df_num[df_num['play_time_ratio']<=(Q3[0]+1.5*IQR)]

In [25]:
Q4=df_num.approxQuantile('play_time_ratio',[0.9],0.5)
print(Q4)

[4150.0]


### Build model for the recommendation system

In [1]:
#### Note:The DataFrame-based API for ALS currently only supports integers for user and item ids. Other numeric types are supported for the user and item id columns, but the ids must be within the integer value range.

#### Onehotencode the user and/or item id 

In [26]:
from pyspark.ml.feature import OneHotEncoder,StringIndexer

stringIndexer=StringIndexer(inputCol='song_id',outputCol='song_id_ind')# some big value in song_id cannot be handled by ALS
indexed=stringIndexer.fit(df_num)

In [27]:
df_trans=indexed.transform(df_num)
df_trans.show(20)

+---------+------+--------+-------------------+---------+-----------+--------------------+-----------+
|      uid|device| song_id|               date|play_time|song_length|     play_time_ratio|song_id_ind|
+---------+------+--------+-------------------+---------+-----------+--------------------+-----------+
|168551323|    ar|16889394|2017-03-30 00:00:00|     93.0|       93.0|                 1.0|      402.0|
|168548223|    ar|  908636|2017-03-30 00:00:00|      1.0|      179.0| 0.00558659217877095|      229.0|
|168550237|    ar| 6511070|2017-03-30 00:00:00|    144.0|      237.0|  0.6075949367088608|    68715.0|
|168519439|    ar|       0|2017-03-30 00:00:00|    227.0|      228.0|  0.9956140350877193|        0.0|
|168551579|    ar| 8762277|2017-03-30 00:00:00|    213.0|      232.0|  0.9181034482758621|       46.0|
|168551086|    ar|23491655|2017-03-30 00:00:00|    253.0|      254.0|  0.9960629921259843|      143.0|
|168549973|    ip| 3194852|2017-03-30 00:00:00|      3.0|      644.0|0.00

In [28]:
(training, test) = df_trans.randomSplit([0.7, 0.3]) #split the dataset to training and test set

In [29]:
# Build the recommendation model using ALS 
als = ALS(maxIter=5, regParam=0.01, userCol="uid", itemCol="song_id_ind", ratingCol="play_time_ratio",
          coldStartStrategy="drop") # set the coldstartstrategy to 'drop' to ensure we don't get nan evaluation metrics

In [30]:
model = als.fit(training)

In [31]:
userRecs = model.recommendForAllUsers(10)

In [32]:
userRecs


DataFrame[uid: int, recommendations: array<struct<song_id_ind:int,rating:float>>]

In [33]:
userRecs.show(10) # Generate top 10 song recommendations for each user

+---------+--------------------+
|      uid|     recommendations|
+---------+--------------------+
| 12515276|[[4317, 193.63855...|
| 60505304|[[63922, 240.0021...|
|151339272|[[42031, 274.2140...|
|167569932|[[323584, 128.406...|
|167570199|[[4317, 234.80182...|
|167571369|[[42031, 469.7313...|
|167574909|[[13018, 0.046466...|
|167575737|[[42031, 143.4431...|
|167577848|[[111078, 119.710...|
|167580775|[[21507, 69.57201...|
+---------+--------------------+
only showing top 10 rows



In [34]:
songRecs=model.recommendForAllItems(10) #Generate top 10 user recommendations for each song

In [35]:
# Evaluate the model by computing hte RMSEon the test data
predictions=model.transform(test)

In [36]:
evaluator=RegressionEvaluator(metricName='rmse',labelCol='play_time_ratio',predictionCol='prediction')

In [100]:
songRecs.show(10)

+-----------+--------------------+
|song_id_ind|     recommendations|
+-----------+--------------------+
|        148|[[168761587, 260....|
|        463|[[169014517, 452....|
|        471|[[168761587, 427....|
|        496|[[168824377, 1107...|
|        833|[[168761587, 3489...|
|       1088|[[168761587, 492....|
|       1238|[[168283987, 2174...|
|       1342|[[168761587, 907....|
|       1580|[[169011743, 311....|
|       1591|[[168283987, 5681...|
+-----------+--------------------+
only showing top 10 rows



In [37]:
rmse=evaluator.evaluate(predictions)

In [38]:
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 26.135108717580128
