In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
import pandas as pd
import numpy as np
import itertools
import os

In [2]:
# Starting a spark session locally
spark = SparkSession \
    .builder \
    .appName("als-reco") \
    .getOrCreate()

In [3]:
movies_file_path = "data/movies_metadata.csv"
df_movies = spark.read.csv(movies_file_path, header=True).select('id', 'title') \
            .withColumn('id', col('id').cast('int'))

df_movies = df_movies.filter(df_movies['id'].isNotNull())

In [4]:
# There are movie title which are corrupted. Let's find them
df_movies.filter(df_movies['title'].rlike("\[*\]")).toPandas().head()

Unnamed: 0,id,title
0,31357,"[{'iso_639_1': 'en', 'name': 'English'}]"
1,11443,"[{'iso_639_1': 'en', 'name': 'English'}]"
2,807,"[{'iso_639_1': 'en', 'name': 'English'}]"
3,32646,"[{'iso_3166_1': 'GB', 'name': 'United Kingdom'..."
4,139408,"[{'iso_639_1': 'hu', 'name': 'Magyar'}, {'iso_..."


In [5]:
# Filtering out the corrupted rows
df_movies = df_movies.filter(~df_movies['title'].rlike("\[*\]"))

In [6]:
def count_nans(df):
    return df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).toPandas()
    
def count_nulls(df):
    return df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).toPandas()

In [7]:
count_nans(df_movies)

Unnamed: 0,id,title
0,0,0


In [8]:
count_nulls(df_movies)

Unnamed: 0,id,title
0,0,0


In [6]:
# There are a few duplicate ids and titles
# movie_met.dropDuplicates(['id']).groupBy('id').count().orderBy('count', ascending=False).toPandas()
df_movies = df_movies.dropDuplicates(['id'])

In [6]:
df_movies.createOrReplaceTempView('movies')

In [8]:
ratings_file_path = "data/ratings_small.csv"
df_ratings = spark.read.csv(ratings_file_path, header=True).select('userId', 'movieId', 'rating') \
            .withColumn('userId', col('userId').cast('int')) \
            .withColumn('movieId', col('movieId').cast('int')) \
            .withColumn('rating', col('rating').cast('int')) \

df_ratings = df_ratings.na.drop()
# df_ratings = df_ratings.filter(df_ratings['userId'].isNotNull())
# df_ratings = df_ratings.filter(df_ratings['movieId'].isNotNull())
# df_ratings = df_ratings.filter(df_ratings['rating'].isNotNull())

In [11]:
df_ratings.limit(5).toPandas()

Unnamed: 0,userId,movieId,rating
0,1,31,2
1,1,1029,3
2,1,1061,3
3,1,1129,2
4,1,1172,4


In [10]:
(df_training, df_test) = df_ratings.randomSplit([0.8, 0.2])

In [13]:
df_training.limit(5).toPandas()

Unnamed: 0,userId,movieId,rating
0,1,31,2
1,1,1029,3
2,1,1061,3
3,1,1129,2
4,1,1172,4


