## Movie Recommender System with Apache Spark

### Abstract
In this project,  we will use an Alternating Least Squares (ALS) algorithm with Spark APIs to predict the ratings for the movies in [MovieLens dataset](https://grouplens.org/datasets/movielens/latest/). The recommendation model is trained in Spark using matrix factorization method and alternating least squares technique.

In [2]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import math

In [3]:
import os
os.environ["PYSPARK_PYTHON"] = "python3"

## Part1: Data ETL and Data Exploration

In [5]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("moive analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [6]:
movies = spark.read.load("/FileStore/tables/movies.csv", format='csv', header = True)
ratings = spark.read.load("/FileStore/tables/ratings.csv", format='csv', header = True)
links = spark.read.load("/FileStore/tables/links.csv", format='csv', header = True)
tags = spark.read.load("/FileStore/tables/tags.csv", format='csv', header = True)

In [7]:
movies.show(5)

In [8]:
ratings.show(5)

In [9]:
tags.show(5)

In [10]:
tmp1 = ratings.groupBy("userID").count().toPandas()['count'].min()
tmp2 = ratings.groupBy("movieId").count().toPandas()['count'].min()
print('For the users that rated movies and the movies that were rated:')
print('Minimum number of ratings per user is {}'.format(tmp1))
print('Minimum number of ratings per movie is {}'.format(tmp2))

In [11]:
tmp1 = sum(ratings.groupBy("movieId").count().toPandas()['count'] == 1)
tmp2 = ratings.select('movieId').distinct().count()
print('{} out of {} movies are rated by only one user'.format(tmp1, tmp2))

## Part 1: Spark SQL and OLAP

In [13]:
movies.registerTempTable("movies")
ratings.registerTempTable("ratings")
links.registerTempTable("links")
tags.registerTempTable("tags")

### 1: The number of Users

In [15]:
%sql
select count(distinct userId) as numUsers from ratings 

numUsers
283228


### 2: The number of Movies

In [17]:
%sql 
select count(distinct movieId) as numMovies from ratings

numMovies
53889


### 3:  How many movies are rated by users? List movies not rated before

In [19]:
%sql
select count(distinct movieId) as numMovies from movies

numMovies
58098


In [20]:
%sql
select distinct title from movies where movies.movieId not in (select movies.movieId from (movies inner join ratings on ratings.movieId==movies.movieId)) limit 10

title
Agency (1980)
Blood Link (1982)
Night Eyes II (1991)
Younger and Younger (1993)
The Moonlighter (1953)
Battle of Rogue River (1954)
Siege at Red River (1954)
No Minor Vices (1948)
Musical Chairs (2012)
Zubeidaa (2001)


### 4: List Movie Genres

In [22]:
import pyspark.sql.functions as f
from functools import reduce

df = movies.withColumn('MovieGenres', f.split(movies['genres'], r'\|'))
df.select(f.explode('MovieGenres').alias('MovieGenres')).groupby('MovieGenres').count().show()

### 5: Movie for Each Category

In [24]:
import pyspark.sql.functions as f
from functools import reduce

df = movies.withColumn('MovieGenres', f.split(movies['genres'], '\|'))
allcategories = df.select('title', f.explode('MovieGenres').alias('MovieGenres')).groupby('MovieGenres').agg(f.collect_set('title').alias('Movies'))
display(df.select('title', f.explode('MovieGenres').alias('MovieGenres')).groupby('MovieGenres').count())

MovieGenres,count
Crime,5105
Romance,7412
Thriller,8216
Adventure,4067
Drama,24144
War,1820
Documentary,5118
Fantasy,2637
Mystery,2773
Musical,1113


## Part2: Spark ALS based approach for training model
We will use an Spark ML to predict the ratings, so let's reload "ratings.csv" using ``sc.textFile`` and then convert it to the form of (user, item, rating) tuples.

In [26]:
from pyspark.mllib.recommendation import ALS

In [27]:
movie_rating = sc.textFile("/FileStore/tables/ratings.csv")

In [28]:
header = movie_rating.take(1)[0]
rating_data = movie_rating.filter(lambda line: line!=header).map(lambda line: line.split(",")).map(lambda tokens:(int(tokens[0]), int(tokens[1]), float(tokens[2]))).cache()

In [29]:
# create dataframe from rdd
df_rating_data = spark.createDataFrame(rating_data)
rating_data = df_rating_data \
                    .withColumnRenamed('_1', 'userId') \
                    .withColumnRenamed('_2', 'movieId') \
                    .withColumnRenamed('_3', 'rating')

In [30]:
rating_data.show(5)


Now we split the data into training/validation/testing sets using a 6/2/2 ratio.

In [32]:
train, validation, test = rating_data.randomSplit([0.6,0.2,0.2],seed = 7856)

In [33]:
train.cache()

In [34]:
validation.cache()

In [35]:
test.cache()

### ALS Model Selection and Evaluation

With the ALS model, we can use a grid search to find the optimal hyperparameters.

In [37]:
from math import sqrt
from pyspark.ml.evaluation import RegressionEvaluator

def train_ALS(train_data, validation_data, num_iters, reg_param, ranks):
  min_error = float('inf')
  best_rank = -1
  best_regularization = 0
  best_model = None
  for rank in ranks:
    for reg in reg_param:
      # train ALS model  
      model = ALS.train(train_data, rank, num_iters, reg)
      # Evaluate the model on training data
      # make prediction
      pred = model.predictAll(validation_data.rdd.map(lambda x:(x[0],x[1]))) 
      true_reorg = validation_data.rdd.map(lambda x:((x[0],x[1]), x[2]))
      # get the rating result
      pred_reorg = pred.map(lambda x:((x[0],x[1]), x[2]))
      # get the RMSE
      error = sqrt(true_reorg.join(pred_reorg).map(lambda r: (r[1][0] - r[1][1])**2).mean())
      print ('{} latent factors and regularization = {}: validation RMSE is {}'.format(rank, reg, error))
      if error < min_error:
          min_error = error
          best_rank = rank
          best_regularization = reg
          best_model = model
  print ('\nThe best model has {} latent factors and regularization = {}'.format(best_rank, best_regularization))
  return best_model, best_rank, best_regularization

In [38]:
num_iterations = 10
ranks = [6, 8, 10, 12, 14]
reg_params = [0.05, 0.1, 0.2, 0.4, 0.8]

import time
start_time = time.time()
final_model, best_rank, best_regularization = train_ALS(train, validation, num_iterations, reg_params, ranks)

print ('Total Runtime: {:.2f} seconds'.format(time.time() - start_time))

In [39]:
# plot the learning curve 
import matplotlib.pyplot as plt

def calculate_learning_curve(iter_array, train_data, validation_data, rank, reg_param):
    train_mse =[]
    validation_mse = []
    for n_iter in iter_array:
      pred = ALS.train(train_data, rank, n_iter, reg_param).predictAll(train_data.rdd.map(lambda x:(x[0],x[1]))) 
      true_reorg = train_data.rdd.map(lambda x:((x[0],x[1]), x[2]))
      pred_reorg = pred.map(lambda x:((x[0],x[1]), x[2]))
      rmse_train = sqrt(true_reorg.join(pred_reorg).map(lambda r: (r[1][0] - r[1][1])**2).mean())
      
      pred = ALS.train(train_data, rank, n_iter, reg_param).predictAll(validation_data.rdd.map(lambda x:(x[0],x[1]))) 
      true_reorg = validation_data.rdd.map(lambda x:((x[0],x[1]), x[2]))
      pred_reorg = pred.map(lambda x:((x[0],x[1]), x[2]))
      rmse_validation = sqrt(true_reorg.join(pred_reorg).map(lambda r: (r[1][0] - r[1][1])**2).mean())

      train_mse.append(rmse_train)
      validation_mse.append(rmse_validation)
    return train_mse, validation_mse

def plot_learning_curve(iter_array, train_data, validation_data, rank, reg_param):
    train_mse, validation_mse = calculate_learning_curve(iter_array, train_data, validation_data, rank, reg_param)
    fig, ax = plt.subplots()
    ax.plot(iter_array, train_mse, label='Training')
    ax.plot(iter_array, validation_mse, label='Validation')
    ax.set_xlabel('iterations')
    ax.set_ylabel('MSE')
    ax.legend(loc='best')
    return fig

'''List of numbers of iterations to train for each step of the learning curve'''
iter_array = [1, 2, 5, 10]
display(plot_learning_curve(iter_array, train, validation, best_rank, best_regularization))

### Model testing
And finally, make a prediction and check the testing error.

In [41]:
n_iter = 2
pred = ALS.train(train, best_rank, n_iter, best_regularization).predictAll(test.rdd.map(lambda x:(x[0],x[1]))) 
true_reorg = test.rdd.map(lambda x:((x[0],x[1]), x[2]))
pred_reorg = pred.map(lambda x:((x[0],x[1]), x[2]))
rmse_test = sqrt(true_reorg.join(pred_reorg).map(lambda r: (r[1][0] - r[1][1])**2).mean())
print("RMSE = "+str(rmse_test))

### Apply the Model

In [43]:
all_model = ALS.train(rating_data, best_rank, n_iter, best_regularization)
all_pred = all_model.predictAll(rating_data.rdd.map(lambda x:(x[0],x[1])))
true_reorg = rating_data.rdd.map(lambda x:((x[0],x[1]), x[2]))
pred_reorg = all_pred.map(lambda x:((x[0],x[1]), x[2]))
rmse = sqrt(true_reorg.join(pred_reorg).map(lambda r: (r[1][0] - r[1][1])**2).mean())
print ("RMSE = "+str(rmse))
alldata_df = spark.createDataFrame(all_pred)
alldata_df.registerTempTable("alldata")

In [44]:
 %sql  select * from alldata limit 5

user,product,rating
228384,4225,3.402714317791509
228384,48385,2.670572914865115
228384,4641,3.278594937608744
228384,2497,2.81785506852576
228384,1569,2.9942789406179133


In [45]:
rating_data.registerTempTable("rating_data")

In [46]:
%sql select * from movies join alldata on movies.movieId=alldata.product limit 10

movieId,title,genres,user,product,rating
4225,"Dish, The (2001)",Comedy,228384,4225,3.402714317791509
48385,Borat: Cultural Learnings of America for Make Benefit Glorious Nation of Kazakhstan (2006),Comedy,228384,48385,2.670572914865115
4641,Ghost World (2001),Comedy|Drama,228384,4641,3.278594937608744
2497,Message in a Bottle (1999),Romance,228384,2497,2.81785506852576
1569,My Best Friend's Wedding (1997),Comedy|Romance,228384,1569,2.9942789406179133
4033,Thirteen Days (2000),Drama|Thriller|War,228384,4033,3.285891326032298
88129,Drive (2011),Crime|Drama|Film-Noir|Thriller,228384,88129,3.084874894477505
4993,"Lord of the Rings: The Fellowship of the Ring, The (2001)",Adventure|Fantasy,228384,4993,3.3148891744977624
2433,"Civil Action, A (1998)",Drama,228384,2433,2.932668727559661
3681,For a Few Dollars More (Per qualche dollaro in più) (1965),Action|Drama|Thriller|Western,228384,3681,3.6481557235988618


## Recommend moive to users with id: 575, 232.
you can choose some users to recommend the moives

In [48]:
movieTitle = movies.select('movieId','title').rdd.collectAsMap()

recommandP = all_model.recommendProducts(575, 5)  # Recommend 5 movies to user 575
for p in recommandP:
  print("Recommend to user", str(p[0]), "with movie: ", movieTitle[str(p[1])], " and recommendation score：", p[2])

In [49]:
recommandP = all_model.recommendProducts(232, 5)  # Recommend 5 movies to user 232
for p in recommandP:
  print("Recommend to user", str(p[0]), "with movie: ", movieTitle[str(p[1])], " and recommendation score：", p[2])

## Find the similar moives for moive with id: 463, 471
You can find the similar moives based on the ALS results

In [51]:
recommandU = all_model.recommendUsers(709, 5)  # Recommend movie 463 to 5 users
for u in recommandU:
  print("Recommend movie ", movieTitle[str(u[1])], "to user ", str(u[0]), " and recommendation score：", u[2])

In [52]:
recommandU = all_model.recommendUsers(471, 5)  # Recommend movie 471 to 5 users
for u in recommandU:
  print("Recommend movie ", movieTitle[str(u[1])], "to user ", str(u[0]), " and recommendation score：", u[2])

## Write the report

motivation: In this project, a movie recommendation system is built based on the MovieLens 265 M dataset. We used ALS method to predict user’s movie rating so we can recommend movies to customers, which they potentially give high ratings according to prediction. Recommendation systems are used to provide personalized recommendations according to user profile and previous behavior. Recommender systems cannot only help the users find their favorite products, but also bring potential profit to online service providers.

1. step1:
First, I conduct SQL OLAP. For example, we list movie genres and check which movies are in each genre.

2. step2:
Then, I start to figure out how to train an ALS model for recommendation system. Above all, after preprocessing the ratings data, I split the dataset into three sets: train, validation and test. With grid search, we choose the best hyperparameters, where the root mean square error (RMSE) is calculated to carry out evaluation. And with the best hyperparameters, I plot the learning curve of different iteration numbers. 

3. step3:
Second, With the model with best hyperparameters and best iteration number, I conduct the final model testing for the hold-out set. The RMSE for evaluation is 0.91, which means on average, the predictions is 0.91 units away from the true ratings.

4. step4:
Last, I utilize the final model to recommend the movies to users, as well as find the similar movies with specified movie id.

output and conclusion: The output is recommendation suggestion for specified user id and specified movie id. In conclusion, ALS is used to predict user’s movie rating. The MovieLens dataset, which has 100 thousand ratings, is selected in our project and divided into training, validation and test set. The RMSE method is used for algorithm evaluation. According to evaluation result, our movie recommender system has pretty good prediction performance.