## Build a Movie Recommendation Model with Apache Spark and Amazon EMR

**COMP4651 2017 Spring Project**

*O Pui Wai, opw0011 | CHAN Hon Sum, samchan1995 | TSUI Ka Wai, kwtsui *

<img src="https://raw.githubusercontent.com/hortonworks/data-tutorials/1f3893c64bbf5ffeae4f1a5cbf1bd667dcea6b06/tutorials/hdp/hdp-2.6/intro-to-machine-learning-with-apache-spark-and-apache-zeppelin/assets/spark-mllib-logo.png" align="left" height="100" >
<img src="https://conceptdraw.com/a3131c3/p1/preview/640/pict--amazon-emr-aws-analytics-vector-stencils-library" width="140">


### Introduction

In this notebook, we are going to train a movie recommendation model using Spark MLib, a Spark's machine learning library . With the trained model, we can predict the movie ratings that a user is likely to give. As a result, we can make tailor-made moive recommendations based on the predicted rating, such listing out the top 25 highest rating moives that a user is likely to watch.   


### Project Setup

The dataset is placed on AWS S3. In order to access the files, fill in the `ACCESS_KEY`, `SECRETE_KEY` and `AWS_BUCKET_NAME`.

In [2]:
# Set up AWS S3 access credentials
ACCESS_KEY = "FILL_IN_YOUR_KEY_HERE"
SECRET_KEY = "FILL_IN_YOUR_KEY_HERE"
ENCODED_SECRET_KEY = SECRET_KEY.replace("/", "%2F")
AWS_BUCKET_NAME = "comp4651-movie-data"

#### Dataset Preparation