In [14]:
def vary_max_iter(max_iter_list):
    rmse_dict = dict.fromkeys(max_iter_list)
    for max_iter in max_iter_list:
        print(f"Max iter: {max_iter}")
        als = ALS(rank=10, maxIter=max_iter, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")
        model = als.fit(df_training)
        predictions = model.transform(df_test)
        rmse_dict[max_iter] = evaluator.evaluate(predictions)
    return rmse_dict

In [15]:
def vary_rank(rank_list):
    rmse_dict = dict.fromkeys(rank_list)
    for rank in rank_list:
        print(f"Rank: {rank}")
        als = ALS(rank=rank, maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")
        model = als.fit(df_training)
        predictions = model.transform(df_test)
        rmse_dict[rank] = evaluator.evaluate(predictions)
    return rmse_dict

In [16]:
def vary_reg_param(reg_param_list):
    rmse_dict = dict.fromkeys(reg_param_list)
    for reg_param in reg_param_list:
        print(f"Reg param: {reg_param}")
        als = ALS(rank=10, maxIter=5, regParam=reg_param, userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")
        model = als.fit(df_training)
        predictions = model.transform(df_test)
        rmse_dict[reg_param] = evaluator.evaluate(predictions)
    return rmse_dict

In [8]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

def validate_get_rmse(model, test):
    predictions = model.transform(df_test)
    rmse = evaluator.evaluate(predictions)
    return rmse

In [9]:
def optimise_params(numIter_list, rank_list, regParam_list):
    rmse_min = float('inf')
    for rank, regParam, numIter in itertools.product(rank_list, regParam_list, numIter_list):
        als = ALS(rank=rank, maxIter=numIter, regParam=regParam, userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")
        model = als.fit(df_training)
        rmse = validate_get_rmse(model, df_test)
        print(f"Rank = {rank}, Reg param = {regParam}, numIter = {numIter}: RMSE = {rmse}")
        if rmse < rmse_min:
            rmse_min = rmse
            rank_best = rank
            regParam_best = regParam
            numIter_best = numIter

    print(f"Best rank: {rank_best}\nBest Reg param: {regParam_best}\nBest iter max: {numIter_best}")

In [51]:
numIter_list = [1,5,10,20]
rank_list = [5,10,20,30]
regParam_list = [0.001, 0.01, 0.1, 0.5, 0.9]

optimise_params(numIter_list, rank_list, regParam_list)

In [54]:
numIter_list = [5, 10, 20]
rank_list = [10, 20, 30, 40]
regParam_list = [0.01, 0.05, 0.1, 0.25, 0.5]

optimise_params(numIter_list, rank_list, regParam_list)

Rank = 10, Reg param = 0.01, numIter = 5: RMSE = 1.1617932978547596
Rank = 10, Reg param = 0.01, numIter = 10: RMSE = 1.2005721146645645
Rank = 10, Reg param = 0.01, numIter = 20: RMSE = 1.2339603937404398
Rank = 10, Reg param = 0.05, numIter = 5: RMSE = 1.014909873227744
Rank = 10, Reg param = 0.05, numIter = 10: RMSE = 1.01817212652199
Rank = 10, Reg param = 0.05, numIter = 20: RMSE = 1.011609243336826
Rank = 10, Reg param = 0.1, numIter = 5: RMSE = 0.9479697106211431
Rank = 10, Reg param = 0.1, numIter = 10: RMSE = 0.9479587065233994
Rank = 10, Reg param = 0.1, numIter = 20: RMSE = 0.9471767379658366
Rank = 10, Reg param = 0.25, numIter = 5: RMSE = 0.9524489863980619
Rank = 10, Reg param = 0.25, numIter = 10: RMSE = 0.9469375500772483
Rank = 10, Reg param = 0.25, numIter = 20: RMSE = 0.9460415790003626
Rank = 10, Reg param = 0.5, numIter = 5: RMSE = 1.0441125386217338
Rank = 10, Reg param = 0.5, numIter = 10: RMSE = 1.0485777865060522
Rank = 10, Reg param = 0.5, numIter = 20: RMSE =

In [19]:
numIter_list = [10, 20, 25]
rank_list = [30, 40, 50]
regParam_list = [0.05, 0.1, 0.25]

optimise_params(numIter_list, rank_list, regParam_list)

Rank = 30, Reg param = 0.05, numIter = 10: RMSE = 1.0363285416493484
Rank = 30, Reg param = 0.05, numIter = 20: RMSE = 1.021219894540249
Rank = 30, Reg param = 0.05, numIter = 25: RMSE = 1.0169836757081714
Rank = 30, Reg param = 0.1, numIter = 10: RMSE = 0.9514801948347762
Rank = 30, Reg param = 0.1, numIter = 20: RMSE = 0.9502141100558108
Rank = 30, Reg param = 0.1, numIter = 25: RMSE = 0.9501671022369136
Rank = 30, Reg param = 0.25, numIter = 10: RMSE = 0.9559648332228158
Rank = 30, Reg param = 0.25, numIter = 20: RMSE = 0.9540615341537515
Rank = 30, Reg param = 0.25, numIter = 25: RMSE = 0.9540077048648247
Rank = 40, Reg param = 0.05, numIter = 10: RMSE = 1.0370421683408315
Rank = 40, Reg param = 0.05, numIter = 20: RMSE = 1.0139594412839907
Rank = 40, Reg param = 0.05, numIter = 25: RMSE = 1.0077092684812672
Rank = 40, Reg param = 0.1, numIter = 10: RMSE = 0.9509024613491506
Rank = 40, Reg param = 0.1, numIter = 20: RMSE = 0.9496179626541684
Rank = 40, Reg param = 0.1, numIter = 25

In [11]:
# Optimal model

als = ALS(rank=40, maxIter=20, regParam=0.1, userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")
model = als.fit(df_training)

In [12]:
# Generate top 10 movie recommendations for each user
# userRecs = model.recommendForAllUsers(10).toPandas().set_index('userId')
userRecs = model.recommendForAllUsers(10)

In [18]:
df_links = spark.read.csv("data/links_small.csv", header=True).select('movieId', 'tmdbId') \
            .withColumn('movieId', col('movieId').cast('int')) \
            .withColumn('tmdbId', col('tmdbId').cast('int'))

df_links = df_links.na.drop()

In [14]:
def movieId_to_title_sql(movieId):
    query = f"""
            SELECT title
            FROM movies
                JOIN links
                ON movies.id = links.tmdbId
            WHERE links.movieId = {movieId}
            """
    movie_name = spark.sql(query).collect()[0].title
    return movie_name

# This is really really slow.

In [20]:
def movieId_to_title(movieId):
    tmdbId = df_links.where(df_links['movieId'] == movieId).collect()[0].tmdbId
    try:
        title = df_movies.where(df_movies['id'] == tmdbId).collect()[0].title
    except IndexError:
        title = ''
    return title

In [15]:
reco_mov = [row.movieId for row in userRecs.select('recommendations').where(userRecs['userId'] == 1).collect()[0][0]]

In [68]:
# reco_mov = [row.movieId for row in userRecs.loc[2]['recommendations']]

In [16]:
reco_mov

[1172, 73290, 83411, 67504, 83318, 2105, 1221, 1953, 1956, 551]

In [62]:
movieId_to_title(83318)

'The Goat'

In [21]:
# Turns out some movies were removed in the cleaning stage
# These were therefore present in the rating DF but not in the movies DF
# It's only corrupted datapoints
for mov in map(movieId_to_title, reco_mov):
    print(mov)

Cinema Paradiso
Hachi: A Dog's Tale
Cops
Land of Silence and Darkness
The Goat

The Godfather: Part II

Ordinary People
The Nightmare Before Christmas
