In [2]:
import sys
import itertools
from math import sqrt
from operator import add
from os.path import join, isfile, dirname
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
from pyspark import SparkConf, SparkContext

In [10]:
def parse_load_sample_ratings(sampleRatingsFile):
    """
    Parses a rating record in format user,highlight,rating.
    """
    if not isfile(sampleRatingsFile):
        print "File %s does not exist." % sampleRatingsFile
        sys.exit(1)
    data = sc.textFile(sampleRatingsFile)
    sample_user_highlights = data.map(lambda l: l.split(',')).map(lambda l: (int(l[0]), int(l[1])))
    all_users = sample_user_highlights.map(lambda x: x[0]).distinct().zipWithIndex()
    all_highlights = sample_user_highlights.map(lambda x: x[1]).distinct().zipWithIndex()
    indexed_data = sample_user_highlights.leftOuterJoin(all_users) \
            .map(lambda x: x[1]) \
            .leftOuterJoin(all_highlights) \
            .map(lambda x: x[1])
    sample_ratings = indexed_data.groupBy(lambda x: x) \
        .map(lambda x: Rating(x[0][0], x[0][1], 1))
    if not sample_ratings:
        print "No sample ratings provided."
        sys.exit(1)
    else:
        return sample_ratings

In [4]:
 def parse_load_my_ratings(myRatingsFile):
    """
    Parses a rating record in format user,highlight,rating.
    """
    if not isfile(myRatingsFile):
        print "File %s does not exist." % myRatingsFile
        sys.exit(1)
    data = sc.textFile(myRatingsFile)    
    my_ratings = data.map(lambda l: l.split(',')).map(lambda l: (int(l[0]), int(l[1])))
    if not my_ratings:
        print "No my ratings provided."
        sys.exit(1)
    else:
        return my_ratings

In [5]:
def computeRmse(model, data, n):
    """
    Compute RMSE (Root Mean Squared Error).
    """
    predictions = model.predictAll(data.map(lambda x: (x[0], x[1])))
    predictionsAndRatings = predictions.map(lambda x: ((x[0], x[1]), x[2])) \
      .join(data.map(lambda x: ((x[0], x[1]), x[2]))) \
    .values()
    return sqrt(predictionsAndRatings.map(lambda x: (x[0] - x[1]) ** 2).reduce(add) / float(n))
  

In [11]:
sample_ratings_rdd = parse_load_sample_ratings("user_highlights.csv")
my_ratings_rdd = parse_load_my_ratings("my_ratings")
count_sample_ratings = sample_ratings_rdd.count()
count_sample_users = sample_ratings_rdd.map(lambda r: r[0]).distinct().count()
count_sample_highlights = sample_ratings_rdd.map(lambda r: r[1]).distinct().count()

In [12]:
# split ratings into train (60%), validation (20%), and test (20%) based on the 
# last digit of the timestamp, add myRatings to train, and cache them
# training, validation, test are all RDDs of (userId, movieId, rating)
numPartitions = 4
(training_data, validation_data, test_data) = sample_ratings_rdd.randomSplit([0.6, 0.2, 0.2])
training_ratings = training_data\
    .repartition(numPartitions)\
    .cache()

training_users = training_ratings.map(lambda x: x.user).distinct().map(lambda x: (x, 1))
training_products = training_ratings.map(lambda x: x.product).distinct().map(lambda x: (x, 1))
    
validation_ratings = validation_data.repartition(numPartitions) \
    .map(lambda x: (x.user, x))\
    .join(training_users)\
    .map(lambda x: x[1][0])\
    .map(lambda x: (x.product, x))\
    .join(training_products)\
    .map(lambda x: x[1][0])\
    .cache()
test_ratings = test_data.repartition(numPartitions)\
    .map(lambda x: (x.user, x))\
    .join(training_users)\
    .map(lambda x: x[1][0])\
    .map(lambda x: (x.product, x))\
    .join(training_products)\
    .map(lambda x: x[1][0])\
    .cache()
training_count = training_data.count()
validation_count = validation_data.count()
test_count = test_data.count()
print "Training: %d, validation: %d, test: %d" % (training_count, validation_count, test_count)