The dataset that is used for traning the model is downloaded from [MovieLens](https://grouplens.org/datasets/movielens/), a movie recommendation service.

We have downloaded two set the data:

Small: 100,000 ratings and 1,300 tag applications applied to 9,000 movies by 700 users. (compressed size: 1 MB) 

- [ml-latest-small.zip](http://files.grouplens.org/datasets/movielens/ml-latest-small.zip) (Last updated 10/2016)

Full: 24,000,000 ratings and 670,000 tag applications applied to 40,000 movies by 260,000 users. (compressed size: 224 MB)
- [ml-latest.zip](http://files.grouplens.org/datasets/movielens/ml-latest.zip) (Last updated 10/2016)

Then, we have uploaded all the files to AWS S3:

![s3](https://cloud.githubusercontent.com/assets/10897048/26273579/459ee37a-3d65-11e7-8460-215adf4e7335.png)

Since the raw data are in csv format, we need to write a function to convert the raw data into Spark DataFrame.

In [4]:
# Convert csv file to Spark DataFrame (Databricks version)
def loadDataFrame(fileName, fileSchema):
  return (spark.read.format("csv")
                    .schema(fileSchema)
                    .option("header", "true")
                    .option("mode", "DROPMALFORMED")
                    .csv("s3a://%s:%s@%s/%s" % (ACCESS_KEY, ENCODED_SECRET_KEY, AWS_BUCKET_NAME, fileName)))

#### Dataset Summary

- `ratings-small.csv` is a small dataset extracted from the full rating dataset. It consists of the users' movie ratings, and has the following format:

 - `userId,movieId,rating,timestamp`

- `movies-small.csv` is  a small dataset extracted from the full movie dataset. It consists of the details of the moive, and has the following format:

 - `movieId,title,genres`

In order to parse the csv into DataFrame, we have to explicitly specify the data type of each field.

In [6]:
from pyspark.sql.types import *

movieRatingSchema = StructType([
    StructField("userId", IntegerType(), True),
    StructField("movieId", IntegerType(), True),
    StructField("rating", FloatType(), True),
    StructField("timestamp", StringType(), True)])

movieSchema = StructType([
    StructField("movieId", IntegerType(), True),
    StructField("title", StringType(), True),
    StructField("genres", StringType(), True)])

smallMovieRatingsDF = loadDataFrame("ratings-small.csv", movieRatingSchema).cache()
smallMoviesDF = loadDataFrame("movies-small.csv", movieSchema).cache()

After successfully created the Dataframe, we can verify it by printing the DataFrame schema. 

Also, we can write some code using the [Spark DataFrame API](http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame) to get a more detailed dataset summary.

In [8]:
# Print out the DataFrame shcema, and a few lines as example
smallMovieRatingsDF.printSchema()
smallMovieRatingsDF.show(5)

smallMoviesDF.printSchema()
smallMoviesDF.show(5)

In [9]:
from pyspark.sql.functions import mean, min, max, stddev

# Data summary of the dataset
print "Number of ratings: %s" % (smallMovieRatingsDF.count())
print "Number of distinct users: %s" % (smallMovieRatingsDF.select('userId').distinct().count())
print "Number of distinct movies: %s" % (smallMovieRatingsDF.select('movieId').distinct().count())
smallMovieRatingsDF.select([mean('rating'), min('rating'), max('rating'), stddev('rating')]).show()
smallMovieRatingsDF.groupBy('rating').count().orderBy('rating').show()

### Model Training

#### Overview

MLlib is Spark’s machine learning (ML) library. It provides high-level machine learning API to develop scalable machine learning program easily.

Spark MLLib provides two machine learning packages, namely `spark.mllib` and `spark.ml`.

`spark.mllib` consists of RDD-based APIs that supports RDD as the input parameters. However, it is expected that in Spark 2.2, the RDD-based API will be deprecated.

On the contrary, the DataFrame-based APIs in `spark.ml` is the latest trend in Spark, because DataFrames provide a more user-friendly API than RDDs. Besides, it has better execution performance, such as the Catalyst optimizations.

For more detailed explanations, please visit [Spark's offical MLlib guide](https://spark.apache.org/docs/2.1.0/ml-guide.html).

As suggested by Spark offical programming guide, we will mainly use the DataFrame-based [pyspark.ml.recommendation module](https://spark.apache.org/docs/2.1.0/api/python/pyspark.ml.html#module-pyspark.ml.recommendation) for doing collaborative filtering.

#### Collaborative Filtering

Coolaborative filtering make predictions about a user's interests by collecting preferences information from many users. It is based on the assumption that if a user A has the same opinion as a user B on an issue, then A is more likely to have B's opinion on a different issue. The image below illustrates this idea:

![collaborative Filtering](https://upload.wikimedia.org/wikipedia/commons/5/52/Collaborative_filtering.gif) 

*(Image from [Wikipedia](https://en.wikipedia.org/wiki/Collaborative_filtering))*

In Spark ML library, it uses Alternating Least Squares (ALS) matrix factorization to perform recommendation. 

- rank is the number of latent factors in the model. In other words, it referes to the number of hidden factors of the real model.

- regParam specifies the regularization parameter in ALS.

Of course we do not know how many underlying factors for the real model, so we have to make a reasonalble guess on those paramters, and choose the best combination after doing evaluations.  


#### Pick the Best ALS Parameters with Samll Dataset

In order to determine a good combination of model training parameters, we will split the small dataset into 3 non-overlapping subsets, namely traning (60%), validation (20%) and testing (20%), with [randomSplit](http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.randomSplit).

Then, we have designed two list of parameters that will be used for traning:
```
ranks = [2, 4, 8, 12, 16, 20, 24]
regParams = [0.01, 0.05, 0.1, 0.15, 0.2, 0.3]
```
We are going to train multiple models based on the training set. Then, we will pick the best model on the validation set with the least RMSE (Root Mean Squared Error). Finally, we will evaluate the best model on the testing set.

In [11]:
# Partition the dataset into traning, validation and testing for cross-validation
(trainingSet, validationSet, testingSet) = smallMovieRatingsDF.randomSplit([0.6, 0.2, 0.2], seed=12345)
training = trainingSet.cache()
validation = validationSet.cache()
testing = testingSet.cache()

In [12]:
# Use ml instead of mlib for Dataframes
# http://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.recommendation.ALS
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import Row
from time import time

ranks = [2, 4, 8, 12, 16, 20, 24]
regParams = [0.01, 0.05, 0.1, 0.15, 0.2, 0.3]
minError = float('inf')
bestRank = -1
bestRegParam = -1
bestModel = None

# An RMSE evaluator using the rating and predicted rating columns
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
# Initialize the ASL(Alternating Least Squares)
als = ALS(userCol = "userId", itemCol = "movieId", ratingCol = "rating", seed = 12345)

timeBegin = time()

for regParam in regParams:
  for rank in ranks:
    # Build the recommendation model using ALS on the training data
    als.setParams(rank = rank, regParam = regParam)
    model = als.fit(training)

    # Evaluate the model by computing the RMSE on the validation data
    predictions = model.transform(validation)
    predictions = predictions.dropna() # drop all NaN prediction value to ensure not to have NaN RMSE (due to SPARK-14489)
    error = evaluator.evaluate(predictions)
    
    if error < minError:
      bestRank = rank
      bestRegParam = regParam
      minError = error
      bestModel = model
    print 'For rank %s, regParams %s, the RMSE is %s' % (rank, regParam, error)

timeElapsed = time() - timeBegin
print "Trained %s models in %s seconds" % (len(ranks) * len(regParams), round(timeElapsed, 3))
print("Best Rank = %s, Best regParam = %s, with RMSE = %s"  % (bestRank, bestRegParam, minError))

In [13]:
# After getting the best rank and RegParam, test the model on test dataset
predictions = bestModel.transform(testing)
predictions = predictions.dropna() # drop all NaN prediction value to ensure not to have NaN RMSE (due to SPARK-14489)
rmse = evaluator.evaluate(predictions)
print("The model had a RMSE of %s on test dataset"  % (rmse))

#### Train the Model with Full Dataset

After we have found out the best training parameters with the samll dataset, now we are going to use them for training the full dataset (~600MB in size).

During the traning, we will record the time spent to train the whole dataset.

In [15]:
# Train the full data set and calculate the time elapsed
MovieRatingsDF = loadDataFrame("ratings.csv", movieRatingSchema).cache()
MoviesDF = loadDataFrame("movies.csv", movieSchema).cache()

In [16]:
# Data summary of the full dataset on movie rating
print "Number of ratings: %s" % (MovieRatingsDF.count())
print "Number of distinct users: %s" % (MovieRatingsDF.select('userId').distinct().count())
print "Number of rated distinct movies: %s" % (MovieRatingsDF.select('movieId').distinct().count())
print "Total number of movies: %s" % (MoviesDF.select('movieId').count())

MovieRatingsDF.select([mean('rating'), min('rating'), max('rating'), stddev('rating')]).show()

print "Distribution of ratings:"
MovieRatingsDF.groupBy('rating').count().orderBy('rating').show()
RatingsCountGroupByMovieId = MovieRatingsDF.groupBy('movieId').count()
print "Average number of ratings per movie: %s" % (RatingsCountGroupByMovieId.select(mean('count')).first())

In [17]:
als.setParams(rank = bestRank, regParam = bestRegParam)
print "Training full data set with Rank = %s, regParam = %s ..." % (bestRank, bestRegParam)

timeBegin = time()

model = als.fit(MovieRatingsDF) # use the full dataset for training

timeElapsed = time() - timeBegin

print "Final model trained in %s seconds" % round(timeElapsed, 3)

In [18]:
# Evaluate the performance of the final model with the testing data
predictions = model.transform(testing)
predictions = predictions.dropna() # drop all NaN prediction value to ensure not to have NaN RMSE (due to SPARK-14489)
rmse = evaluator.evaluate(predictions)
print("The final model had a RMSE of %s"  % (rmse))

### Moive Recommendations

After we have trained the model, now it is time to use the model to predict users' movie ratings.

The high level idea is like that:

1. We pick a user, and get a list of movie that he/she has rated.

2. From the full movie list, we exclude all the moive that the user has rated, and construct a list of un-rated movie list.

3. The movies in the un-rated list are now in the pool of recommendation. They are then inputed into the model.

4. Now, each un-rated movies will get a unique predicted rating based other similar users' ratings. 

5. Finally, we simply do a sorting to find the top N moives with the highest predicted rating. Besides, we can also make recommendation based on moive's categories.

#### Our First Try

We follow the idea illustrated above and pick a user with UserId = 1000, and try to make a movie recommendation to him/her.

In the following example, we try to print out some movies that he/she has and has not watched before respectively, and then make predictions on all the unwatched movies.

In [20]:
from pyspark.sql.functions import lit
UserId = 1000
userWatchedList = (MovieRatingsDF.filter(MovieRatingsDF.userId == UserId)
                                 .join(MoviesDF, 'movieId')
                                 .select(['movieId', 'userId', 'title', 'rating']))
watchedMovieList = []
for movie in userWatchedList.collect():
  watchedMovieList.append(movie.movieId)
print "User %s has watched and rated %s moive (sorted by rating):" % (UserId, len(watchedMovieList)) 
userWatchedList.orderBy('rating', ascending = False).show(20, False)

# find out the unwatched list and append with the userid
userUnwatchedList = (MoviesDF.filter(MoviesDF.movieId.isin(watchedMovieList) == False).withColumn('userId', lit(UserId)).cache())
print "%s unwatched movie:" % (userUnwatchedList.count())
userUnwatchedList.show(20, False)

predictedMovies = model.transform(userUnwatchedList)
predictedMovies = predictedMovies.dropna().cache() # drop all NaN prediction value to ensure not to have NaN RMSE (due to SPARK-14489)

#### Our First Try Result

As you can see in the folliwng, we have list out the top 25 moives with the highest rating. 

However, we got some strange movies that have not be heard before. For example, take "The Thorn (1971)" as an example, try to search it on IMDB.

![the-thorn](https://cloud.githubusercontent.com/assets/10897048/26272746/1e6cdc38-3d53-11e7-9483-554bf6b98e2d.png)

**The problem is ONLY 48 people have rated this moive!**

Since the sampling size is relatively too small, it may not be representative. There might be some biases on the ratings. Besides, a user may not be interested in watching unpopular moive. In order to fix this problem, we need to filter out those unknown movies.

In [22]:
print "Top 25 predicted movie with highest rating:"
top25Movies = predictedMovies.orderBy('prediction', ascending = False).show(25, False)

print "Top 25 commedy with highest rating:"
top25Comedy = (predictedMovies.filter(predictedMovies.genres.like("%Comedy%"))
                              .orderBy('prediction', ascending = False)
                              .show(25, False))

#### Our Second Try

This time we will filter out movies with less than 50 ratings, and then exclude them in the recommendation pool. We have excluded 27575 moives with less than 50 ratings.

In [24]:
N = 50
MovieWithLessThanNRatings = RatingsCountGroupByMovieId.filter('count <' + str(N))
print "Movies with less than %s rating count: %s" % (N, MovieWithLessThanNRatings.count())

movieToBeExcluded = []
for movie in MovieWithLessThanNRatings.collect():
  movieToBeExcluded.append(movie.movieId)
  
userUnwatchedListWithAtLeastNRatings = userUnwatchedList.filter(userUnwatchedList.movieId.isin(movieToBeExcluded) == False).cache()

In [25]:
timeBegin = time()

predictedMovies = model.transform(userUnwatchedListWithAtLeastNRatings)
predictedMovies = predictedMovies.dropna().cache() # drop all NaN prediction value to ensure not to have NaN RMSE (due to SPARK-14489)

timeElapsed = time() - timeBegin
print "Final model make prediction in %s seconds" % round(timeElapsed, 3)

#### Our Second Try Result

This time you can see that in our top recommendations list, all those unpopular movies has been excluded. 

Now, the top recommendation is "Planet Earth (2006)", which looks to be a suprisingly good rating moive that worth to be watched! 

![planet-earth](https://cloud.githubusercontent.com/assets/10897048/26272914/a4750c62-3d56-11e7-9f90-7b4c0882376c.png)

We can also list out the top rating based on the moive categories. 

**Now we have successfully build a practical movide recommendation model!**

In [27]:
print "Top 25 predicted movie with highest rating:"
timeBegin1 = time()
top25Movies = predictedMovies.orderBy('prediction', ascending = False).show(25, False)
timeElapsed1 = time() - timeBegin1
print "Took %s seconds" % round(timeElapsed1, 3)

print "Top 25 commedy with highest rating:"
timeBegin2 = time()
top25Comedy = (predictedMovies.filter(predictedMovies.genres.like("%Comedy%"))
                              .orderBy('prediction', ascending = False)
                              .show(25, False))
timeElapsed2 = time() - timeBegin2
print "Took %s seconds" % round(timeElapsed2, 3)

print "Top 25 Science Fiction with highest rating:"
timeBegin3 = time()
top25SciFi = (predictedMovies.filter(predictedMovies.genres.like("%Sci-Fi%"))
                             .orderBy('prediction', ascending = False)
                             .show(25, False))
timeElapsed3 = time() - timeBegin3
print "Took %s seconds" % round(timeElapsed3, 3)

print "Top 25 Animation with highest rating:"
timeBegin4 = time()
top25Anim = (predictedMovies.filter(predictedMovies.genres.like("%Animation%"))
                             .orderBy('prediction', ascending = False)
                             .show(25, False))
timeElapsed4 = time() - timeBegin4
print "Took %s seconds" % round(timeElapsed4, 3)

### Persisting the model

After we have trained the model, we may want to store it somewhere for later use, such as saving time when starting up the server. We can save it easily to AWS S3 by specifying the bucket name, access key, secrete key and the file path. 

The following are examples for saving and loading the trained ALS model:

In [29]:
timeBegin = time()

# Save the trained model to S3
model.write().overwrite().save("s3a://%s:%s@%s/%s" % (ACCESS_KEY, ENCODED_SECRET_KEY, AWS_BUCKET_NAME, "model"))

timeElapsed = time() - timeBegin
print "Save model took %s seconds" % round(timeElapsed, 3)

In [30]:
from pyspark.ml.recommendation import ALSModel

timeBegin = time()

# Load model previous saved model
model = ALSModel.load("s3a://%s:%s@%s/%s" % (ACCESS_KEY, ENCODED_SECRET_KEY, AWS_BUCKET_NAME, "model"))

timeElapsed = time() - timeBegin
print "Load model took %s seconds" % round(timeElapsed, 3)