In [None]:
# Import required dependencies
# use SQLContext to enable simplified querying of the data
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
import json

# Load the training dataset by specifying its location
trainingRawData = sqlContext.read.json('../Datasets/Amazon/path/to/your/data.json')
# Extract only the required columns, user ID (reviewerID), product ID (asin), product rating (overall) required by ALS.
# Store the data as rdd
trainingData = trainingRawData.select(trainingRawData['reviewerID'],trainingRawData['asin'],trainingRawData['overall']).rdd

# Calculate average rating of the dataset
overallRatingSum = trainingData.map(lambda l: l[2]).sum()
mu = float(overallRatingSum/trainingData.count())

# Load the test dataset and extract required fields
testRawData = sqlContext.read.json('../Datasets/Amazon/path/to/your/data.json')
testData = testRawData.select(testRawData['reviewerID'],testRawData['asin'],testRawData['overall']).rdd

# ReviewerID and productID are in string format, ALS requires them to be integers, hence a dictionary needs to be created
# to map string ID to unique Integer ID

# Get all the data from training and test dataset
allData = trainingData.union(testData)

#Identify unqiue reviewer and product IDs
orgReviewerIDs = allData.map(lambda l:l[0]).distinct()
orgProductIDs = allData.map(lambda l:l[1]).distinct()

# Create a dicitionary of mapping after associating each entry with a unique integer
ReviewerIDsMapping = dict(orgReviewerIDs.zipWithUniqueId().collect())
ProductIDsMapping = dict(orgProductIDs.zipWithUniqueId().collect())

# Create RDD with the newly mapped data. training_RDD now contains integer value for reveiwer ID and product ID and original rating value
training_RDD = trainingData.map(lambda l:(ReviewerIDsMapping[l[0]],ProductIDsMapping[l[1]], l[2]))

# Extract validation data from training data randomly and cache it for faster processing
validation_RDD = training_RDD.sample(False, 0.2, seed = 23).cache()
test_RDD = testData.map(lambda l:(ReviewerIDsMapping[l[0]],ProductIDsMapping[l[1]], l[2])).cache()

# Find user bias by calculating the average rating given by a user to all products
userRatingData = training_RDD.map(lambda l: (l[0],[l[2]])).reduceByKey(lambda x,y : x+y)
userRatingData = userRatingData.map(lambda l: (l[0], sum(l[1])/len(l[1])))
bu = dict(userRatingData.collect())

# Find product bias by calculating the average rating given to a product by all users
productRatingData = training_RDD.map(lambda l: (l[1],[l[2]])).reduceByKey(lambda x,y : x+y)
productRatingData = productRatingData.map(lambda l: (l[0], sum(l[1])/len(l[1])))
bi = dict(productRatingData.collect())

# Remove the bias from the original rating
training_RDD = training_RDD.map(lambda l: (l[0], l[1], l[2]-mu -(bu[l[0]]-mu)-(bi[l[1]]-mu))).cache()

validation_RDD_for_predict = validation_RDD.map(lambda row: (row[0],row[1]))
test_RDD_for_predict = test_RDD.map(lambda row: (row[0],row[1]))

# Run ALS on unbiased data
from pyspark.mllib.recommendation import ALS
import math

seed = 5L
iterations = 10
regularization_parameter = 0.1
ranks = [10, 20]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

# training phase
min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations,lambda_=regularization_parameter)
    predictions = model.predictAll(validation_RDD_for_predict).map(lambda r: ((r[0], r[1]), r[2]+mu+(bu[r[0]]-mu)+(bi[r[1]]-mu)))
    
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print 'For rank %s the RMSE is %s' % (rank, error)
    if error < min_error:
        min_error = error
        best_rank = rank

print 'The best model was trained with rank %s' % best_rank

# testing phase
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
predictions = model.predictAll(test_RDD_for_predict).map(lambda r: ((r[0], r[1]), r[2]+mu+(bu[r[0]]-mu)+(bi[r[1]]-mu)))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())

print 'For testing data the RMSE is %s' % (error)

rates_and_preds.take(3)