# Movie Recommender
### This tutorial was adapted from here: https://www.codementor.io/@jadianes/building-a-recommender-with-apache-spark-python-example-app-part1-du1083qbw

### Download the ml-latest-small.zip file from here: https://grouplens.org/datasets/movielens/  We will use the smaller dataset since we are running on our own personal machines.

In [None]:
# Import the necesary libraries
import pandas as pd
import os
import math
from pyspark import SparkContext
sc = SparkContext()
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
from pyspark.sql import Row

In [None]:
# Enter path for the ratings.csv file
ratings_file = ('')

In [None]:
# Import the rating.csv file as an RDD
ratings_raw_data = sc.textFile(ratings_file)

In [None]:
# Take the header of the ratings_raw_data
ratings_raw_data_header = ratings_raw_data.take(1)[0]

In [None]:
# View the header
ratings_raw_data_header

In [None]:
# Tolkenize the UserID, MovieID, and Rating
ratings_data = ratings_raw_data.filter(lambda line: line!=ratings_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),int(tokens[1]),float(tokens[2]))).cache()

In [None]:
# Take the first 3 lines of the rating data.  UserID, MovieID, and Rating
ratings_data.take(3)

In [None]:
# Same process as with the ratings but applied to movies.  
# Enter path for the movie.csv file
movies_file = ('')
# Import the movie.csv file as an RDD
movies_raw_data = sc.textFile(movies_file)
# Take the header of the ratings_raw_data
movies_raw_data_header = movies_raw_data.take(1)[0]
# Tolkenize MovieID, Title, and Generes
movies_data = movies_raw_data.filter(lambda line: line!=movies_raw_data_header)\
    .map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()
# Print the header
print(movies_raw_data_header)
# Take the first 3 lines of the movies_data
movies_data.take(3)

In [None]:
#Split the ratings_data into 60% training, 20% validataion, and 20% test
training_RDD, validation_RDD, test_RDD = ratings_data.randomSplit([6, 2, 2], seed=0)

# Take only the UserID and MovieID from the validataion and test data.
validation_for_predict_RDD = validation_RDD.map(lambda x: (x[0], x[1]))
test_for_predict_RDD = test_RDD.map(lambda x: (x[0], x[1]))

In [None]:
# View the first 3 lines of the validataion and test data
print(validation_for_predict_RDD.take(3))
print(test_for_predict_RDD.take(3))

In [None]:
# Build an alternating least squares model evaluating different rank parameters
seed = 5
iterations = 10
regularization_parameter = 0.1
ranks = [2, 3, 4, 5, 10]
errors = [0, 0, 0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1
for rank in ranks:
    model = ALS.train(training_RDD, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
    rates_and_preds = validation_RDD.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    errors[err] = error
    err += 1
    print("For rank %s the RMSE is %s" % (rank, error))
    if error < min_error:
        min_error = error
        best_rank = rank

print("The best model was trained with rank %s" % best_rank)

In [None]:
# Apply the model to the test data
model = ALS.train(training_RDD, best_rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
predictions = model.predictAll(test_for_predict_RDD).map(lambda r: ((r[0], r[1]), r[2]))
rates_and_preds = test_RDD.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
error = math.sqrt(rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean())
    
print('For testing data the RMSE is %s' % (error))

## Now we have a model that can predict movies based on other users movie prefences.  Lets apply the model to your personal movie ratings to recommend which movies you would like.

In [None]:
# View a table of the movies 
pd.options.display.max_rows = 10000
movie_list = pd.DataFrame(movies_data.collect())
movie_list

In [None]:
# Search for a sepecific movie
movie_list[movie_list[1].str.contains('', case = False)]

In [None]:
# Add your movies ratings to the list

# Your  UserID
new_user_ID = 0

# The format of each line is (userID, movieID, rating)
# For example:
# new_user_ratings = [(userID, movieID, rating),(0,1732,5),(0,50,4)]
new_user_ratings = []
new_user_ratings_RDD = sc.parallelize(new_user_ratings)

In [None]:
# Add your ratings to the ratings data
complete_data_with_new_ratings_RDD = ratings_data.union(new_user_ratings_RDD)

In [None]:
# Apply the ALS model to the new data with your ratings added
new_ratings_model = ALS.train(complete_data_with_new_ratings_RDD, best_rank, seed=seed, 
                              iterations=iterations, lambda_=regularization_parameter)

In [None]:
# Get your movie recommendations from the new model
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings)
new_user_unrated_movies_RDD = (movies_data.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))
new_user_recommendations_RDD = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

In [None]:
# Take the movie titles
complete_movies_titles = movies_data.map(lambda x: (int(x[0]),x[1]))

In [None]:
# Avearge all UserID ratings for each movie.
def get_counts_and_averages(ID_and_ratings_tuple):
    nratings = len(ID_and_ratings_tuple[1])
    return ID_and_ratings_tuple[0], (nratings, float(sum(x for x in ID_and_ratings_tuple[1]))/nratings)

movie_ID_with_ratings_RDD = ratings_data.map(lambda x: (x[1], x[2])).groupByKey()
movie_ID_with_avg_ratings_RDD = movie_ID_with_ratings_RDD.map(get_counts_and_averages)
movie_rating_counts_RDD = movie_ID_with_avg_ratings_RDD.map(lambda x: (x[0], x[1][0]))

In [None]:
# Get your new movie recommendations, titles, and average user ratings.
new_user_recommendations_rating_RDD = new_user_recommendations_RDD.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_RDD.join(complete_movies_titles).join(movie_rating_counts_RDD)
new_user_recommendations_rating_title_and_count_RDD = \
    new_user_recommendations_rating_title_and_count_RDD.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))

In [None]:
# Show your top 25 movies with more thatn 100 reviews from and the avearge UserID rating score
top_movies = new_user_recommendations_rating_title_and_count_RDD.filter(lambda r: r[2]>=100).takeOrdered(25, key=lambda x: -x[1])

print ('TOP recommended movies (with more than 100 reviews):\n%s' %
        '\n'.join(map(str, top_movies)))