In [1]:
import os
import sys
import itertools
from math import sqrt
from operator import add
from os.path import join, isfile, dirname

from pyspark import SparkConf, SparkContext
from pyspark.mllib.recommendation import ALS

In [2]:
def parseRating(line):
  return long(line['timestamp']) % 10, (int(line['userId']), int(line['movieId']), float(line['rating']))
  return line

def parseMovie(line):
  return int(line['movieId']), line['movieTitle']

def loadRatings(line):
  return int(line['userId']), int(line['movieId']), float(line['rating'])

def computeRmse(model, data, n):
  predictions = model.predictAll(data.map(lambda x: (x[0], x[1])))
  predictionsAndRatings = predictions.map(lambda x: ((x[0], x[1]), x[2])) \
    .join(data.map(lambda x: ((x[0], x[1]), x[2]))) \
    .values()
  return sqrt(predictionsAndRatings.map(lambda x: (x[0] - x[1]) ** 2).reduce(add) / float(n))

In [3]:
myRatings = sqlContext.sql("select userId, movieId, rating from personalrating").rdd.map(loadRatings)
ratings = sqlContext.sql("select * from rating").rdd.map(parseRating)
movies = dict(sqlContext.sql("select * from movie").rdd.map(parseMovie).collect())

numRatings = ratings.count()
numUsers = ratings.values().map(lambda r: r[0]).distinct().count()
numMovies = ratings.values().map(lambda r: r[1]).distinct().count()

print "Got %d ratings from %d users on %d movies." % (numRatings, numUsers, numMovies)

In [4]:
numPartitions = 4
training = ratings.filter(lambda x: x[0] < 6) \
  .values() \
  .union(myRatings) \
  .repartition(numPartitions) \
  .cache()

validation = ratings.filter(lambda x: x[0] >= 6 and x[0] < 8) \
  .values() \
  .repartition(numPartitions) \
  .cache()

test = ratings.filter(lambda x: x[0] >= 8).values().cache()

numTraining = training.count()
numValidation = validation.count()
numTest = test.count()

print "Training: %d, validation: %d, test: %d" % (numTraining, numValidation, numTest)

In [5]:
ranks = [8, 12]
lambdas = [0.1, 10.0]
numIters = [10, 20]
bestModel = None
bestValidationRmse = float("inf")
bestRank = 0
bestLambda = -1.0
bestNumIter = -1

for rank, lmbda, numIter in itertools.product(ranks, lambdas, numIters):
    model = ALS.train(training, rank, numIter, lmbda)
    validationRmse = computeRmse(model, validation, numValidation)
    print "RMSE (validation) = %f for the model trained with " % validationRmse + \
          "rank = %d, lambda = %.1f, and numIter = %d." % (rank, lmbda, numIter)
    if (validationRmse < bestValidationRmse):
        bestModel = model
        bestValidationRmse = validationRmse
        bestRank = rank
        bestLambda = lmbda
        bestNumIter = numIter

testRmse = computeRmse(bestModel, test, numTest)

# evaluate the best model on the test set
print "The best model was trained with rank = %d and lambda = %.1f, " % (bestRank, bestLambda) \
  + "and numIter = %d, and its RMSE on the test set is %f." % (bestNumIter, testRmse)

In [6]:
meanRating = training.union(validation).map(lambda x: x[2]).mean()
baselineRmse = sqrt(test.map(lambda x: (meanRating - x[2]) ** 2).reduce(add) / numTest)
improvement = (baselineRmse - testRmse) / baselineRmse * 100
print "The best model improves the baseline by %.2f" % (improvement) + "%."

In [7]:
from pyspark.sql import Row

myRatingsArr = myRatings.collect()
myRatedMovieIds = set([x[1] for x in myRatingsArr])
candidates = sc.parallelize([m for m in movies if m not in myRatedMovieIds])
predictions = bestModel.predictAll(candidates.map(lambda x: (0, x))).collect()
recommendations = sorted(predictions, key=lambda x: x[2], reverse=True)[:50]

rec_disp = []
print "Movies recommended for you:"
for i in xrange(len(recommendations)):
  print ("%2d: %s" % (i + 1, movies[recommendations[i][1]])).encode('ascii', 'ignore')
  rec_disp.append((Row(key=movies[recommendations[i][1]], value=recommendations[i][2])))

In [8]:
rec = []
for i in range(10):
  rec.append(Row(key=movies[recommendations[i][1]], value=recommendations[i][2]))
display(rec)

In [9]:
rat = sqlContext.sql("select userId from rating").rdd.map(tuple).collect()

In [10]:
user_top = []
for row in rat:
  if row[0] in user_rat:
    user_rat[row[0]] += 1
  else:
    user_rat[row[0]] = 1

In [11]:
from operator import itemgetter
from pyspark.sql import Row
list = []
for key in user_rat:
  list.append([key, user_rat[key]])
list = sorted(list, key=itemgetter(1))
list = map(lambda x: Row(Id=x[0],Count=x[1]), list)

In [12]:
display(list)

In [13]:
recTable = sqlContext.createDataFrame(sc.parallelize(rec_disp))
recTable.registerTempTable("recommendations")
display(sqlContext.sql("select * from recommendations"))