# A Movie Recommendation Service
### Source: https://www.codementor.io/spark/tutorial/building-a-recommender-with-apache-spark-python-example-app-part1

#### Create a SparkContext configured for local mode

In [2]:
import pyspark
sc = pyspark.SparkContext('local[*]')

#### File download
Small: 100,000 ratings and 2,488 tag applications applied to 8,570 movies by 706 users. Last updated 4/2015.   
Full: 21,000,000 ratings and 470,000 tag applications applied to 27,000 movies by 230,000 users. Last updated 4/2015.

In [3]:
small_dataset_url = 'http://files.grouplens.org/datasets/movielens/ml-latest-small.zip'

#### Download location(s)

In [4]:
import os
datasets_path = os.path.join('/home/jovyan', 'work')
small_dataset_path = os.path.join(datasets_path, 'ml-latest-small.zip')

#### Getting file(s)

In [5]:
import urllib.request
small_f = urllib.request.urlretrieve (small_dataset_url, small_dataset_path)

#### Extracting file(s)

In [6]:
import zipfile

with zipfile.ZipFile(small_dataset_path, "r") as z:
    z.extractall(datasets_path)

## Loading and parsing datasets
Now we are ready to read in each of the files and create an RDD consisting of parsed lines. 

Each line in the ratings dataset (ratings.csv) is formatted as: 
+ userId,movieId,rating,timestamp 

Each line in the movies (movies.csv) dataset is formatted as:
+ movieId,title,genres 

The format of these files is uniform and simple, so we can use Python split() to parse their lines once they are loaded into RDDs. Parsing the movies and ratings files yields two RDDs: 
+ For each line in the ratings dataset, we create a tuple of (UserID, MovieID, Rating). We drop the timestamp because we do not need it for this recommender.
+ For each line in the movies dataset, we create a tuple of (MovieID, Title). We drop the genres because we do not use them for this recommender.

#### ratings.csv

In [7]:
complete_ratings_file = os.path.join(datasets_path, 'ml-latest-small', 'ratings.csv')
complete_ratings_raw_data = sc.textFile(complete_ratings_file)
complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0]
# Parse
complete_ratings_data = complete_ratings_raw_data.filter(lambda line: line!=complete_ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()

print ('There are {} recommendations in the complete dataset'.format(complete_ratings_data.count()))
complete_ratings_data.take(3)

There are 100836 recommendations in the complete dataset


[(1, 1, 4.0), (1, 3, 4.0), (1, 6, 4.0)]

#### movies.csv

In [8]:
# Load the small dataset file
complete_movies_file = os.path.join(datasets_path, 'ml-latest-small', 'movies.csv')
complete_movies_raw_data = sc.textFile(complete_movies_file)
complete_movies_raw_data_header = complete_movies_raw_data.take(1)[0]

# Parse
complete_movies_data = complete_movies_raw_data.filter(lambda line: line!=complete_movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),tokens[1],tokens[2])).cache()

complete_movies_titles = complete_movies_data.map(lambda x: (int(x[0]),x[1]))
print ('There are {} movies in the complete dataset'.format(complete_movies_titles.count()))
complete_movies_data.take(3)

There are 9742 movies in the complete dataset


[(1, 'Toy Story (1995)', 'Adventure|Animation|Children|Comedy|Fantasy'),
 (2, 'Jumanji (1995)', 'Adventure|Children|Fantasy'),
 (3, 'Grumpier Old Men (1995)', 'Comedy|Romance')]

## Collaborative Filtering
In Collaborative filtering we make predictions (filtering) about the interests of a user by collecting preferences or taste information from many users (collaborating). The underlying assumption is that if a user A has the same opinion as a user B on an issue, A is more likely to have B's opinion on a different issue x than to have the opinion on x of a user chosen randomly. 

At first, people rate different items (like videos, images, games). Then, the system makes predictions about a user's rating for an item not rated yet. The new predictions are built upon the existing ratings of other users with similar ratings with the active user. In the image, the system predicts that the user will not like the video.

Spark MLlib library for Machine Learning provides a Collaborative Filtering implementation by using Alternating Least Squares. The implementation in MLlib has the following parameters:

+ numBlocks is the number of blocks used to parallelize computation (set to -1 to auto-configure).
+ rank is the number of latent factors in the model.
+ iterations is the number of iterations to run.
+ lambda specifies the regularization parameter in ALS.
+ implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data.
+ alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations.

