In [13]:
import os
import time

# spark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import UserDefinedFunction, explode, desc
from pyspark.sql.types import StringType, ArrayType
from pyspark.mllib.recommendation import ALS

# data science imports
import math
import numpy as np
import pandas as pd



In [14]:
spark = SparkSession \
    .builder \
    .appName("movie recommendation") \
    .config("spark.driver.maxResultSize", "96g") \
    .config("spark.driver.memory", "96g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.master", "local[12]") \
    .getOrCreate()
# get spark context
sc = spark.sparkContext

In [15]:

movies = spark.read.load('movies.csv', format='csv', header=True, inferSchema=True)
ratings = spark.read.load('ratings.csv', format='csv', header=True, inferSchema=True)
links = spark.read.load('links.csv', format='csv', header=True, inferSchema=True)
tags = spark.read.load('tags.csv', format='csv', header=True, inferSchema=True)


In [14]:
import pyspark
from pyspark.mllib.recommendation import ALS,MatrixFactorizationModel,Rating
from pyspark.sql import Row
from collections import namedtuple
input_path='ratings.csv'
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('recommender').getOrCreate()
data = spark.read.format("csv").option("header","true").load(input_path)
parts = data.rdd
ratingsRDD = parts.map(lambda p:Row(userId=int(p[0]), movieId=int(p[1]),
                                    
                                    rating=float(p[2])))
ratings= ratingsRDD.toDF()
#print(type(ratings))
ratings.take(3)

[Row(movieId=296, rating=5.0, userId=1),
 Row(movieId=306, rating=3.5, userId=1),
 Row(movieId=307, rating=5.0, userId=1)]

In [5]:
movie_rating = sc.textFile( 'ratings.csv')
header = movie_rating.take(1)[0]

rating_data = movie_rating \
    .filter(lambda line: line!=header) \
    .map(lambda line: line.split(",")) \
    .map(lambda tokens: (int(tokens[0]), int(tokens[1]), float(tokens[2]))) \
    .cache()
# check three rows
rating_data.take(3)

[(1, 296, 5.0), (1, 306, 3.5), (1, 307, 5.0)]

In [7]:
train, validation, test = rating_data.randomSplit([6, 2, 2], seed=99)
# cache data
train.cache()
validation.cache()
test.cache()

PythonRDD[47] at RDD at PythonRDD.scala:53

ALS model selection and evaluation

With the ALS model, we can use a grid search to find the optimal hyperparameters.

In [8]:
def train_ALS(train_data, validation_data, num_iters, reg_param, ranks):
    """
    Grid Search Function to select the best model based on RMSE of hold-out data
    """
    # initial
    min_error = float('inf')
    best_rank = -1
    best_regularization = 0
    best_model = None
    for rank in ranks:
        for reg in reg_param:
            # train ALS model
            model = ALS.train(
                ratings=train_data,    # (userID, productID, rating) tuple
                iterations=num_iters,
                rank=rank,
                lambda_=reg,           # regularization param
                seed=99)
            # make prediction
            valid_data = validation_data.map(lambda p: (p[0], p[1]))
            predictions = model.predictAll(valid_data).map(lambda r: ((r[0], r[1]), r[2]))
            # get the rating result
            ratesAndPreds = validation_data.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
            # get the RMSE
            MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
            error = math.sqrt(MSE)
            print('{} latent factors and regularization = {}: validation RMSE is {}'.format(rank, reg, error))
            if error < min_error:
                min_error = error
                best_rank = rank
                best_regularization = reg
                best_model = model
    print('\nThe best model has {} latent factors and regularization = {}'.format(best_rank, best_regularization))
    return best_model


num_iterations = 10
ranks = [14, 16, 18, 20]
reg_params = [0.001, 0.01, 0.05, 0.1, 0.2]

# grid search and select best model
start_time = time.time()
final_model = train_ALS(train, validation, num_iterations, reg_params, ranks)

