In [1]:
import findspark

findspark.init()

In [2]:
from pyspark.mllib.recommendation import ALS
from pyspark import SparkContext
import math
import time

In [3]:
sc = SparkContext.getOrCreate()

In [4]:
movie_rating = sc.textFile("ml-latest-small/ratings.csv")

In [5]:
header = movie_rating.take(1)[0]

In [6]:
rating_data = movie_rating.filter(lambda x: x!= header).map( lambda x : x.split(",")).map(lambda x : x)

In [7]:
rating_data.take(3)

[['1', '31', '2.5'], ['1', '1029', '3'], ['1', '1061', '3']]

## Splitting data into Train, Validate and Test

In [8]:
train, validation, test = rating_data.randomSplit([6,2,2], seed =0)

In [9]:
rank=10
numIterations=10
als = ALS.train(train, rank , numIterations)


### Grid search for optimal hyperparameter tuning

In [16]:
#RMSE algorithm

def GridSearch(train, valid, num_iteration, reg_param, n_factors):
    min_rmse = float('inf')
    best_n =-1
    best_reg =0
    best_model = None
    for n in n_factors:
        for reg in reg_param:
            model = ALS.train(train, rank =n , iterations = num_iteration, lambda_=reg, seed=0) # lambda_ = reg
            predictions = model.predictAll(valid.map(lambda x: (x[0], x[1])))
            predictions = predictions.map(lambda x: ((x[0], x[1]), x[2]))
            rate_and_preds = valid.map(lambda x: ((int(x[0]), int(x[1])), float(x[2]))).join(predictions)
            rmse = math.sqrt(rate_and_preds.map(lambda x : (x[1][0] -x[1][1])**2).mean())
            print("{} latest factors and regularization = {}: validation RMSE is {}".format(n, reg, rmse))
            if rmse< min_rmse:
                min_rmse =rmse
                best_n=n
                best_reg= reg
                best_model= model
        
    pred = best_model.predictAll(train.map(lambda x: (x[0], x[1])))
    pred = pred.map(lambda x: ((x[0], x[1]), x[2]))
    rate_and_preds = train.map(lambda x: ((int(x[0]), int(x[1])), float(x[2]))).join(pred)
    train_rmse = math.sqrt(rate_and_preds.map(lambda x: (x[1][0]-x[1][1])**2).mean())
    print("\n the best model has {} latent factors and regularization ={}:".format(best_n,best_reg))
    print("training RMSE is {}, validation RMSE is {} ".format(train_rmse,min_rmse))
    return best_model
    
    





In [17]:
num_iterations =10
ranks = [6,8,10,12]
reg_params = [0.05, 0.1, 0.2, 0.4, 0.8]

start_time= time.time()
final_model = GridSearch(train,validation, num_iterations, reg_params, ranks)
print("Total Runtime: {:.2f} seconds".format(time.time() - start_time ))

6 latest factors and regularization = 0.05: validation RMSE is 1.0141539026898878
6 latest factors and regularization = 0.1: validation RMSE is 0.9450772741033104
6 latest factors and regularization = 0.2: validation RMSE is 0.9203991361125834
6 latest factors and regularization = 0.4: validation RMSE is 0.9852541026748688
6 latest factors and regularization = 0.8: validation RMSE is 1.2000721165162
8 latest factors and regularization = 0.05: validation RMSE is 1.0246771901319585
8 latest factors and regularization = 0.1: validation RMSE is 0.9422120199717055
8 latest factors and regularization = 0.2: validation RMSE is 0.9170344047486733
8 latest factors and regularization = 0.4: validation RMSE is 0.9845811895009963
8 latest factors and regularization = 0.8: validation RMSE is 1.200070747425822
10 latest factors and regularization = 0.05: validation RMSE is 1.0399681062346595
10 latest factors and regularization = 0.1: validation RMSE is 0.9460051885286055
10 latest factors and regul

In [18]:
num_iterations =15
ranks = [7,8,9]
reg_params = [0.1,0.2,0.3]

start_time= time.time()
final_model = GridSearch(train,validation, num_iterations, reg_params, ranks)
print("Total Runtime: {:.2f} seconds".format(time.time() - start_time ))

7 latest factors and regularization = 0.1: validation RMSE is 0.94669873280125
7 latest factors and regularization = 0.2: validation RMSE is 0.9206103372383931
7 latest factors and regularization = 0.3: validation RMSE is 0.944117625029372
8 latest factors and regularization = 0.1: validation RMSE is 0.9427505864714627
8 latest factors and regularization = 0.2: validation RMSE is 0.9180383380705001
8 latest factors and regularization = 0.3: validation RMSE is 0.9433937077037492
9 latest factors and regularization = 0.1: validation RMSE is 0.9406410163852678
9 latest factors and regularization = 0.2: validation RMSE is 0.9183827511526049
9 latest factors and regularization = 0.3: validation RMSE is 0.943426615985445

 the best model has 8 latent factors and regularization =0.2:
training RMSE is 0.6914707673568733, validation RMSE is 0.9180383380705001 
Total Runtime: 316.90 seconds


## RMSE on test data

In [20]:
predictions = final_model.predictAll(test.map(lambda x: (x[0], x[1]))) 
predictions = predictions.map(lambda x: ((x[0], x[1]), x[2]))
rates_and_preds = test.map(lambda x: ((int(x[0]), int(x[1])), float(x[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda x: (x[1][0] - x[1][1])**2).mean())
print("The testing RMSE is "+ str(error))

The testing RMSE is 0.9236071516191982
