### Spark Moive Recommendation
In this notebook, we will use an Alternating Least Squares (ALS) algorithm with Spark APIs to predict the ratings for the movies in [MovieLens small dataset](https://grouplens.org/datasets/movielens/latest/)

In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import os

In [3]:
os.environ["PYSPARK_PYTHON"] = "python3"

## Part0: Data Import

In [5]:
spark = SparkSession \
    .builder \
    .appName("moive analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [6]:
movies_df = spark.read.load("/FileStore/tables/movies.csv", format='csv', header = True)
ratings_df = spark.read.load("/FileStore/tables/ratings.csv", format='csv', header = True)
links_df = spark.read.load("/FileStore/tables/links.csv", format='csv', header = True)
tags_df = spark.read.load("/FileStore/tables/tags.csv", format='csv', header = True)

## Part I: Exploratory Data Analysis

In [8]:
tmp1 = ratings_df.groupBy('userId').count().select(F.min('count')).collect()[0][0]
print('Minimum number of ratings per user is {}'.format(tmp1))

In [9]:
tmp2 = ratings_df.groupBy('movieId').count().select(F.min('count')).collect()[0][0]
print('Minimum number of ratings per movie is {}'.format(tmp2))

In [10]:
tmp1 = ratings_df.groupBy("movieId").count()
tmp1 = tmp1.filter(tmp1['count']==1).count()
tmp2 = ratings_df.select('movieId').distinct().count()
print('{} out of {} movies are rated by only one user'.format(tmp1, tmp2))

In [11]:
tmp1 = ratings_df.groupBy("movieId").count().select(F.avg('count')).collect()[0][0]
print('Average number of ratings per movie is {0:.2f}'.format(tmp1))

In [12]:
tmp2 = ratings_df.groupBy("userId").count().select(F.avg('count')).collect()[0][0]
print('Average number of ratings per user is {0:.2f}'.format(tmp2))

In [13]:
movie_num = movies_df.select('movieId').distinct().count()
user_num = ratings_df.select('userId').distinct().count()
rating_num = ratings_df.select('rating').count()
print('Total number of users is {}. '.format(user_num))
print('Total number of movies is {}. '.format(movie_num))
print('Total number of ratings is {}. '.format(rating_num))

In [14]:
# The sparsity of the movie ratings
# sparcity = 1- rating_num/(movie_num*user_num)
denominator = movie_num*user_num
numerator = rating_num
sparsity = (1-numerator/denominator)*100
print ("The ratings dataframe is ", "%.2f" % sparsity + "% empty.")

In [15]:
movie_num_rated = ratings_df.select('movieId').distinct().count()
print('Total number of movies rated is {}.'.format(movie_num_rated))

In [16]:
movies_df.registerTempTable("movies")
ratings_df.registerTempTable("ratings")
links_df.registerTempTable("links")
tags_df.registerTempTable("tags")

### Movies not rated before.

In [18]:
spark.sql(
  '''
SELECT m.*,r.rating
FROM movies m 
    LEFT JOIN ratings r ON m.movieId = r.movieId
WHERE r.rating is Null
'''
).toPandas().head()

Unnamed: 0,movieId,title,genres,rating
0,1076,"Innocents, The (1961)",Drama|Horror|Thriller,
1,2939,Niagara (1953),Drama|Thriller,
2,3338,For All Mankind (1989),Documentary,
3,3456,"Color of Paradise, The (Rang-e khoda) (1999)",Drama,
4,4194,I Know Where I'm Going! (1945),Drama|Romance|War,


### Movie Genres

Each movie belongs to more than 1 genre,as shown below. We need to seperate these genres.

In [21]:
spark.sql(
  '''
SELECT DISTINCT title, genres
FROM movies
  '''
).toPandas().head()

Unnamed: 0,title,genres
0,Color of Night (1994),Drama|Thriller
1,Surviving the Game (1994),Action|Adventure|Thriller
2,Little Big League (1994),Comedy|Drama
3,Whatever (1998),Drama
4,Ronin (1998),Action|Crime|Thriller


In [22]:
## Data processing to seperate the genres for a movie
genres_pd_df = spark.sql("SELECT DISTINCT title, genres FROM movies").toPandas()
genres_pd_df['genres'] = genres_pd_df['genres'].apply(lambda x:x.split('|'))
genres_pd_df = pd.concat([genres_pd_df['title'],genres_pd_df['genres'].apply(pd.Series)],axis = 1).set_index('title')
genres_sep_pd_df = genres_pd_df.stack().reset_index(level=0)
genres_sep_pd_df.columns = ['title','genre']

We store all the movie genres in a list called movie_genres_list.

In [24]:
movie_genres_list = genres_sep_pd_df['genre'].unique().tolist()
print('Here are all the movie genres:'+'\n', movie_genres_list)

### Movie for Each Category

In [26]:
genres_sep_df = sqlContext.createDataFrame(genres_sep_pd_df)
genres_sep_df.registerTempTable('genres_sep')

In [27]:
spark.sql(
  '''
SELECT genre,count(*) AS count
FROM genres_sep
GROUP BY 1
ORDER BY 2 DESC
'''
).toPandas().head()

Unnamed: 0,genre,count
0,Drama,4361
1,Comedy,3756
2,Thriller,1894
3,Action,1828
4,Romance,1596


We put each movie under its genre. This is stored in a dictionary called movie_genre_dict, where the key is the genre, and the values is a list that contains the movies belonging to this genre.

In [29]:
movie_genre_dict = dict()
for _ in movie_genres_list:
  movie_genre_dict[_] = list()
for index, row in genres_sep_pd_df.iterrows():
   movie_genre_dict[row["genre"]].append(row["title"])

Let's take a look at the Film-Noir genre as an example.

In [31]:
print('Here are movies that belong to the crime genre:'+'\n',movie_genre_dict['Film-Noir'])

## Part2: Spark ALS based approach for training model
We will use an Spark ML to predict the ratings, so let's reload "ratings.csv" using ``sc.textFile`` and then convert it to the form of (user, item, rating) tuples.

In [33]:
ratings_df.toPandas().head()

Unnamed: 0,userId,movieId,rating,timestamp
0,1,1,4.0,964982703
1,1,3,4.0,964981247
2,1,6,4.0,964982224
3,1,47,5.0,964983815
4,1,50,5.0,964982931


In [34]:
movie_ratings_df=ratings_df.drop('timestamp')
movie_ratings_df.toPandas().head()

Unnamed: 0,userId,movieId,rating
0,1,1,4.0
1,1,3,4.0
2,1,6,4.0
3,1,47,5.0
4,1,50,5.0


In [35]:
# Data type convert
from pyspark.sql.types import IntegerType, FloatType
movie_ratings_df = movie_ratings_df.withColumn("userId", movie_ratings_df["userId"].cast(IntegerType()))
movie_ratings_df = movie_ratings_df.withColumn("movieId", movie_ratings_df["movieId"].cast(IntegerType()))
movie_ratings_df = movie_ratings_df.withColumn("rating", movie_ratings_df["rating"].cast(FloatType()))

In [36]:
movie_ratings_df.toPandas().head()

Unnamed: 0,userId,movieId,rating
0,1,1,4.0
1,1,3,4.0
2,1,6,4.0
3,1,47,5.0
4,1,50,5.0


### ALS Model Selection and Evaluation

With the ALS model, we can use a grid search to find the optimal hyperparameters.

In [38]:
# import package
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder

In [39]:
#Create test and train set
(training,test)=movie_ratings_df.randomSplit([0.8,0.2],seed = 42)

## We will tune the hyperparameters using ParamGridBuilder and CrossValidator.

In [41]:
#Create ALS model
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating",coldStartStrategy = 'drop',nonnegative = True, implicitPrefs = False)
# Confirm that a model called "als" was created
type(als)

In [42]:
#Tune model using ParamGridBuilder
# We will just tune rank and regParam considering long run time, after we get the have combination, we will use larger iterations.
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [3,5,10]) \
            .addGrid(als.maxIter, [10]) \
            .addGrid(als.regParam, [0.05,0.15,0.25]) \
            .build()
print ("Num models to be tested: ", len(param_grid))

In [43]:
# Define evaluator as RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction") 

In [44]:
# Build Cross validation 
cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=3)

