In this notebook, we will use an RDD-based API from [pyspark.mllib](https://spark.apache.org/docs/2.1.1/mllib-collaborative-filtering.html) instead and compare the performance and runtime with those in [Spark_MovieLens.ipynb](Spark_MovieLens.ipynb). **This notebook has to run in Spark environment.**

In [1]:
from pyspark.mllib.recommendation import ALS
import math
import time

Let's load "ratings.csv" using ``sc.textFile`` and then convert it to the form of (user, item, rating) tuples.

In [2]:
movie_rating = sc.textFile("ml-latest-small/ratings.csv")

In [3]:
header = movie_rating.take(1)[0]
rating_data = movie_rating.filter(lambda x: x!=header).map(lambda x: x.split(",")).map(lambda x: (x[0],x[1],x[2]))

In [4]:
# check three rows
rating_data.take(3)

[(u'1', u'31', u'2.5'), (u'1', u'1029', u'3.0'), (u'1', u'1061', u'3.0')]

I will split the data into training/validation/testing sets using a 60/20/20 split.

In [5]:
train, validation, test = rating_data.randomSplit([6, 2, 2], seed = 0)

Let's again use the same grid search to find the optimal hyperparameters of the model.

In [6]:
# Define a function to perform grid search and find the best ALS model
# based on the validation RMSE

def GridSearch(train, valid, num_iterations, reg_param, n_factors):
    min_rmse = float('inf')
    best_n = -1
    best_reg = 0
    best_model = None
    for n in n_factors:
        for reg in reg_param:
            model = ALS.train(train, rank = n, iterations = num_iterations, lambda_ = reg, seed = 0)
            predictions = model.predictAll(valid.map(lambda x: (x[0], x[1])))
            predictions = predictions.map(lambda x: ((x[0], x[1]), x[2]))
            rate_and_preds = valid.map(lambda x: ((int(x[0]), int(x[1])), float(x[2]))).join(predictions)
            rmse = math.sqrt(rate_and_preds.map(lambda x: (x[1][0] - x[1][1])**2).mean())
            print '{} latent factors and regularization = {}: validation RMSE is {}'.format(n, reg, rmse)
            if rmse < min_rmse:
                min_rmse = rmse
                best_n = n
                best_reg = reg
                best_model = model
                
    pred = best_model.predictAll(train.map(lambda x: (x[0], x[1])))
    pred = pred.map(lambda x: ((x[0], x[1]), x[2]))
    rate_and_preds = train.map(lambda x: ((int(x[0]), int(x[1])), float(x[2]))).join(pred)
    train_rmse = math.sqrt(rate_and_preds.map(lambda x: (x[1][0] - x[1][1])**2).mean())               
    print '\nThe best model has {} latent factors and regularization = {}:'.format(best_n, best_reg)
    print 'traning RMSE is {}; validation RMSE is {}'.format(train_rmse, min_rmse)
    return best_model

In [7]:
num_iterations = 10
ranks = [6, 8, 10, 12]
reg_params = [0.05, 0.1, 0.2, 0.4, 0.8]

start_time = time.time()
final_model = GridSearch(train, validation, num_iterations, reg_params, ranks)
print 'Total Runtime: {:.2f} seconds'.format(time.time() - start_time)

6 latent factors and regularization = 0.05: validation RMSE is 1.01785508876
6 latent factors and regularization = 0.1: validation RMSE is 0.95260048447
6 latent factors and regularization = 0.2: validation RMSE is 0.930242493613
6 latent factors and regularization = 0.4: validation RMSE is 0.992676760763
6 latent factors and regularization = 0.8: validation RMSE is 1.20267669575
8 latent factors and regularization = 0.05: validation RMSE is 1.02799666376
8 latent factors and regularization = 0.1: validation RMSE is 0.95051746407
8 latent factors and regularization = 0.2: validation RMSE is 0.928105681593
8 latent factors and regularization = 0.4: validation RMSE is 0.992166572865
8 latent factors and regularization = 0.8: validation RMSE is 1.2026779671
10 latent factors and regularization = 0.05: validation RMSE is 1.04031892715
10 latent factors and regularization = 0.1: validation RMSE is 0.954383495189
10 latent factors and regularization = 0.2: validation RMSE is 0.928274141014
1

Wow! The runtime is about 74 seconds, much faster than the dataframe-based approach. Again, the model with 8 latent factors and lambda = 0.2 yields the best result. Let's do a second grid search around these values with more iterations (15) for each model.

In [8]:
num_iterations = 15
ranks = [7, 8, 9]
reg_params = [0.1, 0.2, 0.3]

final_model = GridSearch(train, validation, num_iterations, reg_params, ranks)

7 latent factors and regularization = 0.1: validation RMSE is 0.953764517796
7 latent factors and regularization = 0.2: validation RMSE is 0.930742215865
7 latent factors and regularization = 0.3: validation RMSE is 0.953111214918
8 latent factors and regularization = 0.1: validation RMSE is 0.948017777334
8 latent factors and regularization = 0.2: validation RMSE is 0.928782688855
8 latent factors and regularization = 0.3: validation RMSE is 0.952880120979
9 latent factors and regularization = 0.1: validation RMSE is 0.952357433726
9 latent factors and regularization = 0.2: validation RMSE is 0.929219565168
9 latent factors and regularization = 0.3: validation RMSE is 0.952972698613

The best model has 8 latent factors and regularization = 0.2:
traning RMSE is 0.685661645411; validation RMSE is 0.928782688855


The model with 8 latent factors and lambda = 0.2 is still the best one. And finally, let's check the testing error.

In [9]:
predictions = final_model.predictAll(test.map(lambda x: (x[0], x[1]))) 
predictions = predictions.map(lambda x: ((x[0], x[1]), x[2]))
rates_and_preds = test.map(lambda x: ((int(x[0]), int(x[1])), float(x[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda x: (x[1][0] - x[1][1])**2).mean())
print 'The testing RMSE is ' + str(error)

The testing RMSE is 0.921089888718
