

## Movie Recommender System - Collaborative Filtering

DataSet:

https://www.kaggle.com/rounakbanik/the-movies-dataset

Source:

http://www.3leafnodes.com/apache-spark-introduction-recommender-system

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('MovieRecommender').getOrCreate()
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

:: loading settings :: url = jar:file:/usr/lib/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
com.johnsnowlabs.nlp#spark-nlp_2.12 added as a dependency
graphframes#graphframes added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-17b622f6-1a8c-4f4f-86be-08f4577bb4b8;1.0
	confs: [default]
	found com.johnsnowlabs.nlp#spark-nlp_2.12;4.4.0 in central
	found com.typesafe#config;1.4.2 in central
	found org.rocksdb#rocksdbjni;6.29.5 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.828 in central
	found com.github.universal-automata#liblevenshtein;3.0.0 in central
	found com.google.protobuf#protobuf-java-util;3.0.0-beta-3 in central
	found com.google.protobuf#protobuf-java;3.0.0-beta-3 in central
	found com.google.code.gson#gson;2.3 in central
	found it.unimi.dsi#fastutil;7.0.12 in central
	found org.projectlombok#lombok;1.16.8 in central
	found com.google.cloud#google-cloud-storage;2.16.0 in central
	found com.google.guava#guava;31.1-jre in centra

### Import Data

In [23]:
ratings = spark.read.csv("gs://msca-bdp-student-gcs/the-movies-dataset/ratings_small.csv", inferSchema=True, header=True)
movies = spark.read.csv("gs://msca-bdp-student-gcs/the-movies-dataset/movies_metadata.csv", inferSchema=True, header=True)
ratings = spark.read.csv("gs://msca-bdp-student-gcs/the-movies-dataset/ratings.csv", inferSchema=True, header=True)


                                                                                

In [4]:
ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



### Data Exploration

In [5]:
ratings.columns

['userId', 'movieId', 'rating', 'timestamp']

In [6]:
movies.columns

['adult',
 'belongs_to_collection',
 'budget',
 'genres',
 'homepage',
 'id',
 'imdb_id',
 'original_language',
 'original_title',
 'overview',
 'popularity',
 'poster_path',
 'production_companies',
 'production_countries',
 'release_date',
 'revenue',
 'runtime',
 'spoken_languages',
 'status',
 'tagline',
 'title',
 'video',
 'vote_average',
 'vote_count']

In [7]:
ratings = ratings.select(['userId', 'movieId', 'rating'])

In [8]:
ratings.head(5)

[Row(userId=1, movieId=31, rating=2.5),
 Row(userId=1, movieId=1029, rating=3.0),
 Row(userId=1, movieId=1061, rating=3.0),
 Row(userId=1, movieId=1129, rating=2.0),
 Row(userId=1, movieId=1172, rating=4.0)]

In [9]:
ratings.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|     31|   2.5|
|     1|   1029|   3.0|
|     1|   1061|   3.0|
|     1|   1129|   2.0|
|     1|   1172|   4.0|
|     1|   1263|   2.0|
|     1|   1287|   2.0|
|     1|   1293|   2.0|
|     1|   1339|   3.5|
|     1|   1343|   2.0|
|     1|   1371|   2.5|
|     1|   1405|   1.0|
|     1|   1953|   4.0|
|     1|   2105|   4.0|
|     1|   2150|   3.0|
|     1|   2193|   2.0|
|     1|   2294|   2.0|
|     1|   2455|   2.5|
|     1|   2968|   1.0|
|     1|   3671|   3.0|
+------+-------+------+
only showing top 20 rows



In [10]:
ratings.describe().show()

[Stage 6:>                                                          (0 + 1) / 1]

+-------+------------------+------------------+------------------+
|summary|            userId|           movieId|            rating|
+-------+------------------+------------------+------------------+
|  count|            100004|            100004|            100004|
|   mean| 347.0113095476181|12548.664363425463| 3.543608255669773|
| stddev|195.16383797819535|26369.198968815268|1.0580641091070326|
|    min|                 1|                 1|               0.5|
|    max|               671|            163949|               5.0|
+-------+------------------+------------------+------------------+



                                                                                

