#Abstract#

In this era of entertainment the companies like Netflix, Amazon prime etc. have provided a huge platform for online streaming of movies, series, documentries and other visual entertainment. The users of these platforms have always been in a dilemma to chose which kind of movies they would like to watch amongst the millions of movies uploaded by these platforms. That is where big data and recommendations have come in for analyzing and providing recommendations to its users which makes it very simple for these users to access the kind of movie acoording to the genres they are interested in.

#Motivation#

This dataset was provided for a Netflix recommendation contest which will provide an opportunity and challenges to learn different types of filtering techniques used in recommendation systems. The project is to be performed on databricks platform using PySpark.

#Introduction to the Data#

This dataset is provided by movie lens with 2 files which we will be using for this project which are ratings and movies. The movies.csv has columns which are movieId, title, genre where as the ratings.csv has userId,movieId,ratings and timestamp.

Data Source and dictonary : https://grouplens.org/datasets/movielens/

Data consistency: The data seems to be mostly clean with some missing values which have been dropped. There are a few duplicates in the data which have been dropped.

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.functions import explode,split
from pyspark.sql.window import Window
from IPython.display import Image 
from IPython.core.display import HTML 
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

In [0]:
# File location of MOVIE dataset
movie_file_location = "/FileStore/tables/movies.csv"

In [0]:
#defining schema for Movie dataset
movie_schema = StructType([StructField('movieId', DoubleType(), True),       
                     StructField('title', StringType(), True),
                     StructField('genres', StringType(), True)])

In [0]:
#reading the data using schema definition 
moviedf = spark.read.csv(movie_file_location, header=True , schema=movie_schema)
display(moviedf)

movieId,title,genres
1.0,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2.0,Jumanji (1995),Adventure|Children|Fantasy
3.0,Grumpier Old Men (1995),Comedy|Romance
4.0,Waiting to Exhale (1995),Comedy|Drama|Romance
5.0,Father of the Bride Part II (1995),Comedy
6.0,Heat (1995),Action|Crime|Thriller
7.0,Sabrina (1995),Comedy|Romance
8.0,Tom and Huck (1995),Adventure|Children
9.0,Sudden Death (1995),Action
10.0,GoldenEye (1995),Action|Adventure|Thriller


In [0]:
#Making Function to seperate year from movie title with Regex

import re

def sepYear(title):
  year = re.search('\(([0-9]{4})', title)
  if year is not None:
    year = year.group(1)
  else:
    year = 0

  return int(year)

In [0]:
# Registering Functions
sqlContext.udf.register("yearCleansing", sepYear)

In [0]:
# Using The Functions

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

year_udf = udf(sepYear, IntegerType())
moviedf = moviedf.select("movieId", "title", "genres", year_udf("title").alias("year"))

In [0]:
moviedf.printSchema()

In [0]:
display(moviedf)

movieId,title,genres,year
1.0,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy,1995
2.0,Jumanji (1995),Adventure|Children|Fantasy,1995
3.0,Grumpier Old Men (1995),Comedy|Romance,1995
4.0,Waiting to Exhale (1995),Comedy|Drama|Romance,1995
5.0,Father of the Bride Part II (1995),Comedy,1995
6.0,Heat (1995),Action|Crime|Thriller,1995
7.0,Sabrina (1995),Comedy|Romance,1995
8.0,Tom and Huck (1995),Adventure|Children,1995
9.0,Sudden Death (1995),Action,1995
10.0,GoldenEye (1995),Action|Adventure|Thriller,1995


In [0]:
display(moviedf)

movieId,title,genres,year
1.0,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy,1995
2.0,Jumanji (1995),Adventure|Children|Fantasy,1995
3.0,Grumpier Old Men (1995),Comedy|Romance,1995
4.0,Waiting to Exhale (1995),Comedy|Drama|Romance,1995
5.0,Father of the Bride Part II (1995),Comedy,1995
6.0,Heat (1995),Action|Crime|Thriller,1995
7.0,Sabrina (1995),Comedy|Romance,1995
8.0,Tom and Huck (1995),Adventure|Children,1995
9.0,Sudden Death (1995),Action,1995
10.0,GoldenEye (1995),Action|Adventure|Thriller,1995


In [0]:
#checking if there are any duplicate values of the title.
display(moviedf.groupby("title").count().orderBy("title", ascending=False))

