### Set up

In [None]:
#mounting drive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Overview

In this project, we will use collaborative filtering using a matrix factorization algorithm called `Alternating Least Squares (ALS)` with Spark APIs to predict the ratings for the movies in [MovieLens Datasets](https://grouplens.org/datasets/movielens/latest/)

[Alternating Least Squares](https://endymecy.gitbooks.io/spark-ml-source-analysis/content/%E6%8E%A8%E8%8D%90/papers/Large-scale%20Parallel%20Collaborative%20Filtering%20the%20Netflix%20Prize.pdf) is one of the low-rank matrix approximation algorithms for collaborative filtering. ALS decomposes the user-item matrix into two low-dimensional matrixes: user matrix and item matrix. In collaborative filtering, users and products are described by a small set of latent factors that can be used to predict missing entries. And ALS algorithm learns these latent factors by matrix factorization.

## Collaborative Filtering using ALS Matrix Factorization

**Q: What is matrix factorization?**

A: Matrix factorization is a set of algorithms that factor a matrix into the product of multiple matrices. 

**Q: Why is matrix factorization useful for recommender systems?**

A: We can factor the items-users interaction matrix into the product of 2 lower dimensional rectangular matrices: items-matrix and user-matrix. Here each row of the items matrix can be seen as a vector representation of an item, and each column of the user matrix can be seen as a vector representation of a user.

![matrix_factorization](https://raw.githubusercontent.com/wjlgatech/amazing_recommender/master/images/matrix_factorization.png)


The elements of the item vector do not have a necessarily interpretable meaning by themselves, but the whole vector represents how that item's features interact with users. Therefore these features are called latent (e.g.hidden, or meaning unclear if you like) features.
$$ \hat{R}_{u,i} = \sum_{f=1:n} U_{u,f} I_{f,i}$$

$ \hat{R}_{u,i}$ is the Rating of item `i` given by user `u` equals the dot product of `user latent vector` $U_u$ and `item latent vector` $I_i$.

**Q: How do you choose the number of latent factors `n` in the above formula?**

A: The number of latent factors is a tunable hyperparameter. It's very similar to PCA, where you can choose the number of PC, which affects how much data variance you can capture.

When you increase the number of latent factors, personalization will increase until n becomes too big and the model starts to overfit. 

In order to prevent overfitting, we add **regularization terms** to the objective function.

**Q: What's the final expression for the objective function?**

The **objective function** of matrix factorization is designed to help minimize the error between true rating and predicted rating.

$$arg min_{U,I} ||R - \hat{R}||+\alpha ||U|| + \beta ||I||$$

There are also regularization terms for the user factors ($\alpha ||U||$) and for the item factors ($\beta ||I||$).

**Q: How is the optimization problem setup?**

Once we have the objective function, we need to design an algorithm to solve the optimization problem.

- In case the data is small enough can be fit into a single machine, you can use **Funk SVD**, an algo developed by Simon Funk during the 2006 Netflix Challenge. 

- In case the data is too big to fit into a single machine, you can use Alternating Least Square (ALS). Its Apache Spark version can run parallelly across a cluster of machines. 

ALS has some important hyper-parameters to tune during  validation or Cross Validation:

- **maxIter**: the maximum number of iterations to run (defaults to 10)

- **rank**: the number of latent factors in the model (defaults to 10)

- **regParam**: the regularization parameter in ALS (defaults to 1.0)

The following code show how to tune these hyperparameters in order to produce the best model:

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
from pyspark.ml.recommendation import ALS


def tune_ALS(train_data, validation_data, maxIter, regParams, ranks):
    """
    grid search function to select the best model based on RMSE of
    validation data
    Parameters
    ----------
    train_data: spark DF with columns ['userId', 'movieId', 'rating']
    
    validation_data: spark DF with columns ['userId', 'movieId', 'rating']
    
    maxIter: int, max number of learning iterations
    
    regParams: list of float, one dimension of hyper-param tuning grid
    
    ranks: list of float, one dimension of hyper-param tuning grid
    
    Return
    ------
    The best fitted ALS model with lowest RMSE score on validation data
    """
    # initial
    min_error = float('inf')
    best_rank = -1
    best_regularization = 0
    best_model = None
    for rank in ranks:
        for reg in regParams:
            # get ALS model
            als = ALS().setMaxIter(maxIter).setRank(rank).setRegParam(reg)
            # train ALS model
            model = als.fit(train_data)
            # evaluate the model by computing the RMSE on the validation data
            predictions = model.transform(validation_data)
            evaluator = RegressionEvaluator(metricName="rmse",
                                            labelCol="rating",
                                            predictionCol="prediction")
            rmse = evaluator.evaluate(predictions)
            print('{} latent factors and regularization = {}: '
                  'validation RMSE is {}'.format(rank, reg, rmse))
            if rmse < min_error:
                min_error = rmse
                best_rank = rank
                best_regularization = reg
                best_model = model
    print('\nThe best model has {} latent factors and '
          'regularization = {}'.format(best_rank, best_regularization))
    return best_model

**Q: How does matrix factorization improve our recommendations (compared to, for example, KNNs)?**
1. Model learns to factor rating matrix into user-matrix and item-matrix, allowing us to predict personalized item rating for each user.

1. Less-popular items can have rich latent representations as much as more-popular items, improving the recommender's ability to recommend less-popular items.



In [None]:
import os
import time

# spark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import UserDefinedFunction, explode, desc
from pyspark.sql.types import StringType, ArrayType
from pyspark.mllib.recommendation import ALS

# data science imports
import math
import numpy as np
import pandas as pd

# visualization imports
import seaborn as sns
import matplotlib.pyplot as plt

%matplotlib inline

In [None]:
# spark config
spark = SparkSession \
    .builder \
    .appName("movie recommendation") \
    .config("spark.driver.maxResultSize", "96g") \
    .config("spark.driver.memory", "96g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.master", "local[12]") \
    .getOrCreate()
# get spark context
sc = spark.sparkContext

## Project Content
1. Load Data
2. Spark SQL and OLAP
3. Spark ALS-based approach for training model
4. ALS Model Selection and Evaluation
5. Model testing
6. Make movie recommendations to me


## Load Data

In [None]:
i_file = 'movies.csv' # item file
iu_file = 'ratings.csv' #item user interaction file 

In [None]:
# name the path as per your folder's structure and nomenclature
items = spark.read.load('drive/MyDrive/amazing_recommender/movies.csv', format='csv', header=True, inferSchema=True)
iu = spark.read.load('drive/MyDrive/amazing_recommender/ratings.csv' , format='csv', header=True, inferSchema=True)
links = spark.read.load('drive/MyDrive/amazing_recommender/links.csv', format='csv', header=True, inferSchema=True)
tags = spark.read.load('drive/MyDrive/amazing_recommender/tags.csv', format='csv', header=True, inferSchema=True)

In [None]:
# to de-couple code and data, define some standarized variables
u_id = 'userId'
i_id = 'movieId'
rating = 'rating'

### Basic inspection

In [None]:
items.show(3)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
+-------+--------------------+--------------------+
only showing top 3 rows



In [None]:
iu.show(3)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
+------+-------+------+---------+
only showing top 3 rows



In [None]:
links.show(3)

+-------+------+------+
|movieId|imdbId|tmdbId|
+-------+------+------+
|      1|114709|   862|
|      2|113497|  8844|
|      3|113228| 15602|
+-------+------+------+
only showing top 3 rows



In [None]:
tags.show(3)

+------+-------+---------------+----------+
|userId|movieId|            tag| timestamp|
+------+-------+---------------+----------+
|     2|  60756|          funny|1445714994|
|     2|  60756|Highly quotable|1445714996|
|     2|  60756|   will ferrell|1445714992|
+------+-------+---------------+----------+
only showing top 3 rows



## Spark SQL and OLAP

Below are the EDA questions we would like to ask:
1. What are the ratings?
1. What is the minimum number of ratings per user and the minimum number of ratings per movie?
1. How many movies are rated by only one user?
1. What is the total number of users in the data set?
1. What is the total number of movies in the data set?
1. How many movies are rated by users? List the movies not rated yet.
1. List all movie genres
1. Find out the number of movies for each category
1. Calculate the total rating count for every movie
1. Get a count plot for each rating

What are the ratings?

In [None]:
print('Distinct values of ratings:')
print(sorted(iu.select(rating).distinct().rdd.map(lambda r: r[0]).collect()))

Distinct values of ratings:
[0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5.0]


What are the minimum number of ratings per user and the minimum number of ratings per movie?

In [None]:
tmp1 = iu.groupBy(u_id).count().toPandas()['count'].min()
tmp2 = iu.groupBy(i_id).count().toPandas()['count'].min()
print('For the users that rated movies and the movies that were rated:')
print('Minimum number of ratings per user is {}'.format(tmp1))
print('Minimum number of ratings per movie is {}'.format(tmp2))

For the users that rated movies and the movies that were rated:
Minimum number of ratings per user is 20
Minimum number of ratings per movie is 1


How many movies are rated by only one user?

In [None]:
tmp1 = sum(iu.groupBy(i_id).count().toPandas()['count'] == 1)
tmp2 = iu.select(i_id).distinct().count()
print('{} out of {} movies are rated by only one user'.format(tmp1, tmp2))

3446 out of 9724 movies are rated by only one user


What is the total number of users in the data sets?

In [None]:
tmp = iu.select(i_id).distinct().count()
print('We have a total of {} distinct users in the data sets'.format(tmp))

We have a total of 9724 distinct users in the data sets


What is the total number of movies in the data sets?

In [None]:
tmp = items.select('movieID').distinct().count()
print('We have a total of {} distinct movies in the data sets'.format(tmp))

We have a total of 9742 distinct movies in the data sets


How many movies are rated by users? List movies not rated yet?

In [None]:
tmp1 = items.select(i_id).distinct().count()
tmp2 = iu.select(i_id).distinct().count()
print('We have a total of {} distinct movies that are rated by users in ratings table'.format(tmp2))
print('We have {} movies that are not rated yet'.format(tmp1-tmp2))

We have a total of 9724 distinct movies that are rated by users in ratings table
We have 18 movies that are not rated yet


In [None]:
# create a temp SQL table view for easier query
items.createOrReplaceTempView("movies")
iu.createOrReplaceTempView("ratings")
print('List movies that are not rated yet: ')
# SQL query (NOTE: WHERE ... NOT IN ... == ... LEFT JOIN ... WHERE ... IS NULL)
# Approach 1
spark.sql(
    "SELECT movieId, title "
    "FROM movies "
    "WHERE movieId NOT IN (SELECT distinct(movieId) FROM ratings)"
).show(10)
# Approach 2
# spark.sql(
#     "SELECT m.movieId, m.title "
#     "FROM movies m LEFT JOIN ratings r ON m.movieId=r.movieId "
#     "WHERE r.movieId IS NULL"
# ).show(10)

List movies that are not rated yet: 
+-------+--------------------+
|movieId|               title|
+-------+--------------------+
|   1076|Innocents, The (1...|
|   2939|      Niagara (1953)|
|   3338|For All Mankind (...|
|   3456|Color of Paradise...|
|   4194|I Know Where I'm ...|
|   5721|  Chosen, The (1981)|
|   6668|Road Home, The (W...|
|   6849|      Scrooge (1970)|
|   7020|        Proof (1991)|
|   7792|Parallax View, Th...|
+-------+--------------------+
only showing top 10 rows



List all movie genres

In [None]:
# define a udf for splitting the genres string
splitter = UserDefinedFunction(lambda x: x.split('|'), ArrayType(StringType()))
# query
print('All distinct genres: ')
items.select(explode(splitter("genres")).alias("genres")).distinct().show()

All distinct genres: 
+------------------+
|            genres|
+------------------+
|             Crime|
|           Romance|
|          Thriller|
|         Adventure|
|             Drama|
|               War|
|       Documentary|
|           Fantasy|
|           Mystery|
|           Musical|
|         Animation|
|         Film-Noir|
|(no genres listed)|
|              IMAX|
|            Horror|
|           Western|
|            Comedy|
|          Children|
|            Action|
|            Sci-Fi|
+------------------+



Find out the number of movies for each category

In [None]:
print('Counts of movies per genre')
items.select(i_id, explode(splitter("genres")).alias("genres")) \
    .groupby('genres') \
    .count() \
    .sort(desc('count')) \
    .show()

Counts of movies per genre
+------------------+-----+
|            genres|count|
+------------------+-----+
|             Drama| 4361|
|            Comedy| 3756|
|          Thriller| 1894|
|            Action| 1828|
|           Romance| 1596|
|         Adventure| 1263|
|             Crime| 1199|
|            Sci-Fi|  980|
|            Horror|  978|
|           Fantasy|  779|
|          Children|  664|
|         Animation|  611|
|           Mystery|  573|
|       Documentary|  440|
|               War|  382|
|           Musical|  334|
|           Western|  167|
|              IMAX|  158|
|         Film-Noir|   87|
|(no genres listed)|   34|
+------------------+-----+



## Spark ALS-based approach for training model
1. Reload data
2. Split data into train, validation, test
3. ALS model selection and evaluation
4. Model testing

### Reload data
We will use an RDD-based API from pyspark.mllib to predict the ratings, so let's reload "ratings.csv" using sc.textFile and then convert it to the form of (user, item, rating) tuples.

In [None]:
# load data
item_rating = sc.textFile('drive/MyDrive/amazing_recommender/ratings.csv')
# preprocess data -- only need ["userId", "movieId", "rating"]
header = item_rating.take(1)[0]
rating_data = item_rating \
    .filter(lambda line: line!=header) \
    .map(lambda line: line.split(",")) \
    .map(lambda tokens: (int(tokens[0]), int(tokens[1]), float(tokens[2]))) \
    .cache()
# check three rows
rating_data.take(3)

[(1, 1, 4.0), (1, 3, 4.0), (1, 6, 4.0)]


### Split data
We split the data into training/validation/testing sets using a 6/2/2 ratio.

In [None]:
train, validation, test = rating_data.randomSplit([6, 2, 2], seed=99)
# cache data
train.cache()
validation.cache()
test.cache()

PythonRDD[1493] at RDD at PythonRDD.scala:53

### ALS model selection and evaluation
The ALS model can use a grid search to find the optimal hyperparameters.

In [None]:
#export
def train_ALS(train_data, validation_data, num_iters, reg_param, ranks):
    """
    Grid Search Function to select the best model based on RMSE of hold-out data
    """
    # initial
    min_error = float('inf')
    best_rank = -1
    best_regularization = 0
    best_model = None
    for rank in ranks:
        for reg in reg_param:
            # train ALS model
            model = ALS.train(
                ratings=train_data,    # (userID, productID, rating) tuple
                iterations=num_iters,
                rank=rank,
                lambda_=reg,           # regularization param
                seed=99)
            # make prediction
            valid_data = validation_data.map(lambda p: (p[0], p[1]))
            predictions = model.predictAll(valid_data).map(lambda r: ((r[0], r[1]), r[2]))
            # get the rating result
            ratesAndPreds = validation_data.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
            # get the RMSE
            MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
            error = math.sqrt(MSE)
            print('{} latent factors and regularization = {}: validation RMSE is {}'.format(rank, reg, error))
            if error < min_error:
                min_error = error
                best_rank = rank
                best_regularization = reg
                best_model = model
    print('\nThe best model has {} latent factors and regularization = {}'.format(best_rank, best_regularization))
    return best_model

In [None]:
# hyper-param config
num_iterations = 10
ranks = [16, 18, 20, 22]
reg_params = [0.01, 0.05, 0.1]

# grid search and select best model
start_time = time.time()
final_model = train_ALS(train, validation, num_iterations, reg_params, ranks)

print ('Total Runtime: {:.2f} seconds'.format(time.time() - start_time))

16 latent factors and regularization = 0.01: validation RMSE is 1.3117787273189874
16 latent factors and regularization = 0.05: validation RMSE is 1.0099134143019566
16 latent factors and regularization = 0.1: validation RMSE is 0.9130907778579515
18 latent factors and regularization = 0.01: validation RMSE is 1.3154712703199365


### ALS model learning curve
As we increase the number of iterations in training ALS, we can see how RMSE changes and whether or not the model is overfitted.

In [None]:
#export
def plot_learning_curve(arr_iters, train_data, validation_data, reg, rank):
    """
    Plot function to show learning curve of ALS
    """
    errors = []
    for num_iters in arr_iters:
        # train ALS model
        model = ALS.train(
            ratings=train_data,    # (userID, productID, rating) tuple
            iterations=num_iters,
            rank=rank,
            lambda_=reg,           # regularization param
            seed=99)
        # make prediction
        valid_data = validation_data.map(lambda p: (p[0], p[1]))
        predictions = model.predictAll(valid_data).map(lambda r: ((r[0], r[1]), r[2]))
        # get the rating result
        ratesAndPreds = validation_data.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
        # get the RMSE
        MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
        error = math.sqrt(MSE)
        # add to errors
        errors.append(error)

    # plot
    plt.figure(figsize=(12, 6))
    plt.plot(arr_iters, errors)
    plt.xlabel('number of iterations')
    plt.ylabel('RMSE')
    plt.title('ALS Learning Curve')
    plt.grid(True)
    plt.show()

In [None]:
# create an array of num_iters
iter_array = list(range(1, 11))
# create learning curve plot
plot_learning_curve(iter_array, train, validation, 0.05, 20)

After three iterations, alternating gradient descent starts to converge at an error of around 0.8

### Model testing
And finally, make a prediction and check the testing error using out-of-sample data.

In [None]:
# make prediction using test data
test_data = test.map(lambda p: (p[0], p[1]))
predictions = final_model.predictAll(test_data).map(lambda r: ((r[0], r[1]), r[2]))
# get the rating result
rates_preds = test.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
# get the RMSE
MSE = rates_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
error = math.sqrt(MSE)
print('The out-of-sample RMSE of rating predictions is', round(error, 4))

### Make item recommendation
We need to define a function that takes a new user's item rating and outputs the top 10 recommendations.

In [None]:
#export
def get_i_id(df_i, fav_item_list):
    """
    return all i_id (movieId) of user's favorite items (movies)
    
    Parameters
    ----------
    df_i: spark Dataframe, items (movies) data
    
    fav_item_list: list, e.g. user's list of favorite movies
    
    Return
    ------
    i_id_list: list of iterm id (e.g. movieId)
    """
    i_id_list = []
    for item in fav_item_list:
        i_ids = df_i \
            .filter(items.title.like('%{}%'.format(item))) \
            .select(i_id) \
            .rdd \
            .map(lambda r: r[0]) \
            .collect()
        i_id_list.extend(i_id)
    return list(set(i_id_list))


def add_new_user_to_data(train_data, i_id_list, spark_context):
    """
    add new rows with new user, user's movie and ratings to
    existing train data

    Parameters
    ----------
    train_data: spark RDD, ratings data
    
    i_id_list: list, list of item id (e.g. movieId(s))

    spark_context: Spark Context object
    
    Return
    ------
    new train data with the new user's rows
    """
    # get new user id
    new_id = train_data.map(lambda r: r[0]).max() + 1
    # get max rating
    max_rating = train_data.map(lambda r: r[2]).max()
    # create new user rdd
    user_rows = [(new_id, i_id, max_rating) for i_id in i_id_list]
    new_rdd = spark_context.parallelize(user_rows)
    # return new train data
    return train_data.union(new_rdd)


def get_inference_data(train_data, df_i, i_id_list):
    """
    return a rdd with the userid and all movies (except ones in i_id_list)

    Parameters
    ----------
    train_data: spark RDD, ratings data

    df_i: spark Dataframe, e.g. movies data
    
    i_id_list: list, list of i_id e.g. movieId(s)

    Return
    ------
    inference data: Spark RDD
    """
    # get new user id
    new_id = train_data.map(lambda r: r[0]).max() + 1
    # return inference rdd
    return df_i.rdd \
        .map(lambda r: r[0]) \
        .distinct() \
        .filter(lambda x: x not in i_id_list) \
        .map(lambda x: (new_id, x))


def make_recommendation(best_model_params, ratings_data, df_i, 
                        fav_item_list, n_recommendations, spark_context):
    """
    return top n item recommendation based on user's input list of favorite items


    Parameters
    ----------
    best_model_params: dict, {'iterations': iter, 'rank': rank, 'lambda_': reg}

    ratings_data: spark RDD, ratings data

    df_i: spark Dataframe, movies data

    fav_item_list: list, user's list of favorite items e.g. movies

    n_recommendations: int, top n recommendations

    spark_context: Spark Context object

    Return
    ------
    list of top n item recommendations
    """
    # modify train data by adding new user's rows
    i_id_list = get_i_id(df_i, fav_item_list)
    train_data = add_new_user_to_data(ratings_data, i_id_list, spark_context)
    
    # train best ALS
    model = ALS.train(
        ratings=train_data,
        iterations=best_model_params.get('iterations', None),
        rank=best_model_params.get('rank', None),
        lambda_=best_model_params.get('lambda_', None),
        seed=99)
    
    # get inference rdd
    inference_rdd = get_inference_data(ratings_data, df_i, i_id_list)
    
    # inference
    predictions = model.predictAll(inference_rdd).map(lambda r: (r[1], r[2]))
    
    # get top n movieId
    topn_rows = predictions.sortBy(lambda r: r[1], ascending=False).take(n_recommendations)
    topn_ids = [r[0] for r in topn_rows]
    
    # return movie titles
    return df_i.filter(items[i_id].isin(topn_ids)) \
                    .select('title') \
                    .rdd \
                    .map(lambda r: r[0]) \
                    .collect()

Let's pretend I am a new user of this recommender system. I will input a handful of my all-time favorite movies into the system. And then, the system should output top N movie recommendations for me to watch

In [None]:
# my favorite movies
my_favorite_items = ['Iron Man']

# get recommends
recommends = make_recommendation(
    best_model_params={'iterations': 10, 'rank': 20, 'lambda_': 0.05}, 
    ratings_data=rating_data, 
    df_i=items, 
    fav_movie_list=my_favorite_items, 
    n_recommendations=10, 
    spark_context=sc)

print('Recommendations for {}:'.format(my_favorite_items[0]))
for i, title in enumerate(recommends):
    print('{0}: {1}'.format(i+1, title))


**Comparison with KNN Model-Based Recommendations**

This list of movie recommendations looks completely different than the list from my previous **KNN** model recommender.

**Exploratory Recommendations**

Not only does it recommend movies outside of the years between 2007 and 2009, but it also recommends less known movies. So this can offer users some element of surprise, and users won't get bored by getting the same popular movies all the time.

**Combining Recommendations**

To achieve a good mix, this list of recommendations can be blended into the previous list of recommendations from the **KNN** model recommender.