In [None]:
The Movie Recommender works as follows:

1. For a viewer - find the movies he/she has liked

2. For a given combination of movies(m1,m2), find the number of common viewers and avg rating of the m2 amongst the common viewers

3. Convert the above combination to:
   m1 ----> list of (movie,commonusers,avgrating)
       3.1) Sort the above list by number of commonusers first, and then by descending avg rating
       
4. For a movie m1 : Pick the movies from above list that is from the same genre as user's prefered genre - which is movie m1's genre in this case.




In [1]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *
import itertools


In [3]:
PATH_TO_DATA = "./data/ml-latest-small"

# 1. load ratings from ratings.csv
ratings = spark.read.csv(PATH_TO_DATA + "/ratings.csv", header=True, inferSchema=True)
#ratings is a DataFrame. For Spark 2.+ we need to explicitly convert DataFrame to RDD.
#ratings = ratings.rdd.map()

#Cache the DataFrame into memory, as we will be using it many times
ratings.cache()  #TODO: Think about to cache or not to cache

print("Number of ratings: %i" % ratings.count())
print("Sample of ratings:")
#ratings
ratings.take(3)

Number of ratings: 100004
Sample of ratings:


[Row(userId=1, movieId=31, rating=2.5, timestamp=1260759144),
 Row(userId=1, movieId=1029, rating=3.0, timestamp=1260759179),
 Row(userId=1, movieId=1061, rating=3.0, timestamp=1260759182)]

In [4]:
 #load movie specific raw data from CSV. This will be used at the end when we are trying to map the movieIds against movienames
raw_movies = spark.read.csv(PATH_TO_DATA + "/movies.csv", header=True, inferSchema=True)
#print("Raw movie data:")
#raw_movies.show(5, truncate=False)
# raw_movies is a data frame

#convert this to raw_movies_RDD by using map
raw_movies_rdd = raw_movies.rdd.map(lambda x: ((x["movieId"]),(x["title"],x["genres"].split("|"))))
raw_movies_rdd.take(5)
#raw_movies_rdd.lookup(57)
#raw_movies_rdd.collect()



[(1,
  ('Toy Story (1995)',
   ['Adventure', 'Animation', 'Children', 'Comedy', 'Fantasy'])),
 (2, ('Jumanji (1995)', ['Adventure', 'Children', 'Fantasy'])),
 (3, ('Grumpier Old Men (1995)', ['Comedy', 'Romance'])),
 (4, ('Waiting to Exhale (1995)', ['Comedy', 'Drama', 'Romance'])),
 (5, ('Father of the Bride Part II (1995)', ['Comedy']))]

In [5]:
#MAPREDUCE 1 - Map1 converts the ratings data into (key,value)===> (userId, (movieId,ratings,timestamp))
#do this only if the movie has a rating higher than 3.0 : given in the question)

ratings_tuples_rdd = ratings.rdd.map(lambda x: (x["userId"], (x["movieId"],x["rating"])) )
ratings_tuples_rdd_filtered = ratings_tuples_rdd.filter(lambda x: x[1][1]>=3.0)

#ratings_tuples_rdd.lookup(536)
#ratings_tuples_rdd_filtered.take(1)
#ratings_tuples_rdd_filtered.lookup(1)
#ratings_tuples_rdd_filtered.collect()


In [6]:
#MAPREDUCE 1 - Reduce1 collects the above map by key and created a consolidated RDD - 
# userId ===> [list of tuples of(movieId,ratings,timestamp)] 

user_to_movie_ratings_tuples = ratings_tuples_rdd_filtered.groupByKey().mapValues(list)
user_to_movie_ratings_tuples.take(1)
#user_to_movie_ratings_tuples.collect()


[(1,
  [(1029, 3.0),
   (1061, 3.0),
   (1172, 4.0),
   (1339, 3.5),
   (1953, 4.0),
   (2105, 4.0),
   (2150, 3.0),
   (3671, 3.0)])]

In [36]:
#Drop the userId completely, and create combinations of any 2 common movies by a user
#MAPREDUCE 2 - Map2 Convert : [userId, [list of tuples of(movieId,ratings,timestamp)]] ===> 
#key (movie1,movie2) ; value (ratingA,ratingB)

blah = user_to_movie_ratings_tuples.map(lambda x: (list(itertools.combinations(x[1], 2))))
y = blah.flatMap(lambda m: ((val[0],val[1]) for val in m))
#(y.take(2))[0][1][0]