title,count
줄탁동시 (2012),1
貞子3D (2012),1
チェブラーシカ (2010),1
キサラギ (2007),1
…And the Fifth Horseman Is Fear (1965),1
‘Rameau’s Nephew’ by Diderot (Thanx to Dennis Young) by Wilma Schoen (1974),1
काशी - In Search of Ganga (2018),1
به نام پدر,1
أهواك (2015),1
ארבינקא (1967),1


In [0]:
# Counting the rows of the movies dataframe.
moviedf.count()

In [0]:
#Summary of the statistics of Movie dataset.
display(moviedf.select('movieId').summary())

summary,movieId
count,62423.0
mean,122220.38764557937
stddev,63264.74484425327
min,1.0
25%,82121.0
50%,138010.0
75%,173215.0
max,209171.0


In [0]:
# File location of MOVIE RATING dataset
rating_file_location = "/FileStore/tables/ratings.csv"

In [0]:
#Defining the schmea for the Rating dataframe.
rating_schema = StructType([StructField('userId', DoubleType(), True),        
                     StructField('movieId', DoubleType(), True),
                     StructField('rating', DoubleType(), True),
                     StructField('timestamp', DoubleType(), True)])

In [0]:
#Reading the dataset using schema definition.
ratingdf = spark.read.csv(rating_file_location, header=True, schema=rating_schema )
display(ratingdf)

userId,movieId,rating,timestamp
1.0,296.0,5.0,1147880044.0
1.0,306.0,3.5,1147868817.0
1.0,307.0,5.0,1147868828.0
1.0,665.0,5.0,1147878820.0
1.0,899.0,3.5,1147868510.0
1.0,1088.0,4.0,1147868495.0
1.0,1175.0,3.5,1147868826.0
1.0,1217.0,3.5,1147878326.0
1.0,1237.0,5.0,1147868839.0
1.0,1250.0,4.0,1147868414.0


In [0]:
display(ratingdf)

userId,movieId,rating,timestamp
1.0,296.0,5.0,1147880044.0
1.0,306.0,3.5,1147868817.0
1.0,307.0,5.0,1147868828.0
1.0,665.0,5.0,1147878820.0
1.0,899.0,3.5,1147868510.0
1.0,1088.0,4.0,1147868495.0
1.0,1175.0,3.5,1147868826.0
1.0,1217.0,3.5,1147878326.0
1.0,1237.0,5.0,1147868839.0
1.0,1250.0,4.0,1147868414.0


In [0]:
#No. of rows in the dataframe.
ratingdf.count()

In [0]:
#Checking the statistical summary for the Ratings dataset
display(ratingdf.select('movieId').summary())

summary,movieId
count,25000095.0
mean,21387.981943268616
stddev,39198.86210105984
min,1.0
25%,1197.0
50%,2947.0
75%,8623.0
max,209171.0


In [0]:
#Dropping the columns not required for the current project.
ratingdf = ratingdf.drop('timestamp')

In [0]:
#grouping movies according to its genres
display(moviedf.groupby("genres").count().orderBy("count", ascending=False))

genres,count
Drama,9056
Comedy,5674
(no genres listed),5062
Documentary,4731
Comedy|Drama,2386
Drama|Romance,2126
Horror,1661
Comedy|Romance,1577
Comedy|Drama|Romance,1044
Drama|Thriller,933


In [0]:
#joining the dataframes.
movie_rating = (ratingdf.select('userId','movieId','rating')).join(moviedf,['movieId'])
display(movie_rating)

movieId,userId,rating,title,genres,year
296.0,1.0,5.0,Pulp Fiction (1994),Comedy|Crime|Drama|Thriller,1994
306.0,1.0,3.5,Three Colors: Red (Trois couleurs: Rouge) (1994),Drama,1994
307.0,1.0,5.0,Three Colors: Blue (Trois couleurs: Bleu) (1993),Drama,1993
665.0,1.0,5.0,Underground (1995),Comedy|Drama|War,1995
899.0,1.0,3.5,Singin' in the Rain (1952),Comedy|Musical|Romance,1952
1088.0,1.0,4.0,Dirty Dancing (1987),Drama|Musical|Romance,1987
1175.0,1.0,3.5,Delicatessen (1991),Comedy|Drama|Romance,1991
1217.0,1.0,3.5,Ran (1985),Drama|War,1985
1237.0,1.0,5.0,"Seventh Seal, The (Sjunde inseglet, Det) (1957)",Drama,1957
1250.0,1.0,4.0,"Bridge on the River Kwai, The (1957)",Adventure|Drama|War,1957


