# Collaborative Filtering with spark

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.mllib.evaluation import RankingMetrics, RegressionMetrics
from pyspark.mllib.recommendation import ALS, Rating
from pyspark.sql import Row
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window

from datetime import datetime
import itertools

import pandas as pd
import matplotlib.pyplot as plt

**Spark**
- Transformations vs Actions
- Distributed
- Huge datasets

## 0. Loading data: movielens data
***

In [None]:
ratings = spark.read.csv("../data/movielens/ratings.csv", header=True).cache()

## 1. EDA
***

In [None]:
ratings.head(5)

In [None]:
ratings.printSchema()

In [None]:
print("Total amount of ratings: {}".format(ratings.count()))

In [None]:
ratings.summary().show()

In [None]:
print("Amount of unique users: {}".format(ratings.select('userId').distinct().count()))

In [None]:
print("Amount of unique movies: {}".format(ratings.select('movieId').distinct().count()))

In [None]:
print("Sparsity: {:0.5f}".format(ratings.count() /
                                 (ratings.select('userId').distinct().count() *
                                  ratings.select('movieId').distinct().count())))

In [None]:
# Convert timestamp to a datetime value
ratings = ratings.withColumn('ts', F.from_unixtime('timestamp'))
ratings.show(10, truncate=False)

In [None]:
max_ts = datetime.strptime(ratings.agg({"ts":"max"}).collect()[0][0],'%Y-%m-%d %H:%M:%S')
min_ts = datetime.strptime(ratings.agg({"ts":"min"}).collect()[0][0],'%Y-%m-%d %H:%M:%S')
print("Min datum: {}\nMax datum {}\nVerschil in jaren: {:0.1f}".format(min_ts, max_ts, (max_ts - min_ts).days / 365))

### 1.1 Movie perspective

In [None]:
# Show top 20 of most rated movies
rating_cnt = ratings.groupby('movieId').count()
rating_cnt.orderBy('count', ascending=False).show(10, truncate=False)

In [None]:
# Plot the counts per movie
rating_cnt.orderBy('count', ascending=False).toPandas().plot()

In [None]:
# Get counts per rating for each movie
rating_cnt_2 = ratings.groupby('movieId').pivot('rating').agg({"rating":"count"}).na.fill(0)
rating_cnt_2.show(10, truncate=False)

In [None]:
# Get total rating count per category

agg_functions = [
    F.sum(F.col('`0.5`')).alias('0.5'),
    F.sum('`1.0`').alias('1.0'),
    F.sum('`1.5`').alias('1.5'),
    F.sum('`2.0`').alias('2.0'),
    F.sum('`2.5`').alias('2.5'),
    F.sum('`3.0`').alias('3.0'),
    F.sum('`3.5`').alias('3.5'),
    F.sum('`4.0`').alias('4.0'),
    F.sum('`4.5`').alias('4.5'),
    F.sum('`5.0`').alias('5.0')
]

agg_ratings = rating_cnt_2.agg(*agg_functions)
agg_ratings.show()

In [None]:
agg_ratings.toPandas()

In [None]:
# Pandas is required to get nice graphs
agg_ratings.toPandas().T.plot.bar()

### 1.2 User perspective

In [None]:
# Count amount of ratings given per user
user_rating_cnt = ratings.groupby('userId').count()
user_rating_cnt.orderBy('count', ascending=False).show(10, truncate=False)

In [None]:
# Order and plot
user_rating_cnt.orderBy('count', ascending=False).toPandas().plot()

## 2. Data prep
***

The algorithm that we will choose determines how the input data must look like. Having the model or models clear is important in order to prepare the data.

We choose to use the Alternating Least Squares (ALS), which comes out of the box with spark. This is a Matrix Factorization (MF) algorithm and belongs to the Model Based Collaborative Filtering recommenders.

### Matrix Factorization
Matrix Factorization is mostly used for dimensionality reduction e.g. Principle Component Analysis (PCA) or Singular Value Decomposition (SVD). In principle MF tries to find the hidden features that relate, in this case, the users and items in a smaller matrix.

The image below shows  the original rating matrix R, user-feature matrix U and the feature-item matrix V. 

![Matrix Factorization](images/mf2.png)

### Model intuition

The ALS model is iterative by nature. In order to find the right U and V, these matrices are randomly initiated and then by using least squares the model optimizes U while fixing V and vice versa. Each time we optimise we get closer to the real rating matrix.

### Data structure
The ALS model requires the users and items represented as integers (identifiers). The rating column is of type float (decimal numbers) The date column is not necessary.

We can change the ratings given into relevance scores. How this is done is highly subjective. We choose not to recommend ratings lower than 3. So we need to map the ratings to a new relevance score.
- 5 -> 2.5
- 4 -> 1.5
- 3 -> 0.5
- 2 -> -0.5
- 1 -> -1.5

