In [1]:
#run with this MASTER=local[4] IPYTHON_OPTS="notebook " ./bin/pyspark

#load spark context
from pyspark import SparkConf, SparkContext
from pyspark.mllib.recommendation import ALS

import sys
from os import remove, removedirs
from os.path import join, isfile, dirname
from time import time
import numpy as np 
import itertools
from math import sqrt
from operator import add

In [2]:
#construct the ratings for the demo user
topMovies = """1,Toy Story (1995)
780,Independence Day (a.k.a. ID4) (1996)
590,Dances with Wolves (1990)
1210,Star Wars: Episode VI - Return of the Jedi (1983)
648,Mission: Impossible (1996)
344,Ace Ventura: Pet Detective (1994)
165,Die Hard: With a Vengeance (1995)
153,Batman Forever (1995)
597,Pretty Woman (1990)
1580,Men in Black (1997)
231,Dumb & Dumber (1994)"""
__file__=""
parentDir = dirname(dirname(__file__))
ratingsFile = join(parentDir, "personalRatings.txt")

if isfile(ratingsFile):
    r = raw_input("Looks like you've already rated the movies. Overwrite ratings (y/N)? ")
    if r and r[0].lower() == "y":
        remove(ratingsFile)
    else:
        sys.exit()

prompt = "Please rate the following movie (1-5 (best), or 0 if not seen): "
print prompt

now = int(time())
n = 0

f = open(ratingsFile, 'w')
for line in topMovies.split("\n"):
    ls = line.strip().split(",")
    valid = False
    while not valid:
        rStr = raw_input(ls[1] + ": ")
        r = int(rStr) if rStr.isdigit() else -1
        if r < 0 or r > 5:
            print prompt
        else:
            valid = True
            if r > 0:
                f.write("0::%s::%d::%d\n" % (ls[0], r, now))
                n += 1
f.close()

if n == 0:
    print "No rating provided!"

Looks like you've already rated the movies. Overwrite ratings (y/N)? N


SystemExit: 

To exit: use 'exit', 'quit', or Ctrl-D.


In [3]:
#function definition 
def getMovies(line):
    """
    Read movie record format movieId::movieTitle .
    """
    record = line.strip().split("::")
    return int(record[0]), record[1]

def getRates(line):
    """
    Read Ratings format userId::movieId::rating::timestamp .
    """
    record = line.strip().split("::")
    return long(record[3]) % 10, (int(record[0]), int(record[1]), float(record[2]))

def loadRatings(ratingsFile):
    """
    Load personal ratings from file.
    """
    if not isfile(ratingsFile):
        print "File %s does not exist." % ratingsFile
        sys.exit(1)
    f = open(ratingsFile, 'r')
    ratings = filter(lambda r: r[2] > 0, [getRates(line)[1] for line in f])
    f.close()
    if not ratings:
        print "No ratings provided."
        sys.exit(1)
    else:
        return ratings

def computeRmse(model, data, n):
    """
    Compute RMSE (Root Mean Squared Error).
    """
    predictions = model.predictAll(data.map(lambda x: (x[0], x[1])))
    predictionsAndRatings = predictions.map(lambda x: ((x[0], x[1]), x[2])) \
      .join(data.map(lambda x: ((x[0], x[1]), x[2]))) \
      .values()
    return sqrt(predictionsAndRatings.map(lambda x: (x[0] - x[1]) ** 2).reduce(add) / float(n))



In [6]:
#loading data
if __name__ == "__main__":
    sys.argv=["../bin/spark-submit", "ml-1m/", "personalRatings.txt"]

    # load personal ratings
    myRatings = loadRatings(sys.argv[2])
    myRatingsRDD = sc.parallelize(myRatings, 20)

    
    # load ratings and movie titles

    movieLensHomeDir = sys.argv[1]

    # ratings is an RDD of (last digit of timestamp, (userId, movieId, rating))
    ratings = sc.textFile(join(movieLensHomeDir, "ratings.dat")).map(getRates)

    # movies is an RDD of (movieId, movieTitle)
    movies = dict(sc.textFile(join(movieLensHomeDir, "movies.dat")).map(getMovies).collect())

    # your code here
    numRatings = ratings.count()
    numUsers = ratings.values().map(lambda r: r[0]).distinct().count()
    numMovies = ratings.values().map(lambda r: r[1]).distinct().count()

    print "Got %d ratings from %d users on %d movies." % (numRatings, numUsers, numMovies)
    
   

Got 1000209 ratings from 6040 users on 3706 movies.


In [7]:
# split ratings into train (60%), validation (20%), and test (20%) 

numPartitions = 4
training = ratings.filter(lambda x: x[0] < 6)\
.values() \
.union(myRatingsRDD) \
.repartition(numPartitions) \
.cache()

validation = ratings.filter(lambda x: x[0] >= 6 and x[0] < 8) \
.values() \
.repartition(numPartitions) \
.cache()

test = ratings.filter(lambda x: x[0] >= 8).values().cache()

numTraining = training.count()
numValidation = validation.count()
numTest = test.count()