#### Selecting ALS parameters using the small dataset
In order to determine the best ALS parameters, we will use the small dataset. We need first to split it into train, validation, and test datasets.

In [9]:
# source uses see=0L, which is the previous version of python (2.x)
# 0L should be written as 0 from now on
training_RDD, validation_RDD, test_RDD = complete_ratings_data.randomSplit([6, 2, 2], seed=0)
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

#### Training phase

In [10]:
from pyspark.mllib.recommendation import ALS
import math

seed = 5
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1

for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print ('For rank {} the RMSE is {}'.format(rank, error))
    if error < min_error:
        min_error = error
        best_rank = rank

print ('The best model was trained with rank {}'.format(best_rank))

For rank 4 the RMSE is 0.908078105265682
For rank 8 the RMSE is 0.916462973348527
For rank 12 the RMSE is 0.917665030756129
The best model was trained with rank 4


## Using the complete dataset to build the final model
Due to the limitations of virtual machine, we keep using the small dataset instead of complete dataset

We need first to split it into training and test datasets.

In [11]:
training_RDD, test_RDD = complete_ratings_data.randomSplit([7, 3], seed=0)

complete_model = ALS.train(training_RDD, best_rank, seed=seed, \
                           iterations=iterations, lambda_=regularization_parameter)

Now we test on our testing set.

In [12]:
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

predictions = complete_model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((int(r[0]), int(r[1])), float(r[2]))).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())

print ('For testing data the RMSE is {}'.format(error))

For testing data the RMSE is 0.8949959237223808


## How to make recommendations
Although we aim at building an online movie recommender, now that we know how to have our recommender model ready, we can give it a try providing some movie recommendations. This will help us coding the recommending engine later on when building the web service, and will explain how to use the model in any other circumstances.

When using collaborative filtering, getting recommendations is not as simple as predicting for the new entries using a previously generated model. Instead, we need to train again the model but including the new user preferences in order to compare them with other users in the dataset. That is, the recommender needs to be trained every time we have new user ratings (although a single model can be used by multiple users of course!). This makes the process expensive, and it is one of the reasons why scalability is a problem (and Spark a solution!). Once we have our model trained, we can reuse it to obtain top recomendations for a given user or an individual rating for a particular movie. These are less costly operations than training the model itself.

Another thing we want to do, is give recommendations of movies with a certain minimum number of ratings. For that, we need to count the number of ratings per movie.

In [13]:
def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

movie_ID_with_ratings_RDD = (complete_ratings_data.map(lambda x: (x[1], x[2])).groupByKey())
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))

### Adding new user ratings
Now we need to rate some movies for the new user. We will put them in a new RDD and we will use the user ID 0, that is not assigned in the MovieLens dataset. Check the dataset movies file for ID to Tittle assignment (so you know what movies are you actually rating).

In [35]:
new_user_ID = 0

# The format of each line is (userID, movieID, rating)

# ###################################################
# Keep the userID, but Replace movieID, rating, title
# ###################################################

# Find 10 movies you have watched in the past
# Put your OWN ratings

new_user_ratings = [
     (0,260,4), # Star Wars (1977)
     (0,1,3), # Toy Story (1995)
     (0,16,3), # Casino (1995)
     (0,25,4), # Leaving Las Vegas (1995)
     (0,32,4), # Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
     (0,335,1), # Flintstones, The (1994)
     (0,379,1), # Timecop (1994)
     (0,296,3), # Pulp Fiction (1994)
     (0,858,5), # Godfather, The (1972)
     (0,50,4) # Usual Suspects, The (1995)
    ]

# # Scenario 1: 10 movies you have watched with ratings 3 or lower.
# new_user_ratings = [
#      (0,184053,2), # Battle Planet (2008)
#      (0,121231,3), # It Follows (2014)
#      (0,137857,3), # The Jungle Book (2016)
#      (0,160571,3), # Lights Out (2016)
#      (0,31424,2), # Alone in the Dark (2005)
#      (0,26764,2), # Captain America (1990)
#      (0,31698,2), # Son of the Mask (2005)
#      (0,63239,3), # Cinderella (1997)
#      (0,54736,2), # Kingdom, The (2007)
#      (0,80363,3) # Resident Evil: Afterlife (2010)
#     ]