In [0]:
#caching the dataframes.
movie_rating.cache
ratingdf.cache

In [0]:
#printing the joined dataframe schema.
movie_rating.printSchema()

In [0]:
#Dropping null values
movie_rating = movie_rating.dropna()

In [0]:
#Dropping Duplicate entries.
movie_rating = movie_rating.dropDuplicates() 

In [0]:
#checking the number of rows.
movie_rating.count()

In [0]:
#grouping movies according to its genres
display(moviedf.groupby("genres").count().orderBy("count", ascending=False))

genres,count
Drama,9056
Comedy,5674
(no genres listed),5062
Documentary,4731
Comedy|Drama,2386
Drama|Romance,2126
Horror,1661
Comedy|Romance,1577
Comedy|Drama|Romance,1044
Drama|Thriller,933


In [0]:
from pyspark.sql.functions import explode,split
genres = moviedf.withColumn("genres",explode(split("genres","[|]")))
display(genres)

movieId,title,genres,year
1.0,Toy Story (1995),Adventure,1995
1.0,Toy Story (1995),Animation,1995
1.0,Toy Story (1995),Children,1995
1.0,Toy Story (1995),Comedy,1995
1.0,Toy Story (1995),Fantasy,1995
2.0,Jumanji (1995),Adventure,1995
2.0,Jumanji (1995),Children,1995
2.0,Jumanji (1995),Fantasy,1995
3.0,Grumpier Old Men (1995),Comedy,1995
3.0,Grumpier Old Men (1995),Romance,1995


In [0]:
genre = genres.select('genres').drop_duplicates()
display(genre)

genres
Crime
Romance
Thriller
Adventure
Drama
War
Documentary
Fantasy
Mystery
Musical


As we can see that there are multiple genres for same movieId and title. Combining these genres into one. Providing ranks for measuring the ratings.

In [0]:
from pyspark.sql.window import Window
movie_rating= movie_rating.withColumn("rank",row_number().over(Window.partitionBy("userId","title").orderBy("rating")))
movie_rating.show()

In [0]:
movie_rating.filter(movie_rating.rank==1).show()

In [0]:
display(movie_rating.sort("Rating").select("title","Rating"))

title,Rating
Terminator 2: Judgment Day (1991),0.5
"Princess Diaries, The (2001)",0.5
No Direction Home: Bob Dylan (2005),0.5
"Hobbit: The Desolation of Smaug, The (2013)",0.5
Sirens (1994),0.5
Kung Fu Hustle (Gong fu) (2004),0.5
"Day After Tomorrow, The (2004)",0.5
Die Another Day (2002),0.5
Batman (1989),0.5
Who Framed Roger Rabbit? (1988),0.5


In [0]:
import pyspark.sql.functions as F

(movie_rating
.groupBy(F.col('title'))
.agg(F.count('title').alias('No_of_person_viewed'))
.sort('No_of_person_viewed',ascending=False)
.show())

In [0]:
#Dividing the dataset for Train and Test with random seed.
(train, test) = ratingdf.randomSplit([0.7, 0.3], seed = 2020)

In [0]:
Image(url= "https://miro.medium.com/max/700/1*EIBIiW2YiakP1ftxwPF8LA.png")

In [0]:
Image(url= "https://miro.medium.com/max/700/1*7_JHQ6-1nyHoB2ux1h0ZKw.png")

Collaborative Filtering uses aggregation of the past behaviours of all the users and recommends items to users based on the items liked by other users who have likes and dislikes that are similar to the user who is under consideration. This is called user-user based CF.

#Recommendation using ALS.#

Alternating Least Squares (ALS) matrix factorization is used to estimate the matrix R (Ratings in this case) which is the product of two matrices that are lower rank X and Y where X * Yt = R.


The method is iterative and in each iteration one of the factor matrices is kept constant and other is solved using least squares. Then the newly solved matrix is held constant and the other factor matrix is solved.

As we are looking to build a model explictly we will specify the columns. The non-negative will be true as we are looking for ratings which are greater than zero. Since we are just working with the explicit ratings we will set the implicitPrefs as False.

