# Movie Recommender System

In this project we try to build a recommendation system for a movie ratings dataset. GroupLens Research has collected and made available rating data sets from the MovieLens [website](http://movielens.org). The data sets were collected over various periods of time, depending on the size of the set. In this example we will be using both the small dataset around 100k ratings as well as 22M ratings which include 580,000 tag applications applied to 33,000 movies by 240,000 users. We will show how [collaborative filtering](https://en.wikipedia.org/wiki/Collaborative_filtering) can be applied to the smaller dataset using Python and pandas. Then we will apply the same technique to the full 22M ratings dataset using Apache Spark's SparkSQL. 

Collaborative filtering can be applied as item-based or user-based. The basic idea behind it is to have all your items (in case of item-based) as vectors of numbers (the user ratings in this case). Users who liked the same movie that they both watched tend to like the other movies also. So, all we need to do is to calculate the correlation between movies based on their ratings. Movies that show close correlation will be the candidates to recommend to the user if they haven't seen it. So the user can be asked to name some favorite movies and then we can return the most correlated movies as recommendations for that user. Most of the job here is just to construct the sparse matrix of user to movie with values as ratings. The rest will be as easy as applying a linear correlation method like Pearson or even monotonic correlation metrics like Spearman. I had good results with both. 

Let's load the data and take a look at the format:

In [1]:
import pandas as pd
import numpy as np

ratings = pd.read_csv(r'\DataSets\ml-latest-small\ratings.csv', \
                      usecols = ['userId', 'rating', 'movieId'])
movies = pd.read_csv(r'\DataSets\ml-latest-small\movies.csv', \
                     usecols = ['title', 'movieId'])


In [2]:
movies.head()

Unnamed: 0,movieId,title
0,1,Toy Story (1995)
1,2,Jumanji (1995)
2,3,Grumpier Old Men (1995)
3,4,Waiting to Exhale (1995)
4,5,Father of the Bride Part II (1995)


In [3]:
ratings.head()

Unnamed: 0,userId,movieId,rating
0,1,16,4.0
1,1,24,1.5
2,1,32,4.0
3,1,47,4.0
4,1,50,4.0


We can merge the two DataFrames and make a single DataFrame with all the columns we need:

In [5]:
movieratings = ratings.merge(movies)
del movieratings['movieId'], ratings, movies # to free up memory
movieratings.head()

Unnamed: 0,userId,rating,title
0,1,4.0,Casino (1995)
1,9,4.0,Casino (1995)
2,12,1.5,Casino (1995)
3,24,4.0,Casino (1995)
4,29,3.0,Casino (1995)


For collaborative filtering it is convenient to construct a user-movie matrix with the ratings as values. This can be done by pivoting the movie titles column with userId index:


In [6]:
pivot = pd.pivot_table(movieratings, index='userId', columns='title', values='rating')
pivot.head()

title,'71 (2014),'Hellboy': The Seeds of Creation (2004),'Round Midnight (1986),'Til There Was You (1997),"'burbs, The (1989)",'night Mother (1986),(500) Days of Summer (2009),*batteries not included (1987),...And Justice for All (1979),10 (1979),...,[REC] (2007),[REC]² (2009),[REC]³ 3 Génesis (2012),a/k/a Tommy Chong (2005),eXistenZ (1999),loudQUIETloud: A Film About the Pixies (2006),xXx (2002),xXx: State of the Union (2005),¡Three Amigos! (1986),À nous la liberté (Freedom for Us) (1931)
userId,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
1,,,,,,,,,,,...,,,,,,,,,,
2,,,,,,,,,,,...,,,,,,,,,,
3,,,,,,,,,,,...,,,,,,,,,,
4,,,,,,,,,,,...,,,,,,,,,,
5,,,,,,,,,,,...,,,,,,,,,,


As you can see this is a very Sparse matrix. However it is in good format to analyze the correlation between user ratings. 

In [8]:
movie_corrs = pivot.corr()

Having the correlation between all user ratings for each movie we can find the most relevant movie to any sample movie. Since everybody is crazy about Star Wars we try to find the similar movies to Star Wars:

In [9]:
StarWars_corr = movie_corrs['Star Wars: Episode V - The Empire Strikes Back (1980)'].dropna()
StarWars_corr.sort_values(ascending=False, inplace=True)
StarWars_corr.head()

title
New York Stories (1989)      1.0
'Til There Was You (1997)    1.0
Coffy (1973)                 1.0
Sum of Us, The (1994)        1.0
Another Earth (2011)         1.0
Name: Star Wars: Episode V - The Empire Strikes Back (1980), dtype: float64

It seems we were not very successful in finding the similar movies! When you think about why this happened. You might realized that we gave the same weight for every users opinion in calculating the correlation between movies. We did include even people who rated two movies the StarWars and a movie that nobody has even heard of! 

As Thrasymachus says, <I>"I declare justice is nothing but the advantage of the stronger"</I> 

We should filter out all those weaklings who skewed our results without adding any actual value for our analysis. So let's go back and calculate the correlations but this time we consider people who have at least watched 40 movies and rated them. 

In [10]:
movie_corrs = pivot.corr(min_periods=40)

Now let's find the similar movies to StarWars movie again:

In [11]:
StarWars_corr = movie_corrs['Star Wars: Episode V - The Empire Strikes Back (1980)'].dropna()
StarWars_corr.sort_values(ascending=False, inplace=True)
StarWars_corr.head()

title
Star Wars: Episode V - The Empire Strikes Back (1980)    1.000000
Star Wars: Episode IV - A New Hope (1977)                0.725627
Star Wars: Episode VI - Return of the Jedi (1983)        0.642772
Animal House (1978)                                      0.620702
Die Hard (1988)                                          0.546658
Name: Star Wars: Episode V - The Empire Strikes Back (1980), dtype: float64

Now we have much better results. 

## Movie Recommender using Spark

To be able to apply the same technique to a much larger dataset we are going to use Apache Spark to distribute the task to parallel processes. This can be executed on a Hadoop cluster but for now we will just run this on our local machine. Using the full power of all cores we can canculate the correlations between movie ratings efficiently. SparkSQL provides SQL-like functionaly but to load the data properly we are going to use some other packages. We will use Spark-CSV in perticular to load the data from our CSV files and return a DataFrame object very much like the syntax we used with pandas. 

To be able to use Spark CSV we need to make sure we start the Spark Context with --packages com.databricks:spark-csv_2.11:1.4.0 option. To do so we can add this line to spark-defaults.conf

<strong><I>spark.jars.packages com.databricks:spark-csv_2.11:1.4.0</strong></I>

This will automatically download and include the package when loading Spark Context. 


In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

conf = SparkConf()\
        .setAppName("Movie_Recommender")\
        .setMaster("local[*]")\
        .set("spark.driver.memory", "10g")\
        .set("spark.driver.maxResultSize", "4g")

sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)