# # Scenario 2: 10 movies you have watched with ratings 3 or higher.
# new_user_ratings = [
#      (0,74458,4), # Shutter Island (2010)
#      (0,377,4), # Speed (1994)
#      (0,480,5), # Jurassic Park (1993)
#      (0,588,5), # Aladdin (1992)
#      (0,1370,4), # Die Hard 2 (1990)
#      (0,2273,4), # Rush Hour (1998)
#      (0,2985,5), # RoboCop (1987)
#      (0,89745,4), # Avengers, The (2012)
#      (0,100498,4), # Good Day to Die Hard, A (2013)
#      (0,104913,5) # Rush (2013)
#     ]

# # Scenario 3: 10 movies you have watched with ratings balanced/mixed with lower and higher ratings.
# new_user_ratings = [
#      (0,6872,2), # House of the Dead, The (2003)
#      (0,356,4), # Forrest Gump (1994)
#      (0,72998,5), # Avatar (2009)
#      (0,184053,2), # Battle Planet (2008)
#      (0,73017,5), # Sherlock Holmes (2009)
#      (0,170827,4), # The Mummy (2017)
#      (0,137857,3), # The Jungle Book (2016)
#      (0,63992,4), # Twilight (2008)
#      (0,121231,3), # It Follows (2014)
#      (0,7842,5) # Dune (2000)
#     ]

new_user_ratings_RDD = sc.parallelize(new_user_ratings)
print ('New user ratings: {}'.format(new_user_ratings_RDD.take(10)))

New user ratings: [(0, 260, 4), (0, 1, 3), (0, 16, 3), (0, 25, 4), (0, 32, 4), (0, 335, 1), (0, 379, 1), (0, 296, 3), (0, 858, 5), (0, 50, 4)]


Now we add them to the data we will use to train our recommender model. We use Spark's union() transformation for this.

In [36]:
complete_data_with_new_ratings_RDD = complete_ratings_data.union(new_user_ratings_RDD)

And finally we train the ALS model using all the parameters we selected before (when using the small dataset).

In [37]:
from time import time

t0 = time()
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed,
                              iterations=iterations, lambda_=regularization_parameter)
tt = time() - t0

print ('New model trained in {} seconds'.format(round(tt,3)))

New model trained in 2.091 seconds


## Getting top recommendations
Let's now get some recommendations! For that we will get an RDD with all the movies the new user hasn't rated yet. We will them together with the model to predict ratings.

In [38]:
# new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # original version: get just movie IDs
new_user_ratings_ids = list(map(lambda x: x[1], new_user_ratings)) # fixed version: get just movie IDs
# keep just those not on the ID list (thanks Lei Li for spotting the error!)
new_user_unrated_movies_RDD = (complete_movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

We have our recommendations ready. Now we can print out the 25 movies with the highest predicted ratings. And join them with the movies RDD to get the titles, and ratings count in order to get movies with a minimum number of counts. First we will do the join and see what does the result looks like.

In [39]:
# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)
new_user_recommendations_rating_title_and_count_RDD.take(3)

[(81132, ((2.955288581807401, 'Rubber (2010)'), 2)),
 (60408,
  ((2.636138164903529,
    "Welcome to the Sticks (Bienvenue chez les Ch'tis) (2008)"),
   2)),
 (204, ((0.7783204206395666, 'Under Siege 2: Dark Territory (1995)'), 30))]

So we need to flat this down a bit in order to have (Title, Rating, Ratings Count).

In [40]:
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

Finally, get the highest rated recommendations for the new user, filtering out movies with less than 25 ratings.

In [41]:
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=25).takeOrdered(25, key=lambda x: -x[1])

print ('TOP recommended movies (with more than 25 reviews):\n{}'.format('\n'.join(map(str, top_movies))))

