# Matrix Factorization

We will experiment with the recent MovieLens 25M Dataset and build a recommender system using two approaches:
* Factorizing the user-item matrix using Spark ALS implementation
* Factorizing the item-item PMI maatrix using randomized SVD

In both settings we will index the item embeddings and inspect their quality using KNN queries.

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!curl -O https://dlcdn.apache.org/spark/spark-3.2.4/spark-3.2.4-bin-hadoop3.2.tgz
!tar xf spark-3.2.4-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark==3.2.4

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.4-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
import pyspark.sql.functions as F
sf = F

conf = SparkConf().set('spark.ui.port', '4050')
sc = SparkContext(conf=conf)
spark = SparkSession.builder.master('local[*]').getOrCreate()
ss = spark

In [None]:
%matplotlib inline

### Download the dataset

In [None]:
!wget http://files.grouplens.org/datasets/movielens/ml-25m.zip
!unzip ml-25m

### Loading the ratings dataset

In [None]:
movies_df = spark.read.csv('ml-25m/movies.csv', header=True, inferSchema=True).cache()
ratings_df = spark.read.csv('ml-25m/ratings.csv', header=True, inferSchema=True)

### Useful imports

In [None]:
import numpy as np
import panda as pd
import pyspark

# Part 1 : Alternating least squares

### Split the dataset
We want to randomly split the dataset into train and test parts

In [None]:
training_df, validation_df = #TODO

### Build ALS model
Using the Spark ALS implementation described here https://spark.apache.org/docs/2.2.0/ml-collaborative-filtering.html
Build a model using the ml-25m dataset.

How long does the training take, change the rank (i.e. the dimension of the vectors) from 10 to 20. How does that affect training speed ?

### Evaluation
Using the code described in the Spark documentation, evaluate how good your model is doing on the test set.
The goal is to predict the held out ratings.
A good metric could be RMSE or MAE.

### Inspecting the results

Retrieve the movie vectors from the learned model object (the property is called itemFactors) and create a spark dataset `[title, feature]` by joining with the `movies_df` dataset in the movie id

In [None]:
movie_vectors_df = #TODO

In [None]:
title_vector_array = movie_vectors_df.collect()
titles = [r['title'] for r in title_vector_array]
vectors = np.array([r['features'] for r in title_vector_array])
normalized_vectors = vectors/(LA.norm(vectors, axis=1)[:, np.newaxis])

### Using Nearest neighbours

Pick a few movies, and for each of them, find-out the top 5 nearest neighbours. Make sure your KNN algorithm is fast enough. Try to understand why some results are not so good.

In [None]:
def knn(query_vec: np.ndarray, k:int, titles:list[str], vectors:np.ndarray) -> list[tuple[str, float]]:
    """ Perform KNN algorithm on an array of vectors 
    
    Parameters
    ----------
    query_vec: np.ndarray, shape = (d,)
        Input query vector
    k: int
        nb neighbors to query
    titles: list[str]
        list of movies titles
    vectors: np.ndarray
        Array of embeddings, in the same order as the title list

    Return
    ------
    list of top k (title, affinity score) sorted by affinity score
    """
    #TODO
    pass

In [None]:
GOLDEN_EYE_ID = 0

def analyze(i:int):
  print(f"Query title : {titles[i]}")
  query_vec = normalized_vectors[i]
  ret = knn(query_vec, 10, titles, normalized_vectors)
  for res in ret:
    print(res)

analyze(GOLDEN_EYE_ID)

# Part 2 : PMI and RSVD


We now are going to factorize the item-item PMI matrix using randomized SVD.

## Size estimation

Let's first estimate the size of the matrix we are about to build.

Reminder : we will generate the co-occurence matrix $C$ from all the pairs of
movies that we find in users ratings. 
This matrix can be big.

Namely, if a user has rated movies `(1, 2, 3, 4)`, we will generate 6 pairs :
`(1, 2), (1, 3), (2, 3), (1, 4), (2, 4), (3, 4)`.

Formally, if a user has rated $n$ movies, he will generate $n \cdot (n - 1) \; / \; 2$ pairs. We should be careful of users that have rated a lot of movies.


### Create function to compute the number of pair generated

In [None]:
def number_of_pairs_to_be_generated(ratings_df: pyspark.sql.DataFrame) -> int:
  return #TODO

print(
    f"Number of positive ratings : {ratings_df.count():,}"
    f", that should generate {number_of_pairs_to_be_generated(ratings_df):,} pairs"
)

### Keep positive interactions

In [None]:
# first things first we only keep movies liked by user.
positive_ratings_df = #TODO

# positive_ratings_df Schema
# root
#  |-- userId: integer (nullable = true)
#  |-- movieId: integer (nullable = true)
#  |-- rating: double (nullable = true)
#  |-- timestamp: integer (nullable = true)

print(
    f"Number of positive ratings : {positive_ratings_df.count():,}"
    f", that should generate {number_of_pairs_to_be_generated(positive_ratings_df):,} pairs"
)

### Keep meaningful movies

We will keep only movies having a sufficient amount of positive ratings.
First, that will make the computations lighter, second, it will prevent us from
computing embeddings for movies we have very little information on.

In [None]:
# There are not that many movies in the dataset, we can safely bring their rating counts to pandas
movie_counts_df = #TODO

# movie_counts_df Schema
# root
#  |-- movieId: integer (nullable = true)
#  |-- count: integer (nullable = true)

movie_counts_pdf = movie_counts_df.toPandas()
print(f"Number of Movies in original dataset : {len(movie_counts_pdf):,}")
movie_counts_pdf.head(3)

In [None]:
import matplotlib.pyplot as plt

movie_counts_pdf = #TODO

