In [None]:
############################################################################################################
# EXERCISE 1 - MOVIE RECOMMANDATION SYSTEM
############################################################################################################

In [None]:
# The purpuse of this excercise is to develop a system for movie recommandation 

# Used Dataset: A subset dataset of 500,000 movie ratings from the movielens 10M stable benchmark rating dataset (http://grouplens.org/datasets/movielens/10m/).  
# The data set is included into your VM (/data/cs100/lab4/small)

# We will follow these steps:
        # - 1: Data Loading 
        # - 2: Features extraction
        # - 3: Model implementation: The Naive approach
        # - 4: Model implementation: The Collaborative Filtering approach
        # - 5: Predict Films for yourself !
        
# Have a look at pySpark API: http://spark.apache.org/docs/latest/programming-guide.html#transformations

In [None]:
##################################################################################################################
# 1 - Data Loading
##################################################################################################################

In [None]:
import sys
import os

baseDir = os.path.join('data')
inputPath = os.path.join('cs100', 'lab4', 'small')

ratingsFilename = os.path.join(baseDir, inputPath, 'ratings.dat.gz')
moviesFilename = os.path.join(baseDir, inputPath, 'movies.dat')

In [None]:
# We read in each of the files and create an associated RDD

numPartitions = 2
rawRatings = sc.textFile(ratingsFilename).repartition(numPartitions)
rawMovies = sc.textFile(moviesFilename)


In [None]:
# TODO: Replace <FILL IN> with appropriate code
# Count Lines in each RDD
rawRatings.<FILL IN> 

In [None]:
rawMovies.<FILL IN> 

In [None]:
# Each line in the ratings dataset (ratings.dat.gz) is formatted as: UserID::MovieID::Rating::Timestamp
# Each line in the movies dataset (movies.dat) is formatted as: MovieID::Title::Genres

# TODO: Replace <FILL IN> with appropriate code
# Show the first line in each dataset

rawRatings.<FILL IN>

In [None]:
rawMovies.<FILL IN>

In [None]:
##################################################################################################################
# 2 - Feature Extraction
##################################################################################################################

In [None]:
# We read in each of the files and create an RDD consisting of parsed lines.

# Use the defined functions to parse lines and transform them as following:
## In ratings dataset: We create a tuple of (UserID, MovieID, Rating). We drop the timestamp.
## In movies dataset: We create a tuple of (MovieID, Title). We drop the Genres. 

def get_ratings_tuple(entry):
    items = entry.split('::')
    return int(items[0]), int(items[1]), float(items[2])


def get_movie_tuple(entry):
    items = entry.split('::')
    return int(items[0]), items[1]


ratingsRDD = rawRatings.map(get_ratings_tuple).cache()
moviesRDD = rawMovies.map(get_movie_tuple).cache()

In [None]:
# Display the 2 first lines of each RDD
moviesRDD.take(2)

In [None]:
ratingsRDD.take(2)

In [None]:
##################################################################################################################
# 3 - Model implementation: The Naive approach
##################################################################################################################

In [None]:
# Approach: Recommend movies with the highest average rating and at least films having 500 reviews.

In [None]:
# 1- From ratingsRDD, Create an RDD that contiens Tuples with a movieID and all Its associated ratings
## map: (UserID, MovieID, Rating)--> (MovieID, [Rating])
## reduce: [(MovieID, [Rating1]),(MovieID, [Rating2])] --> [(MovieID, [Rating1, Rating2])]
movieIDsWithRatingsRDD = (ratingsRDD
                          .map(lambda(user_id, movie_id, rating):(movie_id,[rating]))
                          .reduceByKey(lambda a,b: a+b))        

In [None]:
# TODO: Replace <FILL IN> with appropriate code
# 2- Create a function that calculate the avearge rating and return a tuple (MovieID, (number of ratings, averageRating))
## IN: (MovieID, [Rating1, Rating2])--> OUT: (MovieID, (number of ratings, averageRating))

def getCountsAndAverages (RatingsTuple):
    <FILL IN>
    return <FILL IN>


movieIDsWithAverageRatingsRDD = movieIDsWithRatingsRDD.<FILL IN>

In [None]:
# TODO: Replace <FILL IN> with appropriate code
# 3- Use 'moviesRDD` to get the movie names for `movieIDsWithAvgRatingsRDD` and create tuples of the form :
# (average rating, movie name, number of ratings)

movieNameWithAvgRatingsRDD = (moviesRDD
                              .<FILL IN>)

# Print the 3 first lignes of the RDD:
print 'movieNameWithAvgRatingsRDD: %s\n' % movieNameWithAvgRatingsRDD.take(3)

In [None]:
# Apply an RDD transformation to `movieNameWithAvgRatingsRDD` to limit the results to movies with
# ratings having at least 500 people reviews and get the 20 first recommandations. 
# use the sortFunction to sort result.

def sortFunction(tuple):
    key = unicode('%.3f' % tuple[0])
    value = tuple[1]
    return (key + ' ' + value)

