##  Collaborative Filtering Recommendation Systems 



In [1]:
from pyspark import SparkContext
from pyspark.sql.types import *
from pyspark.sql import SparkSession

from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg import DenseVector
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml import Pipeline
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

 


In [2]:
import pyspark.sql.functions as F

In [3]:
# We use matplotlib for plotting
import matplotlib.pyplot as plt

# This statement allow to display plot without asking to
%matplotlib inline

# Load data into Spark DataFrame

In [4]:
# load play log data 
df_play = spark.read.csv('../data/play_ds.csv',header=True)



In [5]:
df_play.show(5)

+---------+------+-------+----------+---------+-----------+
|      uid|device|song_id|      date|play_time|song_length|
+---------+------+-------+----------+---------+-----------+
|168535490|    ar|6616004|2017-03-30|      283|        283|
|168540348|    ar|  77260|2017-03-30|    64528|          0|
|168550572|    ar|4297299|2017-03-30|       11|        272|
|168548493|    ip|6661513|2017-03-30|       63|        243|
|168550571|    ar|      0|2017-03-30|       24|        251|
+---------+------+-------+----------+---------+-----------+
only showing top 5 rows



In [6]:
df_play.count()

10912714

## Data Cleaning

#### Note: bot users are already removed before the downsampling step

### Clean irregular play time and song_length

In [7]:
# filter play time and songs <=0 
# change type to double, so that the approxquntile method can work 
df_play = df_play \
        .withColumn('play_time',F.col('play_time').cast(LongType())) \
        .filter((F.col('song_length')>0) & (F.col('play_time')>0))  


In [8]:
# Remove records that are possibly misclicks or data error
# or song length extremely short  

top_length_threshold = df_play.approxQuantile('play_time',[0.93],0.05)

bot_length_threshold = df_play.approxQuantile('play_time',[0.01],0.05)


In [9]:
top_length_threshold[0]

301.0

In [10]:
bot_length_threshold[0]

1.0

In [11]:
df_play = df_play \
        .filter((F.col('play_time')>bot_length_threshold[0]) & (F.col('play_time')<top_length_threshold[0]))  


In [12]:
df_play.count()

7353847

## Exclude song id =0, i.e. missing song id

In [13]:
df_play = df_play \
        .withColumn('song_id',F.col('song_id').cast(LongType())) \
        .filter(F.col('song_id')>0)


          


In [14]:
df_play.count()

6749472

In [15]:
df_play.show(5)

+---------+------+--------+----------+---------+-----------+
|      uid|device| song_id|      date|play_time|song_length|
+---------+------+--------+----------+---------+-----------+
|168535490|    ar| 6616004|2017-03-30|      283|        283|
|168550572|    ar| 4297299|2017-03-30|       11|        272|
|168548493|    ip| 6661513|2017-03-30|       63|        243|
|168548714|    ar|23492572|2017-03-30|        2|        359|
|168535909|    ar| 7145935|2017-03-30|      130|        130|
+---------+------+--------+----------+---------+-----------+
only showing top 5 rows



### Exclude inactive users

In [16]:
# Check how many songs each user has played

user_song_count = df_play \
                 .groupBy('uid').agg(F.count(F.col('song_id')).alias('songs_played'))      


In [17]:
user_song_count.show(5)

+---------+------------+
|      uid|songs_played|
+---------+------------+
|168550986|           1|
|168554377|         138|
|168554339|         158|
|168559162|         154|
|168576985|          11|
+---------+------------+
only showing top 5 rows



In [18]:
# count the total number of users
user_song_count.count()

54310

In [19]:
# count the number of inactive users
inactive_user = user_song_count.filter('songs_played < 5')
inactive_user.count()

10619

In [20]:
# exclude inactive users 
df_play_join = df_play.join(user_song_count,"uid")


In [21]:
df_play_join

DataFrame[uid: string, device: string, song_id: bigint, date: string, play_time: bigint, song_length: string, songs_played: bigint]

In [22]:
df_play_join.show(5)

+---------+------+--------+----------+---------+-----------+------------+
|      uid|device| song_id|      date|play_time|song_length|songs_played|
+---------+------+--------+----------+---------+-----------+------------+
|117677098|    ar| 2216557|2017-04-02|      247|        246|         141|
|117677098|    ar| 3392015|2017-04-02|       69|        218|         141|
|117677098|    ar| 9845432|2017-04-02|       37|        271|         141|
|117677098|    ar|15171705|2017-04-02|       56|        243|         141|
|117677098|    ar|15171705|2017-04-02|        2|        243|         141|
+---------+------+--------+----------+---------+-----------+------------+
only showing top 5 rows



In [23]:
df_play_active = df_play_join \
                 .filter(F.col('songs_played')>5)            