TOP recommended movies (with more than 25 reviews):
('"Producers', 4.413715167197228, 33)
('Citizen Kane (1941)', 4.330399918478984, 69)
('12 Angry Men (1957)', 4.286009440418237, 57)
('Birdman: Or (The Unexpected Virtue of Ignorance) (2014)', 4.246380665024124, 26)
('Strangers on a Train (1951)', 4.227688009723412, 25)
('"Philadelphia Story', 4.22428406396692, 29)
('"Boot', 4.223205521286304, 40)
('Apocalypse Now (1979)', 4.208841648856371, 107)
('"Bridge on the River Kwai', 4.191475180447089, 45)
('Dr. Strangelove or: How I Learned to Stop Worrying and Love the Bomb (1964)', 4.177194898317937, 97)
('Chinatown (1974)', 4.148454095199036, 59)
('Raging Bull (1980)', 4.102537660743968, 40)
('"Maltese Falcon', 4.087109344558157, 44)
('Seven Samurai (Shichinin no samurai) (1954)', 4.084639969466412, 48)
('"Godfather: Part II', 4.073786810835896, 129)
('"African Queen', 4.062454498618735, 34)
('Lawrence of Arabia (1962)', 4.055775606343244, 45)
('Rear Window (1954)', 4.023139250841821, 84)


## Getting individual ratings
Another useful usecase is getting the predicted rating for a particular movie for a given user. The process is similar to the previous retreival of top recommendations but, instead of using predcitAll with every single movie the user hasn't rated yet, we will just pass the method a single entry with the movie we want to predict the rating for.

In [20]:
my_movie = sc.parallelize([(0, 500)]) # Quiz Show (1994)
# individual_movie_rating_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)
individual_movie_rating_RDD = new_ratings_model.predictAll(my_movie)
individual_movie_rating_RDD.take(1)

[Rating(user=0, product=500, rating=2.231213731088749)]

### Which scenario works best with collaborative filtering? Why does this happen?

In this case, Scenario 2 (S2) is likely to yield the best results with collaborative filtering. The reason behind this lies in the positive ratings given by the user for the movies in S2. Collaborative filtering relies on identifying similar users with shared preferences in order to make recommendations. When a user rates movies with higher ratings (3 or higher in this case), it indicates a level of enjoyment for those movies. As a result, collaborative filtering can leverage this positive feedback to find other users who have also enjoyed those movies and recommend similar ones.

On the other hand, in S1, the user has provided ratings of 3 or lower for the movies. This suggests a lower level of satisfaction or enjoyment. Collaborative filtering may not be as effective in this scenario because finding users who share similar disliking for movies may not lead to valuable recommendations. It becomes challenging to identify patterns or preferences among users who have rated movies poorly, making it harder for collaborative filtering to generate relevant suggestions.

S3 presents a mixed scenario with movies having both lower and higher ratings. While collaborative filtering can still provide recommendations based on users who have similar preferences for some movies, the varying ratings create a more complex situation. The mixed ratings make it difficult to determine a clear pattern of user preference, as the user's tastes seem to be more nuanced and diverse. Collaborative filtering might struggle to capture the user's specific preferences and offer accurate recommendations in this scenario.

### How would you design your movie streaming service (user interfaces with interactions) to get customerâ€™s input which improves your recommendation engine better in movie searching and selection? (as if you are running a service like Netflix).

To design a movie streaming service that gathers customer input to  provide a more personalized, engaging, and accurate recommendation experience for users, ultimately enhancing their movie searching and selection process, I would focus on creating intuitive and interactive user interfaces by incorporating the following:

__1. Customizable Watchlists:__ Allowing users to create and customize their own watchlists by adding movies they are interested in or want to watch in the future. This helps the recommendation engine understand individual preferences and tailor recommendations accordingly.

__2. Behavior-based Insights:__ Analyzing behavior-based insights such as viewing duration, rewatches, or skipping certain parts of a movie. These indicators provide insights into user engagement and preferences, even without explicit ratings or feedback.

__3. Tailored Recommendations:__ Incorporating factors such as time of day, day of the week, or user location to offer more relevant movie recommendations. For example, suggesting feel-good movies on a rainy Sunday or action-packed blockbusters on a Friday night.

__4. Smart Notification System:__ Implementing a smart notification system that provides personalized movie recommendations based on user preferences, recent activity, or upcoming releases. These notifications can be delivered via email, mobile push notifications, or within the streaming service itself.

__5. Integration of Social Media:__ Enabling users to connect their social media accounts to the streaming service and leverage their social graph for recommendations. This can involve analyzing movie-related interactions, such as likes, comments, or shares, within a user's social network to generate relevant suggestions.

__6. Integration of External Data:__ Incorporating external data sources, such as IMDb or Rotten Tomatoes, as well as user-generated reviews, to enhance the recommendation engine. This integration of external data provides a more comprehensive understanding of movie preferences and enhances the accuracy of the recommendations.


