In [35]:
import sys
import os

baseDir         = os.path.join('movielens')
ratingsFilename = os.path.join(baseDir, 'ratings.dat.gz')
moviesFilename  = os.path.join(baseDir, 'movies.dat')

numPartitions = 2
rawRatings = sc.textFile(ratingsFilename).repartition(numPartitions)
rawMovies  = sc.textFile(moviesFilename)

In [36]:
def get_ratings_tuple(entry):
    items = entry.split('::')
    return int(items[0]), int(items[1]), float(items[2])

def get_movie_tuple(entry):
    items = entry.split('::')
    return int(items[0]), items[1]

ratingsRDD = rawRatings.map(get_ratings_tuple).cache()
moviesRDD = rawMovies.map(get_movie_tuple).cache()

In [37]:
print 'Ratings: %s' % ratingsRDD.take(2)
print 'Movies: %s' % moviesRDD.take(2)

Ratings: [(1, 1193, 5.0), (1, 914, 3.0)]
Movies: [(1, u'Toy Story (1995)'), (2, u'Jumanji (1995)')]


In [38]:
movieIDsWithRatingsRDD = (ratingsRDD
                          .map(lambda (user_id,movie_id,rating): (movie_id,[rating]))
                          .reduceByKey(lambda a,b: a+b))

def getCountsAndAverages(RatingsTuple):
    total = 0.0
    for rating in RatingsTuple[1]:
        total += rating
    return (RatingsTuple[0],(len(RatingsTuple[1]),total/len(RatingsTuple[1])))

movieIDsWithAvgRatingsRDD = movieIDsWithRatingsRDD.map(getCountsAndAverages)

movieNameWithAvgRatingsRDD = (moviesRDD
                .join(movieIDsWithAvgRatingsRDD)
                .map(lambda (movieid,(name,(ratings, average))): (average, name, ratings)))

In [39]:
print 'movieNameWithAvgRatingsRDD: %s\n' % movieNameWithAvgRatingsRDD.take(3)

movieNameWithAvgRatingsRDD: [(3.6818181818181817, u'Happiest Millionaire, The (1967)', 22), (3.0468227424749164, u'Grumpier Old Men (1995)', 299), (2.882978723404255, u'Hocus Pocus (1993)', 94)]



In [40]:
def sortFunction(tuple):
    key = unicode('%.3f' % tuple[0])
    value = tuple[1]
    return (key + ' ' + value)

movieLimitedAndSortedByRatingRDD = (movieNameWithAvgRatingsRDD
                                    .filter(lambda (average, name, ratings): ratings > 500)
                                    .sortBy(sortFunction, False))

In [41]:
print 'Movies with highest ratings: %s' % movieLimitedAndSortedByRatingRDD.take(20)

