# Setup

### Imports

In [1]:
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.functions import col, explode, to_timestamp

### Creating a Spark Session

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
  .appName('Movielens Recommender System')\
  .getOrCreate()

### Defining Data Folder

In [3]:
# Path where the raw data is stored
data = "gs://ca4022-recsys-data/data/movielens_small/clean/"

# Building the Recommender System

In [4]:
df = spark.read.csv(data + "movie_ratings_small.csv", header=True)

In [5]:
df.limit(10).toPandas()

Unnamed: 0,movieId,userId,rating,title,genres
0,1,1,4.0,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,3,1,4.0,Grumpier Old Men (1995),Comedy|Romance
2,6,1,4.0,Heat (1995),Action|Crime|Thriller
3,47,1,5.0,Seven (a.k.a. Se7en) (1995),Mystery|Thriller
4,50,1,5.0,"Usual Suspects, The (1995)",Crime|Mystery|Thriller
5,70,1,3.0,From Dusk Till Dawn (1996),Action|Comedy|Horror|Thriller
6,101,1,5.0,Bottle Rocket (1996),Adventure|Comedy|Crime|Romance
7,110,1,4.0,Braveheart (1995),Action|Drama|War
8,151,1,5.0,Rob Roy (1995),Action|Drama|Romance|War
9,157,1,5.0,Canadian Bacon (1995),Comedy|War


In [6]:
df.printSchema()

root
 |-- movieId: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [7]:
df = df.\
        withColumn('movieId', col('movieId').cast('integer')).\
        withColumn('userId', col('userId').cast('integer')).\
        withColumn('rating', col('rating').cast('float'))

### How sparse is the data?

A good first step is to try and understand how sparse the data is. Below, the sparsity of the movie ratings dataset is calculated.

In [8]:
# Count the total number of ratings in the dataset
numerator = df.select("rating").count()

# Count the number of distinct userIds and distinct movieIds
num_users = df.select("userId").distinct().count()
num_movies = df.select("movieId").distinct().count()

# Set the denominator equal to the number of users multiplied by the number of movies
denominator = num_users * num_movies

# Divide the numerator by the denominator
sparsity = (1.0 - (numerator *1.0)/denominator) * 100
print("The movie ratings dataframe is ", "%.2f" % sparsity + "% empty.")

The movie ratings dataframe is  98.30% empty.


As expected, the data is extremely sparse however this should not affect the performance of the recommender system. The latent factor model approach which I will implement also mitigates against this sparsity.

### Creating train and test sets

An 80:20 train-test split is chosen.

In [9]:
(train, test) = df.randomSplit([0.8, 0.2], seed = 99)

### Building an ALS model

Collaborative Filtering (CF) approaches to recommendation aggregate the past behaviour of all users. CF can be further broken down into *user-user* based CF and *item-item* based CF.

* In *user-user* based CF, recommendations are made to a user based on the items liked by another set of users whose likes and dislikes are similar to the user in question.
* In *item-item* based CF, to recommend an item to a user, the similarity between items liked by the user and other items are calculated.

These approaches can be implemented using a *memory-based* or *model-based* approach. The implementation in Spark used a model-based approach where the latent user and item profiles are learned through matrix factorisation by minimising the RMSE (root mean squared error) between the available ratings and their predicted values.

Alternating Least Squares (ALS) matrix factorisation attempts to estimate the ratings matrix $R$ as the product of two lower-rank matrices, $X$ and $Y$. Typically these approximations are called ‘factor’ matrices. The general approach is iterative. During each iteration, one of the factor matrices is held constant, while the other is solved for using least squares. The newly-solved factor matrix is then held constant while solving for the other factor matrix.

Below, I create an ALS model, tune it's hyperparameters using cross-validation and then fit the model.

In [10]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

I create the ALS model below. The user, item and ratings columns are specified along with some extra arguments:

* The *nonnegative* parameter is set to false since all ratings are positive.
* The ratings are categorised as explicit feedback from the user so we set *implicitPrefs* to False.
* The *coldStartStrategy* parameter value of "drop" ensures that NaN predictions are dropped so that the values for the evaluation metrics are non-null.

In [11]:
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative = True, implicitPrefs = False, coldStartStrategy="drop")
type(als)

pyspark.ml.recommendation.ALS

Below, the options for tuning the ALS model are specified. Two hyperparameters for each grid are specified resulting in a total of 4 models to be evaluated during cross-validation. I test two different values of both the *rank* and *regParam* parameters. 

*rank* is the number of latent factors in the model (default value is 10). *regParam* denotes the regularisation parameter in the ALS model (mitigates against overfitting; default value is 1.0).

In [12]:
# Add hyperparameters and their respective values to param_grid
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 100]) \
            .addGrid(als.regParam, [.01, .1]) \
            .build()

print ("Number of models to be tested: ", len(param_grid))

Number of models to be tested:  4


The evaluator is then defined as RMSE as mentioned previously.

In [14]:
# Define evaluator as RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

A cross-validation pipeline is then defined below with 5 folds.

In [15]:
# Build cross validation using CrossValidator
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)
print(cv)

CrossValidator_2ad38d15af83


In [None]:
#Fit cross validator to the 'train' dataset
model = cv.fit(train)

#Extract best model from the cv model above
best_model = model.bestModel

### Finding the best model