print ('Total Runtime: {:.2f} seconds'.format(time.time() - start_time))

14 latent factors and regularization = 0.001: validation RMSE is 0.8751816247548452
14 latent factors and regularization = 0.01: validation RMSE is 0.8269324555975437
14 latent factors and regularization = 0.05: validation RMSE is 0.793413480916171
14 latent factors and regularization = 0.1: validation RMSE is 0.8059576187209331
14 latent factors and regularization = 0.2: validation RMSE is 0.8575592844864997
16 latent factors and regularization = 0.001: validation RMSE is 0.8872740154732262
16 latent factors and regularization = 0.01: validation RMSE is 0.8324322208201042
16 latent factors and regularization = 0.05: validation RMSE is 0.7929360428269683
16 latent factors and regularization = 0.1: validation RMSE is 0.8074505448147691
16 latent factors and regularization = 0.2: validation RMSE is 0.8587026827943351
18 latent factors and regularization = 0.001: validation RMSE is 0.897715109950989
18 latent factors and regularization = 0.01: validation RMSE is 0.8361875034757653
18 late

I ran the tune_ALS function in two parts and in the first part I considered the latent factors like [8,10,12] with respective regularization terms and got the following results and yet found that best model has 18 latent factors and reg term of 0.05, as it has the lowest RMSE 

These are the results of the latent factors like [8,10,12]:


8 latent factors and regularization = 0.001: validation RMSE is 0.8410059570570684
8 latent factors and regularization = 0.01: validation RMSE is 0.8152786348189429
8 latent factors and regularization = 0.05: validation RMSE is 0.8028483085096276
8 latent factors and regularization = 0.1: validation RMSE is 0.8097231653772525
8 latent factors and regularization = 0.2: validation RMSE is 0.8570296996940661
10 latent factors and regularization = 0.001: validation RMSE is 0.850314971518225
10 latent factors and regularization = 0.01: validation RMSE is 0.818705066471433
10 latent factors and regularization = 0.05: validation RMSE is 0.7983040640059628
10 latent factors and regularization = 0.1: validation RMSE is 0.8078940255789443
10 latent factors and regularization = 0.2: validation RMSE is 0.8575775776051373
12 latent factors and regularization = 0.001: validation RMSE is 0.859409779870995
12 latent factors and regularization = 0.01: validation RMSE is 0.8214563848677076
12 latent factors and regularization = 0.05: validation RMSE is 0.7938363743722037
12 latent factors and regularization = 0.1: validation RMSE is 0.804776318537385




In [20]:
# get ALS model
maxIter = 10
setRank = 12
reg = 0.2
model = ALS.train(
    ratings=train,    # (userID, productID, rating) tuple
    iterations=maxIter,
    rank=setRank,
    lambda_=reg,           # regularization param
    seed=99)
# make prediction
valid_data = validation.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(valid_data).map(lambda r: ((r[0], r[1]), r[2]))
# get the rating result
ratesAndPreds = validation.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
# get the RMSE
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
error = math.sqrt(MSE)
print('{} latent factors and regularization = {}: validation RMSE is {}'.format(setRank, reg, error))

12 latent factors and regularization = 0.2: validation RMSE is 0.8567586006292939


In [21]:
def get_movieId(df_movies, fav_movie_list):
    """
    return all movieId(s) of user's favorite movies
    
    Parameters
    ----------
    df_movies: spark Dataframe, movies data
    
    fav_movie_list: list, user's list of favorite movies
    
    Return
    ------
    movieId_list: list of movieId(s)
    """
    movieId_list = []
    for movie in fav_movie_list:
        movieIds = df_movies \
            .filter(movies.title.like('%{}%'.format(movie))) \
            .select('movieId') \
            .rdd \
            .map(lambda r: r[0]) \
            .collect()
        movieId_list.extend(movieIds)
    return list(set(movieId_list))