In [None]:
# TODO: Replace <FILL IN> with appropriate code
movieLimitedAndSortedByRatingRDD = (movieNameWithAvgRatingsRDD
                                    .<FILL IN>
                                    .sortBy (sortFunction,False)
                                    )
print 'Movies with highest ratings: %s' % movieLimitedAndSortedByRatingRDD.take(20)

In [None]:
##################################################################################################################
# 4 - Model implementation: The Collaborative Filtering
##################################################################################################################

In [None]:
# Spark also exposes some higher level functionality; in particular, Machine Learning using a component of Spark called MLlib. 
# In this part, you will learn how to use MLlib to make personalized movie recommendations using the movie data we have been analyzing.
# We are going to use a technique called collaborative filtering.

In [None]:
# 4.1 - Befor using machine learning, we need to break up the ratingsRDD dataset into 3 parts

    #### A training set (RDD), used to train the model
    #### A validation set (RDD), used to choose the best model
    #### A test set (RDD), used for experimentation
    
    # To randomly split the dataset into the multiple groups, we can use the pySpark randomSplit() transformation. 
    # randomSplit() takes a set of splits and a seed and returns multiple RDDs.


In [None]:
trainingRDD, validationRDD, testRDD = ratingsRDD.randomSplit([6, 2, 2], seed=0L)

print 'Training: %s, validation: %s, test: %s\n' % (trainingRDD.count(),
                                                    validationRDD.count(),
                                                    testRDD.count())
print trainingRDD.take(3)
print validationRDD.take(3)
print testRDD.take(3)

In [None]:
# After splitting the dataset, your training set has about 293,000 entries and the validation and test sets 
# each have about 97,000 entries 

In [None]:
# 4.2 Root Mean Square Error (RMSE):

# In the next part, you will generate a few different models, and will need a way to decide which model is best. 
# We will use the Root Mean Square Error (RMSE) or Root Mean Square Deviation (RMSD) to compute the error of each model. 
## RMSE is a frequently used measure of the differences between values (sample and population values) predicted by a model 
## or an estimator and the values actually observed. 
## RMSD represents the sample standard deviation of the differences between predicted values and observed values. 

In [None]:
# TODO: Replace <FILL IN> with appropriate code
import math

def computeError(predictedRDD, actualRDD):
    """ Compute the root mean squared error between predicted and actual
    Args:
        predictedRDD: predicted ratings for each movie and each user where each entry is in the form
                      (UserID, MovieID, Rating)
        actualRDD: actual ratings where each entry is in the form (UserID, MovieID, Rating)
    Returns:
        RSME (float): computed RSME value
    """
    # Transform predictedRDD into the tuples of the form ((UserID, MovieID), Rating)
    predictedReformattedRDD = (predictedRDD.<FILL IN>) 

    # Transform actualRDD into the tuples of the form ((UserID, MovieID), Rating)
    actualReformattedRDD = (actualRDD.<FILL IN>) 

    # Compute the squared error for each matching entry (i.e., the same (User ID, Movie ID) in each
    # RDD) in the reformatted RDDs using RDD transformtions - do not use collect()
    squaredErrorsRDD = (predictedReformattedRDD
                        .<FILL IN>
                        .map (lambda (k,(a,b)):math.pow((a-b),2)))

    # Compute the total squared error - do not use collect()
    totalError = squaredErrorsRDD.reduce (lambda a,b: a+b)

    # Count the number of entries for which you computed the total squared error
    numRatings = squaredErrorsRDD.count()

    # Using the total squared error and the number of entries, compute the RSME
    return <FILL IN>

In [None]:
# 4.3 : Using the Alternating Least Squares (ALS.train())

In [None]:
# TODO: Replace <FILL IN> with appropriate code
from pyspark.mllib.recommendation import ALS

validationForPredictRDD = validationRDD.<FILL IN>

seed = 5L
iterations = 5
regularizationParameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.03

minError = float('inf')
bestRank = -1
bestIteration = -1
for rank in ranks:
    model = ALS.train(trainingRDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularizationParameter)
    predictedRatingsRDD = model.predictAll(validationForPredictRDD)
    error = computeError(predictedRatingsRDD, validationRDD)
    errors[err] = error
    err += 1
    print 'For rank %s the RMSE is %s' % (rank, error)
    if error < minError:
        minError = error
        bestRank = rank

print 'The best model was trained with rank %s' % bestRank

In [None]:
# 4.4: Testing your model:
# So far, we used the trainingRDD and validationRDD datasets to select the best model. 
# Since we used these two datasets to determine what model is best, we cannot use them 
# to test how good the model is - otherwise we would be very vulnerable to overfitting. 
# To decide how good our model is, we need to use the testRDD dataset.

In [None]:
# TODO: Replace <FILL IN> with appropriate code
# train a model, using the trainingRDD, bestRank from previous part