## Highest rated recommendations for the new user for filtering out movies with less than 25 ratings

### Scenario 1: Ratings 3 or lower.
new_user_ratings = [      
     (0,184053,2), # Battle Planet (2008)       
     (0,121231,3), # It Follows (2014)       
     (0,137857,3), # The Jungle Book (2016)     
     (0,160571,3), # Lights Out (2016)    
     (0,31424,2), # Alone in the Dark (2005)    
     (0,26764,2), # Captain America (1990)     
     (0,31698,2), # Son of the Mask (2005)     
     (0,63239,3), # Cinderella (1997)     
     (0,54736,2), # Kingdom, The (2007)     
     (0,80363,3) # Resident Evil: Afterlife (2010)    
    ]
    
__TOP recommended movies (with more than 25 reviews):__    
('"Philadelphia Story', 3.850701867294048, 29)    
('"Great Escape', 3.8348713561114205, 43)    
('Strangers on a Train (1951)', 3.815493575994728, 25)   
('Wallace & Gromit: The Best of Aardman Animation (1996)', 3.7840759682946583, 27)    
('North by Northwest (1959)', 3.7796561504979183, 57)    
('12 Angry Men (1957)', 3.744007120841002, 57)    
('"Bridge on the River Kwai', 3.715152622023407, 45)    
("Schindler's List (1993)", 3.711461379616278, 220)    
('My Fair Lady (1964)', 3.692902756460889, 35)   
('Casablanca (1942)', 3.6863994950228127, 100)    

### Scenario 2: Ratings 3 or higher.
new_user_ratings = [     
     (0,74458,4), # Shutter Island (2010)   
     (0,377,4), # Speed (1994)   
     (0,480,5), # Jurassic Park (1993)   
     (0,588,5), # Aladdin (1992)   
     (0,1370,4), # Die Hard 2 (1990)   
     (0,2273,4), # Rush Hour (1998)   
     (0,2985,5), # RoboCop (1987)   
     (0,89745,4), # Avengers, The (2012)   
     (0,100498,4), # Good Day to Die Hard, A (2013)   
     (0,104913,5) # Rush (2013)    
    ]  

__TOP recommended movies (with more than 25 reviews):__   
('Wallace & Gromit: The Best of Aardman Animation (1996)', 5.696815116013485, 27)    
('"Shawshank Redemption', 5.449061598666422, 317)   
('Star Wars: Episode IV - A New Hope (1977)', 5.447083750979968, 251)   
('"Philadelphia Story', 5.419370814607872, 29)   
("Schindler's List (1993)", 5.414948453801893, 220)   
('Star Wars: Episode V - The Empire Strikes Back (1980)', 5.413575222293608, 211)   
('Patton (1970)', 5.406907859479367, 33)    
('Amadeus (1984)', 5.396351700058654, 76)   
('"Great Escape', 5.343802719149474, 43)   
('In the Name of the Father (1993)', 5.332485525626174, 25)   

### Scenario 3: Ratings balanced/mixed with lower and higher ratings.
new_user_ratings = [    
     (0,6872,2), # House of the Dead, The (2003)   
     (0,356,4), # Forrest Gump (1994)   
     (0,72998,5), # Avatar (2009)   
     (0,184053,2), # Battle Planet (2008)   
     (0,73017,5), # Sherlock Holmes (2009)   
     (0,170827,4), # The Mummy (2017)   
     (0,137857,3), # The Jungle Book (2016)   
     (0,63992,4), # Twilight (2008)   
     (0,121231,3), # It Follows (2014)   
     (0,7842,5) # Dune (2000)  
    ]   

__TOP recommended movies (with more than 25 reviews):__   
('Rogue One: A Star Wars Story (2016)', 4.783242753450173, 27)  
('"Girl with the Dragon Tattoo', 4.769455986638891, 25)   
('Into the Wild (2007)', 4.742420649714853, 41)   
('Remember the Titans (2000)', 4.731893662059123, 41)  
('Cinema Paradiso (Nuovo cinema Paradiso) (1989)', 4.711145647382537, 34) 
('Intouchables (2011)', 4.703097785583718, 37)  
('Juno (2007)', 4.697014975616241, 65)  
('Serenity (2005)', 4.695402034838794, 50)  
('"Patriot', 4.67426324735465, 68)  
('Shooter (2007)', 4.663796388753088, 25)  