def add_new_user_to_data(train_data, movieId_list, spark_context):
    """
    add new rows with new user, user's movie and ratings to
    existing train data

    Parameters
    ----------
    train_data: spark RDD, ratings data
    
    movieId_list: list, list of movieId(s)

    spark_context: Spark Context object
    
    Return
    ------
    new train data with the new user's rows
    """
    # get new user id
    new_id = train_data.map(lambda r: r[0]).max() + 1
    # get max rating
    max_rating = train_data.map(lambda r: r[2]).max()
    # create new user rdd
    user_rows = [(new_id, movieId, max_rating) for movieId in movieId_list]
    new_rdd = spark_context.parallelize(user_rows)
    # return new train data
    return train_data.union(new_rdd)


def get_inference_data(train_data, df_movies, movieId_list):
    """
    return a rdd with the userid and all movies (except ones in movieId_list)

    Parameters
    ----------
    train_data: spark RDD, ratings data

    df_movies: spark Dataframe, movies data
    
    movieId_list: list, list of movieId(s)

    Return
    ------
    inference data: Spark RDD
    """
    # get new user id
    new_id = train_data.map(lambda r: r[0]).max() + 1
    # return inference rdd
    return df_movies.rdd \
        .map(lambda r: r[0]) \
        .distinct() \
        .filter(lambda x: x not in movieId_list) \
        .map(lambda x: (new_id, x))


def make_recommendation(best_model_params, ratings_data, df_movies, 
                        fav_movie_list, n_recommendations, spark_context):
    """
    return top n movie recommendation based on user's input list of favorite movies


    Parameters
    ----------
    best_model_params: dict, {'iterations': iter, 'rank': rank, 'lambda_': reg}

    ratings_data: spark RDD, ratings data

    df_movies: spark Dataframe, movies data

    fav_movie_list: list, user's list of favorite movies

    n_recommendations: int, top n recommendations

    spark_context: Spark Context object

    Return
    ------
    list of top n movie recommendations
    """
    # modify train data by adding new user's rows
    movieId_list = get_movieId(df_movies, fav_movie_list)
    train_data = add_new_user_to_data(ratings_data, movieId_list, spark_context)
    
    # train best ALS
    model = ALS.train(
        ratings=train_data,
        iterations=best_model_params.get('iterations', None),
        rank=best_model_params.get('rank', None),
        lambda_=best_model_params.get('lambda_', None),
        seed=99)
    
    # get inference rdd
    inference_rdd = get_inference_data(ratings_data, df_movies, movieId_list)
    
    # inference
    predictions = model.predictAll(inference_rdd).map(lambda r: (r[1], r[2]))
    
    # get top n movieId
    topn_rows = predictions.sortBy(lambda r: r[1], ascending=False).take(n_recommendations)
    topn_ids = [r[0] for r in topn_rows]
    
    # return movie titles
    return df_movies.filter(movies.movieId.isin(topn_ids)) \
                    .select('title') \
                    .rdd \
                    .map(lambda r: r[0]) \
                    .collect()

In [22]:

# my favorite movies
my_favorite_movies = ['Iron Man']

# get recommends
recommends = make_recommendation(
    best_model_params={'iterations': 10, 'rank': 20, 'lambda_': 0.05}, 
    ratings_data=rating_data, 
    df_movies=movies, 
    fav_movie_list=my_favorite_movies, 
    n_recommendations=10, 
    spark_context=sc)

print('Recommendations for {}:'.format(my_favorite_movies[0]))
for i, title in enumerate(recommends):
    print('{0}: {1}'.format(i+1, title))

Recommendations for Iron Man:
1: Jack-O (1995)
2: Dragon Fist (Long quan) (1979)
3: Enemies of Reason, The (2007)
4: Secrets Of State (2008)
5: Levitated Mass (2013)
6: Being in the World (2009)
7: Dara O'Briain Crowd Tickler (2015)
8: Midnight Diner (2014)
9: A Question of Faith (2017)
10: Towed in a Hole (1932)