In [45]:
#Fit ALS model to training data
model = cv.fit(training)

In [46]:
#Extract best model from the tuning exercise using ParamGridBuilder
best_model = model.bestModel

In [47]:
#Generate predictions and evaluate using RMSE
predictions=best_model.transform(test)
rmse = evaluator.evaluate(predictions)

In [48]:
#Print evaluation metrics and model parameters
print ("**Best Model**")
print ("RMSE = "+str(rmse))
print (" Rank:",best_model._java_obj.parent().getRank())   #parent()method will return an estimator,you can get the best params then
print (" MaxIter:",best_model._java_obj.parent().getMaxIter())
print (" RegParam:",best_model._java_obj.parent().getRegParam()) 

### Increase iteration number and Model testing

In [50]:
#Increase the interation for the best ALS model
#coldStartStrategy = 'drop' is important, otherwise, you will recieve rmse = nan
#Spark allows users to set the coldStartStrategy parameter to “drop” in order to drop any rows in the DataFrame of predictions that contain NaN values. 
als_50 = ALS(userCol="userId", itemCol="movieId", ratingCol="rating",rank = 10, maxIter = 50,regParam = 0.15, nonnegative = True, coldStartStrategy = 'drop',implicitPrefs = False)
#fit the model to training data
best_model_50 = als_50.fit(training)

