In [3]:
sc = SparkContext.getOrCreate()

In [4]:
ratingsFilename = 'ratings.dat'
moviesFilename = 'movies.dat'

In [5]:
partitions = 2
rawRatings = sc.textFile(ratingsFilename).repartition(partitions)
rawMovies = sc.textFile(moviesFilename)

def ratings_tuple(ratingsList):
    items = ratingsList.split('::')
    return int(items[0]), int(items[1]), float(items[2])

def movies_tuple(moviesList):
    items = moviesList.split('::')
    return int(items[0]), items[1]

ratingsRdd = rawRatings.map(ratings_tuple).cache()
moviesRdd = rawMovies.map(movies_tuple).cache()

ratingsRddCount = ratingsRdd.count()
moviesRddCount = moviesRdd.count()

print 'Total number of ratings are %s and number of movies are %s' % (ratingsRddCount, moviesRddCount)
print 'ratings: %s' % ratingsRdd.take(6)
print 'movies: %s' % moviesRdd.take(6)

Total number of ratings are 1559032 and number of movies are 3883
ratings: [(1, 1193, 5.0), (1, 361, 3.0), (1, 414, 3.0), (1, 3408, 4.0), (1, 2355, 5.0), (1, 1197, 3.0)]
movies: [(1, u'Toy Story (1995)'), (2, u'Jumanji (1995)'), (3, u'Grumpier Old Men (1995)'), (4, u'Waiting to Exhale (1995)'), (5, u'Father of the Bride Part II (1995)'), (6, u'Heat (1995)')]


In [6]:
def countAndAverage(IDAndRatings):
    ratings = IDAndRatings[1]
    count = len(ratings)
    average = sum(ratings)/float(count)
    return (IDAndRatings[0], (count, average))

In [7]:
movieIDsWithRatingsRDD = (ratingsRdd.map(lambda (userID, movieID, Ratings): (movieID, Ratings)).groupByKey().sortByKey())

print 'movieIDsWithRatingsRDD: %s' % movieIDsWithRatingsRDD.take(6)

movieIDsWithRatingsRDD: [(1, <pyspark.resultiterable.ResultIterable object at 0x0000000009725A20>), (2, <pyspark.resultiterable.ResultIterable object at 0x0000000009725BE0>), (3, <pyspark.resultiterable.ResultIterable object at 0x0000000009725C18>), (4, <pyspark.resultiterable.ResultIterable object at 0x0000000009725C50>), (5, <pyspark.resultiterable.ResultIterable object at 0x0000000009725C88>), (8, <pyspark.resultiterable.ResultIterable object at 0x0000000009725CC0>)]


In [8]:
movieIDsWithAvgRatingsRDD = movieIDsWithRatingsRDD.map(countAndAverage)
print 'movieIDsWithAvgRatingsRDD: %s' % movieIDsWithAvgRatingsRDD.take(6)

movieIDsWithAvgRatingsRDD: [(1, (2077, 4.146846413095811)), (2, (701, 3.20114122681883)), (3, (1876, 3.544776119402985)), (4, (340, 2.764705882352941)), (5, (1184, 3.4070945945945947)), (8, (1, 5.0))]


In [9]:
movieNameWithAvgRatingsRDD = (moviesRdd.join(movieIDsWithAvgRatingsRDD)
                                       .map(lambda (movieId, (movieName, (count, average ))): ( average, movieName, count)))
print 'movieNameWithAvgRatingsRDD: %s' % movieNameWithAvgRatingsRDD.take(6)

movieNameWithAvgRatingsRDD: [(2.764705882352941, u'Waiting to Exhale (1995)', 340), (5.0, u'Tom and Huck (1995)', 1), (2.37888198757764, u'Dracula: Dead and Loving It (1995)', 161), (3.7932551319648096, u'Casino (1995)', 682), (2.5375, u'Money Train (1995)', 160), (3.1794871794871793, u'Powder (1995)', 624)]


In [10]:
def sortFunction(input):
    key = unicode('%.3f' % input[0])
    value = input[1]
    return (key + ' ' + value)

In [11]:
movieLimitedAndSortedByRatingRDD = (movieNameWithAvgRatingsRDD.filter(lambda s: s > 600).sortBy(sortFunction, False))
print 'Movies with highest ratings: %s' % movieLimitedAndSortedByRatingRDD.take(30)

