Building a Movie Recommendation System using the ALS algorithm

The data set chosen for the analysis is the Movie Lens Data

In [69]:
# importing required libraries
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder,CrossValidator
from pyspark.ml.recommendation import ALS
import pandas as pd


In [2]:
# creating a spark session
sparksession = SparkSession.builder.appName("Movie").getOrCreate()

In [4]:
# reading the movielens dataset
movielensdata = sparksession.read.csv('movielens_ratings.csv',inferSchema=True,header=True)

In [32]:
# schema of the dataset
movielensdata.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- userId: integer (nullable = true)



In [35]:
# top 20 rows of the movielens data
movielensdata.show()

+-------+------+------+
|movieId|rating|userId|
+-------+------+------+
|      2|   3.0|     0|
|      3|   1.0|     0|
|      5|   2.0|     0|
|      9|   4.0|     0|
|     11|   1.0|     0|
|     12|   2.0|     0|
|     15|   1.0|     0|
|     17|   1.0|     0|
|     19|   1.0|     0|
|     21|   1.0|     0|
|     23|   1.0|     0|
|     26|   3.0|     0|
|     27|   1.0|     0|
|     28|   1.0|     0|
|     29|   1.0|     0|
|     30|   1.0|     0|
|     31|   1.0|     0|
|     34|   1.0|     0|
|     37|   1.0|     0|
|     41|   2.0|     0|
+-------+------+------+
only showing top 20 rows



In [None]:
#Number of rows in the dataset
movielensdata.count()

In [28]:
# Number of unique users
movielensdata.toPandas()['userId'].nunique()

30

In [30]:
# Number of unique movies
movielensdata.toPandas()['movieId'].nunique()

100

In [91]:
# calculating average rating for each movie
avgmovierating=movielensdata.select(["movieID","rating"]).groupBy("movieID").mean().select(["movieID","avg(rating)"]).toPandas()
avgmovierating=avgmovierating.rename({"avg(rating)":"Average_Rating"},axis=1)
avgmovierating["Average_Rating"] =avgmovierating["Average_Rating"].round(2)
avgmovierating.sort_values("Average_Rating",ascending =False).head(10)


Unnamed: 0,movieID,Average_Rating
77,32,2.92
76,90,2.81
90,30,2.5
49,23,2.47
26,94,2.47
51,49,2.44
96,18,2.4
72,29,2.4
18,52,2.36
3,53,2.25


In [102]:
# calculating average rating for each movie using SPARK SQL
movielensdata.createOrReplaceTempView("movie")
sparksession.sql("SELECT movieID, ROUND(AVG(rating),2) as Average_Rating FROM movie GROUP BY movieID ORDER BY Average_Rating DESC LIMIT 10").show()


+-------+--------------+
|movieID|Average_Rating|
+-------+--------------+
|     32|          2.92|
|     90|          2.81|
|     30|           2.5|
|     94|          2.47|
|     23|          2.47|
|     49|          2.44|
|     29|           2.4|
|     18|           2.4|
|     52|          2.36|
|     53|          2.25|
+-------+--------------+



In [138]:
# Summary Statistics of the Movielens data
movielensdata.describe().show()

+-------+------------------+------------------+------------------+
|summary|           movieId|            rating|            userId|
+-------+------------------+------------------+------------------+
|  count|              1501|              1501|              1501|
|   mean| 49.40572951365756|1.7741505662891406|14.383744170552964|
| stddev|28.937034065088994| 1.187276166124803| 8.591040424293272|
|    min|                 0|               1.0|                 0|
|    max|                99|               5.0|                29|
+-------+------------------+------------------+------------------+



In [103]:
# Splitting the data into 70% training data and 30% Test Data
(traindata, testdata) = movielensdata.randomSplit([0.7, 0.3])

In [122]:
# Buidling the ALS model
alsmodel =  ALS(userCol="userId", itemCol="movieId", ratingCol="rating",coldStartStrategy="drop",nonnegative=True)


In [125]:
# setting the hyper parameter grid 
paramgrid =  ParamGridBuilder().addGrid(alsmodel.maxIter,[5,10,15]).addGrid(alsmodel.regParam,[0.01,0.1,0.2]).build()

In [126]:
# The model evaluation metric is Root Mean Square Error
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")


In [127]:
# to perform cross-validation on the grid hyper parameters
cross_validation = CrossValidator(estimator=alsmodel,estimatorParamMaps=paramgrid,evaluator=evaluator,numFolds=5) 

In [128]:
# fitting the ALS model
mod = cross_validation.fit(traindata)

In [129]:
# Selecting the best model with least RMSE
bestmodel = mod.bestModel


In [131]:
# Getting the predictions from the model on the test set
predictions = bestmodel.transform(testdata)

In [132]:
# displaying some predictions 
predictions.show()

+-------+------+------+----------+
|movieId|rating|userId|prediction|
+-------+------+------+----------+
|     31|   1.0|    26|0.59006816|
|     31|   4.0|    12| 1.2104212|
|     85|   1.0|    26| 1.1737769|
|     85|   3.0|     1|  1.246295|
|     85|   1.0|    13| 1.9065279|
|     85|   1.0|    15| 0.8273883|
|     85|   1.0|    23|  1.019778|
|     85|   1.0|    25| 1.8364308|
|     65|   1.0|    28|  1.672488|
|     65|   2.0|     3| 1.2472758|
|     65|   2.0|     5| 3.4569218|
|     65|   1.0|     4|0.74044275|
|     53|   3.0|    13| 2.4844565|
|     53|   1.0|     6| 2.5893838|
|     53|   3.0|    20| 1.4866754|
|     53|   3.0|    14| 3.5209095|
|     78|   1.0|    12| 0.6664252|
|     78|   1.0|     1| 0.7142437|
|     78|   1.0|    19| 0.6469874|
|     78|   1.0|    17|0.71163535|
+-------+------+------+----------+
only showing top 20 rows



In [134]:
# Viewing the predictions of rating for a particular user 
movielensdata.createOrReplaceTempView("movie")
user =sparksession.sql("SELECT movieID, userId FROM movie where userId =2").toPandas()
reccomendations = bestmodel.transform(testdata)
reccomendations.orderBy('prediction',ascending=False).show()

+-------+------+------+----------+
|movieId|rating|userId|prediction|
+-------+------+------+----------+
|     25|   2.0|    14| 4.4706593|
|     32|   3.0|    24| 4.4344606|
|     32|   5.0|    23|  4.403323|
|     52|   5.0|     8|  4.150547|
|     49|   4.0|     5|  3.907705|
|     68|   3.0|    23| 3.7907603|
|     63|   4.0|    24| 3.6452858|
|     27|   5.0|    11| 3.6237674|
|     64|   4.0|    23|   3.53715|
|     53|   3.0|    14| 3.5209095|
|     49|   3.0|    17| 3.5192595|
|     49|   3.0|    26| 3.5027378|
|     65|   2.0|     5| 3.4569218|
|     12|   1.0|    11| 3.2897742|
|     96|   4.0|    14|  3.237337|
|     18|   3.0|    22| 3.1240072|
|     64|   3.0|     5| 3.1161447|
|     72|   4.0|    14| 2.9740045|
|     29|   4.0|     4| 2.9495337|
|      7|   1.0|    28| 2.9177337|
+-------+------+------+----------+
only showing top 20 rows



In [137]:
# The RMSE of the ALS model 
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(round(rmse,2)))

Root-mean-square error = 1.07
