# Movie Recommender using Map-Reduce

## 1. Movie Recommender based on genre 

**Step-1:  Read data and data proccessing**

Read data from ratings.csv and movie.csv then create corresponding RDDs.

From the files, each line in the ratings dataset (ratings.csv) is formatted as: userId,movieId,rating,timestamp.
Each line in the movies (movies.csv) dataset is formatted as: movieId,title,genres.

To load data into RDDs, we can use Python split() to parse each line in the dataset and yield two RDDs:
For each line in the ratings dataset, we can create a tuple of (MovieID, (UserID, Rating)).The data in the column of timestamp is dropped because it will not be needed for this recommender. 
For each line in the movies dataset, we can create a tuple of (MovieID, (Title,Genres)). 
To simplify the data processing, we can also filter out the header of each file when loadind data into RDDs.

In [2]:
rating = sc.textFile("ml-latest-small/ratings.csv")
ratingHeader = rating.take(1)[0]
#print(ratingHeader)
ratingRDD = rating.filter(lambda line: line!=ratingHeader).map(lambda line: line.split(","))\
.map(lambda line: (int(line[1]),(int(line[0]),float(line[2])))).cache()
#ratingRDD.take(10)

In [90]:
movie = sc.textFile("ml-latest-small/movies.csv")
movieHeader = movie.take(1)[0]
#print(movieHeader)
movieRDD = movie.filter(lambda line: line!=movieHeader).map(lambda line: line.split(","))\
.map(lambda line: (int(line[0]),(line[1],line[2].split("|")))).cache()
#movieRDD.take(15)

Then join the ratingRDD and movieRDD to create a new RDD with each element in the format of a tuple of 
(MovieID, UserID, Rating, Title, Genres).

In [69]:
newRDD = ratingRDD.join(movieRDD).map(lambda x:(x[0],x[1][0][0],x[1][0][1],x[1][1][0],x[1][1][1]))
newRDD.take(10)

[(1172, 1, 4.0, 'Cinema Paradiso (Nuovo cinema Paradiso) (1989)', ['Drama']),
 (1172, 23, 5.0, 'Cinema Paradiso (Nuovo cinema Paradiso) (1989)', ['Drama']),
 (1172, 38, 4.5, 'Cinema Paradiso (Nuovo cinema Paradiso) (1989)', ['Drama']),
 (1172, 56, 2.0, 'Cinema Paradiso (Nuovo cinema Paradiso) (1989)', ['Drama']),
 (1172, 94, 3.5, 'Cinema Paradiso (Nuovo cinema Paradiso) (1989)', ['Drama']),
 (1172, 102, 4.0, 'Cinema Paradiso (Nuovo cinema Paradiso) (1989)', ['Drama']),
 (1172, 119, 4.0, 'Cinema Paradiso (Nuovo cinema Paradiso) (1989)', ['Drama']),
 (1172, 130, 4.0, 'Cinema Paradiso (Nuovo cinema Paradiso) (1989)', ['Drama']),
 (1172, 133, 5.0, 'Cinema Paradiso (Nuovo cinema Paradiso) (1989)', ['Drama']),
 (1172, 148, 5.0, 'Cinema Paradiso (Nuovo cinema Paradiso) (1989)', ['Drama'])]

**Step-2: Use the RDDs generated to make the recommendation and give the input to test the method**

To make the recommendations based on genre, we can firstly find out the genre of the given movie and the other users who have also liked the given movie (movieId). Once these users are known we can get the other movies they have also rated. With the genre of the given movie, we can further narrow the filed of other movies other users have also rated into other movies with the same genre of the given movie the other users have also rated. Then aggregate the rating of the other movies we have found and give the top 5 rated movies. 

Give the constant input as follows:

In [70]:
givenMovieID= 31
givenUserID = 1
avgRating = 3.0

Find the genre of the given movie.

In [71]:
genre = movieRDD.filter(lambda x:x[0]==givenMovieID).collect()
givenGenre = genre[0][1][1]
givenGenre 

