In [1]:
# Konfigurasi Spark
import os
import sys

# 1. Mengeset variabel yang menyimpan lokasi di mana Spark diinstal
spark_path = "C:/spark"

# 2. Menentukan environment variable SPARK_HOME
os.environ['SPARK_HOME'] = spark_path

# 3. Simpan lokasi winutils.exe sebagai environment variable HADOOP_HOME
os.environ['HADOOP_HOME'] = spark_path

# 4. Lokasi Python yang dijalankan --> punya Anaconda
#    Apabila Python yang diinstall hanya Anaconda, maka tidak perlu menjalankan baris ini.
os.environ['PYSPARK_PYTHON'] = sys.executable

# 5. Konfigurasi path library PySpark
sys.path.append(spark_path + "/bin")
sys.path.append(spark_path + "/python")
sys.path.append(spark_path + "/python/pyspark/")
sys.path.append(spark_path + "/python/lib")
sys.path.append(spark_path + "/python/lib/pyspark.zip")
sys.path.append(spark_path + "/python/lib/py4j-0.10.4-src.zip")

# 6. Import library Spark
#    Dua library yang WAJIB di-import adalah **SparkContext** dan **SparkConf**.
from pyspark import SparkContext
from pyspark import SparkConf

# Setting konfigurasi (opsional)
conf = SparkConf() \
      .setAppName("MovieLensALS") \
      .set("spark.executor.memory", "2g")
sc = SparkContext(conf=conf)
#    Apabila berhasil, maka ketika sc di-print akan mengeluarkan nilai <pyspark.context.SparkContext object>
print sc

<pyspark.context.SparkContext object at 0x00000000043F79B0>


In [2]:
import sys
import itertools
from math import sqrt
from operator import add
from os.path import join, isfile, dirname

from pyspark import SparkConf, SparkContext
from pyspark.mllib.recommendation import ALS

In [3]:
def notHeader(row):
    return "movieId" not in row

In [8]:
def parseRating(line):
    fields = line.strip().split(",")
    return long(fields[3]) % 10, (int(fields[0]), int(fields[1]), float(fields[2]))

In [9]:
def parseRatingdua(line):
    fields = line.strip().split("::")
    return long(fields[3]) % 10, (int(fields[0]), int(fields[1]), float(fields[2]))

In [10]:
ratings = sc.textFile("data/ratings.csv").filter(notHeader).map(parseRating)

In [11]:
ratings.take(3)

[(4L, (1, 31, 2.5)), (9L, (1, 1029, 3.0)), (2L, (1, 1061, 3.0))]

In [12]:
def parseMovie(line):
      fields = line.split(",")
      return int(fields[0]), fields[1]

In [13]:
movies = dict(sc.textFile("data/movies.csv").filter(notHeader).map(parseMovie).collect())

In [14]:
numRatings = ratings.count()
numUsers = ratings.values().map(lambda r: r[0]).distinct().count()
numMovies = ratings.values().map(lambda r: r[1]).distinct().count()

In [15]:
print "Got %d ratings from %d users on %d movies." % (numRatings, numUsers, numMovies)

Got 100004 ratings from 671 users on 9066 movies.


In [16]:
def loadRatings(ratingsFile):
    if not isfile(ratingsFile):
        print "File %s does not exist." % ratingsFile
        sys.exit(1)
    f = open(ratingsFile, 'r')
    ratings = filter(lambda r: r[2] > 0, [parseRatingdua(line)[1] for line in f])
    f.close()
    if not ratings:
        print "No ratings provided."
        sys.exit(1)
    else:
        return ratings

In [17]:
myRatings = loadRatings("data/personalRatings.txt")
myRatingsRDD = sc.parallelize(myRatings, 1)

In [18]:
numPartitions = 4

In [19]:
training = ratings.filter(lambda x: x[0] < 6) \
      .values() \
      .union(myRatingsRDD) \
      .repartition(numPartitions) \
      .cache()

In [20]:
validation = ratings.filter(lambda x: x[0] >= 6 and x[0] < 8) \
      .values() \
      .repartition(numPartitions) \
      .cache()

In [21]:
test = ratings.filter(lambda x: x[0] >= 8).values().cache()

In [22]:
numTraining = training.count()
numValidation = validation.count()
numTest = test.count()

In [23]:
print "Training: %d, validation: %d, test: %d" % (numTraining, numValidation, numTest)

Training: 59876, validation: 20285, test: 19854


In [24]:
ranks = [8, 12]
lambdas = [0.1, 10.0]
numIters = [10, 20]
bestModel = None
bestValidationRmse = float("inf")
bestRank = 0
bestLambda = -1.0
bestNumIter = -1

