# Setup

In [70]:
import os

In [71]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q Findspark
!update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"

In [72]:
os.environ["JAVA_HOME"]

'/usr/lib/jvm/java-8-openjdk-amd64'

In [73]:
os.environ["SPARK_HOME"]

'/content/spark-2.4.7-bin-hadoop2.7'

As we can see above, Java and Spark are configured properly and so we can now go ahead and start loading data.

In [74]:
import findspark
findspark.init()

In [75]:
#Start Apache Spark session and context
import pyspark
from pyspark.sql import SQLContext

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('BigDataProject').getOrCreate()

# Reading the data

In [77]:
from google.colab import drive
drive.mount('/content/drive');

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [78]:
!ls "drive/My Drive/Big_Data_Movie_Recommender"

Data  Results


In [8]:
DATA_PATH = "drive/My Drive/Big_Data_Movie_Recommender/Data"
RESULTS_PATH = "drive/My Drive/Big_Data_Movie_Recommender/Results"

In [79]:
ratings = spark.read.option("header", "true").csv(DATA_PATH+"/ratings.csv")
movies = spark.read.option("header", "true").csv(DATA_PATH+"/movies.csv")

# Recommendation System

### There are two main approaches to recommendation systems:
- Collaborative Filtering : Based on the assumption that people who agreed in the past will agree in the future, and that they will like similar kinds of items as they liked in the past. It does not rely on machine analyzable content and therefore, it is capable of accurately recommending complex items such as movies without requiring an "understanding" of the item itself.

- Content Based Filtering : Based on a description of the item and a profile of the user's preferences. These methods are best suited to situations where there is known domain knowledge, that is, data on an item (name, location, description, etc.), but not on the user. Content-based recommenders treat recommendation as a user-specific classification problem and learn a classifier for the user's likes and dislikes based on an item's features. In this system, keywords are used to describe the items and a user profile is built to indicate the type of item this user likes.



Our project only deals with the collaborative filtering approach. Content based filtering requires domain knowledge about the items at hand. The genome-scores.csv and genome-tags.csv files of the Movielens dataset contain this information. The relevance score of each movie with respect to each of the user-submitted 1129 tags has been given, which was precomputed with the help of machine learning, as described in [this paper](http://files.grouplens.org/papers/tag_genome.pdf). One could use an algorithm such as the tf–idf(Term Frequency - Inverse Document Frequency) representation (also called vector space representation) to create a content-based profile of users based on a weighted vector of item features, which here, are the tags. However, this is beyond the present scope and shall not be explored in this project.

## Collaborative Filtering

![image.png](attachment:image.png)

There are two kinds of collaborative filtering, as shown in the diagram:
- Memory-Based / Neighborhood-Based Approach: Rating data is used to compute the similarity between the users or items. It is a non-parametric approach, that is, we don't attempt to learn any parameter using an optimization algorithm. This algorithm  calculates the similarity between two users or items, and produces a prediction for the user by taking the weighted average of all the ratings. Similarity computation between items or users is an important part of this approach. Multiple measures, such as Pearson correlation and vector cosine based similarity are used for this.

    The Pearson correlation similarity of two users x, y is defined as:
    ![image.png](attachment:image.png)
    The cosine-based approach defines the cosine-similarity between two users x and y as:
    ![image-2.png](attachment:image-2.png)
    
    There are two subcategories to this approach are:
        1)Item-Item based filtering: “Users who liked this item also liked …”
        2)User-Item based Filtering: “Users who are similar to you also liked …”
        
    The disadvantage of this method is that its performance decreases for a sparse data, which hinders scalability of this approach for most of the real-world problems. 
    
    
- Model-Based Approach: Models are developed using machine learning algorithms to predict a user’s rating of unrated items. We can use matrix factorization algorithms such as Alternating Least Squares to split the ratings matrix into a user matrix and item matrix, where the preferences of the user and the properties of the item are defined by a number of hidden/latent factors.
    ![image-3.png](attachment:image-3.png)

In [9]:
from pyspark.ml.recommendation import ALS

In [None]:
from pyspark.sql.types import IntegerType
ratings = ratings.withColumn("userId",ratings['userId'].cast(IntegerType())) \
                   .withColumn("movieId",ratings['movieId'].cast(IntegerType())) \
                   .withColumn("rating",ratings['rating'].cast(IntegerType()))

In [None]:
df_train, df_test = (ratings.randomSplit([0.7, 0.3], seed = 1))

After splitting the ratings dataset into training and testing sets, we build the Alternating Least Squares Model. 