print "Training: %d, validation: %d, test: %d" % (numTraining, numValidation, numTest)

Training: 602248, validation: 198919, test: 199049


In [None]:

# train models and evaluate them on the validation set

ranks = [8, 12, 16, 20, 40]
lambdas = [0.03, 0.1, 1.0, 10.0]
numIters = [10, 30]
bestModel = None
bestValidationRmse = float("inf")
bestRank = 0
bestLambda = -1.0
bestNumIter = -1

for rank, lmbda, numIter in itertools.product(ranks, lambdas, numIters):
    model = ALS.train(training, rank, numIter, lmbda, blocks=-1)
    validationRmse = computeRmse(model, validation, numValidation)
    print "RMSE (validation) = %f for the model trained with " % validationRmse + \
    "rank = %d, lambda = %.1f, and numIter = %d." % (rank, lmbda, numIter)
    """
    if (numIter==10):
        if (rank==8):
            heatData10[0].append(validationRmse)
        elif (rank==12):
            heatData10[1].append(validationRmse)
        elif(rank==16):
            heatData10[2].append(validationRmse)
        elif(rank==20):
            heatData10[3].append(validationRmse)
        else:
            heatData10[4].append(validationRmse) 
                
    if (numIter==30):
        if (rank==8):
            heatData30[0].append(validationRmse)
        elif(rank==12):
            heatData30[1].append(validationRmse)
        elif(rank==16):
            heatData30[2].append(validationRmse)
        elif(rank==20):
            heatData30[3].append(validationRmse) 
        else:
            heatData30[4].append(validationRmse) 
    """
    if (validationRmse < bestValidationRmse):
        bestModel = model
        bestValidationRmse = validationRmse
        bestRank = rank
        bestLambda = lmbda
        bestNumIter = numIter
        


RMSE (validation) = 0.888052 for the model trained with rank = 8, lambda = 0.0, and numIter = 10.
RMSE (validation) = 0.883556 for the model trained with rank = 8, lambda = 0.0, and numIter = 30.
RMSE (validation) = 0.877167 for the model trained with rank = 8, lambda = 0.1, and numIter = 10.

In [None]:
testRmse = computeRmse(bestModel, test, numTest)        

In [21]:
# evaluate the best model on the test set
print "The best model was trained with rank = %d and lambda = %.1f, " % (bestRank, bestLambda) \
+ "and numIter = %d, and its RMSE on the test set is %f." % (bestNumIter, testRmse)

The best model was trained with rank = 20 and lambda = 0.1, and numIter = 30, and its RMSE on the test set is 0.866613.


In [8]:
#this is not needed , just for test and Demo

rank = 20
lmbda = 0.1
numIter = 30
model = ALS.train(training, rank, numIter, lmbda)
validationRmse = computeRmse(model, validation, numValidation)
print "RMSE (Test) = %f for the model trained with " % validationRmse + \
"rank = %d, lambda = %.1f, and numIter = %d." % (rank, lmbda, numIter)


"""
import numpy as np
np.array(heatData30)
"""

RMSE (Test) = 0.868517 for the model trained with rank = 20, lambda = 0.1, and numIter = 30.


'\nimport numpy as np\nnp.array(heatData30)\n'

In [9]:
# make personalized recommendations
bestModel=model
myRatedMovieIds = set([x[1] for x in myRatings])
#candidates = sc.parallelize([m for m in movies if m not in myRatedMovieIds])
myRatedMovieIds = set([x[1] for x in myRatings])
predictions = bestModel.predictAll(candidates.map(lambda x: (0, x))).collect()
recommendations = sorted(predictions, key=lambda x: x[2], reverse=True)[:10]

print "Movies recommended for you:"
for i in xrange(len(recommendations)):
    print ("%2d: %s" % (i + 1, movies[recommendations[i][1]])).encode('ascii', 'ignore')


Movies recommended for you:
 1: Bandits (1997)
 2: For All Mankind (1989)
 3: Sanjuro (1962)
 4: Bewegte Mann, Der (1994)
 5: Man of the Century (1999)
 6: Star Wars: Episode IV - A New Hope (1977)
 7: Dj Vu (1997)
 8: Godfather, The (1972)
 9: Shawshank Redemption, The (1994)
10: Life Is Beautiful (La Vita  bella) (1997)


In [16]:
recommendations

[Rating(user=0, product=2562, rating=4.7454723635454545),
 Rating(user=0, product=3338, rating=4.705075877972453),
 Rating(user=0, product=2905, rating=4.7025825730989155),
 Rating(user=0, product=811, rating=4.693953282221936),
 Rating(user=0, product=2999, rating=4.627185182089478),
 Rating(user=0, product=260, rating=4.598928836030872),
 Rating(user=0, product=2175, rating=4.584876269517677),
 Rating(user=0, product=858, rating=4.58264484932971),
 Rating(user=0, product=318, rating=4.5606836508174),
 Rating(user=0, product=2324, rating=4.549865970502)]

In [13]:
# clean up
sc.stop()