# Movie Recommender using MapReduce

In [1]:
import re

from pyspark.sql.types import StringType
from pyspark import SQLContext
sqlContext = SQLContext(sc)

given_userid  = int(input('Enter userId: '))
given_movieid = int(input('Enter movieId: '))
given_rating  = float(input('Enter rating: '))

movies = sqlContext.read.load('movies.csv', format='com.databricks.spark.csv', header='true', inferSchema='true')
ratings = sqlContext.read.load('ratings.csv', format='com.databricks.spark.csv', header='true', inferSchema='true')

given_genres = movies[movies.movieId.isin(given_movieid)].collect()[0][2].split('|')

# function identifying the release year of the movie based on the title
def release_year(title):
    a = re.split(r'[()-]', title)
    for i in range(len(a)):
        if a[i].isdigit() is True:
            b = int(a[i])
    return b

# release year of the given movie
title = movies[movies.movieId.isin(given_movieid)].collect()[0][1]
given_release_year = release_year(title)

# relevant movie year search range
year_range = list(range(given_release_year-4, given_release_year+6))
relevant_years = [str(x) for x in year_range]

y_movies = movies.where(movies.title.contains(relevant_years[0]) | \
                        movies.title.contains(relevant_years[1]) | \
                        movies.title.contains(relevant_years[2]) | \
                        movies.title.contains(relevant_years[3]) | \
                        movies.title.contains(relevant_years[4]) | \
                        movies.title.contains(relevant_years[5]) | \
                        movies.title.contains(relevant_years[6]) | \
                        movies.title.contains(relevant_years[7]) | \
                        movies.title.contains(relevant_years[8]) | \
                        movies.title.contains(relevant_years[9]) )

seen_movies = ratings.filter(ratings.userId == given_userid).select('movieId')
yu_movies = y_movies.join(seen_movies, ["movieId"], "leftanti")

movies_n_ratings = ratings.join(movies, ratings.movieId == movies.movieId)  
m_n_r = movies_n_ratings.drop('userId', 'movieId', 'timestamp')
m_n_r = m_n_r.selectExpr('rating as rating', 'title as title', 'genres as genre')

list_of_movies = m_n_r.join(yu_movies, ["title"], "inner").drop('movieId','genres')

# Function defining relevance between a movie's genres and the given movie's genres
def genre_relevance(genre):
    common_g = set(given_genres)&set(genre)
    different_g = set(given_genres)^set(genre)
    return len(common_g)/len(given_genres) - len(different_g)/(len(given_genres))

movies_rdd = list_of_movies.rdd.map(lambda x:((x[0],genre_relevance(x[2].split('|'))),(x[1])))
movies_rdd2 = movies_rdd.reduceByKey(lambda x,y:x+y)
flatMappedRDD = movies_rdd2.map(lambda x: (x[0][0], x[0][1], x[1]))
result_list = flatMappedRDD.sortBy(lambda a: ( -a[1], -a[2]))
top5 = result_list.map(lambda x: x[0])

print('-----------')
print('Since you watched ' + title + ', check also these relevant movies:')
print(*top5.take(5), sep='\n')

Enter userId: 7
Enter movieId: 1
Enter rating: 3
-----------
Since you watched Toy Story (1995), check also these relevant movies:
Toy Story 2 (1999)
Antz (1998)
Emperor's New Groove, The (2000)
Adventures of Rocky and Bullwinkle, The (2000)
Space Jam (1996)