# RDD y is a list of lists. [ [(row1comb1),(row1comb2)] , [(row2comb1),(row2comb2)] ]
#For every list element (which has 1 element always) in RDD y , get the combination ((movie1,movie2),(rating1,rating2))
#Note that we have completely dropped the user key from RDD user_to_movie_ratings_tuples. 
#This was needed because  we need the possible combinations of movies that 1 particular user has watched , and keep track of it

#per_user_movie_to_rating_tuples is (M1,M2) ===> [(Ra1,Ra2),(Rb1,Rb2),(Rc1,Rc2)....]
per_user_movie_to_rating_tuples = y.map(lambda m:((m[0][0],m[1][0]),(m[0][1],m[1][1])))
per_user_movie_to_rating_tuples.take(2)
          

[((1029, 1061), (3.0, 3.0)), ((1029, 1172), (3.0, 4.0))]

In [37]:
#MAPREDUCE - Reduce2 Collects the data from above map by key(M1,M2), and creates a consolidates list of ratings.
#(M1,M2) ===> [(Ra1,Ra2),(Rb1,Rb2),(Rc1,Rc2)....]

movie_to_ratings_tuples_list = per_user_movie_to_rating_tuples.groupByKey().mapValues(list)
movie_to_ratings_tuples_list.take(1)

[((1721, 1884),
  [(4.5, 4.0),
   (3.0, 4.5),
   (4.0, 4.5),
   (3.0, 3.5),
   (4.0, 5.0),
   (4.0, 4.0),
   (4.0, 3.0),
   (3.5, 3.5),
   (4.0, 4.5),
   (3.0, 3.0)])]

In [40]:
#MAPREDUCE : Map3 : Convert (movie1,movie2) ===> [list of tuples of ratings] ----TO---
#(movie1) =====> movie2,(no.ofcommonviewersm1m2),(avg-rating of movie2 across common users)
movie_to_avg_ratings = movie_to_ratings_tuples_list.map(lambda x: ((x[0][0]),(x[0][1],len(x[1]),(sum(val[1] for val in x[1]))/len(x[1]))))
movie_to_avg_ratings.take(5)
#movie_to_avg_ratings.lookup(57)

#MAPREDUCE : Reduce3 : Collects data from above map and groups it by key to create a list 
#(movie1)====> [(movie2,(commonviewersm1m2),(avgrating)),(movie3,(commonviewersm1m3),(avgrating)) ......]
movie_to_avg_ratings_by_key = movie_to_avg_ratings.groupByKey().mapValues(list)
movie_to_avg_ratings_by_key.take(3)