In [None]:
# Create a new rating set with relevance scores.
rating_prep = ratings.rdd.map(lambda r: Rating(int(r[0]), int(r[1]), float(r[2])-2.5))

In [None]:
rating_prep.take(5)

## Model Training
***

In [None]:
# Train the recommendation model using the user-product relevance scores

# parameters
latent_factors = 10
iterations = 10
regularization = 0.01
seed = 123

model_1 = ALS.train(rating_prep,
                  rank=latent_factors,
                  iterations=iterations,
                  lambda_=regularization,
                  seed=seed
                 )

## Evaluation
***

In [None]:
testData = rating_prep.map(lambda p: (p.user, p.product))

In [None]:
# Generate predictions for all user-movie combination
predictions = model_1.predictAll(testData).map(lambda r: ((r.user, r.product), r.rating))

In [None]:
predictions.take(10)

In [None]:
ratingsTuple = rating_prep.map(lambda r: ((r.user, r.product), r.rating))

In [None]:
ratingsTuple.take(10)

In [None]:
scoreAndLabels = predictions.join(ratingsTuple).map(lambda tup: tup[1])

In [None]:
metrics = RegressionMetrics(scoreAndLabels)

In [None]:
print("R-squared = %s" % metrics.r2)

## Finding the right parameters: grid search

In [None]:
# parameters
latent_factors = [5, 10, 20]
iterations = [5, 10, 15]
regularization = [0.005, 0.01, 0.05]
seed = 123

r2_best = 0
lf_best = 0
it_best = 0
rg_best = 0


testData = rating_prep.map(lambda p: (p.user, p.product))
ratingsTuple = rating_prep.map(lambda r: ((r.user, r.product), r.rating))

for (lf, it, rg) in itertools.product(latent_factors, iterations, regularization):
    model = ALS.train(rating_prep,
                  rank=lf,
                  iterations=it,
                  lambda_=rg,
                  seed=seed
                 )
    pred = model.predictAll(testData).map(lambda r: ((r.user, r.product), r.rating))
    scoreAndLabels = pred.join(ratingsTuple).map(lambda tup: tup[1])
    metrics = RegressionMetrics(scoreAndLabels)
    r2 = metrics.r2
    print("Lf: {}, Iter: {}, Reg: {}, R-squared: {}".format(lf, it, rg, metrics.r2))
    if r2 > r2_best:
        r2_best = r2
        lf_best = lf
        it_best = it
        rg_best = rg
        

print("Best parameters:\n\tLatent Factors: {}\n\tIterations: {}\n\tLearning rate: {}\n\tR-squared: {}".format(lf_best, it_best, rg_best, r2_best))

In [None]:
# Attempt 2
# parameters
latent_factors = [5, 10, 20, 30, 40, 50]
iterations = 15
regularization = 0.005
seed = 123

r2_best = 0
lf_best = 0


testData = rating_prep.map(lambda p: (p.user, p.product))
ratingsTuple = rating_prep.map(lambda r: ((r.user, r.product), r.rating))

for lf in latent_factors:
    model = ALS.train(rating_prep,
                  rank=lf,
                  iterations=iterations,
                  lambda_=regularization,
                  seed=seed
                 )
    pred = model.predictAll(testData).map(lambda r: ((r.user, r.product), r.rating))
    scoreAndLabels = pred.join(ratingsTuple).map(lambda tup: tup[1])
    metrics = RegressionMetrics(scoreAndLabels)
    r2 = metrics.r2
    print("Lf: {}, R-squared: {}".format(lf, metrics.r2))
    if r2 > r2_best:
        r2_best = r2
        lf_best = lf

        

print("Best parameters:\n\tLatent Factors: {}\n\tR-squared: {}".format(lf_best, r2_best))

## Prediction
***

In [None]:
# Final model
model_final = ALS.train(rating_prep,
                  rank=lf_best,
                  iterations=15,
                  lambda_=0.005,
                  seed=seed
                 )

In [None]:
# Get top N predictions for a user

N = 10
userId = 1

model_1.recommendProducts(userId, N)

In [None]:
# Generate top 10 movie recommendations for each user
userRecs10 = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs10 = model.recommendForAllItems(10)

## References
- Data: https://grouplens.org/datasets/movielens/latest/
- https://datasciencemadesimpler.wordpress.com/tag/alternating-least-squares/
- https://medium.com/radon-dev/als-implicit-collaborative-filtering-5ed653ba39fe
- https://spark.apache.org/docs/latest/ml-collaborative-filtering.html
- https://jessesw.com/Rec-System/
- https://www.blabladata.com/2014/12/20/simple-multimodal-design-recommender/
- http://fastml.com/evaluating-recommender-systems/
- http://yifanhu.net/PUB/cf.pdf
-