In [1]:
# Required imports
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel
from pyspark.sql import Row
from pyspark.sql.types import *

import math
import numpy as np
import pandas as pd
import time

In [2]:
# Location of our data
ratings_loc = '/FileStore/tables/ratings.csv'
movies_loc = '/FileStore/tables/movies.csv'

In [3]:
def getDataframe(file_location):
  """
  Generate a dataframe from a location in the databricks environment
  """
  # File location and type
  file_type = "csv"

  # CSV options
  infer_schema = "false"
  first_row_is_header = "true"
  delimiter = ","

  # The applied options are for CSV files. For other file types, these will be ignored.
  return spark.read.format(file_type) \
    .option("inferSchema", infer_schema) \
    .option("header", first_row_is_header) \
    .option("sep", delimiter) \
    .load(file_location)

In [4]:
# Load the data
ratings = getDataframe(ratings_loc)
movies = getDataframe(movies_loc)

### Preprocessing the data for the model

In [6]:
# Reload the data
movie_rating = sc.textFile(ratings_loc)
movies_list = sc.textFile(movies_loc)

# Remove the timestamp column from the ratings data
header = movie_rating.take(1)[0]
rating_data = movie_rating \
    .filter(lambda line: line!=header) \
    .map(lambda line: line.split(",")) \
    .map(lambda tokens: (int(tokens[0]), int(tokens[1]), float(tokens[2]))) \
    .cache()

# Remove the genre column from the movies data
header = movies_list.take(1)[0]
movies_data = movies_list \
    .filter(lambda line: line!=header) \
    .map(lambda line: line.split(",")) \
    .map(lambda tokens: (int(tokens[0]), str(tokens[1]))) \
    .cache()

In [7]:
# Splitting the dataframe into a training and a test part. 
training, validation, test = rating_data.randomSplit([0.8, 0.2, 0.2], seed=12345)

training.cache()
validation.cache()
test.cache()

### Train the Alternative Least Squares model including hyperparameter tuning

In [9]:
def train_ALS(train_data, validation_data, maxIters, regParams, ranks):
  
    # initial values
    min_error = float('inf')
    best_iters = -1
    best_rank = -1
    best_regularization = 0
    best_model = None
    
    for iteration in maxIters: # Loop through all iteration possibilities
      for rank in ranks: # Loop through all rank possibilities
        for reg in regParams: # Loop through all lambda possibilities
          
          # Record the start time of a run
          start = time.time() 
          
          # Train ALS model
          model = ALS.train(ratings=train_data, iterations=iteration, rank=rank, lambda_=reg, seed=12345)
          
          # Make predictions
          valid_data = validation_data.map(lambda p: (p[0], p[1]))
          predictions = model.predictAll(valid_data).map(lambda r: ((r[0], r[1]), r[2]))
          
          # Combine the ratings and the predictions
          ratings_predicts = validation_data.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
            
          # Get the Root Mean Square Error
          MSE = ratings_predicts.map(lambda r: (r[1][0] - r[1][1])**2).mean()
          error = math.sqrt(MSE)
        
          # Calculate the time elapsed in this run
          elapsed = time.time() - start
          
          print('{} latent factors with max iterations = {} and regularization = {}: validation RMSE is {}. Duration: {} seconds'\
                .format(rank, iteration, reg, error, elapsed))
          
          # Check if the current run is the best 
          if error < min_error:
            min_error = error
            best_iters = iteration
            best_rank = rank
            best_regularization = reg
            best_model = model
  
    print('\nThe best model has {} latent factors, {} max iterations and regularization = {}'\
          .format(best_rank, best_iters, best_regularization))
    
    return best_model

### Hyperparameter tuning for Alternative Least Squares
1. We define some parameter configurations for the function to run through
2. We train the ALS model for every possible combination of configurations
3. The function returns the model with the ideal combination of parameters
4. We save the model so we can easily use it later

In [11]:
# Hyper-param config
num_iterations = [10, 15, 20, 25]
ranks = [10, 15, 20, 25, 30]
reg_params = [0.01, 0.05, 0.1, 0.2, 0.3]

# Grid search and select best model
result = train_ALS(training, validation, num_iterations, reg_params, ranks)
best_model = result

# Save the model so we can use it later
best_model.save(sc, "/model/ALS")

### Test the model with the test dataset

In [13]:
# Load the model
model = MatrixFactorizationModel.load(sc, "/model/ALS")

# Make a prediction using test data
test_data = test.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(test_data).map(lambda r: ((r[0], r[1]), r[2]))

# Combine the ratings and the predictions
ratings_predicts = test.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)

# Get the Root Mean Square Error
MSE = ratings_predicts.map(lambda r: (r[1][0] - r[1][1])**2).mean()
error = math.sqrt(MSE)

print('The out-of-sample RMSE of rating predictions is', round(error, 4))

### Recommending N movies for a specific user

In [15]:
def getRecommendation(userId, n_recommendations=10):
    model = MatrixFactorizationModel.load(sc, "/model/ALS")

    # Find movieIds of rated movies by user x
    rated_movies = ratings.filter(ratings.userId == userId).select("movieId").collect()
    
    # Get the list of movies without the rated movies by user x
    movieId_list = []
    for movie in rated_movies:
      movieIds = movies \
          .filter(movies.movieId == movie.movieId) \
          .select('movieId') \
          .rdd \
          .map(lambda r: r[0]) \
          .collect()
      movieId_list.extend(movieIds)
     
    movieId_list = list(set(movieId_list))
    
    candidates = movies.rdd \
        .map(lambda r: r[0]) \
        .distinct() \
        .filter(lambda x: x not in movieId_list) \
        .map(lambda x: (userId, x))
    
    # Predict the movies that should be recommended to user x
    predictions = model.predictAll(candidates).map(lambda r: (r[1], r[2]))

    # Get the top N
    topn_rows = predictions.sortBy(lambda r: r[1], ascending=False).take(n_recommendations)
    topn_ids = [r[0] for r in topn_rows]

    # Return the top N recommended movies
    return movies.filter(movies.movieId.isin(topn_ids)).collect()

In [16]:
display(getRecommendation(10))

movieId,title,genres
3216,"Vampyros Lesbos (Vampiras, Las) (1971)",Fantasy|Horror|Thriller
5059,Little Dieter Needs to Fly (1997),Documentary
7074,"Navigator, The (1924)",Comedy
8609,Our Hospitality (1923),Comedy
31547,Lessons of Darkness (Lektionen in Finsternis) (1992),Documentary|War
83318,"Goat, The (1921)",Comedy
83359,"Play House, The (1921)",Comedy
83411,Cops (1922),Comedy
92494,Dylan Moran: Monster (2004),Comedy|Documentary
97957,Excision (2012),Crime|Drama|Horror|Thriller