Training: 13721, validation: 4732, test: 4610


In [13]:
# train models and evaluate them on the validation set
ranks = [8, 12]
lambdas = [0.1, 10.0]
numIters = [10, 20]
bestModel = None
bestValidationRmse = float("inf")
bestRank = 0
bestLambda = -1.0
bestNumIter = -1

for rank, lmbda, numIter in itertools.product(ranks, lambdas, numIters):
        model = ALS.trainImplicit(training_ratings, rank, numIter, alpha=0.01)
        validationRmse = computeRmse(model, validation_ratings, validation_count)
        print "RMSE (validation) = %f for the model trained with " % validationRmse + \
              "rank = %d, lambda = %.1f, and numIter = %d." % (rank, lmbda, numIter)
        if (validationRmse < bestValidationRmse):
            bestModel = model
            bestValidationRmse = validationRmse
            bestRank = rank
            bestLambda = lmbda
            bestNumIter = numIter
testRmse = computeRmse(bestModel, test_ratings, test_count)

RMSE (validation) = 0.236121 for the model trained with rank = 8, lambda = 0.1, and numIter = 10.
RMSE (validation) = 0.236115 for the model trained with rank = 8, lambda = 0.1, and numIter = 20.
RMSE (validation) = 0.236083 for the model trained with rank = 8, lambda = 10.0, and numIter = 10.
RMSE (validation) = 0.236125 for the model trained with rank = 8, lambda = 10.0, and numIter = 20.
RMSE (validation) = 0.236058 for the model trained with rank = 12, lambda = 0.1, and numIter = 10.
RMSE (validation) = 0.235568 for the model trained with rank = 12, lambda = 0.1, and numIter = 20.
RMSE (validation) = 0.235737 for the model trained with rank = 12, lambda = 10.0, and numIter = 10.
RMSE (validation) = 0.235373 for the model trained with rank = 12, lambda = 10.0, and numIter = 20.


In [14]:
# evaluate the best model on the test set
print "The best model was trained with rank = %d and lambda = %.1f, " % (bestRank, bestLambda) \
      + "and numIter = %d, and its RMSE on the test set is %f." % (bestNumIter, testRmse)

The best model was trained with rank = 12 and lambda = 10.0, and numIter = 20, and its RMSE on the test set is 0.234164.


In [25]:
# compare the best model with a naive baseline that always returns the mean rating
meanRating = training_ratings.union(validation_ratings).map(lambda x: x[2]).mean()
baselineRmse = sqrt(test_ratings.map(lambda x: (meanRating - x[2]) ** 2).reduce(add) / test_count)
#improvement = (baselineRmse - testRmse) / baselineRmse * 100
#print "The best model improves the baseline by %.2f" % (improvement) + "%."
print meanRating 
print baselineRmse
#print improvement

1.0
0.0


In [19]:
def make_personalized_recommendations(user_number):
    # make personalized recommendations
    data = sc.textFile("user_highlights.csv")
    sample_user_highlights = data.map(lambda l: l.split(',')).map(lambda l: (int(l[0]), int(l[1])))
    all_users = sample_user_highlights.map(lambda x: x[0]).distinct().zipWithIndex()
    all_highlights = sample_user_highlights.map(lambda x: x[1]).distinct().zipWithIndex()
    indexed_data = sample_user_highlights.leftOuterJoin(all_users) \
    
    user_list = all_users.collect()
    user_dictionary =(dict((x, y) for x, y in user_list))
    user_index = user_dictionary[user_number]
    highlight_list = all_highlights.collect()
    highlight_dictionary =(dict((y, x) for x, y in highlight_list))
    recommendations = bestModel.call("recommendProducts", user_index, 10)
    print "Highlights recommended for you:"
    for i in xrange(len(recommendations)):
        print ("%2d: %s" % (i + 1, highlight_dictionary[recommendations[i][1]])).encode('ascii', 'ignore')

In [20]:
make_personalized_recommendations(2981892)

Highlights recommended for you:
 1: 208057095
 2: 217712375
 3: 213477376
 4: 118520378
 5: 198813381
 6: 179148376
 7: 98022386
 8: 105772376
 9: 217113552
10: 133578378
