### Spark Moive Recommendation
In this notebook, Alternating Least Squares (ALS) algorithm will be used with Spark APIs to predict the ratings for the movies in [MovieLens small dataset](https://grouplens.org/datasets/movielens/latest/)

In [2]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import math
%matplotlib inline

dbutils.library.installPyPI("mlflow")
dbutils.library.restartPython()
import mlflow

In [3]:
import os
os.environ["PYSPARK_PYTHON"] = "python3"

## Part1: Data ETL and Data Exploration

In [5]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("moive analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [6]:
movies_df = spark.read.load("/FileStore/tables/movies.csv", format='csv', header = True)
ratings_df = spark.read.load("/FileStore/tables/ratings.csv", format='csv', header = True)
links_df = spark.read.load("/FileStore/tables/links.csv", format='csv', header = True)
tags_df = spark.read.load("/FileStore/tables/tags.csv", format='csv', header = True)

In [7]:
type(movies_df)

In [8]:
movies_df.count()

In [9]:
movies_df.show(5)

In [10]:
ratings_df.show(5)

In [11]:
links_df.show(5)

In [12]:
tags_df.show(5)

In [13]:
tmp1 = ratings_df.groupBy("userID").count().toPandas()['count'].min()
tmp2 = ratings_df.groupBy("movieId").count().toPandas()['count'].min()
print('For the users that rated movies and the movies that were rated:')
print('Minimum number of ratings per user is {}'.format(tmp1))
print('Minimum number of ratings per movie is {}'.format(tmp2))

In [14]:
tmp1 = sum(ratings_df.groupBy("movieId").count().toPandas()['count'] == 1)
tmp2 = ratings_df.select('movieId').distinct().count()
print('{} out of {} movies are rated by only one user'.format(tmp1, tmp2))

## Part 1: Spark SQL and OLAP

In [16]:
movies_df.registerTempTable("movies")
ratings_df.registerTempTable("ratings")
links_df.registerTempTable("links")
tags_df.registerTempTable("tags")

### Q1: The number of Users

In [18]:
# %sql
num_users = spark.sql("SELECT count (distinct userID) as num_users FROM ratings")
display(num_users)

num_users
10532


In [19]:
ratings_df.select("userId").distinct().count()

In [20]:
type(ratings_df.select("userId"))

### Q2: The number of Movies

In [22]:
#%sql 
num_movies = spark.sql("SELECT count (distinct movieID) as num_movies FROM movies")
display(num_movies)

num_movies
58098


In [23]:
movies_df.select('movieID').distinct().count()

In [24]:
movies_df.select('movieID').count()

### Q3:  How many movies are rated by users? List movies not rated before

In [26]:
rated_by_users = ratings_df.select('movieID').distinct().count()
print('How many movies are rated by users?', rated_by_users)

In [27]:
%sql
SELECT movies.title, movies.genres ,ratings.rating FROM movies left JOIN ratings ON ratings.movieId = movies.movieID WHERE ratings.rating IS null LIMIT 10

title,genres,rating
One-Eyed Monster (2008),Comedy|Horror|Sci-Fi,
Deep in the Valley (American Hot Babes) (2009),Comedy,
Love and Lemons (Små citroner gula) (2013),Romance,
Deathsport (1978),Action|Sci-Fi,
Narrien illat (1970),Comedy|Drama|Musical,
Tortured (2008),Crime|Thriller,
World War II: When Lions Roared (1994),Drama|War,
Crocodile (2000),Horror|Thriller,
"Wind Journeys, The (Viajes del viento, Los) (2009)",Drama,
Apartment 1303 (2007),Horror,


### Q4: List Movie Genres

In [29]:
%sql
SELECT DISTINCT(genres) FROM movies LIMIT 10

genres
Comedy|Horror
Adventure|Children
Action|Crime|Thriller
Adventure|Animation|Children|Comedy|Fantasy
Comedy|Romance
Action|Adventure|Thriller
Comedy
Action
Comedy|Drama|Romance
Adventure|Children|Fantasy


In [30]:
%sql
SELECT SUBSTRING_INDEX(SUBSTRING_INDEX(genres, '|', 1), '|', -1) as genre FROM movies
UNION
SELECT  SUBSTRING_INDEX(SUBSTRING_INDEX(genres, '|', 2), '|', -1) as genre FROM movies
UNION
SELECT  SUBSTRING_INDEX(SUBSTRING_INDEX(genres, '|', 3), '|', -1) as genre FROM movies
UNION
SELECT  SUBSTRING_INDEX(SUBSTRING_INDEX(genres, '|', 4), '|', -1) as genre FROM movies
UNION
SELECT  SUBSTRING_INDEX(SUBSTRING_INDEX(genres, '|', 5), '|', -1) as genre FROM movies
UNION
SELECT  SUBSTRING_INDEX(SUBSTRING_INDEX(genres, '|', 6), '|', -1) as genre FROM movies
ORDER BY genre;

--This is method I do not like

genre
(no genres listed)
Action
Adventure
Animation
Children
Comedy
Crime
Documentary
Drama
Fantasy


In [31]:
from pyspark.sql.types import *
from pyspark.sql.functions import col, mean, udf, lit, current_timestamp, unix_timestamp, array_contains
extract_genres = udf(lambda x: x.split("|"), ArrayType(StringType()))
movies_df_clean = movies_df.select("movieId", "title", extract_genres("genres").alias("genres"))
#display(movies_df_clean)

movies_df_clean.createOrReplaceTempView("movies_df_clean")

display (spark.sql("SELECT * FROM movies_df_clean limit 5"))

movieId,title,genres
1,Toy Story (1995),"List(Adventure, Animation, Children, Comedy, Fantasy)"
2,Jumanji (1995),"List(Adventure, Children, Fantasy)"
3,Grumpier Old Men (1995),"List(Comedy, Romance)"
4,Waiting to Exhale (1995),"List(Comedy, Drama, Romance)"
5,Father of the Bride Part II (1995),List(Comedy)


In [32]:
genres_result = list(set(movies_df_clean.select('genres').rdd.flatMap(tuple).flatMap(tuple).collect()))
genres_result

### Q5: Movie for Each Category

In [34]:
genres_result = list(set(movies_df_clean.select('genres').rdd.flatMap(tuple).flatMap(tuple).collect()))
genres_result

In [35]:
movie_pdf = movies_df.toPandas()
movie_pdf['genres'].str.get_dummies(sep='|').head()

Unnamed: 0,(no genres listed),Action,Adventure,Animation,Children,Comedy,Crime,Documentary,Drama,Fantasy,Film-Noir,Horror,IMAX,Musical,Mystery,Romance,Sci-Fi,Thriller,War,Western
0,0,0,1,1,1,1,0,0,0,1,0,0,0,0,0,0,0,0,0,0
1,0,0,1,0,1,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0
2,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,1,0,0,0,0
3,0,0,0,0,0,1,0,0,1,0,0,0,0,0,0,1,0,0,0,0
4,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [36]:
list_of_movie = list(movie_pdf['title'])

## Part2: Spark ALS based approach for training model
We will use an Spark ML to predict the ratings, so let's reload "ratings.csv" using ``sc.textFile`` and then convert it to the form of (user, item, rating) tuples.

In [38]:
ratings_df.show(10)

In [39]:
movie_ratings=ratings_df.drop('timestamp')

In [40]:
movie_ratings.show(5)

In [41]:
# Data type convert
from pyspark.sql.types import IntegerType, FloatType
movie_ratings = movie_ratings.withColumn("userId", movie_ratings["userId"].cast(IntegerType()))
movie_ratings = movie_ratings.withColumn("movieId", movie_ratings["movieId"].cast(IntegerType()))
movie_ratings = movie_ratings.withColumn("rating", movie_ratings["rating"].cast(FloatType()))

In [42]:
movie_ratings.show(10)

### ALS Model Selection and Evaluation

With the ALS model, we can use a grid search to find the optimal hyperparameters.

In [44]:
# import package
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder

In [45]:
#Create test and train set
(training,test)=movie_ratings.randomSplit([0.8,0.2])

In [46]:
# Create ALS model
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, rank=10, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")

In [47]:
# 1st print a list of parameters
print(als.explainParams())

In [48]:
#Tune model using ParamGridBuilder
# it will take long time in the cv period, so just use few parameter to try 

paramGrid = (ParamGridBuilder()
             .addGrid(als.regParam, [0.01])
             .addGrid(als.rank, [10])
             .addGrid(als.maxIter, [15])
             .build())

# paramGrid = (ParamGridBuilder()
#              .addGrid(als.regParam, [0.01, 0.5, 1, 1.5])
#              .addGrid(als.rank, [10, 15, 20, 25])
#              .addGrid(als.maxIter, [1, 5, 10, 15])
#              .build())

In [49]:
# Define evaluator as RMSE

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")

In [50]:
from pyspark.ml.tuning import CrossValidator
# Build Cross validation 
# Create 5-fold CrossValidator
# it takes too long that I only use 2-fold
cv = CrossValidator(estimator=als, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=2)

# Run cross validations
cvModel = cv.fit(training)
# this will likely take a fair amount of time because of the amount of models that we're creating and testing

In [51]:
# Extract the best model selected by CV
best_model = cvModel.bestModel

In [52]:
#Fit ALS model to training data

# specify parameter settings by the best model obtained via CV
print ("**Best Model**")
print ("Rank: ", best_model)
print (" MaxIter: ", str(best_model._java_obj.parent().getMaxIter()))
print (" RegParam:",  best_model._java_obj.parent().regParam())

### Model testing
And finally, make a prediction and check the testing error.

In [54]:
#Generate predictions and evaluate using RMSE
predictions=best_model.transform(test)
rmse = evaluator.evaluate(predictions)

In [55]:
#Print RMSE 
print ("RMSE = "+str(rmse))

In [56]:
#Extract best model from the tuning exercise using ParamGridBuilder

als_best = ALS(maxIter=15, rank=10, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als_best.fit(training)

In [57]:
predictions.show(10)

### Model apply and see the performance

In [59]:
alldata=best_model.transform(movie_ratings)
rmse = evaluator.evaluate(alldata)
print ("RMSE = "+str(rmse))

In [60]:
alldata.registerTempTable("alldata")

In [61]:
%sql SELECT * FROM alldata LIMIT 10

userId,movieId,rating,prediction
8350,148,4.0,3.3800094
8264,148,1.0,1.1569343
6826,148,3.0,2.296391
6328,148,5.0,0.3956195
5751,148,1.5,1.3959177
165,148,3.0,3.305573
9051,148,1.0,1.6596844
8697,148,2.0,2.9162
7051,148,5.0,2.638159
4539,148,2.0,2.0745254


In [62]:
%sql SELECT * FROM movies JOIN alldata ON movies.movieId=alldata.movieId LIMIT 10

movieId,title,genres,userId,movieId.1,rating,prediction
148,"Awfully Big Adventure, An (1995)",Drama,8350,148,4.0,3.3800094
148,"Awfully Big Adventure, An (1995)",Drama,8264,148,1.0,1.1569343
148,"Awfully Big Adventure, An (1995)",Drama,6826,148,3.0,2.296391
148,"Awfully Big Adventure, An (1995)",Drama,6328,148,5.0,0.3956195
148,"Awfully Big Adventure, An (1995)",Drama,5751,148,1.5,1.3959177
148,"Awfully Big Adventure, An (1995)",Drama,165,148,3.0,3.305573
148,"Awfully Big Adventure, An (1995)",Drama,9051,148,1.0,1.6596844
148,"Awfully Big Adventure, An (1995)",Drama,8697,148,2.0,2.9162
148,"Awfully Big Adventure, An (1995)",Drama,7051,148,5.0,2.638159
148,"Awfully Big Adventure, An (1995)",Drama,4539,148,2.0,2.0745254


## Recommend moive to users with id: 575, 232. 
you can choose some users to recommend the moives

In [64]:
#recommend 10 movies for each users
user_recs = best_model.recommendForAllUsers(10)
user_recs.show(10)

In [65]:
user_recs.first()

In [66]:
user_recs.registerTempTable("als_recs_temp")

In [67]:
# seperate the value of 'recommendations' in user_recs

explode_rec = spark.sql('SELECT userId,\
                                explode(recommendations) AS MovieRec\
                                FROM als_recs_temp')
explode_rec.show(10)

In [68]:
fianl_recs = spark.sql("SELECT userId,\
                               movieIds_and_ratings.movieId AS movieId,\
                               movieIds_and_ratings.rating AS prediction\
                               FROM als_recs_temp\
                               LATERAL VIEW explode(recommendations) exploded_table AS movieIds_and_ratings")

In [69]:
fianl_recs.show(10)

In [70]:
#Before we recommend the films, we need to filter out those users have not seen yet. Therefore, we need to choose rating = 'null' by join the movie ratings

final_rec = fianl_recs.join(movie_ratings,['userId','movieId'],'left').filter(movie_ratings.rating.isNull())
#display(final_rec)

final_rec.createOrReplaceTempView("final_rec")

display (spark.sql("SELECT * FROM final_rec LIMIT 5"))

userId,movieId,prediction,rating
1580,6528,8.929522,
1580,26603,8.626689,
1580,3903,7.6779,
1580,2880,7.333238,
1580,3531,7.215636,


In [71]:
final_rec.registerTempTable("final_rec")
movies_df.registerTempTable("movies_df")

### Find recommend films for userid = 575

In [73]:
%sql
SELECT userId,
       title
FROM final_rec t1
LEFT JOIN movies_df t2
ON t1.movieId = t2.movieId
WHERE t1.userId=575
LIMIT 10

userId,title
575,Oklahoma! (1955)
575,Wild Bill (1995)
575,Gentlemen of Fortune (Dzhentlmeny udachi) (1972)
575,Brother (Hermano) (2010)
575,Trouble in Paradise (1932)
575,Possession (1981)
575,Kill List (2011)
575,"Beast of War, The (Beast, The) (1988)"
575,Jump Tomorrow (2001)
575,Before the Rain (Pred dozhdot) (1994)


### Find recommend films for userid = 273

In [75]:
%sql
SELECT userId,
       title
FROM final_rec t1
LEFT JOIN movies_df t2
ON t1.movieId = t2.movieId
WHERE t1.userId=273
LIMIT 5

userId,title
273,Start the Revolution Without Me (1970)
273,Mina Tannenbaum (1994)
273,"Boost, The (1988)"
273,"Ordet (Word, The) (1955)"
273,All the Vermeers in New York (1990)


## Find the similar moives for moive with id: 463, 471
You can find the similar moives based on the ALS results

In [77]:
# 1st extract productFeatures matrix
# The productFeatures matrix will be used to create an item-item collaborative filtering recommendation model
from pyspark.mllib.recommendation import ALS
import math

model_a = ALS.train(movie_ratings, rank=10, iterations=15,
                      lambda_=0.01)
model_a.productFeatures().count()

In [78]:
# look at the feature vector of movie 463
movie_feature = model_a.productFeatures().lookup(471)[0]

In [79]:
# Next define cosine similarity function to measure movie similarity
def cosineSimilarity(vec1, vec2):
  return vec1.dot(vec2) / (LA.norm(vec1) * LA.norm(vec2))

In [80]:
# Assigns the movies title file
movies_file = os.path.join("/FileStore/tables/", 'movies.csv')
movies_sc = sc.textFile(movies_file)

movies_sc_header = movies_sc.take(1)[0]

movies_data = movies_sc.filter(lambda line: line!=movies_sc_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1])).cache()

movies_titles = movies_data.map(lambda x: (int(x[0]),x[1]))

In [81]:
movies_sc_header

In [82]:
movies_data

In [83]:
movies_titles

In [84]:
# Build similarity matrix for movieid 471 using the product features matrix

similarMovies = model_a.productFeatures().map(lambda products:(products[0],
                                        cosineSimilarity(np.asarray(products[1]), movie_feature))).join(movies_titles).map(lambda r: (r[1][1], r[1][0], r[0]))

# Sort the top 10 most similar movies descendingly by cosine similarity measure
# similarMovies.takeOrdered(11, key=lambda x: -x[1])

In [85]:
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.linalg import Vectors, VectorUDT

In [86]:
a = best_model.itemFactors
display(a.cache())

id,features
10,"List(-0.7070604, -0.3937841, -0.7354741, -0.5316492, -0.23054606, -0.9362612, 0.4878864, 0.181011, 0.12492501, -0.32780373)"
20,"List(-0.2865907, -0.38920316, -0.757129, -0.41535524, -0.33387962, -0.78734565, 0.74777204, -0.42470917, -0.5602824, -0.50083333)"
30,"List(-0.41119733, -0.6554364, -1.120149, -0.05089784, 0.2785565, -1.4204834, -0.45307264, -0.91303414, 0.8519034, -0.24168478)"
40,"List(-0.50368834, -0.6089162, -0.34873757, -0.42284888, 0.29128578, -1.0997701, 0.53955245, -0.5642491, 1.1535795, 0.067440234)"
50,"List(-0.60370475, -1.0662308, -0.853213, -0.5079328, -0.010333406, -1.0317204, 0.23053925, 0.22187883, 1.0357325, -0.16144118)"
60,"List(0.023746999, -0.26130533, -1.4601542, -0.63913494, 0.05059811, -0.39546755, 0.2780687, -0.9694489, 0.35018995, -0.61133385)"
70,"List(-1.1970209, -0.3363997, -0.99509525, -0.11204849, -0.4582682, -1.0267098, -0.15598853, -0.05811761, 0.20191139, 0.015026275)"
80,"List(0.19165394, 0.39662528, -1.5484391, -0.7053556, -0.2435736, -1.2047055, -0.6639407, -1.044383, 0.61491716, -0.38580477)"
90,"List(0.18562326, -0.23243228, -0.82413024, -0.49526966, -0.6618156, -0.94704396, 0.9132888, -1.2705051, 0.042536464, -0.45922598)"
100,"List(-0.5800655, -0.8253391, -1.2167127, -0.053193044, -0.39434132, -0.5472712, 0.28391334, -0.18534313, 0.03229926, -0.7044343)"


In [87]:
a.registerTempTable("movie_on_movie")

In [88]:
%sql
SELECT features FROM movie_on_movie WHERE id = 471

features
"List(-0.73946416, -1.03179, -0.83905196, -0.6525196, -0.3816911, -0.88358724, -0.47698575, -0.15836999, 0.36126232, -0.6475737)"


In [89]:
%sql
SELECT * FROM ratings WHERE movieId = 463 LIMIT 10

userId,movieId,rating,timestamp
114,463,4.0,973377486
758,463,3.0,874853655
1927,463,2.0,900074111
2481,463,3.0,899667729
2629,463,2.5,1070005270
3246,463,3.0,862157039
3832,463,3.0,953511244
4796,463,2.0,887467693
5276,463,4.0,945487923
5323,463,3.0,961007786


In [90]:
brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes",seed=12345, bucketLength=1.0)
#a.printSchema()
#change features columns into dense vector
to_vector = udf(lambda a: Vectors.dense(a), VectorUDT())
data = a.select("id", to_vector("features").alias("features"))
#data.printSchema()

In [91]:
model = brp.fit(data)
model.transform(data)

In [92]:
model.approxNearestNeighbors(data, Vectors.dense([-0.73946416, -1.03179, -0.83905196, -0.6525196, -0.3816911, -0.88358724, -0.47698575, -0.15836999, 0.36126232, -0.6475737]), 6).collect()

In [93]:
# similar moives for moive with id: 471

In [94]:
%sql
SELECT * FROM movies
WHERE movieId IN (6296,97057,3476,1059,4346)

movieId,title,genres
1059,William Shakespeare's Romeo + Juliet (1996),Drama|Romance
3476,Jacob's Ladder (1990),Horror|Mystery
4346,Bride of the Wind (2001),Drama|Musical|Romance
6296,"Mighty Wind, A (2003)",Comedy|Musical
97057,Kon-Tiki (2012),Adventure|Documentary|Drama


In [95]:
%sql
SELECT features FROM movie_on_movie WHERE id = 463

features
"List(0.64177823, -0.22756293, -1.3410357, 0.8559743, -0.5571711, -1.2529395, 1.1197073, -1.0625343, 0.92473656, 0.6378647)"


In [96]:
model.approxNearestNeighbors(data, Vectors.dense([0.93929714, 0.015614069, -0.3408886, 0.3818301, 0.19762212, -1.4255825, 0.99496984, -0.065754086, 0.43202916, -0.8621043]), 6).collect()

In [97]:
# similar moives for moive with id: 463

In [98]:
%sql
SELECT * FROM movies
WHERE movieId IN (5321,49007,554,7276,7224)

movieId,title,genres
554,Trial by Jury (1994),Crime|Drama|Thriller
5321,"Triumph of Love, The (2001)",Comedy
7224,"Boy with Green Hair, The (1948)",Children|Drama
7276,Hell's Kitchen (1998),Drama
49007,Arabesque (1966),Adventure|Drama|Romance|Thriller


Based on the above, we obtain the 5 movies that are most similar to movie with id: 471. They are:
William Shakespeare's Romeo + Juliet (1996),
"Jacob's Ladder (1990)',
'Bride of the Wind (2001)',
'Stop Making Sense (1984)',
'Mighty Wind, A (2003)',
'Kon-Tiki (2012)'.

## The Report 
### Motivation: 
I dig deep into ALS collaberative recommendation engine by using Spark MLlib and give recommendation with user based movie recommendation with a scalable matrix factorization technique.

### Step1

First I load four of datasets, namely movie, rating, links and tags and conduct a number of data explorations on these data to get some basic information, such as number of users, number of movies, number of ratings per users and per movies respectively, and distribution of movies on different genres.

### Step2

After doing data preprocessings, I build an ALS model based on the rating data to predict the ratings, which is treated as degree of preference of movies among different users. The parameters (maxIter, rank, regParam) are tuned by grid search strategy via 5-fold cross validation to obtain the model with the smallest RMSE on the validation set. This is considered to be the best model for prediction.

### Step3

By the best model obtained from the above step, making predictions of ratings on movies in the test set and calculating the RMSE to evaluate the model performance  are preparing for the next step.

### Step4  

In this step, I use the prediction results by the best model to recommend 5 movies for userID 575 and 232 respectively; and we also find 5 movies that are the most similar to movie with movieID 471 and 463 by the approximate nearest neighbor search algorithm on the movie feature vector.

### Conclusion
The RMSE of the best ALS model on the test data is 0.71, indicating the model is with good performance in predicting the ratings for movies; ALS model is able to provide both recommendations of movies based on user's preferences and also similar movies to a specific movie, which shows its effectiveness as one of most critical techniques in recommendation system. More works can be considered to further improve the model performance, such as making use of information from other data sets such as genres of movies and tag information, building ALS model incorporatng both explicit and implicit feedbacks, and try some other techniques such as KNN, Deep Learning, applying ensemble based on several methods, and so on.