#### Collaborative Filtering - ALS Recommender System using Spark MLlib adapted from the Spark Summit 2014 Recommender System training example.

Developed By: Pranav Masariya and Aditya Patel <br/>
Supervisor: Dr. Magdalini Eirinaki<br/>


In [2]:
import os
import numpy as np
from pyspark.sql import SparkSession
from pyspark import SparkContext

from pyspark.mllib.recommendation import ALS
import math
import pyspark.sql
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.ml.evaluation import RegressionEvaluator

<img src="https://databricks.com/wp-content/uploads/2016/08/image04.png"/>

#### Spark Session
Spark session 

In [3]:
# Calling spark session to register application
spark = SparkSession \
    .builder \
    .appName("Recom") \
    .config("spark.recom.demo", "1") \
    .getOrCreate()
# lambda word: (word, 1)

##### Loading and Parsing Dataset
Each line in the ratings dataset (ratings.csv) is formatted as:<br/>
+ userId,movieId,rating,timestamp<br/> 



Each line in the movies (movies.csv) dataset is formatted as:<br/>
+ movieId,title,genres<br/>

In [4]:
# Load ratings
ratings_df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("ratings.csv")

In [5]:
ratings_df.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [6]:
ratings_df = ratings_df.drop('')

In [7]:
print(type(ratings_df))

<class 'pyspark.sql.dataframe.DataFrame'>


###### Dropping timestamp
For the simplicity of this tutorial <br/>
For each line in the ratings dataset, we create a tuple of (UserID, MovieID, Rating).<br/>
We drop the timestamp because we do not need it for this recommender.

In [8]:
ratings_df = ratings_df.drop('timestamp')
ratings_df.show(5)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
+------+-------+------+
only showing top 5 rows



In [9]:
# Load movies
movies_df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("movies.csv")

In [10]:
movies_df.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



##### For each line in the movies dataset, we create a tuple of (MovieID, Title).  We drop the genres because we do not use them for this recommender.

In [12]:
movies_df = movies_df.drop('genres')
movies_df.show(5)

+-------+--------------------+
|movieId|               title|
+-------+--------------------+
|      1|    Toy Story (1995)|
|      2|      Jumanji (1995)|
|      3|Grumpier Old Men ...|
|      4|Waiting to Exhale...|
|      5|Father of the Bri...|
+-------+--------------------+
only showing top 5 rows



#### In order to determine the best ALS parameters, we will use the small dataset. We need first to split it into train, validation, and test datasets.


In [15]:
(trainingData,validationData,testData) = ratings_df.randomSplit([0.6,0.2,0.2],5) # randomSplit(weights, seed)

In [16]:
trainingData.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
|     1|     70|   3.0|
|     1|    101|   5.0|
|     1|    110|   4.0|
|     1|    151|   5.0|
|     1|    163|   5.0|
|     1|    223|   3.0|
|     1|    260|   5.0|
|     1|    296|   3.0|
|     1|    316|   3.0|
|     1|    333|   5.0|
|     1|    356|   4.0|
|     1|    362|   5.0|
|     1|    423|   3.0|
|     1|    441|   4.0|
|     1|    480|   4.0|
|     1|    500|   3.0|
+------+-------+------+
only showing top 20 rows



In [17]:
# Prepare test and validation set. They should not have ratings

validation_for_predict = validationData.select('userId','movieId')
test_for_predict = testData.select('userId','movieId')


##### Spark MLlib library for Machine Learning provides a Collaborative Filtering implementation by using Alternating Least Squares. The implementation in MLlib has the following parameters:

    1.numBlocks is the number of blocks used to parallelize computation(set to -1 to auto-configure). Default is 10
    2. rank is the number of latent factors in the model.
    3. iterations is the number of iterations to run.
    4. lambda specifies the regularization parameter in ALS.
    5. implicitPrefs specifies whether to use the explicit 
        feedback ALS variant or one adapted for implicit feedback data.
    6. alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline 
        confidence in preference observations.

In [20]:
seed = 5 #Random seed for initial matrix factorization model. A value of None will use system time as the seed.
iterations = 10
regularization_parameter = 0.1 #run for different lambdas - e.g. 0.01
ranks = [4, 8, 12] #number of features

In [23]:
# Let's take test dataset and get ratings
predictions_test = model.predictAll(test_for_predict.rdd).map(lambda r: ((r[0], r[1]), r[2]))

In [24]:
# Let us traing our dataset and check the best rank with lowest RMSE
# predictAll method of the ALS takes only RDD format and hence we need to convert our dataframe into RDD
# df.rdd will automatically converts Dataframe into RDD
min_error = 1000
for rank in ranks:
    model = ALS.train(trainingData, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    
    #converting prediction into key value pair like key=(userId,movieId) and value = rating
    predictions = model.predictAll(validation_for_predict.rdd).map(lambda r: ((r[0], r[1]), r[2]))
    
    #joing predicted rating and original ratings to calculate error
    rates_and_preds = validationData.rdd.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    #calculate error 
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean()) # RMSE Error

    print ('For rank',rank, "the RMSE is ", error)
    if error < min_error:
        min_error = error
        best_rank = rank

print ("The best model was trained with rank", best_rank)

For rank 4 the RMSE is  0.9089434795751171
For rank 8 the RMSE is  0.9060388685385029
For rank 12 the RMSE is  0.9087544551531763
The best model was trained with rank 8


In [25]:
## visualize preditions, here third element is predictions generated by ALS Model
predictions_test.take(3)

[((156, 1084), 3.982931929448584),
 ((372, 1084), 3.493401132959945),
 ((597, 1084), 4.814418860410022)]

#### Let's start recommending movies.
I have written a method to call recommendations for a perticular user from test data


In [28]:
def getRecommendations(user,testDf,trainDf,model, k):
    # get all user and his/her rated movies
    userDf = testDf.filter(testDf.userId == user)
    # filter movies from main set which have not been rated by selected user
    # and pass it to model we have created above
    mov = trainDf.select('movieId').subtract(userDf.select('movieId'))
    
    # Again we need to covert our dataframe into RDD
    pred_rat = model.predictAll(mov.rdd.map(lambda x: (user, x[0]))).collect()
    
    # Get the top recommendations
    recommendations = sorted(pred_rat, key=lambda x: x[2], reverse=True)[:k]
    
    return recommendations

In [33]:
# Assign user id for which we need recommendations
user = 399
# how many recommendations you want
k= 5

# Call getRecommendations method
derived_rec = getRecommendations(user,testData,trainingData,model,k)

print ("Movies recommended for:",user)

# Print the result
# TODO: we can convert derived_rec into a dataframe to present it properly
for i in range(len(derived_rec)):
    print (i+1)
    movies_df.(movies_df.movieId==derived_rec[i][1]).select('title').show()

Movies recommended for: 399
1
+--------------------+
|               title|
+--------------------+
|Wallace & Gromit:...|
+--------------------+

2
+----------------+
|           title|
+----------------+
|Barcelona (1994)|
+----------------+

3
+------------+
|       title|
+------------+
|Senna (2010)|
+------------+

4
+--------------------+
|               title|
+--------------------+
|Seven Samurai (Sh...|
+--------------------+

5
+-------------------+
|              title|
+-------------------+
|Strange Brew (1983)|
+-------------------+

