# CollaboratIve Filtering for Movie Recommandation
#### use Alternating Least Squares (ALS) to minimize loss function
dataset from https://grouplens.org/datasets/movielens/

In [1]:
import pyspark
import findspark
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import SQLContext
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType
from functools import reduce
from IPython.core.display import display, HTML


#### Initialization spark session

In [2]:
findspark.init()
pyspark.SparkContext.setSystemProperty('spark.executor.memory', '4g')
sc = pyspark.SparkContext(appName="Movie Recommandation")

#### load rating data from csv

In [3]:
schema = StructType([StructField("userId", IntegerType()),
                     StructField("movieId", IntegerType()),
                     StructField("rating", FloatType())])
ratings = SQLContext(sc).read.csv("ml-latest-small/ratings.csv", header=True, schema=schema)
ratings.show(5)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
+------+-------+------+
only showing top 5 rows



#### load movie title data

In [4]:
schema_movies = StructType([
    StructField("movieId", IntegerType()),
    StructField("title", StringType()),
    StructField("genres", StringType())
])
movies = SQLContext(sc).read.csv(
    "ml-latest-small/movies.csv", header=True, schema=schema_movies)
movies.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



#### load movie description link data

In [5]:
schema_links = StructType([
    StructField("movieId", IntegerType()),
    StructField("imdbId", StringType()),
    StructField("tmdbId", IntegerType())
])
links = SQLContext(sc).read.csv(
    "ml-latest-small/links.csv", header=True, schema=schema_links)
links.show(5)

+-------+-------+------+
|movieId| imdbId|tmdbId|
+-------+-------+------+
|      1|0114709|   862|
|      2|0113497|  8844|
|      3|0113228| 15602|
|      4|0114885| 31357|
|      5|0113041| 11862|
+-------+-------+------+
only showing top 5 rows



#### load movie youtube trainer links data

In [6]:
schema_youtube = StructType([
    StructField("youtubeId", StringType()),
    StructField("movieId", IntegerType()),
    StructField("title", StringType())
])
youtubes = SQLContext(sc).read.csv(
    "ml-latest-small/ml-youtube.csv", header=True, schema=schema_youtube)
youtubes.show(5)

+-----------+-------+--------------------+
|  youtubeId|movieId|               title|
+-----------+-------+--------------------+
|K26_sDKnvMU|      1|    Toy Story (1995)|
|3LPANjHlPxo|      2|      Jumanji (1995)|
|rEnOoWs3FuA|      3|Grumpier Old Men ...|
|j9xml1CxgXI|      4|Waiting to Exhale...|
|ltwvKLnj1B4|      5|Father of the Bri...|
+-----------+-------+--------------------+
only showing top 5 rows



## Build ALS model
##### split ratings with training and test data

In [7]:
training, test = ratings.randomSplit([0.9, 0.1])
# build ALS model with training data
# the coldStartStrategy parameter to “drop” in order to drop any rows in the DataFrame of predictions that contain NaN values. 
# The evaluation metric will then be computed over the non-NaN data and will be valid. 
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")
als_model = als.fit(training)

#### predict rating score and put it the new column "prediction" in the test data


In [8]:
predictions = als_model.transform(test)
predictions.show(5)

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   462|    471|   2.5| 3.1241457|
|   448|    471|   4.0| 3.6321914|
|   373|    471|   5.0| 3.7751102|
|   104|    471|   4.5|  3.216738|
|   463|   1088|   3.5|  3.062744|
+------+-------+------+----------+
only showing top 5 rows



#### Evaluat the performance of the model
use the default metric method rmse (Root-mean-square deviation)
$$\sqrt{\frac{1}{n}(rating - prediction)^2}$$

In [9]:
evaluator = RegressionEvaluator(labelCol="rating")
evaluator.evaluate(predictions)

0.8806013443477815

In [10]:
def get_recom_movies(userId, nb=5):
    """
    Get recommended movies dataframe for the specified users

    Parameters
    ----------
    userid : sigle user id or list of user id

    nb :number of recommended movies

    """
    if type(userId) == 'int':
        userId = [userId]
    user_col = als.getUserCol()
    df_users = ratings.select(user_col).distinct().filter(ratings[user_col].isin(userId))
    rec_movies = als_model.recommendForUserSubset(df_users, nb)
    users_movieIds = rec_movies.rdd.map(lambda x: [(x.userId, r.movieId) for r in x['recommendations']]).collect()                                                                                                                                                                                       
    rec_movie_ids = []
    for um in users_movieIds:
        rec_movie_ids.append(SQLContext(sc).createDataFrame(um, ['userId', 'movieId']))

    df_rec_movie_ids = reduce(lambda df1, df2: df1.union(df2), rec_movie_ids)
    df_rec_movies = df_rec_movie_ids.join(movies, 'movieId').join(links, 'movieId').join(
        youtubes, 'movieId').drop(youtubes.title).select("userId", "movieId", "title", "imdbId", "youtubeId")
    return df_rec_movies