movies=sqlContext.read.format("com.databricks.spark.csv")\
                        .options(header="true", inferSchema="true")\
                        .load(r'\DataSets\ml-latest-small\movies.csv');

ratings=sqlContext.read.format("com.databricks.spark.csv")\
                        .options(header="true", inferSchema="true")\
                        .load(r'\DataSets\ml-latest-small\ratings.csv');
    
strTofloat = udf(lambda x: float(x), FloatType())
    
movies = movies[['movieId','title']]
ratings = ratings.select('userId', 'movieId', 'rating')


Note that <I>inferSchema</I> was set true when reading the CSV to infer the right format for the data. Otherwise everything would have read as string. Later on we will apply .corr() which only operates on numbers and having all of them as strings would cause problem. We can convert them to numbers later if we want but it's not the right way. So we just enable the automatic schema inference. 

In [2]:
movies.show(5)

+-------+--------------------+
|movieId|               title|
+-------+--------------------+
|      1|    Toy Story (1995)|
|      2|      Jumanji (1995)|
|      3|Grumpier Old Men ...|
|      4|Waiting to Exhale...|
|      5|Father of the Bri...|
+-------+--------------------+
only showing top 5 rows



In [3]:
ratings.show(5)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|     16|   4.0|
|     1|     24|   1.5|
|     1|     32|   4.0|
|     1|     47|   4.0|
|     1|     50|   4.0|
+------+-------+------+
only showing top 5 rows



In [4]:
movie_ratings = ratings.join(movies, on=['movieId'])
movie_ratings = movie_ratings[['userId', 'rating', 'title']]
movie_ratings.show(5)

+------+------+--------------------+
|userId|rating|               title|
+------+------+--------------------+
|    29|   3.0|Dangerous Minds (...|
|    55|   1.0|Dangerous Minds (...|
|    62|   1.0|Dangerous Minds (...|
|   104|   3.0|Dangerous Minds (...|
|   156|   4.0|Dangerous Minds (...|
+------+------+--------------------+
only showing top 5 rows



In [5]:
movie_ratings.cache()

DataFrame[userId: int, rating: double, title: string]

In [6]:
pivot_table = movie_ratings.groupBy('userId').pivot('title').sum('rating')

In [7]:
first = pivot_table.select(pivot_table.columns[:4])

In [8]:
first.show(5)

+------+----------+---------------------------------------+----------------------+
|userId|'71 (2014)|'Hellboy': The Seeds of Creation (2004)|'Round Midnight (1986)|
+------+----------+---------------------------------------+----------------------+
|   431|      null|                                   null|                  null|
|   631|      null|                                   null|                  null|
|    31|      null|                                   null|                  null|
|   231|      null|                                   null|                  null|
|   432|      null|                                   null|                  null|
+------+----------+---------------------------------------+----------------------+
only showing top 5 rows



In [9]:
rdd = pivot_table.rdd

In [11]:
from pyspark.mllib.stat import Statistics

In [14]:
schemas = pivot_table.schema