[(539,
  [(2053, 3, 3.3333333333333335),
   (4621, 5, 3.4),
   (575, 5, 3.8),
   (2221, 1, 4.0),
   (8865, 3, 3.8333333333333335),
   (4543, 1, 3.0),
   (127728, 1, 5.0),
   (1097, 44, 4.0),
   (3498, 6, 4.0),
   (56921, 1, 3.5),
   (2751, 2, 3.5),
   (1177, 6, 4.0),
   (3251, 1, 4.0),
   (2916, 26, 3.673076923076923),
   (8183, 2, 3.75),
   (2123, 3, 3.6666666666666665),
   (4969, 3, 3.8333333333333335),
   (4221, 2, 3.5),
   (34359, 1, 3.5),
   (4573, 1, 4.0),
   (55451, 2, 4.75),
   (66665, 1, 4.0),
   (6104, 1, 4.0),
   (3340, 1, 3.5),
   (41571, 1, 4.0),
   (26199, 1, 4.0),
   (2429, 4, 3.75),
   (1744, 2, 3.5),
   (7981, 1, 4.0),
   (27812, 1, 4.5),
   (71404, 1, 4.0),
   (4403, 1, 3.0),
   (8761, 1, 3.5),
   (1389, 1, 3.0),
   (3914, 1, 5.0),
   (1523, 1, 4.0),
   (4306, 30, 4.233333333333333),
   (6377, 28, 3.982142857142857),
   (7387, 2, 3.25),
   (6484, 1, 3.5),
   (1919, 2, 4.0),
   (7831, 1, 3.0),
   (851, 6, 3.1666666666666665),
   (2316, 6, 3.5),
   (50245, 1, 4.5),
   (

In [69]:
#Option1: Order the RDD by number of common users first (descending) & then by descending order of avg ratings
movie_to_avg_ratings_by_key_order1 = movie_to_avg_ratings_by_key.map(lambda a: ((a[0]),(sorted(a[1],key=lambda s:(s[1],s[2]),reverse=True))))
#movie_to_avg_ratings_by_key_order1.take(1)

In [46]:
#Option2 : Order the RDD by aggregate of common users * avgratings
#movie_to_avg_ratings_by_key_order2 = movie_to_avg_ratings_by_key.map(lambda a: ((a[0]),(sorted(a[1],key=lambda s:(s[1]*s[2]),reverse=True))))
#movie_to_avg_ratings_by_key_order2.take(3)

In [90]:
def getRecommendedMovies(givenMovieId,givenUserId,avgRating):
    #Get the list of recommended movie for givenMovieId
    givenmovie_genres = raw_movies_rdd.lookup(givenMovieId)[0][1];
    print("You have watched movie : ",givenMovieId, " ~#~ " ,raw_movies_rdd.lookup(givenMovieId)[0][0],"   from genres: ",givenmovie_genres)
    
    data_recommended_movies_ids = (movie_to_avg_ratings_by_key_order1.lookup(givenMovieId))[0]
    
    print("\nRecommended movies for this movie are: \n")
    i = 0
    
    for data in data_recommended_movies_ids:
        recommend_movie_genres = (raw_movies_rdd.lookup(data[0]))[0][1]
        #check if the genres have a matching
        if ((set(givenmovie_genres) == set(recommend_movie_genres))):
            print(data[0]," ~#~ ",(raw_movies_rdd.lookup(data[0]))[0][0]," =====> ",recommend_movie_genres)
            i=i+1
        if (i==5):
            break
    

In [91]:
### ------- TESTING THE CODE ------------takes about 5-6secs to display the result ###
getRecommendedMovies(10,1,3.0)

You have watched movie :  10  ~#~  GoldenEye (1995)    from genres:  ['Action', 'Adventure', 'Thriller']

Recommended movies for this movie are: 

434  ~#~  Cliffhanger (1993)  =====>  ['Action', 'Adventure', 'Thriller']
733  ~#~  Rock, The (1996)  =====>  ['Action', 'Adventure', 'Thriller']
95  ~#~  Broken Arrow (1996)  =====>  ['Action', 'Adventure', 'Thriller']
1610  ~#~  Hunt for Red October, The (1990)  =====>  ['Action', 'Adventure', 'Thriller']
1722  ~#~  Tomorrow Never Dies (1997)  =====>  ['Action', 'Adventure', 'Thriller']


In [92]:
### ------- TESTING THE CODE ------------takes about 5-6secs to display the result ###
getRecommendedMovies(57,1,3.0)

You have watched movie :  57  ~#~  Home for the Holidays (1995)    from genres:  ['Drama']

Recommended movies for this movie are: 

261  ~#~  Little Women (1994)  =====>  ['Drama']
337  ~#~  What's Eating Gilbert Grape (1993)  =====>  ['Drama']
1207  ~#~  To Kill a Mockingbird (1962)  =====>  ['Drama']
1193  ~#~  One Flew Over the Cuckoo's Nest (1975)  =====>  ['Drama']
300  ~#~  Quiz Show (1994)  =====>  ['Drama']


In [93]:
### ------- TESTING THE CODE ------------takes about 5-6secs to display the result ###
getRecommendedMovies(1,1,3.0)

You have watched movie :  1  ~#~  Toy Story (1995)    from genres:  ['Adventure', 'Animation', 'Children', 'Comedy', 'Fantasy']

Recommended movies for this movie are: 

3114  ~#~  Toy Story 2 (1999)  =====>  ['Adventure', 'Animation', 'Children', 'Comedy', 'Fantasy']
4886  ~#~  Monsters, Inc. (2001)  =====>  ['Adventure', 'Animation', 'Children', 'Comedy', 'Fantasy']
2294  ~#~  Antz (1998)  =====>  ['Adventure', 'Animation', 'Children', 'Comedy', 'Fantasy']
4016  ~#~  Emperor's New Groove, The (2000)  =====>  ['Adventure', 'Animation', 'Children', 'Comedy', 'Fantasy']
53121  ~#~  Shrek the Third (2007)  =====>  ['Adventure', 'Animation', 'Children', 'Comedy', 'Fantasy']