In [24]:
df_play_active.count()

6722650

In [25]:
# calculate the most popular songs
df_play_active 

DataFrame[uid: string, device: string, song_id: bigint, date: string, play_time: bigint, song_length: string, songs_played: bigint]

## Proxy for rating: playtime/song_length

In [26]:
# generate play percentage as a proxy for rating
df_play_active = df_play_active \
        .withColumn("percent",F.col('play_time')/F.col('song_length'))
                                        

##  Aggregation by uid and song id

In [27]:
df_aggregate = df_play_active \
        .groupBy('uid','song_id').agg(F.mean(F.col('percent')).alias('rating'))      



In [28]:
df_aggregate.show(5)

+---------+--------+-------------------+
|      uid| song_id|             rating|
+---------+--------+-------------------+
|117677098| 2216557|0.41147423823123913|
|117677098| 3392015| 0.6559633027522935|
|117677098| 9845432|0.36070110701107017|
|117677098|15171705|  0.401920438957476|
|117677098|   69873|          0.3640625|
+---------+--------+-------------------+
only showing top 5 rows



## Data Transformation

ALS requires the song id to be numeric

In [29]:
df_index = df_aggregate \
          .withColumn('uid',F.col('uid').cast(IntegerType())) \
          .withColumn('song_id',F.col('song_id').cast(IntegerType())) \
          .select('uid','song_id',F.round(F.col('rating'), 3).alias('rate'))



In [30]:
df_index

DataFrame[uid: int, song_id: int, rate: double]

In [31]:
df_index.show(5)

+---------+--------+-----+
|      uid| song_id| rate|
+---------+--------+-----+
|117677098| 2216557|0.411|
|117677098| 3392015|0.656|
|117677098| 9845432|0.361|
|117677098|15171705|0.402|
|117677098|   69873|0.364|
+---------+--------+-----+
only showing top 5 rows



## Train Recommender 

• Collaborative Filtering

* Item-Based 




In [32]:
(training, test) = df_index.randomSplit([0.7, 0.3])

In [33]:
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=10, regParam=0.01, userCol="uid", itemCol="song_id", ratingCol="rate",
          coldStartStrategy="drop")


In [34]:
model = als.fit(training)

## Performance Evaluation 
• Mask Data Validation 



In [35]:
from pyspark.ml.evaluation import RegressionEvaluator


# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rate",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))


Root-mean-square error = 0.5673382119470404


### Performance benchmark:
for 5 level rating: RMSE~1.2-1.5, or <2 



## Make recommendations

In [41]:

# Generate top 10 songs recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each song
songRecs = model.recommendForAllItems(10)

In [37]:
userRecs.show()

+---------+--------------------+
|      uid|     recommendations|
+---------+--------------------+
| 35556737|[[6762450, 103.46...|
| 72854913|[[6762450, 70.460...|
|149954144|[[6762450, 64.763...|
|165428768|[[6762450, 75.927...|
|167415658|[[6762450, 54.702...|
|167570315|[[6762450, 125.99...|
|167575737|[[6762450, 121.32...|
|167579413|[[3288014, 52.417...|
|167580826|[[6762450, 41.008...|
|167584025|[[6762450, 130.17...|
|167587488|[[6762450, 105.33...|
|167588969|[[6762450, 70.224...|
|167590565|[[6762450, 154.22...|
|167595909|[[3288014, 63.964...|
|167597788|[[3288065, 121.96...|
|167605713|[[3288065, 60.772...|
|167608347|[[6762450, 70.228...|
|167627248|[[6762450, 104.45...|
|167645953|[[6762450, 72.958...|
|167655696|[[6762450, 52.226...|
+---------+--------------------+
only showing top 20 rows



In [None]:
songRecs.show()

• Matrix Factorization 
	* truncated SVD
	
	• GraphLab 
	• Spark ALS (DataBricks, Google(credit), AWS(small $),IBM(free)) 



In [40]:
from sklearn.decomposition import TruncatedSVD

def fit_uvd(M,k):
    # use TruncatedSVD to realize UVD
    svd = TruncatedSVD(n_components=k, n_iter=7, random_state=0)
    svd.fit(M)

    V = svd.components_
    U = svd.transform(M) # effectively, it's doing: U = M.dot(V.T)
    # we can ignore svd.singular_values_ for our purpose
    
    # why we can do this?
    # recall: 
    # SVD start from u*s*v=M => u*s=M*v.T, where M*v.T is our transformation above to get U in UVD
    # so the above U is effectively u*s in SVD
    # that's why U*V = u*s*v = M our original matrix
    # there are many ways to understand it!
    # here we by-passed singular values.
    
    return U,V, svd

# decompose