In [51]:
#generate predictions on test data
prediction_50 = best_model_50.transform(test)

In [52]:
#tell spark how to evaluate predictions
evaluator_50 = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
#obtain rmse
rmse_50 = evaluator_50.evaluate(prediction_50)
#print rmse
print('RMSE=',rmse_50)

In [53]:
prediction_50_pd_df = prediction_50.toPandas()

In [54]:
fig, ax = plt.subplots(figsize=(7, 5))
sns.scatterplot(x='movieId',y='rating', data=prediction_50_pd_df,label='rating',marker = 'o',s=20,ax=ax)
sns.scatterplot(x='movieId',y='prediction', data=prediction_50_pd_df,label='prediction',marker ='+',s=20,ax=ax)
plt.xticks(fontsize=10)
plt.yticks(fontsize=10)
plt.xlabel("MovieId", fontsize=14)
plt.ylabel("Rating", fontsize=14)
plt.title("Prediction vs movieId", fontsize=14)
plt.legend(loc='best', fontsize=10);
#display()

In [55]:
# round the prediction to a scale of 1-5
def round_to_5scale(x):
  if x<=round(x) and 0.5<=x<=5:
    return round(x)-0.5 if round(x)-x>0.25 else round(x)
  if x>round(x) and 0.5<=x<=5:
    return round(x)+0.5 if x-round(x)>0.25 else round(x)
  if x>5:
    return 5
  if x<0.5:
    return 0.5

In [56]:
prediction_50_pd_df['prediction'] = prediction_50_pd_df['prediction'].apply(lambda x: round_to_5scale(x))

In [57]:
#After converting to a rate of 0-5
fig, ax = plt.subplots(figsize=(7, 5))
sns.scatterplot(x='movieId',y='rating', data=prediction_50_pd_df,label='rating',marker = 'o',s=20,ax=ax)
sns.scatterplot(x='movieId',y='prediction', data=prediction_50_pd_df,label='prediction',marker = '+',s=20,ax=ax)
plt.xticks(fontsize=10)
plt.yticks(fontsize=10)
plt.xlabel("MovieId", fontsize=14)
plt.ylabel("Rating", fontsize=14)
plt.title("Prediction vs movieId", fontsize=14)
plt.legend(loc='best', fontsize=10);
#display()

