In [0]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import math
from pyspark.sql.functions import col, explode
%matplotlib inline
import os
os.environ["PYSPARK_PYTHON"] = "python3"

import urllib.request
import subprocess
import sys
from pyspark.sql.types import *

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder,CrossValidatorModel
from time import time

## Spark Project:

## Movie Recommendation Engine Development with ALS in Apache Spark

# Overview
In this project, I built a movie recommendation engine using Alternating Least Squares [(ALS)](https://spark.apache.org/docs/2.2.0/ml-collaborative-filtering.html) algorithm in model-based collaborative filtering and Apache Spark APIs on [MovieLens movie rating dataset ](https://grouplens.org/datasets/movielens/latest/) of size ~762MB, to predict the ratings by users, give recommendations accordingly to users on request, and find similar movies for a specific movie of interest.

The outline of this report is organized as follows: Part 1 data ETL and OLAP, Part 2 model training and evaluation on sample data, Part 3 model deployment on full data, Part 4 model applications, Part 5 complete walkthrough of the code.

Note: The first four parts only contain a subset of relevant codes just for the display purpose; the full Python code is in the last part.

# Part 1: Data Ingestion, Preprocessing, and OLAP
In this part, I built a data ETL pipeline to manipulate the movie rating dataset in DBFS and conducted online analytical processing (OLAP) on a sample dataset of size ~3MB with Spark SQL.

The MovieLens dataset I used is the ratings and movies dataset. After preprocessing, the Spark dataframe is shown as below:

In [0]:
print("--ratings top 5 rows--")
display(ratings_df.limit(5).toPandas())
print("--movies top 5 rows--")
display(movies_df.limit(5).toPandas())

userId,movieId,rating
1,1,4.0
1,3,4.0
1,6,4.0
1,47,5.0
1,50,5.0


movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy


**Facts:**

In [0]:
tmp1 = ratings_df.groupBy("userID").count().toPandas()['count'].min()
tmp2 = ratings_df.groupBy("movieId").count().toPandas()['count'].min()
tmp3 = ratings_df.count()
print('The sample data contains {} records of ratings.'.format(tmp3))
print('For the users that rated movies and the movies that were rated:')
print('- Minimum number of ratings per user is {}.'.format(tmp1))
print('- Minimum number of ratings per movie is {}.'.format(tmp2))

tmp4 = sum(ratings_df.groupBy("movieId").count().toPandas()['count'] == 1) #that movie has only one rating
tmp5 = ratings_df.select('movieId').distinct().count() #how many movies are rated
print('{} out of {} movies are rated by only one user.'.format(tmp4, tmp5))

##Spark SQL and OLAP

In [0]:
movies_df.registerTempTable("movies")
ratings_df.registerTempTable("ratings")

## Q1: The number of Users

In [0]:
%sql select count(distinct userId) as Number_of_users from ratings

Number_of_users
610


## Q2: The number of Movies

In [0]:
%sql
select count(distinct movieId) as Number_of_movies from movies

Number_of_movies
9742


## Q3:  How many movies are rated by users? List movies not rated before

In [0]:
%sql select count(distinct movieId) as Number_movies_rated_by_users from ratings

Number_movies_rated_by_users
9724


In [0]:
%sql select title, genres from movies where movieId not in (select movieId from ratings)

title,genres
"Innocents, The (1961)",Drama|Horror|Thriller
Niagara (1953),Drama|Thriller
For All Mankind (1989),Documentary
"Color of Paradise, The (Rang-e khoda) (1999)",Drama
I Know Where I'm Going! (1945),Drama|Romance|War
"Chosen, The (1981)",Drama
"Road Home, The (Wo de fu qin mu qin) (1999)",Drama|Romance
Scrooge (1970),Drama|Fantasy|Musical
Proof (1991),Comedy|Drama|Romance
"Parallax View, The (1974)",Thriller


## Q4: List Movie Genres

In [0]:
%sql select distinct explode(split(genres,'[|]')) as genres from movies order by genres

genres
(no genres listed)
Action
Adventure
Animation
Children
Comedy
Crime
Documentary
Drama
Fantasy


## Q5: Movie Count & List for Each Category

In [0]:
%sql select genres as Category, count(*) as number from (select explode(split(genres,'[|]')) as genres, movieId from movies) group by Category order by 2 desc

Category,number
Drama,4361
Comedy,3756
Thriller,1894
Action,1828
Romance,1596
Adventure,1263
Crime,1199
Sci-Fi,980
Horror,978
Fantasy,779


In [0]:
%sql with full_table as (select genres, title, row_number() over (partition by genres order by title) as row_id from (select explode(split(genres,'[|]')) as genres, title from movies)),
select genres as Category, concat_ws(',',collect_set(title)) as list_of_movies from full_table where row_id <= 5 group by 1 

Category,list_of_movies
(no genres listed),"A Midsummer Night's Dream (2016),Ali Wong: Baby Cobra (2016),A Christmas Story Live! (2017),A Cosmic Christmas (1977),Ben-hur (2016)"
Action,"'71 (2014),12 Rounds (2009),10th Victim, The (La decima vittima) (1965),'Hellboy': The Seeds of Creation (2004),13 Assassins (Jûsan-nin no shikaku) (2010)"
Adventure,"10,000 BC (2008),101 Dalmatians (One Hundred and One Dalmatians) (1961),10th Kingdom, The (2000),101 Dalmatians (1996),'Hellboy': The Seeds of Creation (2004)"
Animation,"101 Dalmatians (One Hundred and One Dalmatians) (1961),5 Centimeters per Second (Byôsoku 5 senchimêtoru) (2007),101 Dalmatians II: Patch's London Adventure (2003),A Detective Story (2003),9 (2009)"
Children,"101 Dalmatians (One Hundred and One Dalmatians) (1961),101 Dalmatians (1996),101 Dalmatians II: Patch's London Adventure (2003),102 Dalmatians (2000),*batteries not included (1987)"
Comedy,"(500) Days of Summer (2009),'burbs, The (1989),...All the Marbles (1981),'Hellboy': The Seeds of Creation (2004),*batteries not included (1987)"
Crime,"10 Cent Pistol (2015),11:14 (2003),10th & Wolf (2006),00 Schneider - Jagd auf Nihil Baxter (1994),12 Angry Men (1997)"
Documentary,"20 Feet from Stardom (Twenty Feet from Stardom) (2013),11th Hour, The (2007),'Hellboy': The Seeds of Creation (2004),13th (2016),20,000 Days on Earth (2014)"
Drama,"'Salem's Lot (2004),'71 (2014),""11'09""""01 - September 11 (2002)"",'Round Midnight (1986),'Til There Was You (1997)"
Fantasy,"10th Kingdom, The (2000),'Hellboy': The Seeds of Creation (2004),13 Going on 30 (2004),13th Warrior, The (1999),*batteries not included (1987)"


# Part 2: Model Training and Evaluation

In this part, I built the ALS model and tuned the hyperparameters by Cross Validation with the Grid Search method on the training set, and evaluated the performance of the best model on the test data by computing RMSE of model inference.

In [0]:
#Create ALS model: 
# note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop", seed = 0)
#Tune model using ParamGridBuilder
paramGrid = (ParamGridBuilder()
             .addGrid(als.rank, [4,6,8,12])
             .addGrid(als.regParam, [0.1, 0.15, 0.20])
             .build())
# Define evaluator as RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
# Build 4-fold Cross validation 
cv = CrossValidator(estimator=als, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=4, seed = 0)
cv_model = cv.fit(train)
best_model = cv_model.bestModel

In [0]:
print("cross validation RMSE of the best model: {}".format(min(cv_model.avgMetrics)))
print ("RMSE on test data = "+str(rmse))
best_params = cv_model.getEstimatorParamMaps()[np.argmin(cv_model.avgMetrics)]
print('**Best ALS model**')
for i,j in best_params.items():
  print('  '+i.name+': '+str(j))

# Part 3: Model Deployment on full data

In this part, I loaded full MovieLens ratings data into Spark DBFS and trained the ALS model with the best parameters found by CV from the last step.

In [0]:
# The full code can be found at the walk through part
predictions = full_model.transform(full_test)
rmse = evaluator.evaluate(predictions)
print ("RMSE on test data = "+str(rmse))

#Part 4: Model Applications

##4.1 Movies Recommendations to user
Now let's make some recommendations based on our model!

**Case 1: Recommend 10 movies for user id = 575**

In [0]:
fit.recommend_user(575,10)

Unnamed: 0,movieId,userId,rating,title,genres
0,185645,575,5.293878,Stone Cold Steve Austin: The Bottom Line on th...,Documentary
1,185659,575,5.266957,Macho Madness - The Randy Savage Ultimate Coll...,(no genres listed)
2,188111,575,4.897672,Norman Lear: Just Another Version of You (2016),Documentary
3,122222,575,4.783478,The Bride Goes Wild (1948),Comedy|Romance
4,151989,575,4.774616,The Thorn (1971),Comedy
5,140351,575,4.746531,Richard Jeni: A Big Steaming Pile of Me (2005),Comedy
6,153010,575,4.689323,L'ange (1983),(no genres listed)
7,180293,575,4.649055,The Damned Don't Cry (1950),Action|Drama|Romance
8,177921,575,4.640456,"Gesuzza, La Sposa Garibaldina (1934)",(no genres listed)
9,178881,575,4.640456,The Visitor (1964),Comedy|Romance


We can also compare the recommendations by the engine with the top 10 previous ratings from user id = 575 on their viewing history to see how well it performs:

In [0]:
#compare history
df_575 = ratings_full.where(ratings_full.userId==575).orderBy(ratings_full[2].desc()).limit(10).join(movies_full, on='movieId').toPandas().sort_values(by='rating', ascending=False)

display(df_575)

movieId,userId,rating,title,genres
6016,575,5.0,City of God (Cidade de Deus) (2002),Action|Adventure|Crime|Drama|Thriller
68347,575,5.0,Sin Nombre (2009),Crime|Drama|Thriller
7160,575,4.5,Monster (2003),Crime|Drama
8645,575,4.5,"Maria Full of Grace (Maria, Llena eres de gracia) (2004)",Crime|Drama
61240,575,4.5,Let the Right One In (Låt den rätte komma in) (2008),Drama|Fantasy|Horror|Romance
1096,575,4.0,Sophie's Choice (1982),Drama
4378,575,4.0,Sexy Beast (2000),Crime|Drama
31410,575,4.0,"Downfall (Untergang, Der) (2004)",Drama|War
2857,575,3.5,Yellow Submarine (1968),Adventure|Animation|Comedy|Fantasy|Musical
5388,575,3.5,Insomnia (2002),Action|Crime|Drama|Mystery|Thriller


**Case 2: Recommend 5 movies for user id = 232**

In [0]:
fit.recommend_user(232,5)

Unnamed: 0,movieId,userId,rating,title,genres
0,151989,232,6.665139,The Thorn (1971),Comedy
1,177209,232,6.396969,Acı Aşk (2009),Drama
2,127252,232,5.766646,The Veil of Twilight (2014),Crime|Fantasy|Mystery
3,107434,232,5.72748,Diplomatic Immunity (2009– ),Comedy
4,66389,232,5.459282,AmericanEast (2008),Drama


**Case 3: Unexisting user id**

In [0]:
fit.recommend_user(0,1)

##4.2 Find Similar Movies to a movie

Here I utilized the results from ALS (which is the factorized matrix) for movies' features and defined similarity metrics to find similar movies to the movie of our interest.  
In the functions I defined, I included two common similarity metrics: cosine similarity (by default) and euclidean distance. 

If interested, here's [my article](https://z-cheng.medium.com/a-comparison-of-cosine-similarity-vs-euclidean-distance-in-als-recommendation-engine-51898f9025e7) explaining in details on how these two metrics works differently, comparing their results and some suggestion on which metric to choose under different cases in general and tests and analysis for this specific project, as well as some thoughts and reflections on further A/B testing.
*(Comments are welcome!)*  
In short, if the magnitude of features is important, use Euclidean distance; if not important or may even disrupt the outputs, use cosine similarity.

Here I display sample results using both distances. I recommend using cosine similarity in this case based on my experiment in the above [article](https://z-cheng.medium.com/a-comparison-of-cosine-similarity-vs-euclidean-distance-in-als-recommendation-engine-51898f9025e7). But for some further reasons in real-life deployment, I suggest using A/B testing in deciding which method will better meet one's purpose.

**Case 1: Find 10 similar movies to movie id = 463 by cosine similarity **

In [0]:
movie_factor.similar_movie(463,10)

Unnamed: 0,movieId,cosine_similarity,title,genres
0,248,0.999857,Houseguest (1994),Comedy
1,494,0.999681,Executive Decision (1996),Action|Adventure|Thriller
2,90312,0.999406,Quiet Flows the Don (Tikhiy Don) (1957),Drama|War
3,99857,0.999372,Jesse Stone: Stone Cold (2005),Crime|Drama|Mystery
4,489,0.999321,Made in America (1993),Comedy
5,3,0.999186,Grumpier Old Men (1995),Comedy|Romance
6,108311,0.999176,Nobody Lives Forever (1946),Crime|Drama|Film-Noir
7,138300,0.998861,Welcome to the North (2012),Comedy
8,73991,0.998861,Pyrates (1991),Comedy|Romance
9,170279,0.998838,Darkman III: Die Darkman Die (1996),Action|Sci-Fi|Thriller


**Case 2: Find 2 similar movies of movie id = 471 by Euclidean distance**

In [0]:
movie_factor.similar_movie(471,2,'euclidean_distance')

Unnamed: 0,movieId,euclidean_distance,title,genres
0,94423,3.878288,Disney Princess Collection: Jasmine's Enchante...,Animation
1,188925,3.679992,8 Murders a Day (2011),(no genres listed)


**Case 3: Unexisting movie id**

In [0]:
movie_factor.similar_movie(0,2)

##4.3 Add New User

Now it's time to add a new user (in this case, me ^^) to our dataset and see the results of retraining the model.

This is very common for real-life datasets when it's being updated continuously, and it's of great importance to ensure our code functions smoothly within a reasonable time with new input data.

It has to be pointed out that the ALS model is a batch process and thus not suitable for getting instant results, which is one of its deficiencies. Any time the dataset change (for example, user rates on a new movie or add a new user), ALS need to retrain on the whole dataset again, so there's a layback due to computation costs.  
In the case of movie recommendation, e-commerce recommendation, or other items with long consumption time, ALS could work; in other cases like short video recommendation, one should consider other methods that could handle the whole process within an instance.


After adding my ratings as `userId = 0` into the rating data, I retrained the model on the whole dataset.

In [0]:
# this are the movies I chose and my ratings for them
display(new_user_ratings_df.join(movies_full, on='movieId').toPandas().sort_values(by='rating',ascending=False))

movieId,userId,rating,title,genres
78499,0,5.0,Toy Story 3 (2010),Adventure|Animation|Children|Comedy|Fantasy|IMAX
4896,0,5.0,Harry Potter and the Sorcerer's Stone (a.k.a. Harry Potter and the Philosopher's Stone) (2001),Adventure|Children|Fantasy
5816,0,5.0,Harry Potter and the Chamber of Secrets (2002),Adventure|Fantasy
8368,0,5.0,Harry Potter and the Prisoner of Azkaban (2004),Adventure|Fantasy|IMAX
184651,0,5.0,Detective Chinatown 2 (2018),(no genres listed)
68954,0,5.0,Up (2009),Adventure|Animation|Children|Drama
7153,0,4.0,"Lord of the Rings: The Return of the King, The (2003)",Action|Adventure|Drama|Fantasy
89745,0,4.0,"Avengers, The (2012)",Action|Adventure|Sci-Fi|IMAX
88140,0,4.0,Captain America: The First Avenger (2011),Action|Adventure|Sci-Fi|Thriller|War
110102,0,4.0,Captain America: The Winter Soldier (2014),Action|Adventure|Sci-Fi|IMAX


In [0]:
#retrain the model
new_fit = FitALSModel(new_ratings, movies_full)
new_fit.fit_ALS(data_path, rank=best_rank, regParam=best_regParam,  seed=seed, evaluate = False, save=True, save_name='new_ratings_model')

The model is retrained in less than 6 minutes, which is acceptable. 

Now it's time to see whether the recommendations are satisfying! Can't wait to see my results :)

In [0]:
new_fit.recommend_user(0, 20)

Unnamed: 0,movieId,userId,rating,title,genres
0,151989,0,6.765066,The Thorn (1971),Comedy
1,177209,0,6.757873,Acı Aşk (2009),Drama
2,193257,0,6.485928,Familie Brasch (2018),Documentary
3,192089,0,6.069542,"National Theatre Live: One Man, Two Guvnors (2...",Comedy
4,66389,0,6.065567,AmericanEast (2008),Drama
5,184299,0,6.041394,Freedom on My Mind (1994),Documentary
6,159467,0,5.882512,Fifi Howls from Happiness (2013),Documentary
7,107434,0,5.860322,Diplomatic Immunity (2009– ),Comedy
8,190707,0,5.860061,1968 (2018),(no genres listed)
9,171849,0,5.858974,Without Family (1984),Children|Drama


Hmm... I have to say there's something unexpected with my recommendations. To see my further analysis and attempts to interpret the results, here's [my article](https://z-cheng.medium.com/further-analysis-on-movie-recommendation-result-what-happened-to-my-als-model-30bc234081c4) on trying to figure out what happened. 

In conclusion, one of the deficiencies of ALS model is that the latent factors derived through machine learning process are not interpretable or make sense directly to human's mind. Therefore, it's hard to explain why these recommendations are made compared to other user-based or item-based collaborative filtering methods.

#Part 5: Walk Through Codes

In [0]:
#check spark is running
spark

##5.1 Cross Validation on sample data

I used a 4-fold Cross Validation with Grid Search method to tune two hyperparameters of ALS: **`rank`** and **`regParam`**.  

**`rank`**: the number of latent factors in the model (defaults to 10).  
Number of latent factors is the most important parameters in ALS model and definitely need to be tuned, since it has a great effect on the model's performance.  
Though generally it will be more accurate with higher rank on the training set, it's not always the best for test set as the model might be overfitting, and also the computation costs will increase as well.

**`regParam`**: the regularization parameter in ALS (defaults to 1.0).  
Regularization is one of the advantages of ALS to avoid overfitting problem.  
Spark ALS API scale the regParam by the number of ratings the user generated (or the movie received) in updating user (or movie) factors. The approach is named ALS-WR, which makes regParam less dependent on the scale of the dataset, so we can apply the best parameter learned from a sample subset to the full dataset and expect similar performance.

In [0]:
#load sample data
movies_df = spark.read.load("/FileStore/tables/movies.csv", format='csv', header = True)
ratings_df = spark.read.load("/FileStore/tables/ratings.csv", format='csv', header = True).drop('timestamp')

In [0]:
#Create test and train set
movie_ratings = ratings_df
movie_ratings = movie_ratings.withColumn("userId", movie_ratings["userId"].cast(IntegerType()))
movie_ratings = movie_ratings.withColumn("movieId", movie_ratings["movieId"].cast(IntegerType()))
movie_ratings = movie_ratings.withColumn("rating", movie_ratings["rating"].cast(FloatType()))
(train, test)=movie_ratings.randomSplit([0.8, 0.2],seed=0)

I started with rank = 5, 10, 15, 20 and regParam = 0.01, 0.05, 0.1, 0.15.
The best model from CV was rank = 5 and regParam = 0.15.  
It turns out that the cv tuning process chose the upperbound or lowerbound of possible values, so I try to further tuning by setting new ranges: rank = 3, 4, 5; regParam = 0.15, 0.20. 0.25, where it chose rank = 4 and regParam = 0.15.

(Later I tried several sets of parameter values during the process of analyzing ALS as described in [my article](https://z-cheng.medium.com/further-analysis-on-movie-recommendation-result-what-happened-to-my-als-model-30bc234081c4) mentioned above. It kept choosing the same best values.)  
The following code is one of those tuning processes where I set rank = 4, 6, 8, 12 and regParam = 0.15, 1.0, 10.

In [0]:
#Create ALS model: 
# note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop", seed = 0)

#Tune model using ParamGridBuilder
paramGrid = (ParamGridBuilder()
             .addGrid(als.rank, [4,6,8,12])
             .addGrid(als.regParam, [0.15, 1.0, 10])
             .build())
print ("Num models to be tested: ", len(paramGrid))

# Define evaluator as RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

# Build 4-fold Cross validation 
cv = CrossValidator(estimator=als, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=4, seed = 0)
cv_model = cv.fit(train)
#Extract best model from the tuning exercise using ParamGridBuilder
best_model = cv_model.bestModel


Here's how I created a table containing the CV rmse for markdown visualization:

In [0]:
a = iter(list(cv_model.avgMetrics[:]))
res = []
for x in [4,6,8,12]:
    for y in [0.15,1.0,10]:
        res.append((x, y, next(a)))
for item in res:
    print('|' + '|'.join(list(map(str, item))) + '|')

| rank | regParam | CV RMSE |
| ------------ | ------------- | ------------- |
|4|0.15|0.8878537199214767|
|4|1.0|1.3248333634395033|
|4|10|3.664643882934559|
|6|0.15|0.8926391412241598|
|6|1.0|1.3248205431910358|
|6|10|3.664643882934559|
|8|0.15|0.892729343479868|
|8|1.0|1.3248269491666145|
|8|10|3.664643882934559|
|12|0.15|0.893684770546905|
|12|1.0|1.3248391808064497|
|12|10|3.664643882934559|

These are the final best parameters I got:

In [0]:
#Generate predictions, then evaluate using RMSE
print("cross validation RMSE of the best model: {}".format(min(cv_model.avgMetrics)))
print ("RMSE on test data = "+str(rmse))
best_params = cv_model.getEstimatorParamMaps()[np.argmin(cv_model.avgMetrics)]
print('**Best ALS model**')
for i,j in best_params.items():
  print('  '+i.name+': '+str(j))

##5.2 Load full data and deploy ALS model

In [0]:
# set parameters from cv process
best_rank = 4
best_regParam = 0.15
seed = 0

In general, the following classes I built to encapsulate different stages or functions will require the user to input a data path, where the functions inside the classes can save and load data or model in the DBFS.

However, there are some important rules to keep in mind when defining the path:
1. need to check if the path already exists: if so, unexpected error may occured when moving temp files to dbfs;
2. need to check if the files already unzipped and exist at file:/databricks/driver/ml-latest/: if so, unexpected running time will occur when unzipping again.

To check for a path/file in dbfs, use `display(dbutils.fs.ls(your_data_path))`.

In [0]:
data_path = "dbfs:/FileStore/tables/ml-full/"

Here is the first class `MovieLensDataETL` that I built for encapsulating the data ETL process. Since it may take a while to run the functions, I also added detailed information to print out to indicate each stage in progress and its time consumption.  
When creating the object, nothing needs to be passed. 

Notes for explanation:  
1. How the `extract` function works:  
When calling the function, the user must input the `data_path`.  
First, it retrieves the zip file from the MovieLens website URL to a temporary path. Sometimes it may confront SSL certificate error, so I include the solution suggested by [here](https://stackoverflow.com/questions/50236117/scraping-ssl-certificate-verify-failed-error-for-http-en-wikipedia-org) to solve the problem. Nothing would change after the process since it restores the same behavior immediately after retrieving to reduce the possible hazard mentioned in the link.  
Then it will unzip the files and moved them to DFBS defined by the input path when a list of all the unzipped files will be printed for reference. Since I only use the ratings.csv and movies.csv, only the two files will be moved to DBFS, and a display of file storage will occur when all the processes are finished.

2. How the `transform_load` function works:  
If you have already extracted the files and just want to load data for usage the next time you come back to work, `transform_load` enables you to directly load data from your input `data_path` so that you don't need to go over again all the fuss in the extraction process.  
Since Spark dataframe has the problem of automatically recognizing the type of data, it will take all data as String and therefore we need to transform the types manually. Also, we need to select only the features of interest and drop others.  
`transform_load` saves all the troubles and enables the user to simply call the function to get the preprocessed dataframe of ratings and movies as outputs. I also include an automatic display of the tables' first row for understanding the data.

In [0]:
class MovieLensDataETL(object):
  def extract(self, data_path):
    t0 = time()
    #1. retrive
    print("1. Retriving the zip file...")
    try:
      urllib.request.urlretrieve("https://files.grouplens.org/datasets/movielens/ml-latest.zip", "/tmp/ml-latest.zip")
    except:
      print("Solving ssl certificate failure...")
      import ssl
      # This restores the same behavior as before.
      orig_https_context = ssl._create_default_https_context 
      ssl._create_default_https_context = ssl._create_unverified_context
      urllib.request.urlretrieve("https://files.grouplens.org/datasets/movielens/ml-latest.zip", "/tmp/ml-latest.zip")
      ssl._create_default_https_context = orig_https_context
    tt = time() - t0 
    print("Zip file retrieved in %s seconds!" % round(tt,3))  

    #2. unzip
    t0 = time()
    print("2. Unzipping the files...")
    command = subprocess.run(["unzip","/tmp/ml-latest.zip"], capture_output=True)
    #print output to see list of unzipped files
    sys.stdout.write(command.stdout)
    sys.stderr.write(command.stderr)
    tt = time() - t0 
    print("All files unzipped in %s seconds!" % round(tt,3))
    
    #3. save temp files to dbfs
    print("3. Moving selected temp files to ", data_path, " ...")
    dbutils.fs.mv("file:/databricks/driver/ml-latest/ratings.csv", data_path + "ratings.csv") 
    dbutils.fs.mv("file:/databricks/driver/ml-latest/movies.csv", data_path + "movies.csv")
    print("Selected files saved at DBFS!")
    display(dbutils.fs.ls(data_path))
        
  def transform_load(self, data_path):
    #4. load file into Spark DataFrame
    print("4. Loading file into Spark DataFrame...")
    movies_full = spark.read.load(data_path + "movies.csv", format='csv', header = True)
    ratings_full = spark.read.load(data_path + "ratings.csv", format='csv', header = True).drop('timestamp')
    
    #5. data type convert
    print("5. Converting data type...")
    ratings_full = ratings_full.withColumn("userId", ratings_full["userId"].cast(IntegerType()))
    ratings_full = ratings_full.withColumn("movieId", ratings_full["movieId"].cast(IntegerType()))
    ratings_full = ratings_full.withColumn("rating", ratings_full["rating"].cast(FloatType()))
    movies_full = movies_full.withColumn("movieId", movies_full["movieId"].cast(IntegerType()))
    
    #6. present dataframe
    print("--ratings top 5 rows--")
    display(ratings_full.limit(5).toPandas())
    print(ratings_full.schema)
    print("--movies top 5 rows--")
    display(movies_full.limit(5).toPandas())
    print(movies_full.schema)
    print("**ratings, movies are returned respectively**")
    return ratings_full, movies_full

In [0]:
full_data = MovieLensDataETL()

In [0]:
full_data.extract(data_path)

path,name,size
dbfs:/FileStore/tables/ml-full/movies.csv,movies.csv,2858223
dbfs:/FileStore/tables/ml-full/ratings.csv,ratings.csv,759200511


In [0]:
full_df = full_data.transform_load(data_path)

userId,movieId,rating
1,307,3.5
1,481,3.5
1,1091,1.5
1,1257,4.5
1,1449,4.5


movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy


In [0]:
ratings_full = full_df[0]
movies_full = full_df[1]

##5.3 Fit model on full data and make recommendations

Here's the second class `FitALSModel` I built to encapsulate model training and giving recommendations.  
When creating the object, user must pass in the preprocessed ratings and movies dataframe from the previous stage.

Notes of explanations:  
1. How `fit_ALS` works:  
When calling `fit_ALS`, the user must input `data_path` to indicate where they want to save the model.   
User can also choose to input `rank` and `regParam` as best values tuned by CV or by default values of Spark ALS API, set `seed` or by default = 0, name the output model `save_name` in saving directory or by default = 'full_model'.
By default, after training the model, it will also evaluate the model on test set and print the test rmse. But user can also skip the process by setting `evaluate=False`.  
The same as well with the saving process, where user can skip the saving process by `save=False`, since the function will still return the model as the output and can be saved later if desired.

2. How `recommend_user` works:  
The user must specify the integer of `userId` for recommendation, and set `numItems` to get a specific number of recommendations to return, or just by default = 10. If no such user id exists, a message will be printed.  
If the `recommend_user` is called right after `fit_ALS`, there's no need to input model manually since the result from last process has already saved inside the class properties.  
However, if someone has already got a model at hand and ready to use without going over the whole training process, they can also make use of their model by input as `full_model`, just make sure the input is a valid object of class pyspark.ALSModel.  
The function will return a Pandas dataframe of movie recommendations as the output.

In [0]:

class FitALSModel(object):
  def __init__(self, ratings_full, movies_ful):
    self.ratings_full = ratings_full
    self.movies_full = movies_full
    self.full_model = None
    
  def fit_ALS(self, data_path, rank=10, regParam=1.0, seed=0, evaluate=True, save=True, save_name='full_model'):
    t0 = time()
    # fit ALS model with best parameters from cross validation on sample dataset
    print("1. Training ALS model with input ratings data...")
    full_train, full_test = self.ratings_full.randomSplit([0.8, 0.2], seed = seed)
    als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop", rank = rank, regParam = regParam, seed = seed)
    self.full_model = als.fit(full_train)
    tt = time() - t0
    print("Successfully trained in %s seconds!" % round(tt,3))
    
    if evaluate:
      # Define evaluator as RMSE
      print("2. Evaluate model: computing RMSE on test data...")
      evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
      predictions = self.full_model.transform(full_test)
      rmse = evaluator.evaluate(predictions)
      print ("RMSE on test data = "+str(rmse))
      
    if save:
      # save model
      print("3. Saving model...")
      self.full_model.save(data_path + save_name)
      print("Model saved at ", data_path + save_name)
    return self.full_model
  
   
  def recommend_user(self, userId, numItems=10, full_model=None):
    '''
    Inputs:
    - userId: an integer defining ID of user of interest.
    - numItems: an integer defining max number of recommendations for each user.
    - full_model: the ALS model trained and saved
 
    Returns:
    - a DataFrame of movieId, userId, rating, title, genres.
    '''
    #check inputs
    if not isinstance(userId, int):
      return print('Oops: Id must be integer!') 
    if not full_model:
      full_model = self.full_model
    df = spark.createDataFrame([{'userId': userId}])
    userRecs = full_model.recommendForUserSubset(df, numItems)
    if not len(userRecs.head(1))>0:
      return print('There is no user with id = ', userId)
    userRecs = userRecs\
    .withColumn("rec_exp", explode("recommendations"))\
    .select('userId', col("rec_exp.movieId"), col("rec_exp.rating"))\
    .join(self.movies_full, on='movieId')\
    .where(userRecs['userId'] == userId)\
    .toPandas()
    print("Top %s recommended movie(s) for user with Id %s: " % (numItems, userId))
    return userRecs


In [0]:
#fit the model
fit = FitALSModel(ratings_full, movies_full)
fit.fit_ALS(data_path, rank=best_rank, regParam=best_regParam, seed=seed, save=True, save_name='full_model')

In [0]:
#make top 5 recommendation for uerId=232
fit.recommend_user(232,5)

Unnamed: 0,movieId,userId,rating,title,genres
0,151989,232,6.665139,The Thorn (1971),Comedy
1,177209,232,6.396969,Acı Aşk (2009),Drama
2,127252,232,5.766646,The Veil of Twilight (2014),Crime|Fantasy|Mystery
3,107434,232,5.72748,Diplomatic Immunity (2009– ),Comedy
4,66389,232,5.459282,AmericanEast (2008),Drama


In [0]:
#the message for unexisting userId
fit.recommend_user(0,1)

##5.4 Find similar movies by ALS results

Here's the third class `ALSMovieFactor`, where I make use of the results from the ALS process (i.e. the factorized matrix) to find similar movies.  
The idea is to compute similarity scores based on each movie's features and sort the results. The very same idea could be applied to finding similar users simply by adjusting some relevant code from below.  
When building the object, the user must input the item factors matrix they get from their ALS model, and the movies Spark dataframe they get from the ETL process.

Notes of explanations:  
1. How `similar_movie` works:  
This is the only function the user should call to get the results. Users must input an integer `movieId` to indicate the movie of interest, and define `numItems` to indicate how many similar movies to find (by default = 10).  
To define the similarity metric, the user must choose from 'cosine_similarity'(by default) or 'euclidean_distance' to input for `method`. Other methods should not work and an error will be raised.  
 First, it will select the features of the specified movie from the factor matrix, which will be an array, and save it for the later computation.  
 Second, according to method inputs, it will call the relevant functions and compute the similarity scores across the whole movie features.  
 Third, it will sort the dataframe by descending scores to get the number of most similar movies required and saved it as a new Spark dataframe.  
 Finally, it will join with the movies dataframe by movieId to get movies' titles and genres, and transformed it to Pandas dataframe to return.        
The whole process contains several transformations of type because I need to choose the best form to achieve the tasks with such a big data size. Most operations are done by Spark df because it can conduct distributed operations and thus give returns in a reasonable time. Rdd is used for computing similarity score only (reasons below), and Pandas df only for better display. (I have tested and compared their time performance of each type, and would suggest avoiding using Pandas df on such operations.)

1. How `_cosine_similarity` and `_euclidean_distance` works:  
These are the similarity metrics defined and should be used **only** inside the class, where they take the item factors matrix and the features of the specific movie of interest as inputs to compute the similarity scores.  
I used `rdd.map()` function for this operation because Spark udf cannot take two arguments or do complicated computation like this. So the output of these functions will be rdd and should be transformed back to Spark dataframe in `similar_movie`.  
It should also be noted that I need to apply `float()` to the computation because the computation is done on np.arrary. If not doing so, when you want to transfer back to Spark dataframe and try to correct its type from string to float by setting a schema, it won't work because Spark cannot deal with dtypes object, so we need to manually set it to the type of float.

In [0]:
class ALSMovieFactor(object):
  
  def __init__(self, item_matrix, movies_full):
    print('Note: Please make sure the input item matrix must be generated by ALSModel.itemFactors, and the movies table generated by MovieLensDataETL.transform_load()[1]')
    self.factor_matrix = item_matrix
    self.movies_full = movies_full
    
  def _cosine_similarity(self, df, array):  
    # compute cosine similarity
    # inputs: spark df, array of feature of interest
    # output: rdd
    out = df.rdd.map(lambda x: (x[0], float(np.dot(array,x[1])/(np.linalg.norm(array) * np.linalg.norm(x[1])))))
    return out
  
  def _euclidean_distance(self, df, array):
    # compute euclidean distance
    # inputs: spark df, array of feature of interest
    # output: rdd
    out = df.rdd.map(lambda x: (x[0], float(np.linalg.norm(np.array(array)-np.array(x[1])))))
    return out
  
  def similar_movie(self, movieId, numItems=10, method = 'cosine_similarity'):
    '''
    This function is to find the most similar movies to a certain movie.
    inputs:
    - movieId: an int id of movie in question
    - numItems: an int indicating how many similar movies to find
    - method: a str define which similarity metric to use, must choose from cosine_similarity or euclidean_distance
    output:
    - pandas dataframe consisting of similar movies' movieId, similarity score, title and genres
    '''
    # check if numItems valid
    if not isinstance(numItems, int):
      raise TypeError('Oops! Please input an integer number of similar movies to find')
    # check if movie id valid
    try: movie_factor=self.factor_matrix.where(self.factor_matrix.id==movieId).collect()[0][1]
    except:  return print('There is no movie with id =', movieId)
    # check if method valid
    methods = {'cosine_similarity': self._cosine_similarity, 'euclidean_distance': self._euclidean_distance}
    if method not in methods:
      raise Exception("Method %s not implemented" % method)
      
    movie_factor = self.factor_matrix.where(self.factor_matrix.id==movieId).collect()[0][1]
    #get similarity score
    similar_score = methods[method](self.factor_matrix, movie_factor).toDF(['movieId', method]) 
    #sort df, remove first row
    similar_score = similar_score.orderBy(similar_score[1].desc()).limit(numItems+1).tail(numItems)
    similar_score = spark.createDataFrame(similar_score)
    #join df
    movie_name = self.movies_full[self.movies_full.movieId==movieId].collect()[0][1]
    out = similar_score.join(movies_full, on ='movieId').toPandas()
    print('The top %s similar movie(s) for movie: "%s" (id = %s) :' % (numItems, movie_name, movieId))
    return out


In [0]:
# get factorized matrix
full_model = ALSModel.load("dbfs:/FileStore/tables/ml-full/full_model")
item_matrix = full_model.itemFactors

In [0]:
movie_factor = ALSMovieFactor(item_matrix, movies_full)

In [0]:
movie_factor.similar_movie(471,2)

Unnamed: 0,movieId,cosine_similarity,title,genres
0,41880,0.99994,House of Strangers (1949),Drama|Film-Noir
1,145120,0.999802,Monopol (1996),Comedy


##5.5 Add new user

In this part, I add myself as user id = 0 (remember that userId unexists in the original data) and my ratings for several movies into the full data to imitate the real-life data updates.  
To get movieIds, I used a simple line below to search by movie's title:

In [0]:
#search for movie id by title example: the animation movie 'Up' by Pixar in 2009
movies_full[movies_full.title.rlike('^Up.+2009')].collect()

With the above code, I managed to get the movieIds for movies I would like to rate, and here's my final rating list for training.  
I restructured the list into a Spark dataframe (remember to correct the data types), and add by`.union` function to the original dataset.

In [0]:
new_user_ID = 0

# The format of each line is (userId, movieId, rating)
new_user_ratings = [
     (0,7153,4), # Lord of the Rings: The Return of the King, The (2003)
     (0,78499,5), # Toy Story 3 (2010)
     (0,4896,5), # Harry Potter and the Sorcerer's Stone (a.k.a. Harry Potter and the Philosopher's Stone) (2001)
     (0,5816,5), # Harry Potter and the Chamber of Secrets (2002)
     (0,8368,5), # Harry Potter and the Prisoner of Azkaban (2004)
     (0,40815,3), # Harry Potter and the Goblet of Fire (2005)
     (0,54001,3), # Harry Potter and the Order of the Phoenix (2007)
     (0,69844,3), # Harry Potter and the Half-Blood Prince (2009)
     (0,89745,4) , # Avengers, The (2012)
     (0,88140,4), # Captain America: The First Avenger (2011)
     (0,110102,4), # Captain America: The Winter Soldier (2014)
     (0,79132,4), # Inception (2010)
     (0,109487,3), # Interstellar (2014)
     (0,184651,5), # Detective Chinatown 2 (2018)
     (0,68954,5) # Up (2009)
    ]
# Create data frame
schema = StructType([
    StructField('userId', IntegerType(), True),
    StructField('movieId', IntegerType(), True),
    StructField('rating', StringType(), False)
])
new_user_ratings_df = spark.createDataFrame(new_user_ratings,schema)
new_user_ratings_df = new_user_ratings_df.withColumn('rating',new_user_ratings_df['rating'].cast(FloatType()))
# add ratings of the new user to our data
new_ratings = ratings_full.union(new_user_ratings_df)


In [0]:
#retrain
new_fit = FitALSModel(new_ratings, movies_full)
new_fit.fit_ALS(data_path, rank=best_rank, regParam=best_regParam,  seed=seed, evaluate = False, save=True, save_name='new_ratings_model')

The model is retrained in ~6 mins, which is acceptable.  
Remember that ALS is not designed for a real-time recommendation, so we can think of the situation that after a user watches and rates a movie, it won't be necessary to recommend the next movie to watch immediately since the user might take a rest or do other things after viewing.

In [0]:
#load the new model
new_ratings_model = ALSModel.load("dbfs:/FileStore/tables/ml-full/new_ratings_model")

In [0]:
new_fit.recommend_user(0, 20, new_ratings_model)

Unnamed: 0,movieId,userId,rating,title,genres
0,151989,0,6.765066,The Thorn (1971),Comedy
1,177209,0,6.757873,Acı Aşk (2009),Drama
2,193257,0,6.485928,Familie Brasch (2018),Documentary
3,192089,0,6.069542,"National Theatre Live: One Man, Two Guvnors (2...",Comedy
4,66389,0,6.065567,AmericanEast (2008),Drama
5,184299,0,6.041394,Freedom on My Mind (1994),Documentary
6,159467,0,5.882512,Fifi Howls from Happiness (2013),Documentary
7,107434,0,5.860322,Diplomatic Immunity (2009– ),Comedy
8,190707,0,5.860061,1968 (2018),(no genres listed)
9,171849,0,5.858974,Without Family (1984),Children|Drama


#We are done! Congratulations!
Thank you for your time and effort! Comments are welcome.  
*More further readings by me (as mentioned before):* 
- [How I try to figure out the unexpected results of my recommendations](https://z-cheng.medium.com/further-analysis-on-movie-recommendation-result-what-happened-to-my-als-model-30bc234081c4)
-  [How to understand the different results of cosine similarity and Euclidean distance and how to choose between](https://z-cheng.medium.com/a-comparison-of-cosine-similarity-vs-euclidean-distance-in-als-recommendation-engine-51898f9025e7)