['Drama']

Find the other users who have also liked the given movie. Assuming that user liked a movie can be quantified as user giving a rating of more than avgRating to the given movie.

In [72]:
watch = ratingRDD.filter(lambda x:x[0]==givenMovieID and x[1][0]!=givenUserID and x[1][1] > avgRating).map(lambda x:x[1][0])
otheruser = watch.collect()
print(otheruser)

[31, 32, 73, 110, 111, 165, 242, 325, 341, 485, 487, 496, 511, 516, 525, 607, 641]


Make the recommendation. Get the RDD of other movies with the same genre of the given movie the other users have also rated. Then map the RDD into a new RDD with each element in the format of a tuple of (Title, Rating). Aggregating the rating of each movie(title) in the new RDD by reduceByKey method. Finally use the takeOrdered method to sort the results and get the result in the format of a tuple of (Title, Aggregate Rating). 

In [73]:
mvRDD = newRDD.filter(lambda x: x[0]!=givenMovieID and x[1] != givenUserID and x[1] in otheruser and len(list(set(givenGenre).intersection(set(x[4]))))>0).cache()
topMovies = mvRDD.map(lambda x:(x[3], x[2])).reduceByKey(lambda x, y: x + y).takeOrdered(5, key=lambda x: -x[1])
print(topMovies)

[('Forrest Gump (1994)', 52.0), ('Pulp Fiction (1994)', 51.5), ('Braveheart (1995)', 46.0), ('Dances with Wolves (1990)', 43.0), ('Apollo 13 (1995)', 42.5)]


## 2. Movie Recommender based on tags

**Step-1:  Read data and data proccessing**

Read data from ratings.csv and movie.csv then create corresponding RDDs.

From the files, each line in the ratings dataset (ratings.csv) is formatted as: userId,movieId,rating,timestamp.
Each line in the movies (movies.csv) dataset is formatted as: movieId,title,genres.
Each line in the tags (tags.csv) dataset is formatted as: userId,movieId,tag,timestamp.

To load data into RDDs, we can use Python split() to parse each line in the dataset and yield three RDDs:
For each line in the ratings dataset, we can create a tuple of (MovieID, (UserID, Rating)).The data in the column of timestamp is dropped because it will not be needed for this recommender. 
For each line in the movies dataset, we can create a tuple of (MovieID, (Title,Genres)). 
For each line in the tags dataset, we can create a tuple of (MovieID, Tag).The data in the column of timestamp and userId are dropped because it will not be needed for this recommender. 
To simplify the data processing, we can also filter out the header of each file when loadind data into RDDs.

In [76]:
rating = sc.textFile("ml-latest-small/ratings.csv")
ratingHeader = rating.take(1)[0]
#print(ratingHeader)
ratingRDD = rating.filter(lambda line: line!=ratingHeader).map(lambda line: line.split(","))\
.map(lambda line: [int(line[1]),(int(line[0]),float(line[2]))]).cache()
#ratingRDD.take(10)

In [77]:
movie = sc.textFile("ml-latest-small/movies.csv")
movieHeader = movie.take(1)[0]
#print(movieHeader)
movieRDD = movie.filter(lambda line: line!=movieHeader).map(lambda line: line.split(","))\
.map(lambda line: (int(line[0]),(line[1],line[2].split("|")))).cache()
#movieRDD.take(15)

In [88]:
tag = sc.textFile("ml-latest-small/tags.csv")
tagHeader = tag.take(1)[0]
#print(movieHeader)
tagRDD = tag.filter(lambda line: line!=tagHeader).map(lambda line: line.split(","))\
.map(lambda line: (int(line[1]),[line[2]])).cache()
#tagRDD.take(10)

Then join the ratingRDD and movieRDD to create a new RDD with each element in the format of a tuple of (MovieID, UserID, Rating, Title, Genres).

In [79]:
newRDD = ratingRDD.join(movieRDD).map(lambda x:(x[0],x[1][0][0],x[1][0][1],x[1][1][0],x[1][1][1]))
newRDD.take(10)