In [0]:
# Importing the required functions.
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
# Create ALS model
Als_model = ALS(
         userCol="userId", 
         itemCol="movieId",
         ratingCol="rating", 
         nonnegative = True, 
         implicitPrefs = False,
         coldStartStrategy="drop"
)
# To confirm that that model was created.
type(Als_model)

Defining the tuning parameter using param_grid function. Please Feel free to experiment with these parameters for the grid. 
We have only chosen 4 parameters for each grid. This will result in 16 models for training.

In [0]:
# Adding hyperparameters and respective values to the grid.
param_grid = ParamGridBuilder() \
            .addGrid(Als_model.rank, [10, 50, 100, 150]) \
            .addGrid(Als_model.regParam, [.01, .05, .1, .15]) \
            .build()

Defining Evaluator where metric is RMSE and printing length of evaluator

In [0]:
Evaluator = RegressionEvaluator(
           metricName="rmse", 
           labelCol="rating", 
           predictionCol="prediction") 
print ("Num models to be tested: ", len(param_grid))

Fitting the evaluator and the param_grid for ALS model into the cross validator. The number of folds we have chosen is 2. Feel free to experiment.

In [0]:
# Build cross validation using CrossValidator
Crossvalidator = CrossValidator(estimator=Als_model, estimatorParamMaps=param_grid, evaluator=Evaluator, numFolds=2)
Crossvalidator

We have chosen 16 models so it will fit and give us the best model parameters.

In [0]:
#Fit cross validator to the 'train' dataset
model = Crossvalidator.fit(train)

#Extract best model from the cv model above
best_model = model.bestModel

Printing the best model and its parameters

In [0]:
# Print best_model-
print(type(best_model))

# Complete the code below to extract the ALS model parameters
print("**Best Model**")

# # Print "Rank"
print("  Rank:", best_model._java_obj.parent().getRank())

# Print "MaxIter"
print("  MaxIter:", best_model._java_obj.parent().getMaxIter())

# Print "RegParam"
print("  RegParam:", best_model._java_obj.parent().getRegParam())

Calculating and evaluating by the metric RMSE with the best model.

In [0]:
# View the predictions
test_data_prediction = best_model.transform(test)
RMSE = Evaluator.evaluate(test_data_prediction)
print(RMSE)

Displaying the predicted values from the test dataset.

In [0]:
test_data_prediction.show()

Predicting Recommendations for all these users.

In [0]:
# Generate n Recommendations for all users
n_recommendations = best_model.recommendForAllUsers(10)
n_recommendations.show()

Transforming the dataframe into a better understandable format.

In [0]:
n_recommendations = n_recommendations\
    .withColumn("rec_exp", explode("recommendations"))\
    .select('userId', col("rec_exp.movieId"), col("rec_exp.rating"))
n_recommendations.limit(10).show()

Lets check if these recommendations make any sense? We will print the original genres of movies watched by the userId 50 and we will check if the predicted movies are of similar genre.

In [0]:
n_recommendations.join(moviedf, on='movieId').filter('userId = 50').show()

In [0]:
ratingdf.join(moviedf, on='movieId').filter('userId = 50').sort('rating', ascending=False).limit(10).show()

#Conclusion and Results#

We can see that the movies which were originally viewed by the 50th userId and the movies predicted by this model are of the similar genre which is War, Drama etc. Thus, we can verify the the values our model is predicting are very similar.

#Future Work and Limitations#
In this project we have used some param_grid inputs and 16 models. There can be more number models that can be experimented for a better model and we can see the different results they display. In this project we have only used ALS where as there are several other recommendation methods that can be explored.

#References and Related work#

1. https://grouplens.org/datasets/movielens/
2. http://spark.apache.org/docs/2.2.0/ml-collaborative-filtering.html
3. https://www.data-stats.com/spark-analytics-on-movielens-dataset/
4. https://intellipaat.com/community/5161/how-do-i-add-a-new-column-to-a-spark-dataframe-using-pyspark
5. https://en.wikipedia.org/?title=Collaborative_filtering#/media/File:Collaborative_filtering.gif
6. https://github.com/snehalnair/als-recommender-pyspark/blob/master/Recommendation_Engine_MovieLens.ipynb 
7. https://dl.acm.org/doi/10.1109/MC.2009.263
8. https://forums.databricks.com/questions/29012/how-to-show-an-image-in-a-notebook-using-html.html 
9. https://developers.google.com/machine-learning/recommendation/collaborative/matrix