In [None]:
als = ALS(rank=10, maxIter=5, seed=0, userCol= "userId", itemCol= "movieId", ratingCol="rating")
    # Rank is the number of latent/hidden factors

Collaborative Filtering suffers from the Cold Start Problem, which describes the difficulty of making recommendations when the items are new. This is due to the lack of past history of interaction of the users with the item.

Content-based filtering is less prone to this problem because the recommendations are made based on the feature the items possess, so even if no interaction exists for a new item, its features will allow for a recommendation to be made.

Since the split was random, the testing dataset could have movies that have never been seen in the training dataset, as a result of which no predictions can be made for them. This, in turn, won't let us compute the regression metric scores later to evalute the prediction on the test dataset. Possible workarounds are to remove the entries for the items in the test dataset which have not been seen in the train dataset, or to impute their prediction to some average value. We have opted for the former, by setting the ColdStartStrategy of our model to 'drop'.

In [None]:
als.setColdStartStrategy('drop')

We define the Root Mean Square Error metric to evaluate our predictions.

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="rmse")

We now fit our ALS model to the train dataset.

In [None]:
als_model = als.fit(df_train)

The latent factors of each item(movie) and each user has been stored into the internal state of our model, so that it can be used to serve predictions on any other dataset

In [None]:
import pandas as pd
pd.set_option('display.max_colwidth',None)

In [34]:
als_model.itemFactors.toPandas()

Unnamed: 0,id,features
0,10,"[0.025030001997947693, -0.15706470608711243, -0.7125052213668823, 0.5534553527832031, -0.4547816216945648, -0.6850728392601013, -0.02034790813922882, 0.7225993871688843, 0.3338954746723175, 0.5894351601600647]"
1,20,"[-0.016555173322558403, 0.20084144175052643, -0.7855822443962097, 0.32502689957618713, -0.43446433544158936, -0.8430556654930115, -0.2627929449081421, 0.32083219289779663, 0.00761453527957201, 0.4773748815059662]"
2,30,"[-0.5490902066230774, -0.4576345980167389, -0.45613765716552734, 0.8040887713432312, -0.35709503293037415, -0.8311018347740173, -0.18651041388511658, 0.6749392151832581, 0.06978724151849747, -0.2113349586725235]"
3,40,"[-0.26931577920913696, -0.7781033515930176, -0.6253662705421448, 0.6695196628570557, -0.627173900604248, -0.8440384864807129, -0.5844224095344543, 0.2798468768596649, 0.17067402601242065, -0.21850799024105072]"
4,50,"[-0.1771986037492752, -0.4208698868751526, -0.7888234257698059, 0.9405423998832703, -0.1371450275182724, -0.9908616542816162, -0.017619265243411064, 0.9093695282936096, 0.28146111965179443, 0.08991490304470062]"
...,...,...
55120,209119,"[0.05715878680348396, -0.029551083222031593, -0.26570358872413635, 0.7993663549423218, -0.05546528473496437, -0.4180419147014618, -0.34509679675102234, 0.5956115126609802, 0.2083600014448166, 0.165347158908844]"
55121,209129,"[-0.14975421130657196, 0.09460978209972382, 0.15954715013504028, 1.10838782787323, -0.035872142761945724, -0.8518288135528564, -0.1743745654821396, 1.0619900226593018, 0.30995625257492065, 0.2502687871456146]"
55122,209139,"[-0.03968313708901405, 0.024427734315395355, -0.14824500679969788, 0.09329432994127274, 0.021550556644797325, -0.24797090888023376, -0.06280148029327393, 0.2760414183139801, 0.05714036896824837, 0.035864830017089844]"
55123,209159,"[-0.11231565475463867, 0.07095733284950256, 0.11966035515069962, 0.8312908411026001, -0.02690410614013672, -0.6388716101646423, -0.13078093528747559, 0.7964924573898315, 0.2324671894311905, 0.18770159780979156]"


We backup our model so that we can load it for future use.

In [None]:
from pyspark.ml.recommendation import ALSModel
als_model.save(RESULTS_PATH+"ALS_MovieLens_1")
als_model = ALSModel.load(RESULTS_PATH+"ALS_MovieLens_1")

Our ratings matrix is very sparse, as a result of which neighborhood based collaborative filtering will not be efficient. But we can apply our similarity metrics to the newly obtained itemFactors and userFactors datasets, which are complete matrices, to obtain recommendations based on item-item similarity and user-user similarity.