[(1172, 1, 4.0, 'Cinema Paradiso (Nuovo cinema Paradiso) (1989)', ['Drama']),
 (1172, 23, 5.0, 'Cinema Paradiso (Nuovo cinema Paradiso) (1989)', ['Drama']),
 (1172, 38, 4.5, 'Cinema Paradiso (Nuovo cinema Paradiso) (1989)', ['Drama']),
 (1172, 56, 2.0, 'Cinema Paradiso (Nuovo cinema Paradiso) (1989)', ['Drama']),
 (1172, 94, 3.5, 'Cinema Paradiso (Nuovo cinema Paradiso) (1989)', ['Drama']),
 (1172, 102, 4.0, 'Cinema Paradiso (Nuovo cinema Paradiso) (1989)', ['Drama']),
 (1172, 119, 4.0, 'Cinema Paradiso (Nuovo cinema Paradiso) (1989)', ['Drama']),
 (1172, 130, 4.0, 'Cinema Paradiso (Nuovo cinema Paradiso) (1989)', ['Drama']),
 (1172, 133, 5.0, 'Cinema Paradiso (Nuovo cinema Paradiso) (1989)', ['Drama']),
 (1172, 148, 5.0, 'Cinema Paradiso (Nuovo cinema Paradiso) (1989)', ['Drama'])]

**Step-2: Use the RDDs generated to make the recommendation and give the input to test the method**

To make the recommendations based on tags, we can firstly find out the tag of the given movie and the other users who have also liked the given movie (movieId). Once these users are known we can get the other movies they have also rated. With the tag of the given movie, we can further narrow the filed of other movies other users have also rated into other movies with the same tag of the given movie the other users have also rated. Then aggregate the rating of the other movies we have found and give the top 5 rated movies. 

Give the constant input as follows:

In [80]:
givenMovieID= 1199
givenUserID = 77
avgRating = 3.0

Find the tags of the given movie.

In [83]:
tagcontent = tagRDD.filter(lambda x:x[0]==givenMovieID).collect()
#print(tagcontent)
givenTag = tagcontent[0][1]
print(givenTag)

['Trilogy of the Imagination']


Find other movies with the same tag of the given movie.

In [84]:
othermovie = tagRDD.filter(lambda x:len(list(set(givenTag).intersection(set(x[1]))))>0).map(lambda x:x[0]).collect()
print(othermovie)

[1199, 2968, 4467]


Find other users who have also liked the given movie.

In [85]:
otheruser = ratingRDD.filter(lambda x:x[0]==givenMovieID and x[1][0]!=givenUserID and x[1][1] > avgRating).map(lambda x:x[1][0]).collect()
print(otheruser)

[17, 19, 21, 23, 45, 73, 81, 103, 105, 118, 119, 134, 147, 167, 177, 183, 185, 198, 214, 236, 270, 274, 285, 297, 304, 337, 346, 363, 378, 388, 423, 430, 433, 450, 468, 472, 502, 509, 514, 519, 537, 544, 547, 564, 577, 580, 596, 597, 642, 664, 667]


Make the recommendation. Get the RDD of other movies with the same tag of the given movie the other users have also rated. Then map the RDD into a new RDD with each element in the format of a tuple of (Title, Rating). Aggregating the rating of each movie(title) in the new RDD by reduceByKey method. Finally use the takeOrdered method to sort the results and get the result in the format of a tuple of (Title, Aggregate Rating). 

In [87]:
mvRDD = newRDD.filter(lambda x: x[0]!=givenMovieID and x[1] in otheruser and x[0] in othermovie )\
.map(lambda x:(x[3], x[2])).reduceByKey(lambda x, y: x + y)
topMovies = mvRDD.takeOrdered(5, key=lambda x: -x[1])
print(topMovies)

[('Time Bandits (1981)', 42.5), ('"Adventures of Baron Munchausen', 23.5)]