In [58]:
fig, ax = plt.subplots(figsize=(7,5))
sns.scatterplot(x='userId',y='rating', data=prediction_50_pd_df,label='rating',marker = 'o',s=20,ax=ax)
sns.scatterplot(x='userId',y='prediction', data=prediction_50_pd_df,label='prediction',marker = '+',s=20,ax=ax)
plt.xticks(fontsize=10)
plt.yticks(fontsize=10)
plt.xlabel("UserId", fontsize=14)
plt.ylabel("Rating", fontsize=14)
plt.title("Prediction vs UserId", fontsize=14)
plt.legend(loc='best', fontsize=10)
display()


### Apply model to all data and see the performance

In [60]:
#Best_model RMSE
alldata=best_model.transform(movie_ratings_df)
rmse = evaluator.evaluate(alldata)
print ("RMSE = "+str(rmse))

In [61]:
#Best_model_50 RMSE
alldata=best_model_50.transform(movie_ratings_df)
rmse = evaluator.evaluate(alldata)
print ("RMSE = "+str(rmse))

In [62]:
alldata.registerTempTable("alldata")

In [63]:
all_movie_data = spark.sql(
  '''
SELECT *
FROM movies
	LEFT JOIN alldata ON movies.movieId = alldata.movieId
  '''
)

In [64]:
alldata_pd_df = alldata.toPandas()

In [65]:
alldata_pd_df['prediction'] = alldata_pd_df['prediction'].apply(lambda x: round_to_5scale(x))

In [66]:
fig, ax = plt.subplots(figsize=(7, 5))
sns.scatterplot(x='userId',y='rating', data=alldata_pd_df,label='rating',marker = 'o',s=20,ax=ax)
sns.scatterplot(x='userId',y='prediction', data=alldata_pd_df,label='prediction',marker = '+',s=20,ax=ax)
plt.xticks(fontsize=10)
plt.yticks(fontsize=10)
plt.xlabel("MovieId", fontsize=14)
plt.ylabel("Rating", fontsize=14)
plt.title("Prediction vs MovieId", fontsize=14)
plt.legend(loc='best', fontsize=10);
display()

In [67]:
fig, ax = plt.subplots(figsize=(7,5))
sns.scatterplot(x='userId',y='rating', data=alldata_pd_df,label='rating',marker = 'o',s=20,ax=ax)
sns.scatterplot(x='userId',y='prediction', data=alldata_pd_df,label='prediction',marker = '+',s=20,ax=ax)
plt.xticks(fontsize=10)
plt.yticks(fontsize=10)
plt.xlabel("UserId", fontsize=14)
plt.ylabel("Rating", fontsize=14)
plt.title("Prediction vs UserId", fontsize=14)
plt.legend(loc='best', fontsize=10);
display()

### Let's look more closely on the user and movie data to decide which user to recommend.

In [69]:
ratings_info_df = movie_ratings_df.groupBy('movieId').avg('rating')
movie_ratings_count= movie_ratings_df.groupBy('movieId').count()
ratings_info_df = ratings_info_df.join(movie_ratings_count,'movieId','left').join(movies_df,'movieId','left')
ratings_info_df.toPandas().head()

Unnamed: 0,movieId,avg(rating),count,title,genres
0,1580,3.487879,165,Men in Black (a.k.a. MIB) (1997),Action|Comedy|Sci-Fi
1,2366,3.64,25,King Kong (1933),Action|Adventure|Fantasy|Horror
2,3175,3.58,75,Galaxy Quest (1999),Adventure|Comedy|Sci-Fi
3,1088,3.369048,42,Dirty Dancing (1987),Drama|Musical|Romance
4,32460,4.25,4,Knockin' on Heaven's Door (1997),Action|Comedy|Crime|Drama


### UserId order by rating count

In [71]:
ratings_df.groupBy("userId").count().toPandas().sort_values(by = 'count',ascending=False).head()

Unnamed: 0,userId,count
345,414,2698
472,599,2478
280,474,2108
28,448,1864
193,274,1346


