# EDSA Movie Recommendation Challenge

# Introduction

## Context

In today’s technology driven world, recommender systems are socially and economically critical for ensuring that individuals can make appropriate choices surrounding the content they engage with on a daily basis. One application where this is especially true surrounds movie content recommendations; where intelligent algorithms can help viewers find great titles from tens of thousands of options.

## Problem Statement
With this context, EDSA is challenging you to construct a recommendation algorithm based on content or collaborative filtering, capable of accurately predicting how a user will rate a movie they have not yet viewed based on their historical preferences.

Providing an accurate and robust solution to this challenge has immense economic potential, with users of the system being exposed to content they would like to view or purchase - generating revenue and platform affinity.

## Evaluation
The evaluation metric for this competition is Root Mean Square Error. Root Mean Square Error (RMSE) is commonly used in regression analysis and forecasting, and measures the standard deviation of the residuals arising between predicted and actual observed values for a modelling process.

##### Matrix Factorisation
Matrix factorization is a class of collaborative filtering algorithms used in recommender systems. Matrix factorization algorithms work by decomposing the user-item interaction matrix into the product of two rectangular matrices that represent users and items and their preferences in a lower dimensional latent space. The first one has a row for each user, while the second has a column for each item. Further, the rows or columns associated with a specific user or item in the matrices are referred to as latent factors. Note that tt is possible to tune the expressive power of the model by changing the number of latent factors. It has been demonstrated that a matrix factorization with one latent factor is equivalent to a most popular or top popular recommender. Increasing the number of latent factor will improve personalization, therefore recommendation quality, until the number of factors becomes too high, at which point the model starts to overfit and the recommendation quality will decrease. A common strategy to avoid overfitting is to add regularization terms to the objective function. A user's rating is estimated using the following function
$$r_{ui} = \mu + b_{u} + b_{i} + q_{i}^Tp_{u}$$
where $\mu$ is the user's average rating, $b_{u}$ is the user bias, $b_{i}$ is the movie bias and the expression $q_{i}^Tp_{u}$ signifies the latent factors. The model's parameters are computed by minimising the following loss function using a gradient descent approach.
$$\sum_{i=1}^N (r_{ui} - r_{ui})^2 + \lambda(b_{i}^2 + b_{u}^2 + ||q_{i}||^2 + ||p_{u}||^2)$$
where $\sum_{i=1}^N (r_{ui} - r_{ui})^2$ is the simple sum squared residuals, $\lambda$ is a regularisation term that applies to the user bias, the movie bias as well as the latent factors. Note that regularisation can be different for each of the terms it applies to. In this section we implement models that utilise this approach to predict rating i.e matrix factorisation that uses alternating least squares (ALS) and one that decomposes the user-item matrix using Singular Values Decomposion (SVD). Further, note that these models have much more feasible implementation because they don't attempt to create the BIG matrix but rather use the matrices of latent factors to produce an estimate of the BIG matrix through an iterative approach.

###### Alternating Least Squares
Alternating Least Square (ALS) is a matrix factorization algorithm that runs itself in a parallel fashion. ALS is implemented in Apache Spark ML and built for a larges-scale collaborative filtering problems. While Apache Spark is written and developed using Scala there are various APIs that allow python, R and other programming languages to benefit from the technology. One of the major differences between ALS and the the Singular Value Decompsition approach is that regularisation in ALS is implemented as the L2 norm while SVD uses the L1 norm. Also, its training routine is different: ALS minimizes two loss functions alternatively; It first holds user matrix fixed and runs gradient descent with item matrix; then it holds item matrix fixed and runs gradient descent with user matrix and runs its gradient descent in parallel across multiple partitions of the underlying training data from a cluster of machines. Note also that the data can be residually distributed resulting in fast and highly scalable applications. 
Here we use PySpark, python's API to Apache Spark. The logistics of setting up a cluster of computers is well beyond our expertise and as such, we utilise an Amazon Web Services managed service (Elastic MapReduce (EMR)). The service allows users to setup and run heavy jobs in a matter of minites by providing EC2 instance already installed with PySpark and all other dependencies such as Java, Hadoop etc. 
This environment is being run on a cluster of 5 EC2 instances (including the Master node), each with the following specifications:
- Instance name - m5.2xlarge
- Number of CPU cores - 8
- RAM - 64Gib
- Instance storage - EBS only
- Pricing - spot
Lets first check to see if our environment is functioning as expected.