Now from the 4 different sets of parameters fed into the cross-validation model, we check which resulted in the best model.

In [None]:
print("**Best Model**")

print("Rank: ", best_model._java_obj.parent().getRank())
print("MaxIter: ", best_model._java_obj.parent().getMaxIter())
print("RegParam: ", best_model._java_obj.parent().getRegParam())

**Best Model**
Rank:  100
MaxIter:  10
RegParam:  0.1


So a Rank of 100 and a RegParam of 0.1 resulted in the best performance. Using this best model, predictions can now be made on the test set.

In [20]:
test_predictions = best_model.transform(test)
RMSE = evaluator.evaluate(test_predictions)
print(RMSE)

0.8776517554479847


The above score for RMSE indicates that, on average, the model predicts 0.878 above or below values of the original ratings matrix.

### Generate recommendations

With the optimal model identified, we can now use this to generate $n$ recommendations for each user. In this case, I'm going to generate 3 movie recommendations for each user.

In [21]:
# Generate 3 Recommendations for all users
threerecommendations = best_model.recommendForAllUsers(3)
threerecommendations.limit(10).toPandas()

Unnamed: 0,userId,recommendations
0,496,"[(858, 4.199934005737305), (106920, 4.18192481..."
1,471,"[(3266, 4.463098526000977), (8477, 4.448762416..."
2,463,"[(80906, 4.431299209594727), (78836, 4.3735466..."
3,148,"[(98491, 4.437413692474365), (160718, 4.356680..."
4,243,"[(171495, 5.14157772064209), (84944, 5.0602149..."
5,540,"[(92535, 4.848649501800537), (86377, 4.8408169..."
6,392,"[(296, 4.431065559387207), (3153, 4.4141931533..."
7,516,"[(4429, 4.847044467926025), (7121, 4.556695938..."
8,31,"[(3200, 5.1146392822265625), (1361, 4.77681589..."
9,251,"[(858, 5.165389060974121), (750, 5.13303041458..."


As seen above, the recommendations are stored in an array which contains the *movieId* and *rating*. To make these recommendations easier to read and inspect, more information such as the *title* and *genres* are desired. This can be achieved through exploding this array to get rows with single recommendations.

In [22]:
threerecommendations = threerecommendations\
    .withColumn("rec_exp", explode("recommendations"))\
    .select("userId", col("rec_exp.movieId"), col("rec_exp.rating"))

threerecommendations.limit(10).toPandas()

Unnamed: 0,userId,movieId,rating
0,496,858,4.199934
1,496,106920,4.181925
2,496,6380,4.073887
3,471,3266,4.463099
4,471,8477,4.448762
5,471,2324,4.435744
6,463,80906,4.431299
7,463,78836,4.373547
8,463,27611,4.370327
9,148,98491,4.437414


### Are the recommendations logical?

To deduce whether the recommendations make sense, a join must be performed from the above table to the movies table to fetch the movie titles and genres. I will also select a user at random, for instance *userId* 148, to check if their recommendations make sense. For this, I must load back in the original movies DataFrame.

In [23]:
df_movies = spark.read.csv("gs://ca4022-recsys-data/data/movielens_small/movies.csv", header=True)
df_movies = df_movies.withColumn('movieId', col('movieId').cast('integer'))
df_movies.limit(10).toPandas()

Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy
5,6,Heat (1995),Action|Crime|Thriller
6,7,Sabrina (1995),Comedy|Romance
7,8,Tom and Huck (1995),Adventure|Children
8,9,Sudden Death (1995),Action
9,10,GoldenEye (1995),Action|Adventure|Thriller


In [25]:
threerecommendations.join(df_movies, on='movieId').filter('userId = 148').toPandas()

Unnamed: 0,movieId,userId,rating,title,genres
0,98491,148,4.437414,Paperman (2012),Animation|Comedy|Romance
1,160718,148,4.35668,Piper (2016),Animation
2,177593,148,4.197762,"Three Billboards Outside Ebbing, Missouri (2017)",Crime|Drama


Now we can compare this to userId 674's actual movie preferences...

In [27]:
df.filter('userId = 148').sort('rating', ascending=False).toPandas()

Unnamed: 0,movieId,userId,rating,title,genres
0,30816,148,5.0,"Phantom of the Opera, The (2004)",Drama|Musical|Romance
1,40629,148,5.0,Pride & Prejudice (2005),Drama|Romance
2,98491,148,5.0,Paperman (2012),Animation|Comedy|Romance
3,60069,148,4.5,WALL·E (2008),Adventure|Animation|Children|Romance|Sci-Fi
4,81847,148,4.5,Tangled (2010),Animation|Children|Comedy|Fantasy|Musical|Roma...
5,98243,148,4.5,Rise of the Guardians (2012),Adventure|Animation|Children|Fantasy|IMAX
6,116797,148,4.5,The Imitation Game (2014),Drama|Thriller|War
7,160718,148,4.5,Piper (2016),Animation
8,356,148,4.0,Forrest Gump (1994),Comedy|Drama|Romance|War
9,4308,148,4.0,Moulin Rouge (2001),Drama|Musical|Romance


The movies recommended to userId 148 belonged to the Animation, Comedy, Crime, Drama and Romance genres. Generally, the movies preferred by the user as seen in the above table, match very closely with these genres.