We will recommend user 414 and 599 as they have the highest rating count. This means the recommendations to them would be more reliable.

### Histogram of rating counts

In [74]:
fig, ax = plt.subplots(figsize=(7, 5))
sns.distplot(ratings_info_df.toPandas()['count'],bins=50,ax = ax)
display()

### Histogram of the ratings

In [76]:
fig, ax = plt.subplots(figsize=(7, 5))
sns.distplot(ratings_info_df.toPandas()['avg(rating)'],bins=50,ax=ax)
display()

In [77]:
fig, ax = plt.subplots()
sns.jointplot(x='avg(rating)', y='count', data=ratings_info_df.toPandas())
display()

## Recommend moive to users with id: 414, 599.

In [79]:
# use the recommendation function of ALS
ALS_recommendations = best_model.recommendForAllUsers(10)
ALS_recommendations.filter(ALS_recommendations['userId'] == 599).toPandas().head()

Unnamed: 0,userId,recommendations
0,599,"[(96004, 4.2339396476745605), (3379, 4.2339396..."


We need to process the above dataframe for readability.

In [81]:
# Data procesing of the ALS_recommendations dataframe
from pyspark.sql.functions import explode,col
recommendations_df = (ALS_recommendations\
                      .select("userId",\
                              explode("recommendations")\
                              .alias("recommendation"))\
                      .select("userId", "recommendation.movieId",\
                              col("recommendation.rating")\
                              .alias('prediction')))
recommendations_df.toPandas().head()

Unnamed: 0,userId,movieId,prediction
0,471,6818,5.00462
1,471,8477,5.004135
2,471,3379,4.82935
3,471,96004,4.82935
4,471,25771,4.736907


In [82]:
# we only recommend movies that have not been watched by users before
recommendations_df = recommendations_df.join(movies_df,["movieId"],"left").join(ratings_df,['movieId','UserId'],'left')
recommendations_df = recommendations_df.drop('timestamp')
recommendations_df = recommendations_df.filter(ratings_df.rating.isNull())
recommendations_df.toPandas().head()