We opt for the cosine similarity metric which is a part of the scipy package. The output of this metric is 0 when the vectors being compared are the same. We register our user defined 'distance' function with the sparkContext in order to use it on pyspark dataframes.

In [69]:
from pyspark.sql.types import DoubleType
import numpy as np
import scipy
import scipy.spatial

def distance(v1, v2):
    v1 = np.array(v1)
    v2 = np.array(v2)
    return float(scipy.spatial.distance.cosine(v1, v2))

spark.udf.register("distance", distance, DoubleType())

We implement a function to recommend movies similar to a movie, by selecting those movies whose vectors in 10-D space are closest to the movie vector for which we want to make the recommendation.

In [None]:
def recommendation_by_i2i(movie_id):
    return (als_model
     .itemFactors
     .filter(F.col("id") == movie_id)
     .alias("t1")
     .crossJoin(als_model.itemFactors.alias("t2"))
     .withColumn("similarity", F.expr("distance(t1.features, t2.features)")) 
     .join(movies, F.col("t2.id") == F.col("movieId"))
     .orderBy(F.asc("similarity"))
     .select("movieId", "title", "similarity")
    )

Suppose a user really likes the movie _Blade Runner (1982)_(movieId=541). We now try to recommend movies similar to it. 

In [73]:
recommendation_by_i2i(541).show(20, False)

+-------+----------------------------------------------+--------------------+
|movieId|title                                         |similarity          |
+-------+----------------------------------------------+--------------------+
|541    |Blade Runner (1982)                           |0.0                 |
|1080   |Monty Python's Life of Brian (1979)           |0.012101173928981468|
|73914  |Sometimes a Great Notion (1970)               |0.01240904214166283 |
|6104   |Monty Python Live at the Hollywood Bowl (1982)|0.013113274085467697|
|5965   |Duellists, The (1977)                         |0.013625690509695643|
|1218   |Killer, The (Die xue shuang xiong) (1989)     |0.014535267911132355|
|148288 |Who Am I This Time? (1982)                    |0.015166192408404777|
|1136   |Monty Python and the Holy Grail (1975)        |0.015209555968390687|
|145755 |The Dark Glow of the Mountain (1985)          |0.015227092839303014|
|152292 |Mojin: The Lost Legend (2015)                 |0.015743

We observe that the recommendations are fairly relevant. For example, both _1984_ and _Blade Runner_ are movies set in future dystopias and _2001: A Space Odyssey_ and _Blade Runner_ are works of science fiction.

Now, we implement a function to find users similar to a particular user, i.e, the users for whom the vectors in 10-D space are closest to the vector of the user in question. All users who are within a 0.03 similarity are included and recommendations are made for the current user based on the top movies that the users similar to him have watched.

In [72]:
top_rated_movies_by_user = (ratings
                            .filter("rating = 4 or rating = 5 or rating = 4.5")
                            .groupBy("userId")
                            .agg(F.collect_set("movieId").alias("top_movies")))
top_rated_movies_by_user.show()