In [11]:
training, test = ratings.randomSplit([0.8,0.2])

### ALS

[Alternating Least Squares(ALS)](https://spark.apache.org/docs/latest/ml-collaborative-filtering.html) is a the model we’ll use to fit our data and find similarities. ALS is an iterative optimization process where we for every iteration try to arrive closer and closer to a factorized representation of our original data.

For implicit preference data, the algorithm used is based on “Collaborative Filtering for Implicit Feedback Datasets”,, adapted for the blocked approach used here.

Essentially instead of finding the low-rank approximations to the rating matrix R, this finds the approximations for a preference matrix P where the elements of P are 1 if r > 0 and 0 if r <= 0. The ratings then act as ‘confidence’ values related to strength of indicated user preferences rather than explicit ratings given to items.

### Cold Start Predictions

When there are cold start users or items to make predictions on (ones not available in the model) the predictions produce NaNs as shown in the summary below. This also causes evaluation with the mean squared error to produce a NaN.To solve this problem, the rows can be dropped with <code>predictions.na.drop()</code>. A more streamlined way is to add the <code>coldStartStrategy="drop"</code> as a model parameter.

In [24]:
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=10, regParam=0.01, rank = 10, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop", nonnegative = True)

In [25]:
#fit and predict
model = als.fit(training)
predictions = model.transform(test)

                                                                                

In [26]:
#explain parameters of the model
model.explainParams()

"blockSize: block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data. (default: 4096)\ncoldStartStrategy: strategy for dealing with unknown or new users/items at prediction time. This may be useful in cross-validation or production scenarios, for handling user/item ids the model has not seen in the training data. Supported values: 'nan', 'drop'. (default: nan, current: drop)\nitemCol: column name for item ids. Ids must be within the integer value range. (default: item, current: movieId)\npredictionCol: prediction column name. (default: prediction)\nuserCol: column name for user ids. Ids must be within the integer value range. (default: user, current: userId)"

In [27]:
#item factors 
model.itemFactors.show(10, truncate = False)

+---+-----------------------------------------------------------------------------------------------------------------+
|id |features                                                                                                         |
+---+-----------------------------------------------------------------------------------------------------------------+
|10 |[1.5238638, 0.83908147, 1.149138, 1.2575643, 0.30058798, 0.79920846, 0.49613717, 0.383721, 0.75089955, 1.4196552]|
|20 |[0.0, 2.3232348, 0.4780416, 0.24904971, 1.4564697, 0.44559446, 0.9844715, 0.0, 1.0125227, 0.4016392]             |
|30 |[1.0938011, 2.4810016, 1.870443, 0.77797836, 0.0, 0.3354503, 0.0, 0.6334316, 0.0, 1.9481325]                     |
|40 |[2.306192, 1.0666863, 1.5224135, 1.798652, 0.045223873, 0.5488121, 0.5036729, 1.137273, 0.26715937, 1.33093]     |
|50 |[1.5835184, 1.3895707, 0.36872786, 0.51045555, 1.1541736, 0.0675079, 1.0428332, 1.177898, 1.3729986, 2.689678]   |
|60 |[0.8427902, 1.031159, 0.38933897, 1

In [28]:
movies = movies.select('id','title','genres')
predictions = predictions.join(movies, movies.id == predictions.movieId)

In [29]:
predictions = predictions.na.drop()
predictions.show(10, truncate = False)

                                                                                

+------+-------+------+----------+----+--------------------+----------------------------------------------------------------------------------------------------------------------------+
|userId|movieId|rating|prediction|id  |title               |genres                                                                                                                      |
+------+-------+------+----------+----+--------------------+----------------------------------------------------------------------------------------------------------------------------+
|148   |329    |4.0   |2.808783  |329 |Jurassic Park       |[{'id': 12, 'name': 'Adventure'}, {'id': 878, 'name': 'Science Fiction'}]                                                   |
|148   |492    |3.5   |4.4056106 |492 |Being John Malkovich|[{'id': 14, 'name': 'Fantasy'}, {'id': 18, 'name': 'Drama'}, {'id': 35, 'name': 'Comedy'}]                                  |
|148   |590    |3.5   |4.1352367 |590 |114.0               |[{'id': 18

### Prediction Performance 

The RMSE with 100,004 data points is 1.1244220. 

Adding additional data points (26,024,289) is expected to increase the prediction performance. Run this notebook with the full dataset to see the lift.

In [30]:
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating')
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

                                                                                

Root-mean-square error = 1.0390645705770263


### Predictions

In [31]:
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
userRecs.show(10)



+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|     1|[{7008, 6.8439064...|
|     3|[{6216, 9.520026}...|
|     5|[{2290, 7.371956}...|
|     6|[{3090, 9.517046}...|
|     9|[{5114, 7.3316774...|
|    12|[{1150, 7.3790197...|
|    13|[{3090, 6.4471984...|
|    15|[{1590, 8.017656}...|
|    16|[{5617, 6.8824153...|
|    17|[{188, 6.165929},...|
+------+--------------------+
only showing top 10 rows



                                                                                

In [32]:
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)

movieRecs.show(10, truncate=False)



+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|movieId|recommendations                                                                                                                                                                   |
+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1      |[{337, 5.6704106}, {473, 5.5976715}, {617, 5.4034953}, {217, 5.34975}, {357, 5.2569284}, {4, 5.2108073}, {225, 5.1925864}, {128, 5.1544576}, {232, 5.126755}, {287, 5.094413}]    |
|12     |[{629, 12.973161}, {289, 9.611979}, {375, 9.51248}, {20, 8.343504}, {477, 8.069855}, {317, 7.975387}, {410, 7.8204756}, {389, 7.3303685}, {611, 7.1621103}, {244, 7.0875344}]     |
|13     |[{336, 7.1685023}, {290, 6.9668965}, {288, 6.8

                                                                                

In [33]:
# Generate top 10 movie recommendations for a specified set of users
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)

userSubsetRecs.show(10, truncate=False)



+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                                                                                                          |
+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|360   |[{3414, 7.6182785}, {2946, 7.324641}, {3266, 7.269505}, {4062, 6.939024}, {121231, 6.740911}, {1554, 6.6971397}, {4914, 6.579885}, {41285, 6.48893}, {38038, 6.485614}, {3083, 6.379064}]|
|246   |[{2946, 8.88727}, {1683, 8.264869}, {67255, 7.782461}, {2285, 7.6378}, {176, 7.400325}, {3730, 7.1559973}, {506, 7.049885}, {90439, 6.9712877}, {1411, 6.9615927}, {3159, 6.941833}]     |
|346   |[{2929, 8.580865}

                                                                                

In [34]:
# Generate top 10 user recommendations for a specified set of movies
movies = ratings.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)

movieSubSetRecs.show(10, truncate=False)



+-------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|movieId|recommendations                                                                                                                                                                |
+-------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|6620   |[{375, 9.0726185}, {336, 8.100001}, {151, 7.347838}, {228, 6.849182}, {498, 6.802775}, {290, 6.6789436}, {477, 6.6496553}, {586, 6.5666456}, {314, 6.5135317}, {116, 6.504161}]|
|44022  |[{356, 8.431391}, {260, 8.297907}, {568, 7.1829295}, {145, 6.93806}, {451, 6.8901567}, {317, 6.810429}, {415, 6.677039}, {337, 6.513944}, {112, 6.378457}, {46, 6.378193}]     |
|3918   |[{568, 5.2515535}, {46, 4.8189974}, {144, 4.7611456}, {629, 4

                                                                                