Unnamed: 0,movieId,userId,prediction,title,genres,rating
0,6818,471,5.00462,Come and See (Idi i smotri) (1985),Drama|War,
1,8477,471,5.004135,"Jetée, La (1962)",Romance|Sci-Fi,
2,3379,471,4.82935,On the Beach (1959),Drama,
3,96004,471,4.82935,Dragon Ball Z: The History of Trunks (Doragon ...,Action|Adventure|Animation,
4,25771,471,4.736907,"Andalusian Dog, An (Chien andalou, Un) (1929)",Fantasy,


We will only recommend movies that the users haven't watched/rated.

movies recommendations for user 414

In [84]:
recommendations_df.filter(recommendations_df['userId'] == 414).toPandas().head()

Unnamed: 0,movieId,userId,prediction,title,genres,rating
0,8477,414,5.228586,"Jetée, La (1962)",Romance|Sci-Fi,
1,3379,414,5.190521,On the Beach (1959),Drama,
2,96004,414,5.190521,Dragon Ball Z: The History of Trunks (Doragon ...,Action|Adventure|Animation,
3,6818,414,5.188003,Come and See (Idi i smotri) (1985),Drama|War,
4,102217,414,4.964223,Bill Hicks: Revelations (1993),Comedy,


movies recommendations for user 599

In [86]:
recommendations_df.filter(recommendations_df['userId'] == 599).toPandas().head()

Unnamed: 0,movieId,userId,prediction,title,genres,rating
0,33649,599,3.841577,Saving Face (2004),Comedy|Drama|Romance,
1,60943,599,3.827816,Frozen River (2008),Drama,
2,5915,599,3.827767,Victory (a.k.a. Escape to Victory) (1981),Action|Drama|War,
3,5490,599,3.827767,The Big Bus (1976),Action|Comedy,
4,132333,599,3.827767,Seve (2014),Documentary|Drama,


In [87]:
# Another way is to recommendForUserSubset function
#users = ALS_recommendations.filter(ALS_recommendations['userId'] == 575)
#ALS_recommendations_target = best_model.recommendForUserSubset(users,1)
#ALS_recommendations_target.show()

## Find the similar moives for moive with id: 464, 471

The similarites of different movies can be recognized from correlations of their user ratings. Let's say user A and B both gave movie 1,2 5 star rating. This indicates that the movie 1 and 2 might be highily similar.

However, we have a challenge in that some of the movies have very few ratings and may end up having high correlation simply because one or two people gave them a 5 star rating. We can fix this by setting a threshold for the number of ratings. From the histogram earlier we saw a sharp decline in number of ratings from 100. Therefore we will choose this as our threshold.

In [90]:
movie_matrix = movie_ratings_df.toPandas().pivot_table(index='userId', columns='movieId', values='rating')

In [91]:
def find_similar_movie(x):
  movie_x_rating = movie_matrix[x]
  similar_to_x=movie_matrix.corrwith(movie_x_rating).reset_index(level=0)
  similar_to_x.dropna(axis = 0,how = 'any',inplace=True)
  similar_to_x.columns = ['movieId','correlation']
  
  similar_to_x_df = sqlContext.createDataFrame(similar_to_x)
  similar_to_x_movie = similar_to_x_df.join(ratings_info_df,'movieId','left').toPandas()[['movieId','correlation','title','count']]
  res = similar_to_x_movie[similar_to_x_movie['count']>100].sort_values(by = 'correlation',ascending = False)
  return similar_to_x,res

We will only find similar movies that have been rated more than 100 times.

movies similar to movie 471

In [93]:
# Movies similar to 471
corr_471,similar_to_471_movie = find_similar_movie(471)

In [94]:
fig, ax = plt.subplots(figsize=(7, 5))
sns.scatterplot(x = 'movieId',y='correlation',data=corr_471,ax=ax)
display()

In [95]:
similar_to_471_movie.head()

Unnamed: 0,movieId,correlation,title,count
2426,79132,0.973124,Inception (2010),143
1788,48516,0.724861,"Departed, The (2006)",107
2576,33794,0.67743,Batman Begins (2005),116
1350,1222,0.608101,Full Metal Jacket (1987),102
1229,364,0.570549,"Lion King, The (1994)",172


movies similar to movie 464

In [97]:
# Movies similar to 464
corr_464,similar_to_464_movie = find_similar_movie(464)

In [98]:
fig, ax = plt.subplots(figsize=(7, 5))
sns.scatterplot(x='movieId',y='correlation',data=corr_464,ax=ax)
display()

In [99]:
similar_to_464_movie.head()

Unnamed: 0,movieId,correlation,title,count
362,5816,1.0,Harry Potter and the Chamber of Secrets (2002),102
528,4896,0.904534,Harry Potter and the Sorcerer's Stone (a.k.a. ...,107
1465,58559,0.866025,"Dark Knight, The (2008)",149
450,253,0.853913,Interview with the Vampire: The Vampire Chroni...,109
389,7153,0.845154,"Lord of the Rings: The Return of the King, The...",185


## Report
In this project, the movie data from movie lens data set which includes about 600 users and 9500 movies were analyzed to gain insights on movie recomendations to users and finding similar movies. At the beginning, we calculate the sparisity of the movie ratings which is 98.3%, this tells us that the ratings dataframe is mostly empty, which brings significance to predict the user ratings from what we have.

To achieve the goal,the data was analyzed on Spark platform from perfoming data cleaning,processing to model training with Alternating Least Squares (ALS) algorithm.During which, grid search and cross validation were applied to tune the hyperparameters. It is found that using large rank and iterzations would help achieve a low RMSE. Finally, we choose a rank of 10, iterate 50 times and regPram = 0.15,a RMSE of 0.69 was achieved.This means that on average the model predicts 0.69 above or below values of the original ratings matrix.

By successsfully predicting the ratings using the best model, we not only fill the rating dataframe and recommend our users with movies they have never watched, but also find similar movies through their correlations. This brings huge business value to the company.