# Spark Moive Recommendation
In this notebook, we will use an Alternating Least Squares (ALS) algorithm with Spark APIs to predict the ratings for the movies in [MovieLens small dataset](https://grouplens.org/datasets/movielens/latest/)

## Report
####part0: Data ETL and Data exploration

####part1: OLAP via Spark SQL

####part2: Spark ALS for model training
  grid search, 5-fold validation, best model (rsme: 0.59 on training, 0.88 on test)
  
####part3: Recommend movies to user
  User_based approach, note that the movies that the user has rated will not be recommended.
  
####part4: Find similar movies of movie
  Find k nearest movie factor using LHS.
  <p>LSH is a randomized algorithm and hashing technique commonly used in large-scale machine learning tasks including clustering and approximate nearest neighbor search.
  <ref> https://databricks.com/blog/2017/05/09/detecting-abuse-scale-locality-sensitive-hashing-uber-engineering.html

In [3]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import math
import os
os.environ["PYSPARK_PYTHON"] = "python3"
from pyspark.sql import SparkSession

## Part0: Data ETL and Data Exploration

In [5]:
spark = SparkSession \
    .builder \
    .appName("moive analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [6]:
movies_df = spark.read.load("/FileStore/tables/movies.csv", format='csv', header = True)
ratings_df = spark.read.load("/FileStore/tables/ratings.csv", format='csv', header = True)
links_df = spark.read.load("/FileStore/tables/links.csv", format='csv', header = True)
tags_df = spark.read.load("/FileStore/tables/tags.csv", format='csv', header = True)

In [7]:
movies_df.show(5)

In [8]:
ratings_df.show(5)

In [9]:
links_df.show(5)

In [10]:
tags_df.show(5)

In [11]:
tmp1 = ratings_df.groupBy("userID").count().toPandas()['count'].min()
tmp2 = ratings_df.groupBy("movieId").count().toPandas()['count'].min()
print('For the users that rated movies and the movies that were rated:')
print('Minimum number of ratings per user is {}'.format(tmp1))
print('Minimum number of ratings per movie is {}'.format(tmp2))

In [12]:
tmp1 = sum(ratings_df.groupBy("movieId").count().toPandas()['count'] == 1)
tmp2 = ratings_df.select('movieId').distinct().count()
print('{} out of {} movies are rated by only one user'.format(tmp1, tmp2))

## Part 1: Spark SQL and OLAP

In [14]:
movies_df.registerTempTable("movies")
ratings_df.registerTempTable("ratings")
links_df.registerTempTable("links")
tags_df.registerTempTable("tags")

In [15]:
%sql
select count(*) from ratings

count(1)
100836


### Q1: The number of Users

In [17]:
%sql
select count(distinct userId) as Numer_of_users from ratings

Numer_of_users
610


### Q2: The number of Movies

In [19]:
%sql
select count(distinct movieId) as Numer_of_movies from ratings

Numer_of_movies
9724


### Q3:  How many movies are rated by users? List movies not rated before

In [21]:
%sql
select count(distinct movieId) as Number_movies_are_rated_by_users from ratings

Number_movies_are_rated_by_users
9724


In [22]:
%sql
select distinct title, genres from movies m
where m.movieId not in (select distinct movieId from ratings)

title,genres
This Gun for Hire (1942),Crime|Film-Noir|Thriller
"Chosen, The (1981)",Drama
"Innocents, The (1961)",Drama|Horror|Thriller
Niagara (1953),Drama|Thriller
Scrooge (1970),Drama|Fantasy|Musical
"Browning Version, The (1951)",Drama
I Know Where I'm Going! (1945),Drama|Romance|War
For All Mankind (1989),Documentary
Twentieth Century (1934),Comedy
Call Northside 777 (1948),Crime|Drama|Film-Noir


### Q4: List Movie Genres

In [24]:
%sql
select distinct genres from movies

genres
Comedy|Horror|Thriller
Adventure|Sci-Fi|Thriller
Action|Adventure|Drama|Fantasy
Action|Drama|Horror
Action|Animation|Comedy|Sci-Fi
Animation|Children|Drama|Musical|Romance
Action|Adventure|Drama
Adventure|Sci-Fi
Documentary|Musical|IMAX
Adventure|Children|Fantasy|Sci-Fi|Thriller


This step is tricky. See an example as below.
<ref>https://sparkbyexamples.com/pyspark/pyspark-explode-nested-array-into-rows/

In [26]:
%sql
select title, explode(split(genres,"[|]")) as genres from movies

title,genres
Toy Story (1995),Adventure
Toy Story (1995),Animation
Toy Story (1995),Children
Toy Story (1995),Comedy
Toy Story (1995),Fantasy
Jumanji (1995),Adventure
Jumanji (1995),Children
Jumanji (1995),Fantasy
Grumpier Old Men (1995),Comedy
Grumpier Old Men (1995),Romance


In [27]:
%sql
select distinct explode(split(genres,"[|]")) as genres from movies

genres
Crime
Romance
Thriller
Adventure
Drama
War
Documentary
Fantasy
Mystery
Musical


### Q5: Movie for Each Category

In [29]:
%sql
select genres as category, count(distinct title) as count
from (select title, explode(split(genres,"[|]")) as genres from movies)
group by 1
order by 2 desc

category,count
Drama,4359
Comedy,3755
Thriller,1892
Action,1827
Romance,1595
Adventure,1263
Crime,1198
Sci-Fi,978
Horror,978
Fantasy,779


## Part2: Spark ALS based approach for training model
We will use an Spark ML to predict the ratings, so let's reload "ratings.csv" using ``sc.textFile`` and then convert it to the form of (user, item, rating) tuples.

In [31]:
ratings_df.show()

In [32]:
movie_ratings=ratings_df.drop('timestamp')

In [33]:
# Data type convert
from pyspark.sql.types import IntegerType, FloatType
movie_ratings = movie_ratings.withColumn("userId", movie_ratings["userId"].cast(IntegerType()))
movie_ratings = movie_ratings.withColumn("movieId", movie_ratings["movieId"].cast(IntegerType()))
movie_ratings = movie_ratings.withColumn("rating", movie_ratings["rating"].cast(FloatType()))

In [34]:
movie_ratings.show()

### ALS Model Selection and Evaluation

With the ALS model, we can use a grid search to find the optimal hyperparameters.
<ref>https://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html#pyspark.ml.recommendation.ALS

In [36]:
# import package
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [37]:
#Create test and train set
(training,test)=movie_ratings.randomSplit([0.8,0.2])

In [38]:
#Create ALS model
als = ALS(maxIter=5, rank=10, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")

In [39]:
print(als.explainParams())

In [40]:
#Tune model using ParamGridBuilder
paramGrid = (ParamGridBuilder()
             .addGrid(als.maxIter, [5, 10, 15])
             .addGrid(als.rank, [10, 15, 20])
             .addGrid(als.regParam, [0.01, 0.1, 0.5])
             .build())

In [41]:
# Define evaluator as RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")

In [42]:
# Build Cross validation 
cv = CrossValidator(estimator=als, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

In [43]:
#Fit ALS model to training data
cvModel = cv.fit(training)

In [44]:
#Extract best model from the tuning exercise using ParamGridBuilder
best_model = cvModel.bestModel

### Model testing
And finally, make a prediction and check the testing error.

In [46]:
#Generate predictions and evaluate using RMSE
predictions = best_model.transform(test)
rmse = evaluator.evaluate(predictions)

In [47]:
#Print evaluation metrics and model parameters
print ("RMSE = "+str(rmse))
print ("**Best Model**")
print (" Rank:", best_model.rank)
print (" MaxIter:", best_model._java_obj.parent().getMaxIter())
print (" RegParam:", best_model._java_obj.parent().getRegParam())

In [48]:
predictions.show()

### Model apply and see the performance

In [50]:
alldata=best_model.transform(movie_ratings)
rmse = evaluator.evaluate(alldata)
print ("RMSE = "+str(rmse))

In [51]:
alldata.registerTempTable("alldata")

In [52]:
%sql select * from alldata

userId,movieId,rating,prediction
191,148,5.0,4.9203944
133,471,4.0,3.341171
597,471,2.0,3.4518747
385,471,4.0,3.383218
436,471,3.0,3.7310843
602,471,4.0,3.5697618
91,471,1.0,2.4314399
409,471,3.0,3.5237584
372,471,3.0,3.0369964
599,471,2.5,2.745125


In [53]:
%sql select * from movies join alldata on movies.movieId=alldata.movieId

movieId,title,genres,userId,movieId.1,rating,prediction
148,"Awfully Big Adventure, An (1995)",Drama,191,148,5.0,4.9203944
471,"Hudsucker Proxy, The (1994)",Comedy,133,471,4.0,3.341171
471,"Hudsucker Proxy, The (1994)",Comedy,597,471,2.0,3.4518747
471,"Hudsucker Proxy, The (1994)",Comedy,385,471,4.0,3.383218
471,"Hudsucker Proxy, The (1994)",Comedy,436,471,3.0,3.7310843
471,"Hudsucker Proxy, The (1994)",Comedy,602,471,4.0,3.5697618
471,"Hudsucker Proxy, The (1994)",Comedy,91,471,1.0,2.4314399
471,"Hudsucker Proxy, The (1994)",Comedy,409,471,3.0,3.5237584
471,"Hudsucker Proxy, The (1994)",Comedy,372,471,3.0,3.0369964
471,"Hudsucker Proxy, The (1994)",Comedy,599,471,2.5,2.745125


## Part3: Recommend moives to users
you can choose some users to recommend the moives

In [55]:
# recommend 5 movies for each users 
als_user_recs = best_model.recommendForAllUsers(5)
user_recs.registerTempTable("als_user_recs")
display(als_user_recs)

userId,recommendations
471,"List(List(177593, 4.5729346), List(170705, 4.432933), List(170355, 4.3702784), List(55363, 4.3607764), List(3201, 4.328181))"
463,"List(List(177593, 5.0999756), List(80906, 4.8039575), List(69524, 4.789617), List(3200, 4.7765493), List(170355, 4.7352614))"
496,"List(List(7099, 4.85922), List(177593, 4.771958), List(51931, 4.5685368), List(101, 4.549831), List(5618, 4.498313))"
148,"List(List(160718, 4.3862305), List(177593, 4.31868), List(122918, 4.2864337), List(183897, 4.2264643), List(40629, 4.1998296))"
540,"List(List(177593, 5.659834), List(3200, 4.9527864), List(78836, 4.927608), List(123, 4.8638678), List(318, 4.8332086))"
392,"List(List(6380, 4.9689326), List(750, 4.886894), List(103984, 4.8670797), List(2599, 4.7894044), List(26131, 4.767499))"
243,"List(List(177593, 5.3528185), List(84273, 5.325616), List(26073, 5.325616), List(7071, 5.325616), List(117531, 5.325616))"
31,"List(List(932, 5.503551), List(177593, 5.1505103), List(899, 4.9961877), List(674, 4.89535), List(1939, 4.8482013))"
516,"List(List(177593, 4.978042), List(1250, 4.8788342), List(4429, 4.862645), List(1262, 4.8074036), List(3200, 4.7696185))"
580,"List(List(177593, 4.8197393), List(53123, 4.746355), List(71462, 4.633043), List(80906, 4.574043), List(84847, 4.5648212))"


In [56]:
final_rec = spark.sql("SELECT userId, rec.movieId AS movieId, rec.rating AS prediction \
                       FROM (select userId, explode(recommendations) AS rec FROM als_user_recs)")
final_rec = final_rec.join(movie_ratings,['userId','movieId'],'left').filter(movie_ratings.rating.isNull())
display(final_rec)

userId,movieId,prediction,rating
471,177593,4.5729346,
471,170705,4.432933,
471,170355,4.3702784,
471,55363,4.3607764,
471,3201,4.328181,
463,177593,5.0999756,
463,80906,4.8039575,
463,69524,4.789617,
463,3200,4.7765493,
463,170355,4.7352614,


In [57]:
final_rec.registerTempTable("final_rec")
movies_df = movies_df.withColumn("movieId", movies_df["movieId"].cast(IntegerType()))
movies_df.registerTempTable("movies_df")

### Recommend moive to user with id: 575

In [59]:
%sql
select movieId, title from movies_df
where movieId in (select movieId from final_rec where userId = 575)

movieId,title
116897,Wild Tales (2014)
177593,"Three Billboards Outside Ebbing, Missouri (2017)"
123,Chungking Express (Chung Hing sam lam) (1994)
51931,Reign Over Me (2007)


### Recommend moive to user with id: 232

In [61]:
%sql
select movieId, title from movies_df
where movieId in (select movieId from final_rec where userId = 232)

movieId,title
170705,Band of Brothers (2001)
92259,Intouchables (2011)
177593,"Three Billboards Outside Ebbing, Missouri (2017)"
142020,Oscar (1967)
78836,Enter the Void (2009)


## Part4: Find the similar moives for moive 
You can find the similar moives based on the ALS results
<ref>https://databricks.com/blog/2017/05/09/detecting-abuse-scale-locality-sensitive-hashing-uber-engineering.html
<ref>https://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html#pyspark.ml.feature.BucketedRandomProjectionLSH

In [63]:
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import col

In [64]:
item_factors = best_model.itemFactors

In [65]:
to_vector = udf(lambda x: Vectors.dense(x), VectorUDT())
item_factors = item_factors.select("id", to_vector("features").alias("features"))
item_factors.show(5)

In [66]:
brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", seed=12345, bucketLength=1.0)
model_brp = brp.fit(item_factors)
item_factors = model_brp.transform(item_factors)

In [67]:
item_similarities = model_brp.approxSimilarityJoin(item_factors, item_factors, 3.0, distCol="EuclideanDistance").select(
...     col("datasetA.id").alias("idA"),
...     col("datasetB.id").alias("idB"),
...     col("EuclideanDistance"))
item_similarities = item_similarities.filter(item_similarities["idA"]!=item_similarities["idB"])

In [68]:
item_similarities.show(5)

In [69]:
item_similarities.registerTempTable("item_similarities")

### Find the similar moives for moive with id: 463

In [71]:
%sql
with temp as (select idA, idB,  EuclideanDistance as sim 
from item_similarities 
where idA = 463
order by 3 desc
limit 5)

select movieId, title from movies_df
where movieId in (select idB from temp)

movieId,title


Movie with id = 463 hasn't been rated.

In [73]:
%sql
select * from movies_df where movieId = 463

movieId,title,genres


We have no information about movie 463.

### Find the similar moives for moive with id: 471

In [76]:
%sql
with temp as (select idA, idB,  EuclideanDistance as sim 
from item_similarities 
where idA = 471
order by 3 desc
limit 5)

select movieId, title from movies_df
where movieId in (select idB from temp)

movieId,title
3566,"Big Kahuna, The (2000)"
4477,Big Top Pee-Wee (1988)
27251,"10th Kingdom, The (2000)"
39446,Saw II (2005)
70994,Halloween II (2009)