In [None]:
spark

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
3,application_1595175669035_0004,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7f388fb38c50>

Looks like it its. Next we load in the data from an S3 bucket that has been made public as there are no security issues with the dataset we are working with.

In [None]:
data = spark.read.csv("s3://predict/train.csv", header=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# show the data
data.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|  5163|  57669|   4.0|1518349992|
|106343|      5|   4.5|1206238739|
|146790|   5459|   5.0|1076215539|
|106362|  32296|   2.0|1423042565|
|  9041|    366|   3.0| 833375837|
|120949|  81768|   3.0|1289595242|
| 19630|  62049|   4.0|1246729817|
| 21066|   2282|   1.0| 945785907|
|117563| 120474|   4.0|1515108225|
|144018|   1997|   5.0|1109967647|
| 40858|   5025|   3.5|1090061607|
| 80119|  92259|   3.5|1435635534|
|  6063|  33493|   3.0|1236048966|
| 97844|   1784|   3.5|1111630438|
| 55909|   3978|   2.5|1111555006|
| 12942|  48394|   0.5|1280365881|
|161472|   3355|   4.0|1229975011|
|117890| 152077|   3.5|1488975758|
| 46581| 108190|   2.0|1465961865|
| 33970|   1265|   4.0|1463280090|
+------+-------+------+----------+
only showing top 20 rows

We'd like to drop the timestamp column here. Matrix factorisation algorithms utiles only the ratings data.

In [None]:
columns_to_drop = ['timestamp']
data = data.drop(*columns_to_drop)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# show a description of the data
data.describe().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+------------------+------------------+------------------+
|summary|            userId|           movieId|            rating|
+-------+------------------+------------------+------------------+
|  count|          10000038|          10000038|          10000038|
|   mean| 81199.08814466505| 21389.11161287587|3.5333951730983424|
| stddev|46793.586155532874|39195.781053416096|1.0611240700473958|
|    min|                 1|                 1|               0.5|
|    max|             99999|             99999|               5.0|
+-------+------------------+------------------+------------------+

Also, we recognise that the data was imported and parsed as strings. Below we convert $userId$, $movieId$ and $rating$ columns to integers and a double for ratings. A $double$ is a data structure identical to $numeric$





In [None]:
data = data.withColumn("userId", data["userId"].cast("integer"))
data = data.withColumn("movieId", data["movieId"].cast("integer"))
data = data.withColumn("rating", data["rating"].cast("double"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Note that we'll also want to test the perfomance of the model before making a submission on kaggle. Here we use spark's method $.randomSplit$ and split the data table into 70% training and 30% tesing.

In [None]:
(training, testing) = data.randomSplit([0.7, 0.3])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Now we want to import ALS. We also import the $RegressionEvaluater$ class that will allow us to evaluate the perfomance of the model on a test dataset using a metric of choice. We are challenged to minimise root mean squared error so thats what we'll use here.

In [None]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

VBox()

An error was encountered:
Session 3 did not reach idle status in time. Current status is busy.


Next, we implement the model. Just like other machine learning algorithms, ALS has its own set of hyper-parameters. We would want to tune its hyper-parameters via hold-out validation or cross-validation. However, the compute can get quickly expensive on AWS. So we'll limit ourselves to arbitrarily chosen parameters. The set of parameters describe as follows:
- maxIter: the maximum number of iterations to run (defaults to 10)
- rank: the number of latent factors in the model (defaults to 10)
- regParam: the regularization parameter in ALS (L1) (defaults to 1.0)
We set maxIter to 20 here, rank to 10 and regParam to 0.05. Further, we'll need a strategy to deal with the cold start problem. ALS allows the user to either predict $np.nan$, a missing value or drop the users and movies affected by cold start altogether. Here we'll use the strategy of dropping them. See the cell below for the implementation.

In [None]:
als = ALS(
    userCol = "userId", 
    itemCol = "movieId", 
    ratingCol = "rating", 
    coldStartStrategy = "drop",
    maxIter = 20, 
    rank = 10,
    regParam = 0.05)

model = als.fit(training)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Next, we want to test the model on the heldout test dataset. The rmse computed is 0.83.

In [None]:
## first get the prediction
predictions = model.transform(testing)

## next, evaluate the model using the RegressionEvaluater class
evaluator = RegressionEvaluator(metricName = "rmse", labelCol = "rating", predictionCol = "prediction")
rmse = evaluator.evaluate(predictions)

## and print the rmse
rmse

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0.8310646804166654

Now we want to load the test data from the S3 bucket and transform it the same way we did the train data.

In [None]:
test_data = spark.read.csv("s3://predict/test.csv", header=True)
test_data = test_data.withColumn("userId", test_data["userId"].cast("integer"))
test_data = test_data.withColumn("movieId", test_data["movieId"].cast("integer"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
test_data.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+-------+
|userId|movieId|
+------+-------+
|     1|   2011|
|     1|   4144|
|     1|   5767|
|     1|   6711|
|     1|   7318|
|     1|   8405|
|     1|   8786|
|     2|    150|
|     2|    356|
|     2|    497|
|     2|    588|
|     2|    653|
|     2|   1080|
|     2|   1196|
|     2|   1198|
|     2|   1201|
|     2|   1299|
|     2|   1485|
|     2|   1580|
|     2|   1693|
+------+-------+
only showing top 20 rows

Next, we make predictions on the test data.

In [None]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
predictions.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|    12|    313|   3.0|     2.371|
|    12|    466|   3.0| 2.3660858|
|    12|    514|   3.0| 3.2177682|
|    12|    923|   5.0| 4.3434906|
|    12|   1343|   4.5| 3.3593106|
|    12|   1409|   1.0|  2.432028|
|    12|   1499|   1.0| 1.2541748|
|    12|   1895|   3.0| 2.8242474|
|    12|   2071|   4.5| 3.9559107|
|    12|   2902|   1.0| 1.8404977|
|    12|   3082|   3.0| 2.6318996|
|    12|   3210|   4.0| 3.4207344|
|    12|   3363|   4.0| 3.8230362|
|    12|   4979|   4.5| 3.9237773|
|    12|  51694|   4.0| 3.5921807|
|    12|    316|   3.0|  2.674575|
|    12|    368|   2.0|  3.022237|
|    12|    480|   4.0| 3.4556544|
|    12|    724|   2.0| 2.3973897|
|    12|   1196|   5.0| 4.2088485|
+------+-------+------+----------+
only showing top 20 rows

Import pandas to convert the spark dataframe to a pandas dataframe. Also install s3fs to be able to dump the predictions csv directly on the public S3 bucket.

In [None]:
sc.install_pypi_package("pandas==0.25.1")
sc.install_pypi_package("s3fs")

In [None]:
submission = test_data.toPandas()
submission.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

   userId  movieId  prediction
0      12        7    2.869163
1      12       21    3.558857
2      12       47    3.971080
3      12       50    4.427972
4      12      141    3.275634

In [None]:
# dump the submission file directly onto the S3 bucket.
submission.to_csv('s3://predict/submission_MF_Spark.csv', index = False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Finally, this model was submitted on the kaggle leaderboard and boosted our performance by the popularity recommender from 0.96 to 0.85. We acknowledge that this model could provide better results in we had done a search for hyper-parameters that provide the best scores in a cross validation setting, but given our limitations, these are satiffactory results. Now we go back to the main notebook and explore other alternative to ALS i.e SVD.