In [0]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import math
from pyspark.sql.functions import col, explode

%matplotlib inline
import os
os.environ["PYSPARK_PYTHON"] = "python3"

import requests, zipfile, io
import urllib.request
import subprocess
import sys
from pyspark.sql.types import *

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder,CrossValidatorModel
from time import time

# Movie Recommendation System with ALS in Apache Spark

## Overview
In this project, I built a movie recommendation engine with Alternating Least Squares [(ALS)](https://spark.apache.org/docs/latest/ml-collaborative-filtering.html) optimizer using collaborative filtering method and Apache Spark APIs on [MovieLens movie rating dataset ](https://grouplens.org/datasets/movielens/latest/) of size ~.8MB. The goal is to predict the ratings by users, give recommendations accordingly to users on request, and find similar movies based on a specific movie of interest.

Outline of this report is as follows: Part 1 data ETL and OLAP, Part 2 model training and evaluation on sample data, Part 3 ETL and model deployment on full data, Part 4 model applications.

# Part 0: Data Loading

In [0]:
#load sample data
movies_df = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/tables/movies-3.csv")
ratings_df = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/tables/ratings-2.csv").drop('timestamp')
tags_df = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/tables/tags-2.csv")
links_df = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/tables/links-2.csv") 

In [0]:
movies_df.registerTempTable("movies")
ratings_df.registerTempTable("ratings")
tags_df.registerTempTable("tags")
links_df.registerTempTable("links")



# Part 1: Data Preprocessing and OLAP
In this part, I built a data ETL pipeline to manipulate the movie rating dataset in DBFS and conducted online analytical processing (OLAP) on a sample dataset of size ~1MB with Spark SQL.

The MovieLens dataset I used is the ratings and movies dataset. After preprocessing, the Spark dataframe is shown as below:

In [0]:
print("--ratings top 5 rows--")
display(ratings_df.limit(5) )
print("--movies top 5 rows--")
display(movies_df.limit(5) )
print("--tags top 5 rows--")
display(tags_df.limit(5) )
print("--links top 5 rows--")
display(links_df.limit(5) )

--ratings top 5 rows--


userId,movieId,rating
1,1,4.0
1,3,4.0
1,6,4.0
1,47,5.0
1,50,5.0


--movies top 5 rows--


movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy


--tags top 5 rows--


userId,movieId,tag,timestamp
2,60756,funny,1445714994
2,60756,Highly quotable,1445714996
2,60756,will ferrell,1445714992
2,89774,Boxing story,1445715207
2,89774,MMA,1445715200


--links top 5 rows--


movieId,imdbId,tmdbId
1,114709,862
2,113497,8844
3,113228,15602
4,114885,31357
5,113041,11862


**Statistics:**

In [0]:
tmp1 = ratings_df.groupBy("userID").count().toPandas()['count'].min()
tmp2 = ratings_df.groupBy("movieId").count().toPandas()['count'].min()
tmp3 = ratings_df.count()
print('The sample data contains {} records of ratings.'.format(tmp3))
print('For the users that rated movies and the movies that were rated:')
print('- Minimum number of ratings per user is {}.'.format(tmp1))
print('- Minimum number of ratings per movie is {}.'.format(tmp2))

tmp4 = sum(ratings_df.groupBy("movieId").count().toPandas()['count'] == 1) #that movie has only one rating
tmp5 = ratings_df.select('movieId').distinct().count() #how many movies are rated
print('{:.1f}% ({} out of {}) movies are rated by only one user.'.format(tmp4/tmp5, tmp4, tmp5))

The sample data contains 100836 records of ratings.
For the users that rated movies and the movies that were rated:
- Minimum number of ratings per user is 20.
- Minimum number of ratings per movie is 1.
0.4% (3446 out of 9724) movies are rated by only one user.


##Spark SQL and OLAP

## Q1: The number of Users

In [0]:
%sql 
select count(distinct userId) as Number_of_users from ratings

Number_of_users
610


## Q2: The number of Movies

In [0]:
%sql
select count(distinct movieId) as Number_of_movies from movies

Number_of_movies
9742


## Q3:  How many movies are rated by users? List movies not rated before

In [0]:
%sql select count(distinct movieId) as Number_movies_rated_by_users from ratings

Number_movies_rated_by_users
9724


In [0]:
%sql select distinct userId, count(*) as count from ratings group by 1 order by count asc limit 1

userId,count
442,20


Lowest number of movies rated by a user is 20.

In [0]:
%sql
select title, genres from movies where movieId not in (select movieId from ratings)

title,genres
"Innocents, The (1961)",Drama|Horror|Thriller
Niagara (1953),Drama|Thriller
For All Mankind (1989),Documentary
"Color of Paradise, The (Rang-e khoda) (1999)",Drama
I Know Where I'm Going! (1945),Drama|Romance|War
"Chosen, The (1981)",Drama
"Road Home, The (Wo de fu qin mu qin) (1999)",Drama|Romance
Scrooge (1970),Drama|Fantasy|Musical
Proof (1991),Comedy|Drama|Romance
"Parallax View, The (1974)",Thriller


## Q4: List Movie Genres

In [0]:
%sql 
--first genre listed is most representative of the movie
select distinct explode(split(genres,'[|]')) as genres from movies order by genres

genres
(no genres listed)
Action
Adventure
Animation
Children
Comedy
Crime
Documentary
Drama
Fantasy


## Q5: Movie Count & List of 5 Movies for Each Genre

In [0]:
%sql
select genres, count(*) as number_of_movies from (select distinct explode(split(genres, '[|]')) as genres, movieId from movies) group by 1 order by 2 desc

genres,number_of_movies
Drama,4361
Comedy,3756
Thriller,1894
Action,1828
Romance,1596
Adventure,1263
Crime,1199
Sci-Fi,980
Horror,978
Fantasy,779


In [0]:
%sql
with tb as (
select genres, title, row_number() over (partition by genres order by title) as row_id from 
(select explode(split(genres, '[|]')) as genres, title from movies))
select genres, concat_ws(',', collect_set(title)) as list_of_first_5_movies from tb where row_id <=5 group by 1

genres,list_of_first_5_movies
(no genres listed),"A Midsummer Night's Dream (2016),Ali Wong: Baby Cobra (2016),A Christmas Story Live! (2017),A Cosmic Christmas (1977),Ben-hur (2016)"
Action,"'71 (2014),12 Rounds (2009),10th Victim, The (La decima vittima) (1965),'Hellboy': The Seeds of Creation (2004),13 Assassins (Jûsan-nin no shikaku) (2010)"
Adventure,"10,000 BC (2008),101 Dalmatians (One Hundred and One Dalmatians) (1961),10th Kingdom, The (2000),101 Dalmatians (1996),'Hellboy': The Seeds of Creation (2004)"
Animation,"101 Dalmatians (One Hundred and One Dalmatians) (1961),5 Centimeters per Second (Byôsoku 5 senchimêtoru) (2007),101 Dalmatians II: Patch's London Adventure (2003),A Detective Story (2003),9 (2009)"
Children,"101 Dalmatians (One Hundred and One Dalmatians) (1961),101 Dalmatians (1996),101 Dalmatians II: Patch's London Adventure (2003),102 Dalmatians (2000),*batteries not included (1987)"
Comedy,"(500) Days of Summer (2009),'burbs, The (1989),...All the Marbles (1981),'Hellboy': The Seeds of Creation (2004),*batteries not included (1987)"
Crime,"10 Cent Pistol (2015),11:14 (2003),10th & Wolf (2006),00 Schneider - Jagd auf Nihil Baxter (1994),12 Angry Men (1997)"
Documentary,"20 Feet from Stardom (Twenty Feet from Stardom) (2013),11th Hour, The (2007),'Hellboy': The Seeds of Creation (2004),13th (2016),20,000 Days on Earth (2014)"
Drama,"'Salem's Lot (2004),'71 (2014),""11'09""""01 - September 11 (2002)"",'Round Midnight (1986),'Til There Was You (1997)"
Fantasy,"10th Kingdom, The (2000),'Hellboy': The Seeds of Creation (2004),13 Going on 30 (2004),13th Warrior, The (1999),*batteries not included (1987)"


# Part 2: Model Training and Evaluation

In this part, I built the ALS model and tuned the hyperparameters by Cross Validation with the Grid Search method on the training set, and evaluated the performance of the best model on the test data by computing RMSE of model inference.

##2.1 Cross Validation on sample data for Matrix Factorization

I used a 5-fold Cross Validation with Grid Search method to tune three hyperparameters of ALS: **`rank`** , **`regParam`**  , and **`alpha`**. 

**`maxIter`**: the maximum number of iterations to run (defaults to 10).
Did not tune this hyperparameter since 

**`rank`**: the number of latent factors in the model (defaults to 10).  
Number of latent factors is one of the most important parameters in ALS model and definitely need to be tuned, since it has a great effect on the model's performance. Usually, a smaller rank will make performance better. 
Generally higher rank leads to better accuracy, but too much may cause the model to overfit and the computation costs will also increase as well. But if more data is added to training, increasing rank may be able to improve overall results.


**`regParam`**: the regularization parameter in ALS (defaults to 1.0).  
Regularization is one of the advantages of ALS to avoid overfitting problem. Spark ALS API scale the regParam by the number of ratings the user generated (or the movie received) in updating user (or movie) factors. The approach is named ALS-WR, which makes regParam less dependent on the scale of the dataset, so we can apply the best parameter learned from a sample subset to the full dataset and expect similar performance.


**`alpha`**: the alpha in ALS (defaults to 1.0).  
parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations 


By default, Spark assigns NaN predictions during ALSModel.transform when a user and/or item factor is not present in the model. This can be useful in a production system, since it indicates a new user or item, and so the system can make a decision on some fallback to use as the prediction. However, this is undesirable during cross-validation, since any NaN predicted values will result in NaN results for the evaluation metric (for example when using RegressionEvaluator). This makes model selection impossible. 

Spark allows users to set the coldStartStrategy parameter to “drop” in order to drop any rows in the DataFrame of predictions that contain NaN values. The evaluation metric will then be computed over the non-NaN data and will be valid.

In [0]:
#train-test split
train, test = ratings_df.randomSplit([0.9, 0.1], seed=12345)

In [0]:
 
train = train.withColumn('userId', train['userId'].cast(IntegerType()))
train = train.withColumn('movieId', train['movieId'].cast(IntegerType()))
train = train.withColumn('rating', train['rating'].cast(DoubleType()))
test = test.withColumn('userId', test['userId'].cast(IntegerType()))
test = test.withColumn('movieId', test['movieId'].cast(IntegerType()))
test = test.withColumn('rating', test['rating'].cast(DoubleType()))

Rank [5, 10, 15] was previously trained, with rank 5 chosen by cross validation as best rank.

In [0]:
#Create ALS model: 
# note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(rank=5, userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop", seed = 0)

#Tune model using ParamGridBuilder
paramGrid = (ParamGridBuilder()
              
             .addGrid(als.regParam, [0.1, 0.01, 0.001])\
             .addGrid(als.alpha, [0.1, 0.01, 0.001])
             .build())
print ("Num models to be tested: ", len(paramGrid))

# Define evaluator as RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

# Build 4-fold Cross validation 
cv = CrossValidator(estimator=als, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5, seed = 0)
cv_model = cv.fit(train)
#Extract best model from the tuning exercise using ParamGridBuilder
best_model = cv_model.bestModel


Num models to be tested:  9


In [0]:
als = ALS(rank=5, regParam=.1, alpha=.001, max_iter=10,userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop", seed = 0)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
cv_mo

In [0]:
predictions_train = cv_model.transform(train)
rmse_train = evaluator.evaluate(predictions_train)

In [0]:
predictions = best_model.transform(test)
rmse = evaluator.evaluate(predictions)

Below is a table containing CV rmse for markdown visualization:

In [0]:
a = iter(list(cv_model.avgMetrics[:]))
res = []
 
for y in [0.1, 0.01, 0.001]:
    for z in [0.1, 0.01, 0.001]:
      res.append(( y, z, next(a)))
for item in res:
    print('|' + '|'.join(list(map(str, item))) + '|')

|0.1|0.1|0.8875165812702344|
|0.1|0.01|0.8875165812702356|
|0.1|0.001|0.8875165812702349|
|0.01|0.1|1.0425327162402644|
|0.01|0.01|1.0425327162402644|
|0.01|0.001|1.0425327162402644|
|0.001|0.1|1.206740005563284|
|0.001|0.01|1.2067400055632829|
|0.001|0.001|1.2067400055632835|


| regParam | alpha | CV RMSE |
| ------------- | ------------- | ------------- |
|0.1|0.1|0.8875165812702344|
|0.1|0.01|0.8875165812702356|
|0.1|0.001|0.8875165812702349|
|0.01|0.1|1.0425327162402644|
|0.01|0.01|1.0425327162402644|
|0.01|0.001|1.0425327162402644|
|0.001|0.1|1.206740005563284|
|0.001|0.01|1.2067400055632829|
|0.001|0.001|1.2067400055632835|

The final best parameters are:

In [0]:
#Generate predictions, then evaluate using RMSE
print(" RMSE of train data: {}".format(rmse_train))
print ("RMSE on test data = "+str(rmse))
best_params = cv_model.getEstimatorParamMaps()[np.argmin(cv_model.avgMetrics)]
print('**Best ALS model**')
for i,j in best_params.items():
  print('  '+i.name+': '+str(j))
 

 RMSE of train data: 0.648725288054045
RMSE on test data = 0.8702025071137041
**Best ALS model**
  regParam: 0.1
  alpha: 0.1


RMSE of test is bigger than RMSE of train. This means there is some overfitting, which can be eased by using more data to train the model.

# Part 3: Model Deployment on full data

In this part, I extracted and loaded **full MovieLens ratings data** using my self-defined `MovieLensDataETL` class into Spark DBFS and trained the ALS model with the best parameters found by CV from the last step. 

Note: Smallest alpha (**.001**) is chosen since cross-validation error for all three alphas were the same, smaller alpha is better since it controls the rate of increase in the confidence matrix slower as we don't want to be overly confident in observing a particular preference a user has on a movie too fast

In [0]:
# set parameters from cv process
max_iter = 10
best_rank = 5
best_regParam = 0.1
best_alpha = .001
seed = 42

In [0]:
data_path = "dbfs:/FileStore/ml-full-dataset/"

In [0]:
class MovieLensDataETL(object):
    def extract(self, data_path): 
        urllib.request.urlretrieve("https://files.grouplens.org/datasets/movielens/ml-latest.zip", "/tmp/ml-latest.zip")     
        with zipfile.ZipFile("/tmp/ml-latest.zip", 'r') as zip_ref:
            zip_ref.extractall("/databricks/driver/")
#          dbutils.fs.put("/tmp/ml-latest-full", "/databricks/driver/ml-latest")         
        dbutils.fs.mv("file:/databricks/driver/ml-latest/movies.csv", data_path + "movies.csv") 
        dbutils.fs.mv("file:/databricks/driver/ml-latest/ratings.csv", data_path + "ratings.csv")
        print("Selected files saved at DBFS!")
          
    def transform_load(self,data_path):
      #4. load file into Spark DataFrame
      print("4. Loading file into Spark DataFrame...")
      movies_full = spark.read.load( data_path+ "/movies.csv", format='csv', header = True)
      ratings_full = spark.read.load(data_path + "/ratings.csv", format='csv', header = True).drop('timestamp')

      #5. data type convert
      print("5. Converting data type...")
      ratings_full = ratings_full.withColumn("rating", ratings_full["rating"].cast(FloatType()))
      ratings_full = ratings_full.withColumn("userId", ratings_full["userId"].cast(IntegerType()))
      ratings_full = ratings_full.withColumn("movieId", ratings_full["movieId"].cast(IntegerType()))
      movies_full = movies_full.withColumn("movieId", movies_full["movieId"].cast(IntegerType()))

      #6. present dataframe
      print("--ratings top 5 rows--")
      display(ratings_full.limit(5).toPandas())
      print(ratings_full.schema)
      print("--movies top 5 rows--")
      display(movies_full.limit(5).toPandas())
      print(movies_full.schema)
      print("--ratings, movies are returned respectively--")
      return ratings_full, movies_full

In [0]:
full_data = MovieLensDataETL()
full_data.extract(data_path)

Selected files saved at DBFS!


In [0]:
full_df = full_data.transform_load(data_path)

4. Loading file into Spark DataFrame...
5. Converting data type...
--ratings top 5 rows--


userId,movieId,rating
1,307,3.5
1,481,3.5
1,1091,1.5
1,1257,4.5
1,1449,4.5


StructType(List(StructField(userId,IntegerType,true),StructField(movieId,IntegerType,true),StructField(rating,FloatType,true)))
--movies top 5 rows--


movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy


StructType(List(StructField(movieId,IntegerType,true),StructField(title,StringType,true),StructField(genres,StringType,true)))
--ratings, movies are returned respectively--


In [0]:
full_df = transform_load(data_path)

4. Loading file into Spark DataFrame...
5. Converting data type...
--ratings top 5 rows--


userId,movieId,rating
1,307,3.5
1,481,3.5
1,1091,1.5
1,1257,4.5
1,1449,4.5


StructType(List(StructField(userId,IntegerType,true),StructField(movieId,IntegerType,true),StructField(rating,FloatType,true)))
--movies top 5 rows--


movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy


StructType(List(StructField(movieId,IntegerType,true),StructField(title,StringType,true),StructField(genres,StringType,true)))
--ratings, movies are returned respectively--


In [0]:
ratings_full = full_df[0]
movies_full = full_df[1]

##3.1 Fit model on full data and make recommendations

`FitALSModel` class is for model training and giving recommendations.  

 
1. How `fit_ALS` works:  
When calling `fit_ALS`, the user must input `data_path` to indicate where they want to save the model.   
User can also choose to input `rank`, `regParam`, and `alpha` as best values tuned by CV or by default values of Spark ALS API, set `seed` or by default = 0.
Users have the option to specify whether to evaluate the model with rmse using `evaluate`, save the model using `save`, and specify what file name for `save_name` to save under.\
**ALS is a matrix factorization algorithm that can run parallelized. It also solves data sparseness and scales well to very large datasets.**


2. How `recommend_user` works:  
The user must specify  `userId` for recommendation and  `numItems` to get a specific number of recommendations for a particular user. `numItems` default = 10.  It returns a Pandas dataframe of movie recommendations as the output.

In [0]:

class FitALSModel(object):
  def __init__(self, ratings_full, movies_full):
    self.ratings_full = ratings_full
    self.movies_full = movies_full
    self.full_model = None
    
  def fit_ALS(self, data_path, max_iter=max_iter, rank=best_rank, regParam=best_regParam, alpha=best_alpha, seed=0, evaluate=True, save=True, save_name='full_model'):
    # fit ALS model with best parameters from cross validation on sample dataset
    print("1. Training ALS model with input ratings data...")
    full_train, full_test = self.ratings_full.randomSplit([0.8, 0.2], seed = seed)
    als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop", maxIter=max_iter, rank = rank, regParam = regParam, alpha=alpha, seed = seed)
    self.full_model = als.fit(full_train)
    print("Successfully trained!")
    
    if evaluate:
      # Define evaluator as RMSE
      print("2. Evaluate model: computing RMSE on test data...")
      evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
      predictions = self.full_model.transform(full_test)
      rmse = evaluator.evaluate(predictions)
      print ("RMSE on test data = "+str(rmse))
      
    if save:
      # save model
      print("3. Saving model...")
      self.full_model.write().overwrite().save(data_path + save_name)
      print("Model saved at ", data_path + save_name)
    return self.full_model
  
   
  def recommend_user(self, userId, numItems=10, full_model=None):
    '''
    Inputs:
    - userId: an integer defining ID of user of interest.
    - numItems: an integer defining max number of recommendations for each user.
    - full_model: the ALS model trained and saved
 
    Returns:
    - a DataFrame of movieId, userId, rating, title, genres.
    '''
    #check inputs
    if not isinstance(userId, int):
      return print('Data type should be integer!') 
    if not full_model:
      full_model = self.full_model
    df = spark.createDataFrame([{'userId': userId}])
    userRecs = full_model.recommendForUserSubset(df, numItems)
    if not len(userRecs.head(1))>0:
      return print('There is no user with id = ', userId)
    userRecs = userRecs\
    .withColumn("rec_exp", explode("recommendations"))\
    .select('userId', col("rec_exp.movieId"), col("rec_exp.rating"))\
    .join(self.movies_full, on='movieId')\
    .where(userRecs['userId'] == userId)\
    .toPandas()
    print("Top %s recommended movie(s) for user with Id %s: " % (numItems, userId))
    return userRecs


In [0]:
#fit the model
fit = FitALSModel(ratings_full, movies_full)
fit.fit_ALS(data_path, rank=best_rank, regParam=best_regParam, seed=seed, save=True, save_name='full_model')

1. Training ALS model with input ratings data...
Successfully trained!
2. Evaluate model: computing RMSE on test data...
RMSE on test data = 0.825027790347957
3. Saving model...
Model saved at  dbfs:/FileStore/tables/full_model
Out[16]: ALSModel: uid=ALS_6a5e2d3b4d38, rank=5

# Part 4: Model Applications

## 4.1 Movie Recommendations for Users

In [0]:
#make top 5 recommendation for uerId=232
fit.recommend_user(232,5)

Top 5 recommended movie(s) for user with Id 232: 


Unnamed: 0,movieId,userId,rating,title,genres
0,175625,232,6.879239,The Dragon Spell (2016),Adventure|Animation|Children|Comedy|Fantasy
1,43567,232,6.814394,Sweet November (1968),Drama
2,77344,232,6.477338,Chizuko's Younger Sister (Futari) (1991),Drama
3,188113,232,6.468568,A Very Old Story (1968),Children|Fantasy
4,177209,232,6.395944,Acı Aşk (2009),Drama


In [0]:
#the message for unexisting userId
fit.recommend_user(0,1)

There is no user with id =  0


## 4.2 Find similar movies by ALS results

`ALSMovieFactor` class is where results from the ALS process (i.e. the factorized matrix) are used to find similar movies.  
 
1. How `similar_movie` works:  
To define the similarity metric, the user must choose from 'cosine_similarity'(by default) or 'euclidean_distance' to input for `method`.        
  

1. How `_cosine_similarity` and `_euclidean_distance` works:  

When should one use cosine similarity over euclidean distance?

Example: 
-  User 1 bought 1x books, 1x CDs and 1x coffee.
- User 2 bought 100x books, 100x pens and 100x flowers.
- User 3 bought 1x books, 2x apples and 2x oranges. 

By cosine similarity, user 1 and user 2 are more similar. If you were to project the items the users bought onto a 3D plane, user1 and user2 are parallel; scale is handled by normalization in cosine similarity. 
By euclidean similarity, user 3 is more similar to user 1. It measures the distance of two points (the smaller distance, the more similar).

By rule of thumb, cosine similarity can be used when the similarity of ratio or scale is what matters. On the other hand, it is more useful to apply Euclidean distance when the magnitude is important. 

In our case of the recomeending movies, which similarity metric to use depends on how ALS computes its matrix factorizations. One guess is that since the factorization process produces a number (score) which means the closer two numbers are, the more similar the two movies are.

The smaller the `_euclidean_distance` between the factors, the more similar the movies.
The larger the `_cosine_similarity`, the smaller the two feature vectors' angle, the similar the movies.

In [0]:
class ALSMovieFactor(object):
  
  def __init__(self, item_matrix, movies_full):
    '''Note: Input item matrix must be generated by ALSModel.itemFactors, and the movies table generated by transform_load()'''
    self.factor_matrix = item_matrix
    self.movies_full = movies_full
    
  def _cosine_similarity(self, df, array):  
    # compute cosine similarity
    # inputs: spark df, array of feature of interest
    # output: rdd
    out = df.rdd.map(lambda x: (x[0], float(np.dot(array,x[1])/(np.linalg.norm(array) * np.linalg.norm(x[1])))))
    return out
  
  def _euclidean_distance(self, df, array):
    # compute euclidean distance
    # inputs: spark df, array of feature of interest
    # output: rdd
    out = df.rdd.map(lambda x: (x[0], float(np.linalg.norm(np.array(array)-np.array(x[1])))))
    return out
  
  def similar_movie(self, movieId, numItems=10, method = 'cosine_similarity'):
    '''
    This function finds the most similar movies to a certain movie.
    inputs:
    - movieId: an int id of movie in question
    - numItems: an int indicating how many similar movies to find
    - method: a str define which similarity metric to use, must choose from cosine_similarity or euclidean_distance
    output:
    - pandas dataframe consisting of similar movies' movieId, similarity score, title and genres
    '''
    # check if numItems valid
    if not isinstance(numItems, int):
      raise TypeError('Must input integer number of similar movies to recommend')
    # check if movie id valid
    try: movie_factor=self.factor_matrix.where(self.factor_matrix.id==movieId).collect()[0][1]
    except:  return print('There is no movie with id =', movieId)
    # check if method valid
    methods = {'cosine_similarity': self._cosine_similarity, 'euclidean_distance': self._euclidean_distance}
    if method not in methods:
      raise Exception("Method %s not implemented" % method)
      
    movie_factor = self.factor_matrix.where(self.factor_matrix.id==movieId).collect()[0][1]
    #get similarity score
    similar_score = methods[method](self.factor_matrix, movie_factor).toDF(['movieId', method]) 
    #sort df, remove first row same movie
    if method == 'cosine_similarity':
      similar_score = similar_score.orderBy(similar_score[1].desc()).limit(numItems+1).tail(numItems)
    else:
      similar_score = similar_score.orderBy(similar_score[1].asc()).limit(numItems+1).tail(numItems)
    similar_score = spark.createDataFrame(similar_score)
    #join df
    movie_name = self.movies_full[self.movies_full.movieId==movieId].collect()[0][1]
    out = similar_score.join(movies_full, on ='movieId').toPandas()
    print('The top %s similar movie(s) for movie: "%s" (id = %s) :' % (numItems, movie_name, movieId))
    return out


In [0]:
# get factorized matrix
full_model = ALSModel.load("dbfs:/FileStore/tables/full_model")
item_matrix = full_model.itemFactors



In [0]:
movie_factor = ALSMovieFactor(item_matrix, movies_full)

In [0]:
movie_factor.similar_movie(68954,5)

The top 5 similar movie(s) for movie: "Up (2009)" (id = 68954) :


Unnamed: 0,movieId,cosine_similarity,title,genres
0,5829,0.999635,Men with Brooms (2002),Comedy|Drama|Romance
1,98491,0.999617,Paperman (2012),Animation|Comedy|Romance
2,6312,0.999193,"Private Function, A (1984)",Comedy
3,78499,0.999145,Toy Story 3 (2010),Adventure|Animation|Children|Comedy|Fantasy|IMAX
4,165075,0.99897,London Town (2016),Drama


In [0]:
movie_factor.similar_movie(68954,5, method='euclidean_distance')

The top 5 similar movie(s) for movie: "Up (2009)" (id = 68954) :


Unnamed: 0,movieId,euclidean_distance,title,genres
0,98491,0.095326,Paperman (2012),Animation|Comedy|Romance
1,78499,0.103536,Toy Story 3 (2010),Adventure|Animation|Children|Comedy|Fantasy|IMAX
2,134853,0.111304,Inside Out (2015),Adventure|Animation|Children|Comedy|Drama|Fantasy
3,91233,0.124606,Lifted (2006),Animation|Comedy|Sci-Fi
4,8961,0.136013,"Incredibles, The (2004)",Action|Adventure|Animation|Children|Comedy


Compared to cosine similarity, **euclidean distance produces more accurate and relevant results**. Overall, the ALS algorithm is producing relevant results.

## 4.3 Add new user and Recommend

In this part, I add myself as user id = 0 (an userId that unexists in the original data) and my ratings for several movies into the full data to imitate the real-life data updates.  
To get movieIds, I used a simple line below to search by movie's title:

I get the movieIds for movies I would like to rate, and here's my final rating list for training.  
I restructured the list into a Spark dataframe (remember to correct the data types to float()).

In [0]:
new_user_ID = 0

# The format of each line is (userId, movieId, rating)
new_user_ratings = [
     (0,7153,4), # Lord of the Rings: The Return of the King, The (2003)
     (0,55052,5), # Atonement (2007)
     (0,116797,5), # The Imitation Game (2014)
     (0,76093,5), # How to Train Your Dragon (2010)
     (0,8368,5), # Harry Potter and the Prisoner of Azkaban (2004)
     (0,164179,5), # Arrival (2016)
     (0,54001,3), # Harry Potter and the Order of the Phoenix (2007)
     (0,109374,4), # Grand Budapest Hotel, The (2014)
     (0,89745,4) , # Avengers, The (2012)
     (0,117176,4), # The Theory of Everything (2014)
     (0,110102,4), # Captain America: The Winter Soldier (2014)
     (0,79132,4), # Inception (2010)
     (0,109487,3), # Interstellar (2014)
     (0,89470,4), # Contagion (2011)
     (0,68954,5) # Up (2009)
    ]
# Create data frame
schema = StructType([
    StructField('userId', IntegerType(), True),
    StructField('movieId', IntegerType(), True),
    StructField('rating', StringType(), False)
])
new_user_ratings_df = spark.createDataFrame(new_user_ratings,schema)
new_user_ratings_df = new_user_ratings_df.withColumn('rating',new_user_ratings_df['rating'].cast(FloatType()))
# add ratings of the new user to our data
new_ratings = ratings_full.join(new_user_ratings_df, on=['userId', 'movieId', 'rating'], how='outer')

In [0]:
#retrain
new_fit = FitALSModel(new_ratings, movies_full)
new_fit.fit_ALS(data_path, rank=best_rank, regParam=best_regParam, alpha=best_alpha, seed=seed, evaluate = False, save=True, save_name='new_ratings_model')

1. Training ALS model with input ratings data...
Successfully trained!
3. Saving model...
Model saved at  dbfs:/FileStore/tables/new_ratings_model
Out[50]: ALSModel: uid=ALS_47d9093dc26c, rank=5

The model is retrained in ~6 mins.
ALS is not designed for a real-time recommendation.  Any time the dataset change (for example, user rates on a new movie or add a new user), ALS need to retrain on the whole dataset again, so there's a layback due to computation costs.  \
One should consider **hybrid models** that manage the latency and accuracy tradeoff and be able to deliver desirable results effectively.

In [0]:
#load the new model
new_ratings_model = ALSModel.load("dbfs:/FileStore/tables/new_ratings_model")

In [0]:
new_fit.recommend_user(0, 20, new_ratings_model)

Top 20 recommended movie(s) for user with Id 0: 


Unnamed: 0,movieId,userId,rating,title,genres
0,151989,0,7.120394,The Thorn (1971),Comedy
1,177209,0,6.773423,Acı Aşk (2009),Drama
2,98595,0,6.362688,Peppermint Soda (Diabolo menthe) (1977),Comedy|Drama
3,193369,0,6.11898,Starie znakomie (1956),Animation
4,192089,0,6.109591,"National Theatre Live: One Man, Two Guvnors (2...",Comedy
5,157791,0,6.065419,.hack Liminality In the Case of Kyoko Tohno,(no genres listed)
6,120134,0,5.985489,Doggiewoggiez! Poochiewoochiez! (2012),Comedy
7,184299,0,5.954202,Freedom on My Mind (1994),Documentary
8,183947,0,5.945487,NOFX Backstage Passport 2,(no genres listed)
9,190707,0,5.882737,1968 (2018),(no genres listed)


An issue with ALS and collaborative filtering in general is for first time users, the program could give the top rated movies over all users. Since for this new user 0, we only rated 15 movies, which is fewer than the minimum number of movie ratings given by a user, the system likes to recommend some common movies such as the first two movies in the list above. I ran another set of movies for user id 0 and the same top two movies were repeated in the recommended list.