# Display Number of movies per number of ratings bucket with matplotlib
fig, axes = plt.subplots(1, 2, figsize=(14, 4))
axes[0].hist(movie_counts_pdf["count"], log=True, rwidth=0.9)
axes[0].set_xlabel("positive ratings count")
axes[0].set_ylabel("number of movies")
axes[0].set_title("Number of movies per number of ratings bucket")

axes[1].hist(movie_counts_pdf["count"][movie_counts_pdf["count"] < 100], log=True, rwidth=0.9)
axes[1].set_xlabel("positive ratings count")
axes[1].set_ylabel("number of movies")
axes[1].set_title("Number of movies per number of ratings bucket\nZoom on least rated movies")
None

In [None]:
MIN_COUNT = #TODO
kept_movies_df = #TODO

filtered_ratings_df = positive_ratings_df.join(kept_movies_df, on="movieId")

print(
    f"Number of positive ratings : {filtered_ratings_df.count():,}"
    f", that should generate {number_of_pairs_to_be_generated(filtered_ratings_df):,} pairs"
)

### Limit the number of pairs

We have way less movies but still almost all ratings and a lot of pairs ! 

In [None]:
# Let's look at how many ratings are done user by user
# When user has scored a lots of movies, the number of pairs he generates increases quadratically !
ratings_count_by_user_df = #TODO

# ratings_count_by_user_df Schema
# root
#  |-- userId: integer (nullable = true)
#  |-- count: long (nullable = false)

ratings_count_by_user_df.show(5)

In [None]:
# We will sample user ratings to limit the maximum number of ratings per user (in expectation)
# You can use ".filter(F.rand() < (EXPECTED_MAX_USER_RATINGS / F.col('count')))"
EXPECTED_MAX_USER_RATINGS = 40

# ratings_sampled_df Schema
# root
#  |-- userId: integer (nullable = true)
#  |-- movieId: integer (nullable = true)

ratings_sampled_df =  #TODO

In [None]:
print(
    f"Number of positive ratings after sampling : {ratings_sampled_df.count():,}"
    f", that should generate {number_of_pairs_to_be_generated(ratings_sampled_df):,} event pairs"
)

### Creating the PMI matrix

Reminder, the PMI matrix writes
$$
PMI(i, j) = \log\left(\frac{p(i, j)}{p(i)p(j)}\right)
$$
that we estimate with
$$
\widehat{PMI}(i, j) = \log\left(\frac{C_{i, j} \cdot n}{C_i \cdot  C_j}\right)
$$
where
* $C_{i, j}$ is the number of pairs (i, j) (i.e. the number of users that have
  given a positive feedback for both movie i and movie j.
* $C_{i}$ is total the number of pairs containing i
* $C_{j}$ is total the number of pairs containing j
* $n$ is the total number of pairs

#### Step 1 : compute co-occurence matrix $C_{i,j}$

In [None]:
# Compute the coocurrence matrix from the ratings_sampled_df DataFrame

# cooccurence_df Schema
# root
#  |-- movieId1: integer (nullable = true)
#  |-- movieId2: integer (nullable = true)
#  |-- pair_count: long (nullable = false)

cooccurence_df = ratings_sampled_df. ... #TODO

In [None]:
total_generated_pairs = int(cooccurence_df.select(F.sum("pair_count")).collect()[0]["sum(pair_count)"])
print(f"We have generated {total_generated_pairs:,} event pairs for {cooccurence_df.count():,} movie pairs (i, j)")
cooccurence_df.show(5)

#### Step 2 : Compute total number of pairs per movies $C_i$, $C_j$

In [None]:
# movie_pair_counts_df Schema
# root
#  |-- movieId: integer (nullable = true)
#  |-- count: long (nullable = true)

movie_pair_counts_df = #TODO

#### Step 3 : compute $\widehat{PMI}(i, j)$

Using the movie counts and the pair counts, compute the PMI dataframe.

In [None]:
# pmi_df Schema
# root
#  |-- movieId1: integer (nullable = true)
#  |-- movieId2: integer (nullable = true)
#  |-- pmi: double (nullable = true)

pmi_df = #TODO

### Factorizing the PMI matrix with RSVD

First, we need to create a mapping between movie ids and position in the matrix that we call vocabulary.

In [None]:
count_ordering_window = Window().orderBy(F.col("count").desc())
vocabulary_df = movie_counts_df.select(
    "movieId",
    (F.row_number().over(count_ordering_window) - F.lit(1)).alias("index"),  # row number starts at 1
).cache()

vocabulary_df.show(3)

Now we need to build a scipy sparse matrix from the PMI dataframe. 
Thanks to our filtering, it is small enough to be collected into memory.

Still this might take a minute or two.

In [None]:
pmi_pdf = (
    #TODO
    .select("i", "j", "pmi")
).toPandas()

In [None]:
from scipy.sparse import coo_matrix

pmi_matrix = coo_matrix((pmi_pdf["pmi"].values, (pmi_pdf["i"].values, pmi_pdf["j"].values)))
# We have built only the triangular matrix 
pmi_matrix = pmi_matrix + pmi_matrix.transpose()
print(
    f"We have a sparse {pmi_matrix.shape} PMI matrix with {len(pmi_matrix.nonzero()[0]):,} entries"
)

Use the scikit-learn implementation of SVD https://scikit-learn.org/stable/modules/generated/sklearn.decomposition.TruncatedSVD.html to factorize the PMI matrix. It uses the randomized SVD algorithm presented as a default.

### Knn Index on movies embeddings from SVD

In [None]:
movie_embeddings = svd.components_.transpose().astype('float32')

#### Using the function defined above, compute the k nearest neighbor of a movie embedding

#### Faiss index version