In [1]:
import os
import sys
spark_path = "E:/spark"
os.environ['SPARK_HOME'] = spark_path
os.environ['HADOOP_HOME'] = spark_path
os.environ['PYSPARK_PYTHON'] = sys.executable
sys.path.append(spark_path + "/bin")
sys.path.append(spark_path + "/python")
sys.path.append(spark_path + "/python/pyspark/")
sys.path.append(spark_path + "/python/lib")
sys.path.append(spark_path + "/python/lib/pyspark.zip")
sys.path.append(spark_path + "/python/lib/py4j-0.10.4-src.zip")
from pyspark import SparkContext
from pyspark import SparkConf

sc = SparkContext("local", "test")

print sc

<pyspark.context.SparkContext object at 0x0000000005F03AC8>


In [2]:
import itertools
from math import sqrt
from operator import add
from os.path import join, isfile, dirname

from pyspark import SparkConf, SparkContext
from pyspark.mllib.recommendation import ALS

In [3]:
def parseRating(line):
    """
    Parses a rating record in MovieLens format userId::movieId::rating::timestamp .
    """
    fields = line.strip().split("::")
    return long(fields[3]) % 10, (int(fields[0]), int(fields[1]), float(fields[2]))

In [4]:
def parseMovie(line):
    """
    Parses a movie record in MovieLens format movieId::movieTitle .
    """
    fields = line.strip().split("::")
    return int(fields[0]), fields[1]

In [5]:
def loadRatings(ratingsFile):
    """
    Load ratings from file.
    """
    if not isfile(ratingsFile):
        print "File %s does not exist." % ratingsFile
        sys.exit(1)
    f = open(ratingsFile, 'r')
    ratings = filter(lambda r: r[2] > 0, [parseRating(line)[1] for line in f])
    f.close()
    if not ratings:
        print "No ratings provided."
        sys.exit(1)
    else:
        return ratings


In [6]:
def computeRmse(model, data, n):
    """
    Compute RMSE (Root Mean Squared Error).
    """
    predictions = model.predictAll(data.map(lambda x: (x[0], x[1])))
    predictionsAndRatings = predictions.map(lambda x: ((x[0], x[1]), x[2])).join(data.map(lambda x: ((x[0], x[1]), x[2]))).values()
    return sqrt(predictionsAndRatings.map(lambda x: (x[0] - x[1]) ** 2).reduce(add) / float(n))


In [7]:
if __name__ == "__main__":
    if (len(sys.argv) != 3):
        print "Usage: E:/spark/bin/spark-submit --driver-memory 2g " + "MovieLensALS.py movieLensDataDir personalRatings.txt"
        sys.exit(1)

In [8]:
# load personal ratings
myRatings = loadRatings("personalRatings.txt")

In [9]:
myRatingsRDD = sc.parallelize(myRatings, 1)

In [10]:
myRatingsRDD.take(5)

[(0, 1, 5.0), (0, 780, 4.0), (0, 590, 1.0), (0, 1210, 5.0), (0, 648, 4.0)]

In [11]:
ratings = loadRatings("ratings_clean.csv")

In [12]:
ratingsRDD = sc.parallelize(ratings,1)

In [13]:
myRatingsRDD.take(2)

[(0, 1, 5.0), (0, 780, 4.0)]

In [14]:
movies = dict(sc.textFile("movies.csv").map(parseMovie).collect())

In [15]:
moviesRDD = sc.parallelize(movies,1)

In [16]:
numPartitions = 4
training = ratingsRDD.filter(lambda x: x[0] < 6).values().union(myRatingsRDD).repartition(numPartitions).cache()

In [17]:
validation = ratingsRDD.filter(lambda x: x[0] >= 6 and x[0] < 8).values().repartition(numPartitions).cache()

In [18]:
test = ratingsRDD.filter(lambda x: x[0] >= 8).values().cache()

In [19]:
ranks = [8, 12]
lambdas = [0.1, 10.0]
numIters = [10, 20]
bestModel = None
bestValidationRmse = float("inf")
bestRank = 0
bestLambda = -1.0
bestNumIter = -1

In [None]:
for rank, lmbda, numIter in itertools.product(ranks, lambdas, numIters):
        model = ALS.train(training, rank, numIter, lmbda)
        validationRmse = computeRmse(model, validation, numValidation)
        print "RMSE (validation) = %f for the model trained with " % validationRmse + \
              "rank = %d, lambda = %.1f, and numIter = %d." % (rank, lmbda, numIter)
        if (validationRmse < bestValidationRmse):
            bestModel = model
            bestValidationRmse = validationRmse
            bestRank = rank
            bestLambda = lmbda
            bestNumIter = numIter

In [None]:
testRmse = computeRmse(bestModel, test, numTest)

In [None]:
# evaluate the best model on the test set
    print "The best model was trained with rank = %d and lambda = %.1f, " % (bestRank, bestLambda) \
      + "and numIter = %d, and its RMSE on the test set is %f." % (bestNumIter, testRmse)

In [None]:
# compare the best model with a naive baseline that always returns the mean rating
    meanRating = training.union(validation).map(lambda x: x[2]).mean()
    baselineRmse = sqrt(test.map(lambda x: (meanRating - x[2]) ** 2).reduce(add) / numTest)
    improvement = (baselineRmse - testRmse) / baselineRmse * 100
    print "The best model improves the baseline by %.2f" % (improvement) + "%."

In [None]:
# make personalized recommendations

    myRatedMovieIds = set([x[1] for x in myRatings])
    candidates = sc.parallelize([m for m in movies if m not in myRatedMovieIds])
    predictions = bestModel.predictAll(candidates.map(lambda x: (0, x))).collect()
    recommendations = sorted(predictions, key=lambda x: x[2], reverse=True)[:50]

    print "Movies recommended for you:"
    for i in xrange(len(recommendations)):
        print ("%2d: %s" % (i + 1, movies[recommendations[i][1]])).encode('ascii', 'ignore')