Movies with highest ratings: [(5.0, u'Ulysses (Ulisse) (1954)', 1), (5.0, u'Tom and Huck (1995)', 1), (5.0, u'Song of Freedom (1936)', 1), (5.0, u'Smashing Time (1967)', 2), (5.0, u'One Little Indian (1973)', 1), (5.0, u'Lured (1947)', 1), (5.0, u'GoldenEye (1995)', 1), (5.0, u'Follow the Bitch (1998)', 1), (5.0, u'Bittersweet Motel (2000)', 1), (5.0, u'Baby, The (1973)', 1), (4.8, u'I Am Cuba (Soy Cuba/Ya Kuba) (1964)', 5), (4.666666666666667, u'Apple, The (Sib) (1998)', 9), (4.608695652173913, u'Sanjuro (1962)', 69), (4.560509554140127, u'Seven Samurai (The Magnificent Seven) (Shichinin no samurai) (1954)', 628), (4.529308065226973, u'Shawshank Redemption, The (1994)', 2269), (4.507936507936508, u'Wrong Trousers, The (1993)', 882), (4.5, u'Skipped Parts (2000)', 2), (4.5, u'Primal Fear (1996)', 2), (4.5, u'Inheritors, The (Die Siebtelbauern) (1998)', 2), (4.5, u'Dry Cleaning (Nettoyage \ufffd sec) (1997)', 2), (4.5, u'Callej\ufffdn de los milagros, El (1995)', 2), (4.5, u'Bells, The 

In [12]:
trainingRDD, validationRDD, testRDD = ratingsRdd.randomSplit([8, 1, 1], seed=0L)

print 'Training: %s, validation: %s, test: %s\n' % (trainingRDD.count(),
                                                    validationRDD.count(),
                                                    testRDD.count())
print trainingRDD.take(6)
print validationRDD.take(6)
print testRDD.take(6)

Training: 1246925, validation: 155854, test: 156253

[(1, 361, 3.0), (1, 414, 3.0), (1, 3408, 4.0), (1, 2355, 5.0), (1, 1197, 3.0), (1, 2321, 3.0)]
[(1, 594, 4.0), (1, 1270, 5.0), (1, 1961, 5.0), (1, 2692, 4.0), (1, 2028, 5.0), (2, 2916, 3.0)]
[(1, 1193, 5.0), (1, 1287, 5.0), (1, 2804, 5.0), (1, 419, 4.0), (1, 1545, 4.0), (1, 345, 3.0)]


In [13]:
import math

def computeError(predictedRDD, actualRDD):
    predictedReformattedRDD = predictedRDD.map(lambda (userId, movieId, rating): ((userId, movieId), rating))
    actualReformattedRDD = actualRDD.map(lambda (userId, movieId, rating): ((userId, movieId), rating))
    squaredErrorsRDD = (predictedReformattedRDD.join(actualReformattedRDD)
                        .map(lambda ((userId, movieId),(predValue, actValue)): (predValue - actValue)**2))
    totalError = squaredErrorsRDD.reduce(lambda a, b: (a + b))
    numRatings = squaredErrorsRDD.count()
    return math.sqrt(1.0*totalError/ numRatings)

In [14]:
from pyspark.mllib.recommendation import ALS

validationForPredictRDD = validationRDD.map(lambda (userId, movieId, rating): (userId, movieId))

seed = 5L
iterations = 5
regularizationParameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.03

minError = float('inf')
bestRank = -1
bestIteration = -1
for rank in ranks:
    model = ALS.train(trainingRDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularizationParameter)
    predictedRatingsRDD = model.predictAll(validationForPredictRDD)
    error = computeError(predictedRatingsRDD, validationRDD)
    errors[err] = error
    err += 1
    print 'For rank %s the RMSE is %s' % (rank, error)
    if error < minError:
        minError = error
        bestRank = rank

print 'The best model was trained with rank %s' % bestRank

For rank 4 the RMSE is 0.885539086616
For rank 8 the RMSE is 0.880820311222
For rank 12 the RMSE is 0.8879282902
The best model was trained with rank 8


In [16]:
myModel = ALS.train(trainingRDD, bestRank, seed = seed, iterations = iterations, lambda_ = regularizationParameter)
testForPredictingRDD = testRDD.map(lambda (userId, movieId, rating): (userId, movieId))
predictedTestRDD = myModel.predictAll(testForPredictingRDD)

testRMSE = computeError(testRDD, predictedTestRDD)

print 'The model had a RMSE on the test set of %s' % testRMSE

The model had a RMSE on the test set of 0.876139049138


In [17]:
trainingAvgRating = trainingRDD.map(lambda (userid, movieid, rating): rating).reduce(lambda a,b: (a+b)) / trainingRDD.count()
print 'The average rating for movies in the training set is %s' % trainingAvgRating

testForAvgRDD = testRDD.map(lambda (userid, movieid, rating): (userid, movieid, trainingAvgRating))
testAvgRMSE = computeError(testRDD, testForAvgRDD)
print 'The RMSE on the average set is %s' % testAvgRMSE

The average rating for movies in the training set is 3.64529622872
The RMSE on the average set is 1.02043866699


In [18]:
print 'Most rated movies:'
print '(average rating, movie name, number of reviews)'
for ratingsTuple in movieLimitedAndSortedByRatingRDD.take(60):
    print ratingsTuple

Most rated movies:
(average rating, movie name, number of reviews)
(5.0, u'Ulysses (Ulisse) (1954)', 1)
(5.0, u'Tom and Huck (1995)', 1)
(5.0, u'Song of Freedom (1936)', 1)
(5.0, u'Smashing Time (1967)', 2)
(5.0, u'One Little Indian (1973)', 1)
(5.0, u'Lured (1947)', 1)
(5.0, u'GoldenEye (1995)', 1)
(5.0, u'Follow the Bitch (1998)', 1)
(5.0, u'Bittersweet Motel (2000)', 1)
(5.0, u'Baby, The (1973)', 1)
(4.8, u'I Am Cuba (Soy Cuba/Ya Kuba) (1964)', 5)
(4.666666666666667, u'Apple, The (Sib) (1998)', 9)
(4.608695652173913, u'Sanjuro (1962)', 69)
(4.560509554140127, u'Seven Samurai (The Magnificent Seven) (Shichinin no samurai) (1954)', 628)
(4.529308065226973, u'Shawshank Redemption, The (1994)', 2269)
(4.507936507936508, u'Wrong Trousers, The (1993)', 882)
(4.5, u'Skipped Parts (2000)', 2)
(4.5, u'Primal Fear (1996)', 2)
(4.5, u'Inheritors, The (Die Siebtelbauern) (1998)', 2)
(4.5, u'Dry Cleaning (Nettoyage \ufffd sec) (1997)', 2)
(4.5, u'Callej\ufffdn de los milagros, El (1995)', 2)
(4.

In [19]:
myRatedMovies = [(0, 240, 5), (0, 360, 4), (0, 216, 5), (0, 24, 3), (0, 120, 2), (0, 150, 3)]
myRatingsRDD = sc.parallelize(myRatedMovies)
print 'My movie ratings: %s' % myRatingsRDD.take(15)

My movie ratings: [(0, 240, 5), (0, 360, 4), (0, 216, 5), (0, 24, 3), (0, 120, 2), (0, 150, 3)]


In [20]:
trainingWithMyRatingsRDD = trainingRDD.union(myRatingsRDD)

print ('The training dataset now has %s more entries than the original training dataset' %
       (trainingWithMyRatingsRDD.count() - trainingRDD.count()))

The training dataset now has 6 more entries than the original training dataset


In [21]:
myRatingsModel = ALS.train(trainingWithMyRatingsRDD, bestRank, 
                           seed = seed, iterations = iterations, lambda_ = regularizationParameter)

In [23]:
predictedTestMyRatingsRDD = myRatingsModel.predictAll(testForPredictingRDD)
testRMSEMyRatings = computeError(predictedTestMyRatingsRDD, testRDD)
print 'The model had a RMSE on the test set of %s' % testRMSEMyRatings

The model had a RMSE on the test set of 0.883916564327


In [26]:
myUnratedMoviesRDD = (moviesRdd.filter(lambda (movieID,title): movieID not in [int(i[1]) for i in myRatedMovies])
                      .map(lambda(movieID,title): (0,movieID)))
predictedRatingsRDD = myRatingsModel.predictAll(myUnratedMoviesRDD)

In [31]:
movieCountsRDD = movieIDsWithAvgRatingsRDD.map(lambda ((movieid), (count, avg)): (movieid, count))
predictedRDD = predictedRatingsRDD.map(lambda (userid, movieid, predrating): (movieid, predrating))
predictedWithCountsRDD  = (predictedRDD
                           .join(movieCountsRDD))

ratingsWithNamesRDD = (predictedWithCountsRDD
                       .join(moviesRdd)
                       .map(lambda (MovieID, ((rating, numRatings), MovieName)): (rating, MovieName, numRatings))
                       .filter(lambda (rating, MovieName, numRatings): numRatings > 100))

predictedHighestRatedMovies = ratingsWithNamesRDD.takeOrdered(20, lambda x: -x[0])
print ('My highest rated movies as predicted (for movies with more than 100 reviews):\n%s' %
        '\n'.join(map(str, predictedHighestRatedMovies)))


My highest rated movies as predicted (for movies with more than 100 reviews):
(4.783271111574745, u'Patriot, The (2000)', 1240)
(4.417099993376237, u'Waterboy, The (1998)', 420)
(4.412723449333932, u'Armageddon (1998)', 1110)
(4.359276946681395, u'Tommy Boy (1995)', 1770)
(4.354184944895293, u'Austin Powers: The Spy Who Shagged Me (1999)', 1434)
(4.352959978380735, u'Blue Streak (1999)', 128)
(4.344718114634946, u'Braveheart (1995)', 2443)
(4.321704100003138, u'Bloodsport (1988)', 222)
(4.310727688462989, u'Deuce Bigalow: Male Gigolo (1999)', 308)
(4.298950357271258, u'Coyote Ugly (2000)', 201)
(4.283158578887151, u'American Pie (1999)', 1389)
(4.277288961952914, u'Nutty Professor II: The Klumps (2000)', 248)
(4.270989344289623, u'Big Green, The (1995)', 723)
(4.266875224862038, u'Con Air (1997)', 954)
(4.263110396994394, u'Christmas Vacation (1989)', 515)
(4.23199424939802, u'Road Trip (2000)', 419)
(4.2204721265853244, u'Rocky IV (1985)', 375)
(4.197721907873872, u'Dirty Work (1998)'