# **Movie Recommendations**
![ML Logo](http://images.huffingtonpost.com/2016-06-23-1466705986-1144339-netflix31200x630c.jpg)

En este notebook intentaremos construir un sistema de recomendaciones simples en base a califiaciones de usuarios a peliculas.

In [1]:
# Get the data
numPartitions = 2
rawRatings = sc.textFile('data/movies/ratings.dat').repartition(numPartitions)
rawMovies = sc.textFile('data/movies/movies.dat')

def get_ratings_tuple(entry):
    # input UserID::MovieID::Rating::Timestamp
    # returns  tuple: (UserID, MovieID, Rating)
    items = entry.split('::')
    return int(items[0]), int(items[1]), float(items[2])

def get_movie_tuple(entry):
    # in    entry (str): a line in the movies dataset in the form of MovieID::Title::Genres
    #Returns:  tuple: (MovieID, Title)
    items = entry.split('::')
    return int(items[0]), items[1]


ratingsRDD = rawRatings.map(get_ratings_tuple).cache()
moviesRDD = rawMovies.map(get_movie_tuple).cache()

ratingsCount = ratingsRDD.count()
moviesCount = moviesRDD.count()

print 'There are %s ratings and %s movies in the datasets' % (ratingsCount, moviesCount)
print 'Ratings: %s' % ratingsRDD.take(3)
print 'Movies: %s' % moviesRDD.take(3)

There are 487650 ratings and 3883 movies in the datasets
Ratings: [(1, 1193, 5.0), (1, 914, 3.0), (1, 2355, 5.0)]
Movies: [(1, u'Toy Story (1995)'), (2, u'Jumanji (1995)'), (3, u'Grumpier Old Men (1995)')]


## Movies with Highest Average Ratings

In [3]:
# First, implement a helper function `getCountsAndAverages` using only Python
def getCountsAndAverages(IDandRatingsTuple):
    #input  IDandRatingsTuple: a single tuple of (MovieID, (Rating1, Rating2, Rating3, ...))
    # returns a tuple of (MovieID, (number of ratings, averageRating))
    total = sum(IDandRatingsTuple[1])
    count = len(IDandRatingsTuple[1])
    return ((IDandRatingsTuple[0],(count,float(total)/count)))

In [4]:
# From ratingsRDD with tuples of (UserID, MovieID, Rating) create an RDD with tuples of
# the (MovieID, iterable of Ratings for that MovieID)
movieIDsWithRatingsRDD = (ratingsRDD
                          .map(lambda x:(x[1],x[2]))
                          .groupByKey())

# Using `movieIDsWithRatingsRDD`, compute the number of ratings and average rating for each movie to
# yield tuples of the form (MovieID, (number of ratings, average rating))
movieIDsWithAvgRatingsRDD = movieIDsWithRatingsRDD.map(lambda x:getCountsAndAverages(x))

# To `movieIDsWithAvgRatingsRDD`, apply RDD transformations that use `moviesRDD` to get the movie
# names for `movieIDsWithAvgRatingsRDD`, yielding tuples of the form
# (average rating, movie name, number of ratings)
movieNameWithAvgRatingsRDD = (moviesRDD
                              .join(movieIDsWithAvgRatingsRDD)
                              .map(lambda x:(x[1][1][1],x[1][0],x[1][1][0])))

In [9]:
def sortFunction(tuple):
    # input tuple: (rating, MovieName)
    #  sortString: the value to sort with, 'rating MovieName'
    key = unicode('%.3f' % tuple[0])
    value = tuple[1]
    return (key + ' ' + value)


# Movies with highest Average Ratings and more than 500 reviews
# Apply an RDD transformation to `movieNameWithAvgRatingsRDD` to limit the results to movies with
# ratings from more than 500 people. We then use the `sortFunction()` helper function to sort by the
# average rating to get the movies in order of their rating (highest rating first)
movieLimitedAndSortedByRatingRDD = (movieNameWithAvgRatingsRDD
                                    .filter(lambda x:x[2]>500)
                                    .sortBy(sortFunction, False))

for mv in movieLimitedAndSortedByRatingRDD.take(20):
    print mv

(4.5349264705882355, u'Shawshank Redemption, The (1994)', 1088)
(4.515798462852263, u"Schindler's List (1993)", 1171)
(4.512893982808023, u'Godfather, The (1972)', 1047)
(4.510460251046025, u'Raiders of the Lost Ark (1981)', 1195)
(4.505415162454874, u'Usual Suspects, The (1995)', 831)
(4.457256461232604, u'Rear Window (1954)', 503)
(4.45468509984639, u'Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1963)', 651)
(4.43953006219765, u'Star Wars: Episode IV - A New Hope (1977)', 1447)
(4.4, u'Sixth Sense, The (1999)', 1110)
(4.394285714285714, u'North by Northwest (1959)', 700)
(4.379506641366224, u'Citizen Kane (1941)', 527)
(4.375, u'Casablanca (1942)', 776)
(4.363975155279503, u'Godfather: Part II, The (1974)', 805)
(4.358816276202219, u"One Flew Over the Cuckoo's Nest (1975)", 811)
(4.358173076923077, u'Silence of the Lambs, The (1991)', 1248)
(4.335826477187734, u'Saving Private Ryan (1998)', 1337)
(4.326241134751773, u'Chinatown (1974)', 564)
(4.32538330494037

## **Part 2: Collaborative Filtering**
![collaborative filtering](https://courses.edx.org/c4x/BerkeleyX/CS100.1x/asset/Collaborative_filtering.gif)
[mllib]: https://spark.apache.org/mllib/
[collab]: https://en.wikipedia.org/?title=Collaborative_filtering
[collab2]: http://recommender-systems.org/collaborative-filtering/

In [10]:
# Split Data in Train, Validation and Test Sets
trainingRDD, validationRDD, testRDD = ratingsRDD.randomSplit([6, 2, 2], seed=0L)

print 'Training: %s, validation: %s, test: %s\n' % (trainingRDD.count(),
                                                    validationRDD.count(),
                                                    testRDD.count())
print trainingRDD.take(3)
print validationRDD.take(3)
print testRDD.take(3)

Training: 292716, validation: 96902, test: 98032

[(1, 914, 3.0), (1, 2355, 5.0), (1, 595, 5.0)]
[(1, 1287, 5.0), (1, 594, 4.0), (1, 1270, 5.0)]
[(1, 1193, 5.0), (1, 2398, 4.0), (1, 1035, 5.0)]


In [11]:
# RMSE as our metric error
import math

def computeError(predictedRDD, actualRDD):
    # input  predictedRDD: (UserID, MovieID, Rating)
    #       actualRDD:  (UserID, MovieID, Rating)
    # Returns:  RSME (float): computed RSME value

    # Transform predictedRDD into the tuples of the form ((UserID, MovieID), Rating)
    predictedReformattedRDD = predictedRDD.map(lambda x:((x[0],x[1]),x[2]))

    # Transform actualRDD into the tuples of the form ((UserID, MovieID), Rating)
    actualReformattedRDD = actualRDD.map(lambda x:((x[0],x[1]),x[2]))

    # Compute the squared error for each matching entry (i.e., the same (User ID, Movie ID) in each
    # RDD) in the reformatted RDDs using RDD transformtions - do not use collect()
    squaredErrorsRDD = (predictedReformattedRDD
                        .join(actualReformattedRDD)
                        .map(lambda x:abs(x[1][0]-x[1][1])**2)
                        )
    # Compute the total squared error 
    totalError = squaredErrorsRDD.reduce(lambda x,y:x+y)

    # Count the number of entries for which you computed the total squared error
    numRatings = squaredErrorsRDD.count()
    # Using the total squared error and the number of entries, compute the RSME
    return math.sqrt((float(totalError)/numRatings))


# sc.parallelize turns a Python list into a Spark RDD.
testPredicted = sc.parallelize([
    (1, 1, 5),
    (1, 2, 3),
    (1, 3, 4),
    (2, 1, 3),
    (2, 2, 2),
    (2, 3, 4)])
testActual = sc.parallelize([
     (1, 2, 3),
     (1, 3, 5),
     (2, 1, 5),
     (2, 2, 1)])
testPredicted2 = sc.parallelize([
     (2, 2, 5),
     (1, 2, 5)])
testError = computeError(testPredicted, testActual)
print 'Error for test dataset (should be 1.22474487139): %s' % testError

testError2 = computeError(testPredicted2, testActual)
print 'Error for test dataset2 (should be 3.16227766017): %s' % testError2

testError3 = computeError(testActual, testActual)
print 'Error for testActual dataset (should be 0.0): %s' % testError3

Error for test dataset (should be 1.22474487139): 1.22474487139
Error for test dataset2 (should be 3.16227766017): 3.16227766017
Error for testActual dataset (should be 0.0): 0.0


## ALS for Matrix Factorization

In [14]:
# A Baseline!
trainingAvgRating = float(trainingRDD.map(lambda x:x[2]).reduce(lambda x,y:x+y)) / trainingRDD.count()
print 'The average rating for movies in the training set is %s' % trainingAvgRating

testForAvgRDD = testRDD.map(lambda x:(x[0],x[1],trainingAvgRating))
testAvgRMSE = computeError(testRDD, testForAvgRDD)
print 'Baseline RMSE on the test set is %s' % testAvgRMSE

The average rating for movies in the training set is 3.57409571052
Baseline RMSE on the test set is 1.12036693569


In [12]:
from pyspark.mllib.recommendation import ALS


validationForPredictRDD = validationRDD.map(lambda x:(x[0],x[1]))

seed = 42 # What else? Are you kidding me?
iterations = 5
regularizationParameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

minError = float('inf')
bestRank = -1
bestIteration = -1
for rank in ranks:
    model = ALS.train(trainingRDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularizationParameter)
    predictedRatingsRDD = model.predictAll(validationForPredictRDD)
    error = computeError(predictedRatingsRDD, validationRDD)
    errors[err] = error
    err += 1
    print 'For rank %s the RMSE is %s' % (rank, error)
    if error < minError:
        minError = error
        bestRank = rank

print 'The best model was trained with rank %s' % bestRank

For rank 4 the RMSE is 0.892734779484
For rank 8 the RMSE is 0.890121292255
For rank 12 the RMSE is 0.890216118367
The best model was trained with rank 8


In [13]:
# Try this on the test set
myModel = ALS.train(trainingRDD, bestRank, seed=seed, iterations=iterations,
                      lambda_=regularizationParameter)
testForPredictingRDD = testRDD.map(lambda x:(x[0],x[1]))
predictedTestRDD = myModel.predictAll(testForPredictingRDD)

testRMSE = computeError(testRDD, predictedTestRDD)

print 'The model had a RMSE on the test set of %s' % testRMSE

The model had a RMSE on the test set of 0.891048561304


## Create our Own Recommendations

In [15]:
# We start with the most popular movies
movieLimitedAndSortedByRatingRDD12 = (moviesRDD
                              .join(movieIDsWithAvgRatingsRDD)
                              .filter(lambda x:x[1][1][0]>500)
                              .map(lambda x:(0,x[0],x[1][1][1],x[1][0]))
                              .sortBy(lambda x:x[2],False))
print 'Most rated movies:'
#print '(average rating, movie name, number of reviews)'
for ratingsTuple in movieLimitedAndSortedByRatingRDD12.take(50):
    print ratingsTuple


Most rated movies:
(0, 318, 4.5349264705882355, u'Shawshank Redemption, The (1994)')
(0, 527, 4.515798462852263, u"Schindler's List (1993)")
(0, 858, 4.512893982808023, u'Godfather, The (1972)')
(0, 1198, 4.510460251046025, u'Raiders of the Lost Ark (1981)')
(0, 50, 4.505415162454874, u'Usual Suspects, The (1995)')
(0, 904, 4.457256461232604, u'Rear Window (1954)')
(0, 750, 4.45468509984639, u'Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1963)')
(0, 260, 4.43953006219765, u'Star Wars: Episode IV - A New Hope (1977)')
(0, 2762, 4.4, u'Sixth Sense, The (1999)')
(0, 908, 4.394285714285714, u'North by Northwest (1959)')
(0, 923, 4.379506641366224, u'Citizen Kane (1941)')
(0, 912, 4.375, u'Casablanca (1942)')
(0, 1221, 4.363975155279503, u'Godfather: Part II, The (1974)')
(0, 1193, 4.358816276202219, u"One Flew Over the Cuckoo's Nest (1975)")
(0, 593, 4.358173076923077, u'Silence of the Lambs, The (1991)')
(0, 2028, 4.335826477187734, u'Saving Private Ryan (1998)')


In [22]:
# We insert our rankings (the user 0 is never used in the dataset)
myUserID = 0

# Note that the movie IDs are the *last* number on each line. A common error was to use the number of ratings as the movie ID.
myRatedMovies = [
    (0, 318,5), #Shawshank Redemption
    (0, 527, 3.8), # Schindler's List
    (0, 858, 3), # Godfather, The
    (0, 1198, 3.5), # Raiders of the Lost Ark
    (0, 50, 4), # Usual Suspects, The
    (0, 904, 2), # Rear Window (1954)
    (0, 750, 2), # Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1963)
    (0, 260, 5), # Star Wars: Episode IV 
    (0, 2762, 1), #Sixth Sense
    (0, 908, 2), # North by Northwest (1959)
    (0, 923, 2), # Citizen Kane (1941)
    (0, 912, 1.5), # Casablanca (1942)
    (0, 1221, 3.2), #Godfather: Part II,
    (0, 1193, 3), # One Flew Over the Cuckoo's Nest
    (0, 593, 5), # Silence of the Lambs, 
    (0, 2028, 3), # Saving Private Ryan
    (0, 1252, 2), # Chinatown 
    (0, 2324, 2), #Life Is Beautiful (La Vita \ufffd bella)
    (0, 1136, 1), #Monty Python and the Holy Grail
    (0, 2571, 5), #Matrix
    (0, 1196, 5), #Star Wars: Episode V 
    (0, 1278, 1), #Young Frankenstein
    (0, 1219, 2), #Psycho
    (0, 296, 5), #Pulp Fiction
    (0, 608, 5), #Fargo
    (0, 1213, 3.44),#GoodFellas
    (0, 2858, 2.2),#American Beauty
    (0, 919, 1),#Wizard of Oz
    (0, 1197, 1),#Princess Bride, The
    (0, 1247, 1),#Graduate, The 
    (0, 2692, 2.5),#Run Lola Run 
    (0, 1225, 1),#Amadeus 
    (0, 3114, 3),#Toy Story 2
    (0, 1288, 2),#This Is Spinal Tap
    (0, 3897, 2),#Almost Famous 
    (0, 2804, 2),#Christmas Story
    (0, 1242, 2),#Glory (1989)
    (0, 1208, 3),#Apocalypse Now
    (0, 1617, 2),#L.A. Confidential
    (0, 541, 4.2),#Blade Runner
    (0, 1358, 3),#Sling Blade
    (0, 110, 1),#Braveheart (1995)
    (0, 1304, 2),#Butch Cassidy and the Sundance Kid 
    (0, 1704, 2),#Good Will Hunting 
    (0, 111, 4),#Taxi Driver 
    (0, 1240, 4.5),#Terminator
    (0, 1089,3),#Reservoir Dogs
    (0, 1387, 4),#Jaws
    (0, 1214, 4.6),#Alien
     (0, 1, 3) #Toy Story
     # The format of each line is (myUserID, movie ID, your rating)
     # For example, to give the movie "Star Wars: Episode IV - A New Hope (1977)" a five rating, you would add the following line:
     #   (myUserID, 260, 5),
    ]
myRatingsRDD = sc.parallelize(myRatedMovies)
print 'My movie ratings: %s' % myRatingsRDD.take(10)

My movie ratings: [(0, 318, 5), (0, 527, 3.8), (0, 858, 3), (0, 1198, 3.5), (0, 50, 4), (0, 904, 2), (0, 750, 2), (0, 260, 5), (0, 2762, 1), (0, 908, 2)]


In [17]:
# Add this to the training set
trainingWithMyRatingsRDD = trainingRDD.union(myRatingsRDD)

print ('The training dataset now has %s more entries than the original training dataset' %
       (trainingWithMyRatingsRDD.count() - trainingRDD.count()))
assert (trainingWithMyRatingsRDD.count() - trainingRDD.count()) == myRatingsRDD.count()

The training dataset now has 50 more entries than the original training dataset


In [18]:
# Train ALS with the new training set
myRatingsModel = ALS.train(trainingWithMyRatingsRDD, bestRank,seed=seed, iterations=iterations, lambda_=regularizationParameter)

In [19]:
# Check RMSE
predictedTestMyRatingsRDD = myRatingsModel.predictAll(testForPredictingRDD)
print predictedTestMyRatingsRDD.take(2)
testRMSEMyRatings = computeError(testRDD, predictedTestMyRatingsRDD)
print 'The model had a RMSE on the test set of %s' % testRMSEMyRatings

[Rating(user=1377, product=384, rating=2.2394562275265586), Rating(user=2909, product=384, rating=3.510252226522553)]
The model had a RMSE on the test set of 0.891827923332


In [20]:
# Create the rdd of unrated movies (things we haven't seen)
myIds = [x[1] for x in myRatedMovies]
myUnratedMoviesRDD = (moviesRDD
                      .filter(lambda x:x[0] not in myIds))

In [21]:
# and now get our recommendations!
# Transform movieIDsWithAvgRatingsRDD from part (1b), which has the form (MovieID, (number of ratings, average rating))
# into and RDD of the form (MovieID, number of ratings)
movieCountsRDD = movieIDsWithAvgRatingsRDD.map(lambda x:(x[0],x[1][0]))
                                               
# Transform predictedRatingsRDD into an RDD with entries that are pairs of the form (Movie ID, Predicted Rating)
predictedRDD = predictedRatingsRDD.map(lambda x:(x[0],x[2]))

# Use RDD transformations with predictedRDD and movieCountsRDD to yield an RDD with tuples of the form 
# (Movie ID, (Predicted Rating, number of ratings))
predictedWithCountsRDD  = (predictedRDD.join(movieCountsRDD))

# Use RDD transformations with PredictedWithCountsRDD and moviesRDD to yield an RDD with tuples of the form 
# (Predicted Rating, Movie Name, number of ratings), for movies with more than 75 ratings
ratingsWithNamesRDD = (predictedWithCountsRDD.join(moviesRDD)
                                             .map(lambda x:(x[1][0][0],x[1][1],x[1][0][1]))
                                             .filter(lambda x:x[2]>75))

predictedHighestRatedMovies = ratingsWithNamesRDD.takeOrdered(20, key=lambda x: -x[0])
print ('My highest rated movies as predicted (for movies with more than 75 reviews):\n%s' %
        '\n'.join(map(str, predictedHighestRatedMovies)))

My highest rated movies as predicted (for movies with more than 75 reviews):
(5.283147682824893, u'Halloween II (1981)', 85)
(5.264062498059267, u"Breakfast at Tiffany's (1961)", 326)
(5.245178209712927, u'Halloween II (1981)', 85)
(5.239504672800167, u"It's a Wonderful Life (1946)", 343)
(5.236705087447951, u"Hang 'em High (1967)", 92)
(5.207028191989294, u"Sullivan's Travels (1942)", 90)
(5.19270399167659, u'Tender Mercies (1983)', 139)
(5.18658665643199, u'Fletch Lives (1989)', 103)
(5.1860813356767235, u'Angels in the Outfield (1994)', 114)
(5.165727618718441, u'Age of Innocence, The (1993)', 153)
(5.164065770502644, u'Wolf (1994)', 125)
(5.160447794939038, u'G.I. Jane (1997)', 270)
(5.15830744748144, u'Nightmare on Elm Street 4: The Dream Master, A (1988)', 141)
(5.149591755039808, u'Elephant Man, The (1980)', 219)
(5.148758990616763, u'Species II (1998)', 102)
(5.147659529959279, u'Shining, The (1980)', 484)
(5.132414935285567, u'Highlander (1986)', 327)
(5.125585688066486, u"It'