Movies with highest ratings: [(4.5349264705882355, u'Shawshank Redemption, The (1994)', 1088), (4.515798462852263, u"Schindler's List (1993)", 1171), (4.512893982808023, u'Godfather, The (1972)', 1047), (4.510460251046025, u'Raiders of the Lost Ark (1981)', 1195), (4.505415162454874, u'Usual Suspects, The (1995)', 831), (4.457256461232604, u'Rear Window (1954)', 503), (4.45468509984639, u'Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1963)', 651), (4.43953006219765, u'Star Wars: Episode IV - A New Hope (1977)', 1447), (4.4, u'Sixth Sense, The (1999)', 1110), (4.394285714285714, u'North by Northwest (1959)', 700), (4.379506641366224, u'Citizen Kane (1941)', 527), (4.375, u'Casablanca (1942)', 776), (4.363975155279503, u'Godfather: Part II, The (1974)', 805), (4.358816276202219, u"One Flew Over the Cuckoo's Nest (1975)", 811), (4.358173076923077, u'Silence of the Lambs, The (1991)', 1248), (4.335826477187734, u'Saving Private Ryan (1998)', 1337), (4.32624113475177

In [42]:
trainingRDD, validationRDD, testRDD = ratingsRDD.randomSplit([6, 2, 2], seed=0L)

print 'Training: %s, validation: %s, test: %s\n' % (trainingRDD.count(),
                                                    validationRDD.count(),
                                                    testRDD.count())

Training: 292716, validation: 96902, test: 98032



In [43]:
import math

def computeError(predictedRDD, actualRDD):
    predictedReformattedRDD = (predictedRDD
            .map(lambda (UserID, MovieID, Rating):((UserID, MovieID), Rating)) )
                               
    actualReformattedRDD = (actualRDD
            .map(lambda (UserID, MovieID, Rating):((UserID, MovieID), Rating)) )
    
    squaredErrorsRDD = (predictedReformattedRDD
                        .join(actualReformattedRDD)
                        .map(lambda (k,(a,b)): math.pow((a-b),2)))

    totalError = squaredErrorsRDD.reduce(lambda a,b: a+b)
    numRatings = squaredErrorsRDD.count()

    return math.sqrt(float(totalError)/numRatings)

In [44]:
from pyspark.mllib.recommendation import ALS

validationForPredictRDD = validationRDD.map(lambda (UserID, MovieID, Rating): (UserID, MovieID))

ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0

minError = float('inf')
bestRank = -1
bestIteration = -1
for rank in ranks:
    model = ALS.train(trainingRDD, rank, seed=5L, iterations=5, lambda_=0.1)
    predictedRatingsRDD = model.predictAll(validationForPredictRDD)
    error = computeError(predictedRatingsRDD, validationRDD)
    errors[err] = error
    err += 1
    print 'For rank %s the RMSE is %s' % (rank, error)
    if error < minError:
        minError = error
        bestRank = rank

print 'The best model was trained with rank %s' % bestRank

For rank 4 the RMSE is 0.892734779484
For rank 8 the RMSE is 0.890121292255
For rank 12 the RMSE is 0.890216118367
The best model was trained with rank 8


In [45]:
myModel = ALS.train(trainingRDD, 8, seed=5L, iterations=5, lambda_=0.1)

testForPredictingRDD = testRDD.map(lambda (UserID, MovieID, Rating): (UserID, MovieID))

predictedTestRDD = myModel.predictAll(testForPredictingRDD)

testRMSE = computeError(testRDD, predictedTestRDD)

print 'The model had a RMSE on the test set of %s' % testRMSE

The model had a RMSE on the test set of 0.891048561304


In [46]:
myRatedMovies = [                                   # Rating
    (0, 845,5.0),  # Blade Runner (1982)            - 5.0/5
    (0, 789,4.5),  # Good Will Hunting (1997)       - 4.5/5
    (0, 983,4.8),  # Christmas Story, A (1983)      - 4.8/5
    (0, 551,2.0),  # Taxi Driver (1976)             - 2.0/5
    (0,1039,2.0),  # Pulp Fiction (1994)            - 2.0/5
    (0, 651,5.0),  # Dr. Strangelove (1963)         - 5.0/5
    (0,1195,4.0),  # Raiders of the Lost Ark (1981) - 4.0/5
    (0,1110,5.0),  # Sixth Sense, The (1999)        - 4.5/5
    (0,1250,4.5),  # Matrix, The (1999)             - 4.5/5
    (0,1083,4.0)   # Princess Bride, The (1987)     - 4.0/5
    ]
myRatingsRDD = sc.parallelize(myRatedMovies)

In [47]:
trainingWithMyRatingsRDD = myRatingsRDD.union(trainingRDD)
myRatingsModel = ALS.train(trainingWithMyRatingsRDD, 8, seed=5L, iterations=5, lambda_=0.1)
predictedTestMyRatingsRDD = myRatingsModel.predictAll(testForPredictingRDD)
testRMSEMyRatings = computeError(testRDD, predictedTestMyRatingsRDD)

print 'The model had a RMSE on the test set of %s' % testRMSEMyRatings

The model had a RMSE on the test set of 0.892023318284


In [48]:
myUnratedMoviesRDD = (moviesRDD
                      .map(lambda (movieID, name): movieID)
                      .filter(lambda movieID: movieID not in [ mine[1] for mine in myRatedMovies] )
                      .map(lambda movieID: (0, movieID)))

predictedRatingsRDD = myRatingsModel.predictAll(myUnratedMoviesRDD)
print predictedRatingsRDD.take(1)

[Rating(user=0, product=1084, rating=3.1777819449083786)]


In [49]:
movieCountsRDD = (movieIDsWithAvgRatingsRDD
                  .map(lambda (MovieID, (ratings, average)): (MovieID, ratings)) )

predictedRDD = predictedRatingsRDD.map(lambda (uid, movie_id, rating): (movie_id, rating))

predictedWithCountsRDD = (predictedRDD.join(movieCountsRDD))

ratingsWithNamesRDD = (predictedWithCountsRDD
                       .join(moviesRDD)
                       .map(lambda (movieID, ((pred, ratings), name)): (pred, name, ratings) )
                       .filter(lambda (pred, name, ratings): ratings > 75))

predictedHighestRatedMovies = ratingsWithNamesRDD.takeOrdered(20, key=lambda x: -x[0])

print ('My highest rated movies as predicted (for movies with more than 75 reviews):\n%s' %
        '\n'.join(map(str, predictedHighestRatedMovies)))

My highest rated movies as predicted (for movies with more than 75 reviews):
(4.823536053603062, u'Once Upon a Time in the West (1969)', 82)
(4.743456934724456, u'Texas Chainsaw Massacre, The (1974)', 111)
(4.452221024980805, u'Evil Dead II (Dead By Dawn) (1987)', 305)
(4.387531237859994, u'Duck Soup (1933)', 279)
(4.373821653377477, u'Citizen Kane (1941)', 527)
(4.344480264132989, u'Cabin Boy (1994)', 95)
(4.332264360095111, u'Shaft (1971)', 85)
(4.217371529794628, u'Night of the Living Dead (1968)', 352)
(4.181318251399025, u'Yojimbo (1961)', 110)
(4.1717902728073835, u'Naked Gun: From the Files of Police Squad!, The (1988)', 435)
(4.0736012757455295, u'Plan 9 from Outer Space (1958)', 105)
(4.039873448373331, u'Double Indemnity (1944)', 274)
(4.031981033189572, u'Kingpin (1996)', 396)
(4.025774584559731, u'Bride of Frankenstein (1935)', 91)
(4.003670933783985, u'Nosferatu (Nosferatu, eine Symphonie des Grauens) (1922)', 115)
(3.9937035986464333, u'Tales from the Crypt Presents: Bord