+------+--------------------+
|userId|          top_movies|
+------+--------------------+
|100010|[647, 1047, 1, 60...|
|100140|[2348, 2313, 1189...|
|100227|[6, 3, 662, 62, 7...|
|100263|[2105, 5995, 3300...|
|100320|[6873, 1719, 2160...|
|100553|[5995, 2160, 5679...|
|100704|[74458, 4886, 412...|
|100735|[110, 2000, 74458...|
|100768|[41, 306, 1450, 1...|
| 10096|[784, 832, 1, 839...|
|100964|[62081, 48744, 13...|
|101021|[2859, 858, 1950,...|
|101122|[60069, 64614, 40...|
|101205|[1, 352, 141, 307...|
|101261|[33794, 37380, 43...|
|101272|[189333, 140956, ...|
|102113|[74458, 81845, 33...|
|102521|[110, 350, 356, 2...|
|102536|[3, 783, 1, 141, ...|
|102539|[110, 185, 2231, ...|
+------+--------------------+
only showing top 20 rows



In [103]:
def recommendation_by_u2u(user_id):
    return (als_model
     .userFactors
     .filter(F.col("id") == user_id)
     .alias("top_rated_movies_by_user = (ratings
                            .filter("rating = 4 or rating = 5")
                            .groupBy("userId")
                            .agg(F.collect_set("movieId").alias("top_movies")))
top_rated_movies_by_user.show()t1")
     .crossJoin(als_model.itemFactors.alias("t2"))
     .withColumn("similarity", F.expr("distance(t1.features, t2.features)"))
     .filter("similarity < 0.03 and similarity > 0.0") # 0.02 Similarity threshold - a hyper parameter. We can perform tuning to find the suitable value
     .join(top_rated_movies_by_user.alias("t3"), F.col("t2.id") == F.col("t3.userId"))
     .select("t1.id", F.explode("top_movies").alias("movieId"))
     .join(movies, on = "movieId")
     .select("movieId", "title")
    )

In [72]:
top_rated_movies_by_user = (ratings
                            .filter("rating = 4 or rating = 5")
                            .groupBy("userId")
                            .agg(F.collect_set("movieId").alias("top_movies")))
top_rated_movies_by_user.show()

+------+--------------------+
|userId|          top_movies|
+------+--------------------+
|100010|[647, 1047, 1, 60...|
|100140|[2348, 2313, 1189...|
|100227|[6, 3, 662, 62, 7...|
|100263|[2105, 5995, 3300...|
|100320|[6873, 1719, 2160...|
|100553|[5995, 2160, 5679...|
|100704|[74458, 4886, 412...|
|100735|[110, 2000, 74458...|
|100768|[41, 306, 1450, 1...|
| 10096|[784, 832, 1, 839...|
|100964|[62081, 48744, 13...|
|101021|[2859, 858, 1950,...|
|101122|[60069, 64614, 40...|
|101205|[1, 352, 141, 307...|
|101261|[33794, 37380, 43...|
|101272|[189333, 140956, ...|
|102113|[74458, 81845, 33...|
|102521|[110, 350, 356, 2...|
|102536|[3, 783, 1, 141, ...|
|102539|[110, 185, 2231, ...|
+------+--------------------+
only showing top 20 rows



We now make recommendations for the userId=1.

In [112]:
recommendation_by_u2u(1).show(20, False)

+-------+---------------------------------------------------------+
|movieId|title                                                    |
+-------+---------------------------------------------------------+
|2160   |Rosemary's Baby (1968)                                   |
|1207   |To Kill a Mockingbird (1962)                             |
|1215   |Army of Darkness (1993)                                  |
|79132  |Inception (2010)                                         |
|589    |Terminator 2: Judgment Day (1991)                        |
|7445   |Man on Fire (2004)                                       |
|87192  |Attack the Block (2011)                                  |
|6385   |Whale Rider (2002)                                       |
|179135 |Blue Planet II (2017)                                    |
|2011   |Back to the Future Part II (1989)                        |
|57274  |[REC] (2007)                                             |
|64839  |Wrestler, The (2008)                   

In [106]:
ratings.join(movies,['movieId']).show(50,False)

+-------+------+------+----------+---------------------------------------------------------------------------------+-------------------------------------+
|movieId|userId|rating|timestamp |title                                                                            |genres                               |
+-------+------+------+----------+---------------------------------------------------------------------------------+-------------------------------------+
|296    |1     |5.0   |1147880044|Pulp Fiction (1994)                                                              |Comedy|Crime|Drama|Thriller          |
|306    |1     |3.5   |1147868817|Three Colors: Red (Trois couleurs: Rouge) (1994)                                 |Drama                                |
|307    |1     |5.0   |1147868828|Three Colors: Blue (Trois couleurs: Bleu) (1993)                                 |Drama                                |
|665    |1     |5.0   |1147878820|Underground (1995)                  

When compared to the movies that userId 1 has watched, these are fairly relevant recommendations, beloging to the drama/comedy/scifi genres.

We now use the model to obtain predictions on the train dataset and evaluate the predictions.

In [64]:
training_predictions_df = als_model.transform(df_train)
reg_eval.evaluate(training_predictions_df)

0.8193309774106281

In [40]:
training_predictions_df.show()

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
| 32855|    148|     4|1029309135| 2.3990471|
| 26480|    148|     2| 915406133| 1.9864883|
| 38199|    148|     2| 835601960| 2.4336302|
|159730|    148|     3| 842162037|  2.716928|
| 33354|    148|     3| 938886119| 2.6935844|
| 47989|    148|     2| 833173771| 2.9645228|
| 72337|    148|     2| 944246202|  2.777789|
|151614|    148|     1| 878170956|  2.731505|
|  5055|    148|     3| 842463284| 2.8496578|
|108767|    148|     3|1276969740| 2.5595648|
| 21531|    148|     3| 834035555| 3.0218282|
| 38679|    148|     3| 853421750| 2.5657732|
| 99684|    148|     3|1027645782| 2.9732146|
| 35969|    148|     2| 835094487|  2.794639|
| 54331|    148|     2| 954702916| 2.9416816|
| 77130|    148|     1| 831284829| 1.1798544|
| 29943|    148|     3|1049216998| 2.8596456|
|117168|    148|     4| 835820190| 3.2516797|
| 28229|    148|     1| 833850593|

In [65]:
validation_predictions_df = als_model.transform(df_test)

Since we configured our model to not predict the user ratings for the movies it hasn't encountered before, there are no NaN values in the prediction column, which lets us compute the RMSE metric to evaulate our model's performance.

In [66]:
validation_predictions_df.select([F.count(F.when(F.isnan(c), c)).alias(c) for c in validation_predictions_df.columns]).show()

+------+-------+------+---------+----------+
|userId|movieId|rating|timestamp|prediction|
+------+-------+------+---------+----------+
|     0|      0|     0|        0|         0|
+------+-------+------+---------+----------+



In [68]:
reg_eval.evaluate(validation_predictions_df)

0.8576732625272104

We observe that our model performs slightly worse on the test dataset but this is expected.

In [67]:
validation_predictions_df.show()

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
|     1|   1250|     4|1147868414| 3.3892672|
|     1|   2161|     3|1147868609| 2.9195266|
|     1|  27266|     4|1147879365| 3.3538353|
|     1|   2843|     4|1147868891|   3.80054|
|     1|    306|     3|1147868817| 4.1096406|
|     1|   3448|     4|1147868480|  3.211211|
|     1|   4308|     3|1147868534| 3.3079157|
|     1|   4973|     4|1147869080|  3.985975|
|     1|   5767|     5|1147878729| 3.6437132|
|     1|   5912|     3|1147878698| 3.2279365|
|     1|   5952|     4|1147868053| 3.3478596|
|     1|   6016|     5|1147869090| 3.8530746|
|     1|   6377|     4|1147868469|  3.213149|
|     1|   6539|     3|1147868461| 2.9053173|
|     1|   7234|     4|1147868869|  3.627199|
|     1|   7361|     5|1147880055|  3.841207|
|     1|   7938|     2|1147878063|  3.478118|
|     1|   8154|     5|1147868865| 3.4568613|
|    10|   1962|     3|1227570828|

Now, we make predictions for the ratings given by the user with Id = 1

In [None]:
predictions=als_model.transform(ratings)
predictions.createOrReplaceTempView("predictions_sql")
movies.createOrReplaceTempView("movies_sql")

We've converted the predictions and movies dataframes to SQL tables, which allows us to use Spark SQL in order to query the tables for the required data.

In [111]:
spark.sql(
"""select p.userId, p.movieId, p.rating, p.prediction, m.title, m.genres
from predictions_sql p
join movies_sql m on p.movieId=m.movieId
where p.userId == 1
order by p.prediction desc
""").show(20,False)

+------+-------+------+----------+----------------------------------------------------------------------------------------+-------------------------------------+
|userId|movieId|rating|prediction|title                                                                                   |genres                               |
+------+-------+------+----------+----------------------------------------------------------------------------------------+-------------------------------------+
|1     |306    |3     |4.1096406 |Three Colors: Red (Trois couleurs: Rouge) (1994)                                        |Drama                                |
|1     |307    |5     |4.0995245 |Three Colors: Blue (Trois couleurs: Bleu) (1993)                                        |Drama                                |
|1     |296    |5     |4.0422907 |Pulp Fiction (1994)                                                                     |Comedy|Crime|Drama|Thriller          |
|1     |4973   |4     |3.985

We now try to make recommendations for a user based on his IMDb ratings.

One can export his/her IMDb ratings by choosing the Export option as shown below. This results in a download prompt for a CSV file which contains all ratings with other details.
![image.png](attachment:image.png)

In [None]:
# Default Packages (available by Default in Google Colab)
import math
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import seaborn as sns
import random
from pprint import pprint
from matplotlib.lines import Line2D

# PySpark Utilities
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.mllib.evaluation import RegressionMetrics, RankingMetrics

# Random Seed
SEED = 1492

We have to preprocess the data to include the new user ratings in the dataset. There is no userId=0, so we assign 0 to be the new user. We also binarize the movie ratings, i.e, all ratings greater than 3 are set to 1, which indicates that the user enjoyed the movie and such movies are included for recommendation, while those below 3 are set to 0, which indicates that user disliked the movie.

# Preprocessing

In [8]:
class MovieLensDatasets(object):
    """
    Class for loading and preprocesing MovieLens Dataset
    """
    def __init__(self, ratings, movies, links, personalRatings, debug=True, debugLimit=10000):
        # Load Existing Data
        if debug:
            debugLimit = debugLimit
            ratings = ratings.limit(debugLimit)
        else:
            ratings = ratings      

        self.ratings = ratings
        self.movies = movies
        self.links = links
        self.personalRatings = personalRatings

        # Create New DataFrame
        users = ratings.select('userId').distinct()
        self.users = users

    def preprocessing(self):
        # Preprocess MovieLens Ratings
        self.ratings = self.ratings.withColumn('rating',
        F.col('rating').cast('double')).drop('timestamp') \
        .withColumn('userId', F.col('userId').cast('int')) \
        .withColumn('movieId', F.col('movieId').cast('int'))

        # Preprocess Personal IMDb Ratings To MovieLens Ratings
        self.personalRatings = self.personalRatings.select(['Const',
                                                'Your Rating']) \
        .withColumnRenamed('Const', 'imdbId') \
        .withColumnRenamed('Your Rating', 'personalRating')

        self.personalRatings = self.personalRatings \
        .withColumn('personalRating', F.col('personalRating').cast('double')*0.5) \
        .withColumn('imdbId', F.expr("substr(imdbId, 3)"))

        self.personalRatings = self.personalRatings.join(
        self.links.select('movieId', 'imdbId'), ['imdbId'], how='inner')

        # Insert IMDb Ratings into MovieLens Ratings Dataset
        self.personalRatings = self.personalRatings \
                    .withColumn('userId', F.lit('0'))
        self.personalRatings = self.personalRatings \
                    .select(['userId', 'movieId', 'personalRating']) \
                    .toDF('userId', 'movieId', 'rating')
        self.ratings = self.ratings.union(self.personalRatings)

        # Binarize MovieLens Ratings (if rating >= 3.0, then 1.0, else 0.0)
        udf_scale_ratings = F.udf(lambda x: x - 2.5, DoubleType())
        udf_binary_ratings = F.udf(lambda x: 1.0 if x > 0.0 else 0.0, DoubleType())

        self.ratings = self.ratings \
        .withColumn('ratingsScaled', udf_scale_ratings(F.col('rating'))) \
        .withColumn('ratingsBinary', udf_binary_ratings(F.col('ratingsScaled')))

    def get_ratings(self):
        return self.ratings

    def get_movies(self):
        return self.movies

    # Displaying Null Values
    def spark_df_display_null_values(sparkDf):
        print('NaN values ?')
        sparkDf.select([F.count(F.when(F.isnan(c), c)).alias(c) for c in sparkDf.columns]).show()

        print('Null values ?')
        sparkDf.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in sparkDf.columns]).show()

In [11]:
%%time
# Set to True if only testing
# For testing: use debug=True option (subset of all ratings)
debug = False

# Load the additional Datasets

links = sqlContext \
            .read.format('csv').option("header", "true") \
            .load(DATA_PATH + "/links.csv")

personalRatings = sqlContext \
            .read.format('csv').option("header", "true") \
            .load(DATA_PATH + "/personal_IMDB_ratings.csv")


# Data Preprocessing
movieLensDatasets = MovieLensDatasets(ratings=ratings, movies=movies,
                                      links=links,
                                      personalRatings=personalRatings,
                                      debug=debug)

movieLensDatasets.preprocessing()
dfRatings = movieLensDatasets.get_ratings() 
dfMovies = movieLensDatasets.get_movies()

CPU times: user 17 ms, sys: 6.39 ms, total: 23.3 ms
Wall time: 3.54 s


In [12]:
ratingsPrepare = dfRatings.withColumn("userId", F.col("userId").cast('int')) \
                          .withColumn("movieId", F.col("movieId").cast('int'))

dfRatingsTrain, dfRatingsTest = ratingsPrepare.randomSplit([0.7, 0.3],
                                                           seed=SEED)

In [14]:
class PopularityRecommenderModel(object):
    """
    Class for generating movie item recommendations based on movie popularity
    (from average ratings with logarithmic scaling factor that penalizes movies
    with few ratings). By default, only the top 10 rated movies are selected.
    """
    def __init__(self, ratingsTrain, ratingsTest):
        self.ratingsTrain = ratingsTrain
        self.ratingsTest = ratingsTest

    def generate_recommendations_for_all_users(self, topk=10):
        # Get list of top k rated movies
        topRated = self._get_top_k_best_movies(topk=topk)

        # Compare most popular movies with user's personal preferences
        # Important: only keep recommendations with rating over 3/5
        results = self.ratingsTest \
                 .filter(F.col("ratingsBinary")==1.0) \
                 .withColumn('movieId', F.col('movieId').cast('int')) \
                 .orderBy('rating', ascending=False) \
                 .groupby("userId") \
                 .agg(F.collect_list("movieId").alias('groundTruth')) \
                 .withColumn('predictions', F.array([F.lit(el)
                        for el in topRated])) \
                 .select(['userId', 'predictions', 'groundTruth'])

        return results

    def _get_top_k_best_movies(self, topk):

        # Function
        def _get_mean_rating_w_log_penalty(arr):
            sumRatings, nbRatings = arr[0], arr[1]
            grade = (sumRatings / nbRatings) * math.log(nbRatings)
            return grade

        # UDF
        udf_mean_rating_w_log_penalty = F.udf(
            lambda arr: _get_mean_rating_w_log_penalty(arr),
            DoubleType()
        )

        # Get top k Best Rated Movies
        ranking = self.ratingsTrain.groupBy('movieId') \
              .agg({'rating': 'sum', 'userId': 'count'}) \
              .toDF('movieId', 'sumRating', 'nbRatings')

        ranking = ranking \
        .withColumn("meanLogUserRating",
         udf_mean_rating_w_log_penalty(F.array("sumRating", "nbRatings"))
         ) \
        .sort(['meanLogUserRating', 'nbRatings'], ascending=[False, True]) \
        .limit(topk)

        self.topRatedMovies = ranking
        ratingsTop = [int(row.movieId) for row in ranking.collect()]

        return ratingsTop

In [None]:
def format_recommendations(rowPreds):
    # From recommendations column extract only items
    # recommended and ignore ratings
    rowPredsList = [row.movieId for row in rowPreds]
    return rowPredsList

udf_format_recommendations = F.udf(lambda x: format_recommendations(x),
                                   ArrayType(IntegerType()))

In [15]:
%%time
pbrModel = PopularityRecommenderModel(dfRatingsTrain, dfRatingsTest)
resultsPop = pbrModel.generate_recommendations_for_all_users(topk=20)

CPU times: user 69.4 ms, sys: 12.4 ms, total: 81.8 ms
Wall time: 32.1 s


In [16]:
%%time
resultsPop.limit(5).orderBy('userId').show(5)

+------+--------------------+--------------------+
|userId|         predictions|         groundTruth|
+------+--------------------+--------------------+
|   148|[318, 296, 858, 5...|[858, 1089, 1136,...|
|   463|[318, 296, 858, 5...|[32, 648, 780, 78...|
|   471|[318, 296, 858, 5...|[95167, 2571, 521...|
|   496|[318, 296, 858, 5...|[3910, 44555, 122...|
|   833|[318, 296, 858, 5...|[45431, 46578, 98...|
+------+--------------------+--------------------+

CPU times: user 55.8 ms, sys: 30 ms, total: 85.7 ms
Wall time: 1min 33s


In [19]:
%%time
_ = pbrModel.generate_recommendations_for_all_users(topk=20)
pbrModel.topRatedMovies \
    .join(dfMovies.select(['movieId', 'title']), 'movieId') \
    .orderBy('meanLogUserRating', ascending=False) \
    .show(20, truncate=False)

+-------+---------+---------+------------------+------------------------------------------------------------------------------+
|movieId|sumRating|nbRatings|meanLogUserRating |title                                                                         |
+-------+---------+---------+------------------+------------------------------------------------------------------------------+
|318    |251444.5 |56981    |48.32200642241223 |Shawshank Redemption, The (1994)                                              |
|296    |233476.0 |55743    |45.77335501425202 |Pulp Fiction (1994)                                                           |
|858    |159121.0 |36803    |45.45532488277934 |Godfather, The (1972)                                                         |
|50     |165948.5 |38720    |45.27630330905071 |Usual Suspects, The (1995)                                                    |
|527    |179594.0 |42294    |45.23353708263301 |Schindler's List (1993)                                 

The popularity model recommender can be used to solve the cold start problem caused by new users with no previous user interaction to give recommendations.

# ALS
We perform ALS as we've already done above.

In [20]:
%%time
tempALS = ALS(maxIter=10, rank=10, regParam=0.1, nonnegative=True,
              userCol='userId', itemCol='movieId', ratingCol='rating',
              coldStartStrategy='drop', implicitPrefs=False, seed=SEED)

mlALSFitted = tempALS.fit(dfRatingsTrain)

CPU times: user 63.7 ms, sys: 44.8 ms, total: 108 ms
Wall time: 59.6 s


In [21]:
mlALSFitted.save(RESULTS_PATH+"/ALS_MovieLens_25M")

In [23]:
mlALSFitted = ALSModel.load(RESULTS_PATH+"/ALS_MovieLens_25M")

### RMSE

In [24]:
%%time
predictions = mlALSFitted.transform(dfRatingsTest)
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating',
                                predictionCol='prediction')
rmse = evaluator.evaluate(predictions)
print('RMSE (Test Set):', rmse)

RMSE (Test Set): 0.8147259683304076
CPU times: user 66.4 ms, sys: 25.4 ms, total: 91.8 ms
Wall time: 36.1 s


Now, we generate top 20 movie recommendations for all the users.

In [25]:
resultsALS = mlALSFitted.recommendForAllUsers(20)

resultsALS = resultsALS.withColumn('recommendations',
              udf_format_recommendations(F.col("recommendations"))) \
              .toDF('userId', 'predictions')

In [27]:
# Downloaded Packages (not available by Default)
import databricks.koalas


In [28]:
resultsALSExpanded = resultsALS \
                        .withColumn("movieId", F.explode("predictions")) \
                        .drop('predictions') \
                        .join(dfMovies, "movieId")

resultsALSKdf = resultsALSExpanded.to_koalas()

MostRecommendedMoviesForAllUsers = resultsALSKdf.groupby(["movieId", "title"])['userId'].count()
MostRecommendedMoviesForAllUsers = MostRecommendedMoviesForAllUsers.sort_values(ascending=False)

In [29]:
%%time
# These are the movies that have been recommended the most number of times to the users.
MostRecommendedMoviesForAllUsers.head(20)

CPU times: user 5.85 ms, sys: 744 µs, total: 6.59 ms
Wall time: 20.6 ms


movieId  title                                             
203882   Dead in the Water (2006)                              144255
183947   NOFX Backstage Passport 2                             142233
194434   Adrenaline (1990)                                     136780
196787   The Law and the Fist (1964)                           119244
165689   Head Trauma (2006)                                     98235
192089   National Theatre Live: One Man, Two Guvnors (2011)     96258
143422   2 (2007)                                               90441
166812   Seeing Red: Stories of American Communists (1983)      86752
117352   A Kind of America 2 (2008)                             84742
194334   Les Luthiers: El Grosso Concerto (2001)                82590
121919   The Good Mother (2013)                                 74576
128667   Wiseguy (1996)                                         74542
197355   Once Upon a Ladder (2016)                              72643
165559   Ο Θανάσης στη χώρα τη

In [37]:
%%time
predictionsPerso = resultsALS.filter(F.col("userId")==0) \
                             .select(F.explode("predictions") \
                             .alias("movieId")) \
                             .join(dfMovies.select(["movieId", "title"]),
                                   "movieId") \
                             .join(dfRatings.filter(F.col("userId")==0),
                                   ['movieId'], how='left')

predictionsPerso.select(["title"]).show(10, truncate=False)

+-------------------------------------------+
|title                                      |
+-------------------------------------------+
|The Country Cousin (1936)                  |
|Foster (2018)                              |
|Cássia (2015)                              |
|Insane (2016)                              |
|Olga (2004)                                |
|Argo (2004)                                |
|.hack Liminality In the Case of Yuki Aihara|
|NOFX Backstage Passport 2                  |
|.hack Liminality In the Case of Kyoko Tohno|
|Red, Honest, in Love (1984)                |
+-------------------------------------------+
only showing top 10 rows

CPU times: user 75.4 ms, sys: 36.3 ms, total: 112 ms
Wall time: 2min 43s


These are the predictions for the new user according to our recommendation model.

# Potential Improvements to the Recommender System:

- __Hybrid recommender systems__: The major weakness of collaborative filtering is that forgoing the actual characteristics of items (for movies, meta-information such as genres, actor/actresses, director, country of origin) hurts both recommender accuracy and interpretability of recommendations. In practice, industrial recommender systems use hybrid approaches that combine both user similarity (collaborative filtering) and item characteristics (content-based approach).
- __Reducing dataset size and user-item segmentation__: One reason why our collaborative filtering model struggled to generate sound recommendations for all users is that our model training included every user (e.g. single rating users, users with few ratings or many ratings). A more ideal strategy would be to segment our ratings data into seperate but homogeneous user datasets and train different recommender systems on those separate data chunks, potentially improving our results.