In [11]:
def display_rec_movies(userId, nb=5):
    """
    display recommended movies in html

    Parameters
    ----------
    userid : sigle user id or list of user id

    nb :number of recommended movies

    """
    rec_movies = get_recom_movies(userId, nb)
    user_id =''
    for m in rec_movies.collect():
        if m.userId != user_id:
            user_id = m.userId
            display(HTML(f"<h3>Recommended movies for User ID: {user_id}</h3>"))
        display(HTML(f"<a href='https://www.imdb.com/title/tt{m.imdbId}' target='_blank'>{m.title}</a>  <a href='http://youtube.com/watch?v={m.youtubeId}' target='_blank'>trailer on YouTube</a>"))
            

# Input the user ids to get the list of Recommended movies
(example: 100, 200, 300, 400)

In [12]:
userIdInput = input("Input the user ID (sigle or list of ids) : ")
display_rec_movies([int(e.strip()) for e in userIdInput.split(',')])


Input the user ID (sigle or list of ids) : 100, 200, 300, 400


## other tests

In [13]:
# Generate top 5 movie recommendations for each user
userRecs = als_model.recommendForAllUsers(5)
userRecs.show(5, False)

+------+----------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                     |
+------+----------------------------------------------------------------------------------------------------+
|471   |[[6818, 5.1971087], [89904, 4.836401], [58301, 4.807467], [51931, 4.789848], [8477, 4.7536807]]     |
|463   |[[3379, 4.8360953], [171495, 4.8099704], [5075, 4.7839384], [33649, 4.7730894], [78836, 4.7462564]] |
|496   |[[89904, 5.220061], [6818, 4.8951616], [25771, 4.7698693], [176371, 4.7178593], [100714, 4.6885977]]|
|148   |[[98491, 4.602793], [183897, 4.5978146], [25906, 4.5014005], [77846, 4.5014005], [93008, 4.5014005]]|
|540   |[[26171, 5.312468], [32892, 5.261033], [3925, 5.2275634], [177593, 5.1534095], [60943, 5.0078473]]  |
+------+----------------------------------------------------------------------------------------------------+
only showi

In [14]:
# Generate top 5 user recommendations for each movie
movieRecs = als_model.recommendForAllItems(5)
movieRecs.show(5, False)

+-------+----------------------------------------------------------------------------------------+
|movieId|recommendations                                                                         |
+-------+----------------------------------------------------------------------------------------+
|1580   |[[53, 5.097166], [543, 4.7331495], [267, 4.622299], [452, 4.58258], [276, 4.5718994]]   |
|4900   |[[99, 4.6421227], [539, 4.440361], [574, 4.392588], [493, 4.31025], [73, 4.2963686]]    |
|5300   |[[236, 4.302127], [224, 4.0200305], [250, 4.0109334], [59, 3.9718616], [53, 3.9529269]] |
|6620   |[[360, 5.199749], [518, 4.869224], [393, 4.726391], [418, 4.710321], [430, 4.653871]]   |
|7340   |[[543, 4.4401903], [53, 4.3066406], [43, 4.1246996], [276, 3.9683414], [558, 3.8822365]]|
+-------+----------------------------------------------------------------------------------------+
only showing top 5 rows



In [15]:
# Generate top 5 movie recommendations for a specified set of users
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = als_model.recommendForUserSubset(users, 5)
userSubsetRecs.show(5, False)

+------+----------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                     |
+------+----------------------------------------------------------------------------------------------------+
|471   |[[6818, 5.1971087], [89904, 4.836401], [58301, 4.807467], [51931, 4.789848], [8477, 4.7536807]]     |
|463   |[[3379, 4.8360953], [171495, 4.8099704], [5075, 4.7839384], [33649, 4.7730894], [78836, 4.7462564]] |
|148   |[[98491, 4.602793], [183897, 4.5978146], [25906, 4.5014005], [77846, 4.5014005], [93008, 4.5014005]]|
+------+----------------------------------------------------------------------------------------------------+



In [16]:
# Generate top 5 user recommendations for a specified set of movies
movies = ratings.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = als_model.recommendForItemSubset(movies, 5)
movieSubSetRecs.show(5, False)

+-------+---------------------------------------------------------------------------------------+
|movieId|recommendations                                                                        |
+-------+---------------------------------------------------------------------------------------+
|1580   |[[53, 5.097166], [543, 4.7331495], [267, 4.622299], [452, 4.58258], [276, 4.5718994]]  |
|3175   |[[53, 4.9045753], [558, 4.6209235], [452, 4.524556], [246, 4.4747305], [276, 4.445344]]|
|2366   |[[236, 4.931757], [53, 4.6494646], [275, 4.470407], [276, 4.46451], [122, 4.4399204]]  |
+-------+---------------------------------------------------------------------------------------+