In [25]:
def computeRmse(model, data, n):
    predictions = model.predictAll(data.map(lambda x: (x[0], x[1])))
    predictionsAndRatings = predictions.map(lambda x: ((x[0], x[1]), x[2])) \
      .join(data.map(lambda x: ((x[0], x[1]), x[2]))) \
      .values()
    return sqrt(predictionsAndRatings.map(lambda x: (x[0] - x[1]) ** 2).reduce(add) / float(n))

In [28]:
for rank, lmbda, numIter in itertools.product(ranks, lambdas, numIters):
        model = ALS.train(training, rank, numIter, lmbda)
        validationRmse = computeRmse(model, validation, numValidation)
        print "RMSE (validation) = %f for the model trained with " % validationRmse + \
              "rank = %d, lambda = %.1f, and numIter = %d." % (rank, lmbda, numIter)
        if (validationRmse < bestValidationRmse):
            bestModel = model
            bestValidationRmse = validationRmse
            bestRank = rank
            bestLambda = lmbda
            bestNumIter = numIter

RMSE (validation) = 0.927087 for the model trained with rank = 8, lambda = 0.1, and numIter = 10.
RMSE (validation) = 0.926348 for the model trained with rank = 8, lambda = 0.1, and numIter = 20.
RMSE (validation) = 3.621969 for the model trained with rank = 8, lambda = 10.0, and numIter = 10.
RMSE (validation) = 3.621969 for the model trained with rank = 8, lambda = 10.0, and numIter = 20.
RMSE (validation) = 0.925224 for the model trained with rank = 12, lambda = 0.1, and numIter = 10.
RMSE (validation) = 0.925143 for the model trained with rank = 12, lambda = 0.1, and numIter = 20.
RMSE (validation) = 3.621969 for the model trained with rank = 12, lambda = 10.0, and numIter = 10.
RMSE (validation) = 3.621969 for the model trained with rank = 12, lambda = 10.0, and numIter = 20.


In [29]:
testRmse = computeRmse(bestModel, test, numTest)

In [30]:
# evaluate the best model on the test set
print "The best model was trained with rank = %d and lambda = %.1f, " % (bestRank, bestLambda) \
    + "and numIter = %d, and its RMSE on the test set is %f." % (bestNumIter, testRmse)

The best model was trained with rank = 12 and lambda = 0.1, and numIter = 20, and its RMSE on the test set is 0.935602.


In [31]:
myRatedMovieIds = set([x[1] for x in myRatings])
candidates = sc.parallelize([m for m in movies if m not in myRatedMovieIds])
predictions = bestModel.predictAll(candidates.map(lambda x: (0, x))).collect()
recommendations = sorted(predictions, key=lambda x: x[2], reverse=True)[:50]

In [32]:
print "Movies recommended for you:"
for i in xrange(len(recommendations)):
    print ("%2d: %s" % (i + 1, movies[recommendations[i][1]])).encode('ascii', 'ignore')

Movies recommended for you:
 1: Land of Silence and Darkness (Land des Schweigens und der Dunkelheit) (1971)
 2: Cops (1922)
 3: "Goat
 4: "Play House
 5: My Best Friend (Mon meilleur ami) (2006)
 6: "Philadelphia Story
 7: "Vernon
 8: Play Time (a.k.a. Playtime) (1967)
 9: "Cameraman
10: "Dog's Life
11: "Last Laugh
12: Salesman (1969)
13: Zorn's Lemma (1970)
14: "Amelie (Fabuleux destin d'Amlie Poulain
15: Chinatown (1974)
16: Double Indemnity (1944)
17: Happiness (1998)
18: Harvey (1950)
19: Baraka (1992)
20: Three Colors: White (Trzy kolory: Bialy) (1994)
21: My Life in Pink (Ma vie en rose) (1997)
22: Paradise Lost: The Child Murders at Robin Hood Hills (1996)
23: Rivers and Tides (2001)
24: "Godfather
25: "Immigrant
26: It Happened One Night (1934)
27: One Day in September (1999)
28: In the Bedroom (2001)
29: "Mission
30: Microcosmos (Microcosmos: Le peuple de l'herbe) (1996)
31: Persuasion (1995)
32: 28 Up (1985)
33: "Usual Suspects
34: Babette's Feast (Babettes gstebud) (1987)
3

In [33]:
meanRating = training.union(validation).map(lambda x: x[2]).mean()
baselineRmse = sqrt(test.map(lambda x: (meanRating - x[2]) ** 2).reduce(add) / numTest)
improvement = (baselineRmse - testRmse) / baselineRmse * 100
print "The best model improves the baseline by %.2f" % (improvement) + "%."

The best model improves the baseline by 11.76%.