myModel = <FILL IN>
testForPredictingRDD = testRDD.<FILL IN>
predictedTestRDD = myModel.<FILL IN>

testRMSE = computeError(testRDD, predictedTestRDD)

print 'The model had a RMSE on the test set of %s' % testRMSE

In [None]:
##################################################################################################################
# 5 -  TESTER LE MODEL: PREDIR DES FILMS POUR UN UTILISATEUR
##################################################################################################################

In [None]:
# 5.1: Choose your movies:
# To help you provide ratings for yourself, we have included the following code to list the names and movie IDs 
# of the 50 highest-rated movies from movieLimitedAndSortedByRatingRDD which we created in part 1 the lab.
print 'Most rated movies:'
print '(average rating, movie name, number of reviews)'
for ratingsTuple in movieLimitedAndSortedByRatingRDD.take(10):
    print ratingsTuple



In [None]:
# The user ID 0 is unassigned, so we will use it for your ratings. 
# We set the variable myUserID to 0 for you. Next, create a new RDD myRatingsRDD with your ratings for 
# at least 10 movie ratings. 
# Each entry should be formatted as (myUserID, movieID, rating) (i.e., each entry should be formatted in the same way as trainingRDD). 
# As in the original dataset, ratings should be between 1 and 5 (inclusive). If you have not seen at least 10 of these movies, 
# you can increase the parameter passed to take() in the above cell until there are 10 movies that you have seen 
# (or you can also guess what your rating would be for movies you have not seen).

In [None]:
# TODO: Replace <FILL IN> with appropriate code

# 5.2 - Create your dataSet:
myUserID = 0

# Note that the movie IDs are the *last* number on each line. A common error was to use the number of ratings as the movie ID.
myRatedMovies = [
     <FILL IN>
    ]
     # The format of each line is (myUserID, movie ID, your rating)
     # For example, to give the movie "Star Wars: Episode IV - A New Hope (1977)" a five rating, you would add the following line:


myRatingsRDD = sc.parallelize(myRatedMovies)
print 'My movie ratings: %s' % myRatingsRDD.take(10)

In [None]:
# TODO: Replace <FILL IN> with appropriate code
# 5.3 - Add Your Movies to Training Dataset
# Spark's union() transformation combines two RDDs; use union() to create a new training dataset that includes your ratings and the data in the original training dataset.¶
trainingWithMyRatingsRDD = myRatingsRDD.<FILL IN>

print ('The training dataset now has %s more entries than the original training dataset' %
       (trainingWithMyRatingsRDD.count() - trainingRDD.count()))

In [None]:
# TODO: Replace <FILL IN> with appropriate code
# 5.4- Train a Model with Your Ratings and the parameters you used previously for the best model 

myRatingsModel = ALS.train(trainingWithMyRatingsRDD, bestRank, <FILL IN>)

In [None]:
# 5.5- Check RMSE for the New Model with Your Ratings

predictedTestMyRatingsRDD = myRatingsModel.predictAll(testForPredictingRDD)
testRMSEMyRatings = computeError(testRDD, predictedTestMyRatingsRDD)
print 'The model had a RMSE on the test set of %s' % testRMSEMyRatings


In [None]:
# 5.6 - Predict your rating: We predict your rating for films you did not see

In [None]:
# TODO: Replace <FILL IN> with appropriate code

# Get an RDD with only the movies I did not rate Then run predictAll()
MyUnratedMoviesRDD = (moviesRDD
                      .<FILL IN>)
                     
predictedRDD = myRatingsModel.<FILL IN>

In [None]:
# 5.7 - Display the predicted rating: 

In [None]:
movieCountsRDD = movieIDsWithAverageRatingsRDD.map(lambda (movie_id, (ratings, average)):(movie_id, ratings))

In [None]:
# Transform predictedRatingsRDD into an RDD with entries that are pairs of the form (Movie ID, Predicted Rating)
predictedRDD = predictedRatingsRDD.map(lambda (uid, movie_id, rating): (movie_id, rating))

In [None]:
# TODO: Replace <FILL IN> with appropriate code
# Use RDD transformations with predictedRDD and movieCountsRDD to yield an RDD with tuples of the form (Movie ID, (Predicted Rating, number of ratings))
predictedWithCountsRDD = ((predictedRDD.<FILL IN>))

In [None]:
# TODO: Replace <FILL IN> with appropriate code
# Use RDD transformations with PredictedWithCountsRDD and moviesRDD to yield an RDD with tuples of the form (Predicted Rating, Movie Name, number of ratings), for movies with more than 75 ratings
ratingsWithNamesRDD = (predictedWithCountsRDD
                        .<FILL IN>)

In [None]:
predictedHighestRatedMovies = ratingsWithNamesRDD.takeOrdered(10, key=lambda x: -x[0])
print ('My highest rated movies as predicted (for movies with more than 75 reviews):\n%s' %
        '\n'.join(map(str, predictedHighestRatedMovies)))