# Movie Recommender System Design With Spark


In this project, a movie recommender system using cluster-computing framework **Spark** has been designed.

We have studied two algorithms in Collaborative filtering:

* **ALS :** A model-based collaborative filtering, which use latent factors to predict missing entries. Alternating Least Squares (ALS) algorithm has been to learn latent factors. 
* **ALS + bias :** Same algorithm but add bias to calculate latent factors.

This project has been done remotely on **IBM Clould Pak for Data** flatform.


In [1]:
import sys
from pyspark.sql import Row
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.functions import desc
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20200826021103-0000
KERNEL_ID = b3b7be40-8e6b-4e97-b87d-f71b9901bd7e


<div style="border-bottom: 3px solid black; margin-bottom:5px"></div>
<div style="border-bottom: 3px solid black"></div>

## 1. Dataset
Data has been collected from the [MovieLens dataset](https://grouplens.org/datasets/movielens/latest/). We are using **ml-latest-small** dataset where 100,000 ratings applied to 9,000 movies by 600 users. In datasent 'movie.csv', each row contain a moveId, a movie name and  types of the movie and in 'ratings.csv', each row consisting of a userId, a movieId, a rating and a timestamp.

In [2]:
import ibmos2spark
# @hidden_cell
credentials = {
    'endpoint': 'https://s3-api.us-geo.objectstorage.service.networklayer.com',
    'service_id': 'iam-ServiceId-d559645e-c94d-425d-b965-ab25ae7b548f',
    'iam_service_endpoint': 'https://iam.cloud.ibm.com/oidc/token',
    'api_key': '6erENksJWLklBfBTqCHIctD_S8tVfLQ8h9rO1aGe7hcK'
}

configuration_name = 'os_765d26212c5f430ea1447d428b4901d3_configs'
cos = ibmos2spark.CloudObjectStorage(sc, credentials, configuration_name, 'bluemix_cos')

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

df_movie = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .load(cos.url('movies.csv', 'datascience-donotdelete-pr-zij4u5l3bs4ov0'))

df_movie.show(10, False)

+-------+----------------------------------+-------------------------------------------+
|movieId|title                             |genres                                     |
+-------+----------------------------------+-------------------------------------------+
|1      |Toy Story (1995)                  |Adventure|Animation|Children|Comedy|Fantasy|
|2      |Jumanji (1995)                    |Adventure|Children|Fantasy                 |
|3      |Grumpier Old Men (1995)           |Comedy|Romance                             |
|4      |Waiting to Exhale (1995)          |Comedy|Drama|Romance                       |
|5      |Father of the Bride Part II (1995)|Comedy                                     |
|6      |Heat (1995)                       |Action|Crime|Thriller                      |
|7      |Sabrina (1995)                    |Comedy|Romance                             |
|8      |Tom and Huck (1995)               |Adventure|Children                         |
|9      |Sudden Death

In [3]:
df_rating = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .load(cos.url('ratings.csv', 'datascience-donotdelete-pr-zij4u5l3bs4ov0'))
df_rating.show(10, False)


+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|1     |1      |4.0   |964982703|
|1     |3      |4.0   |964981247|
|1     |6      |4.0   |964982224|
|1     |47     |5.0   |964983815|
|1     |50     |5.0   |964982931|
|1     |70     |3.0   |964982400|
|1     |101    |5.0   |964980868|
|1     |110    |4.0   |964982176|
|1     |151    |5.0   |964984041|
|1     |157    |5.0   |964984100|
+------+-------+------+---------+
only showing top 10 rows



<div style="border-bottom: 3px solid black"></div>

### 1.1 Data type Casting:
The type of the data is string, so we cast it to integer type to train model.

In [7]:
from pyspark.sql.types import IntegerType
df_rating = df_rating.withColumn("userIdTmp", df_rating["userId"].cast(IntegerType())).drop("userId").withColumnRenamed("userIdTmp", "userId")
df_rating = df_rating.withColumn("movieIdTmp", df_rating["movieId"].cast(IntegerType())).drop("movieId").withColumnRenamed("movieIdTmp", "movieId")
df_rating = df_rating.withColumn("ratingTmp", df_rating["rating"].cast(IntegerType())).drop("rating").withColumnRenamed("ratingTmp", "rating")
df_rating = df_rating.withColumn("timestampTmp", df_rating["timestamp"].cast(IntegerType())).drop("timestamp").withColumnRenamed("timestampTmp", "timestamp")
# df_rating.show()

<div style="border-bottom: 3px solid black; margin-bottom:5px"></div>
<div style="border-bottom: 3px solid black"></div>

## 2. Model
Let’s have a brief discussion about Collaborative filtering. Suppose there are $m$ users and $n$ items, we make a matrix with size $m*n$ where each cell represents the associated opinion that a user holds. Such matrix is called **Utility matrix**. Assume some items are not ranked by some users, whole idea is to predict those missing entries.  

There are many ways to replace that empty cell. The simplest way can be done by just taking mean rating of that movie or mean rating of that user, or the global mean of entire move. However, due to accuracy concern, updated models are important. There are many high accuracy algorithm, we will implement two of them.    

<div style="border-bottom: 3px solid black"></div>

### 2.1 ALS:
The idea of the latent factors is simple: find two matrices whose product are very close to utility matrix. Suppose our utility matrix is $A(m,n)$ where $m$  and  $n$  are the users and items numbers, now find two matrices say $P(m,k)$ and $Q(k,n)$
$$ A \approx P \cdot Q$$
which means
$$ \hat{a}_{mn} = p_{m} \cdot q_{n}=\displaystyle\sum_{k} p_{mk} \cdot q_{kn}$$

where $\hat{a}$ is rating prediction and $k$ is the rank of the latent factor. $p_{m} \cdot q_{n}$ is also called **usre-item interection**. So, we have to solve now a optimization problem
$$ \min\limits_{P, Q} \displaystyle\sum_{m,n\in A } \left[  a_{mn} - p_{m} \cdot q_{n} \right]^2$$

Finally, the model performance is evaluated by root-mean-square error (rmse) with rating prediction.

Now, to train the model, the training and test dataset are split into 0.8:0.2 ratio. We consider rank of the latent factors is $10$. We set cold start strategy to 'drop' to ensure we don't get $NaN$ evaluation metrics. 

In [25]:
(training, test) = df_rating.randomSplit([0.8, 0.2], seed=123)
als = ALS(maxIter=10, rank=10, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
als.setSeed(123)
model_als = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model_als.transform(test)
# print(predictions.show())
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error using ALS : " + str(rmse))
error_als = rmse

Root-mean-square error using ALS : 1.1840927612688594


<div style="border-bottom: 3px solid black"></div>

### 2.2 ALS+ bias:
In this algorith, we add bias.
$$ a_{mn} = b_m + b_n -\mu + p_{m} \cdot q_{n}$$
where $\mu$ is global mean of rating, $b_m$ is user mean of rating and $b_n$ is item mean of rating. Now, we have to solve
$$ \min\limits_{P, Q} \displaystyle\sum_{m,n\in A } \left[  a_{mn} - \left(b_m + b_n - \mu + p_{m} \cdot q_{n} \right) \right]^2$$
The same training and test dataset are used to train the model. All the model parameters are kept same.  

For implementation, as ALS only calculate matrices $P$ and $Q$ $i.e.$ latent factor based on rating, but now it would be rating with bias insted of only rating. So, ALS model should make predictions for the user-item interaction, so we have to recompute the predicted rating with bias.
The RMSE should compare the original rating column and the predicted rating column. 

In [64]:
(training, test) = df_rating.randomSplit([0.8, 0.2], seed=123)

movie_mean_rating = training.groupBy('movieId').mean('rating').withColumnRenamed("avg(rating)", "item_mean")
training = training.join(movie_mean_rating, on='movieId')
user_mean_rating = training.groupBy('userId').mean('rating').withColumnRenamed("avg(rating)", "user_mean")
training = training.join(user_mean_rating, on='userId')
global_mean = (training.groupBy().mean('rating').collect())[0][0]
training = training.withColumn("global_mean", lit(global_mean))
training = training.withColumn("user_item_interaction", lit(training.rating
                                                                 - training.user_mean
                                                                 - training.item_mean
                                                                 + training.global_mean))

# Model is trained by user_item_interaction = rating - (user_mean+item_mean-global_mean) insted of just rating
als = ALS(maxIter=5, rank=10, regParam=1.0, userCol="userId", itemCol="movieId", 
          ratingCol="user_item_interaction", coldStartStrategy="drop")

als.setSeed(123)
model_bias = als.fit(training)
predictions = model_bias.transform(test)

predictions = predictions.join(movie_mean_rating, on='movieId')
predictions = predictions.join(user_mean_rating, on='userId')
predictions = predictions.withColumn("global_mean", lit(global_mean))
predictions = predictions.withColumn("uii_prediction", lit(predictions.prediction
                                                               + predictions.user_mean
                                                               + predictions.item_mean
                                                               - predictions.global_mean))

# print(predictions.show())
# model makes predictions for the user-item interaction
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                    predictionCol="uii_prediction")

rmse = evaluator.evaluate(predictions)
print("Root-mean-square error using ALS + Bias : " + str(rmse))
error_als_bias = rmse
# print(training.show())

Root-mean-square error using ALS + Bias : 0.9448272896382308


<div style="border-bottom: 3px solid black"></div>

### 2.3 Model Performance Comparison:
We see that more than $20\%$ has been improved after adding bias. This model is the extention of the latent factor model. 

In [65]:
# Calculate improvement of model performance
improvement = 100.0*(error_als - error_als_bias)/error_als
print("The improvement after using ALS + Bias : " + str(improvement))

The improvement after using ALS + Bias : 20.206649297833273


In [66]:
def userMovieList(userid):
    users = df_rating.where(df_rating.userId == userid)
    users = users.orderBy("rating", ascending=False).collect()
    user_movie_list = []
    for x in users:
        user_movie_list.append(x[1])

    movie_name_list = df_movie.where(df_movie.movieId == user_movie_list[0])   
    for i in range(1, len(user_movie_list)):
        movie_name = df_movie.where(df_movie.movieId == user_movie_list[i])
        movie_name_list = movie_name_list.union(movie_name)
    
    movie_name_list.show(10, False)

In [67]:
# Recommend 10 movie to a user
def userRecommendation(userid, model_name):
    users = df_rating.where(df_rating.userId == userid)
    userSubsetRecs = model_name.recommendForUserSubset(users, 10).collect()[0][1]
    
    movie_name_list = df_movie.where(df_movie.movieId == userSubsetRecs[0][0])   
    for i in range(1, len(userSubsetRecs)):
        movie_name = df_movie.where(df_movie.movieId == userSubsetRecs[i][0])
        movie_name_list = movie_name_list.union(movie_name)
    
    movie_name_list.show(10, False)

We can easily check both user movie choice list and recommendation movie list. First, we will see the 10 most rating movie rated by a user. Next, we will see recommend first 10 movie to the same user. Then we will compare with both models. 

In [63]:
# Show users 10 most rated movie
#userMovieList(userid)
userMovieList(8)

+-------+---------------------------------------------------------+-----------------------------------------------+
|movieId|title                                                    |genres                                         |
+-------+---------------------------------------------------------+-----------------------------------------------+
|34     |Babe (1995)                                              |Children|Drama                                 |
|50     |Usual Suspects, The (1995)                               |Crime|Mystery|Thriller                         |
|185    |Net, The (1995)                                          |Action|Crime|Thriller                          |
|253    |Interview with the Vampire: The Vampire Chronicles (1994)|Drama|Horror                                   |
|318    |Shawshank Redemption, The (1994)                         |Crime|Drama                                    |
|357    |Four Weddings and a Funeral (1994)                       |Comed

In [68]:
# Show top 10 movie recommendations for user
# userRecommendation(userid, model_name)
userRecommendation(8, model_bias)

+-------+------------------------------------------------------+---------------------------+
|movieId|title                                                 |genres                     |
+-------+------------------------------------------------------+---------------------------+
|1124   |On Golden Pond (1981)                                 |Drama                      |
|6238   |Green Card (1990)                                     |Comedy|Drama|Romance       |
|688    |Operation Dumbo Drop (1995)                           |Action|Adventure|Comedy|War|
|720    |Wallace & Gromit: The Best of Aardman Animation (1996)|Adventure|Animation|Comedy |
|1272   |Patton (1970)                                         |Drama|War                  |
|647    |Courage Under Fire (1996)                             |Action|Crime|Drama|War     |
|1093   |Doors, The (1991)                                     |Drama                      |
|5048   |Snow Dogs (2002)                                      |Advent

In [70]:
userRecommendation(8, model_als)

+-------+-------------------------------------------------------------+-------------------------+
|movieId|title                                                        |genres                   |
+-------+-------------------------------------------------------------+-------------------------+
|1461   |Vegas Vacation (National Lampoon's Las Vegas Vacation) (1997)|Comedy                   |
|194    |Smoke (1995)                                                 |Comedy|Drama             |
|5080   |Slackers (2002)                                              |Comedy                   |
|994    |Big Night (1996)                                             |Comedy|Drama             |
|2340   |Meet Joe Black (1998)                                        |Romance                  |
|1147   |When We Were Kings (1996)                                    |Documentary              |
|97225  |Hotel Transylvania (2012)                                    |Animation|Children|Comedy|
|79242  |Kids Are Al

The results clearly suggest that als+bias model recommend more precise movie to the user.