# Movie Recommender in PySpark

The objective of this notebook is to replicate Nick Pentreath's post: http://mlnick.github.io/blog/2013/04/01/movie-recommendations-and-more-with-spark/ in PySpark.

We will use the same data source: https://grouplens.org/datasets/movielens/ but we will take a smaller data file available.

Fisrt we read the data and select: userId, movieId and rating:

In [4]:
ratings = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/FileStore/tables/ratings.csv")

In [5]:
ratings.printSchema

In [6]:
ratings = ratings.select('userId','movieId','rating')

We will need the number of ratings for each movie, so we group by 'movieId' and count:

In [8]:
numRatersPerMovie = ratings.groupby('movieId').count()
numRatersPerMovie.show()

Now we join the number of ratings by movieId:

In [10]:
ratingsWithSize = ratings.join(numRatersPerMovie, ['movieId'] ,how='left')
ratingsWithSize = ratingsWithSize.selectExpr("movieId as movie", "userId as user", "rating as rating" ,"count as numRaters")
ratingsWithSize.show()

Then we replicate the datframe ratingsWithSize and rename the columns. We will use this new dataframe to join it with itself on the userId to get all the pairs of movies that a user has rated:

In [12]:
ratings2 = ratingsWithSize.selectExpr("movie as movie2", "user as user2", "rating as rating2" ,"numRaters as numRaters2")
ratings2.show()

Now we can make the join on user:

In [14]:
ratingPairs=ratingsWithSize.join(ratings2,ratingsWithSize.user==ratings2.user2)
ratingPairs.show(200)
ratingPairs.count()

The next step is to get rid of all the duplicate movie pairs. For that we use the filter transformation and keep just those rows where movie Id is less than movie Id 2:

In [16]:
ratingPairs = ratingPairs.filter(("movie < movie2"))
ratingPairs.show()
ratingPairs.count()

Now we will create new features:

- dotProduct = rating 1 * rating 2
- ratingSq = (rating 1)**2
- rating2Sq = (rating 2)**2

In [18]:
ratingPairs = ratingPairs.withColumn('dotProduct', ratingPairs.rating * ratingPairs.rating2)
ratingPairs.show()

In [19]:
ratingPairs = ratingPairs.withColumn('ratingSq', ratingPairs.rating**2)
ratingPairs = ratingPairs.withColumn('rating2Sq', ratingPairs.rating2**2)
ratingPairs.show()

In [20]:
ratingPairs.count()

In [21]:
size = ratingPairs.groupby(['movie','movie2']).count()
size.show()

In [22]:
ratingPairs = ratingPairs.alias('a').join(size.alias('b'), (ratingPairs.movie == size.movie) & (ratingPairs.movie2 == size.movie2)).select('a.movie','a.user','a.rating','a.numRaters','a.movie2','a.user2','a.rating2','a.numRaters2','a.dotProduct','a.ratingSq','a.rating2Sq','b.count')
ratingPairs.show()
ratingPairs.count()

At this stage, we have to aggregate by each movie pair (movie,movie2):

In [24]:
from pyspark.sql import functions as F

In [25]:
vectorCalcs = ratingPairs.groupby(['movie','movie2']).agg(F.sum('dotProduct'), F.sum('rating'), F.sum('rating2'), F.sum('ratingSq'), F.sum('rating2Sq'), F.max('numRaters'), F.max('numRaters2'),F.max('count'))
vectorCalcs.show()

We define the correlation function:

In [27]:
from numpy import sqrt

In [28]:
def correlation(size,dotProduct,ratingSum,rating2Sum, ratingNormSq, rating2NormSq):
  numerator = size * dotProduct - ratingSum * rating2Sum
  denominator = sqrt(size * ratingNormSq - ratingSum * ratingSum) * sqrt(size * rating2NormSq - rating2Sum * rating2Sum)*1.0

  return (numerator*1.0/denominator)

In [29]:
PRIOR_COUNT = 10
PRIOR_CORRELATION = 0

def regularizedCorrelation(size, dotProduct, ratingSum, rating2Sum, ratingNormSq, rating2NormSq, virtualCount, priorCorrelation):
  unregularizedCorrelation = correlation(size, dotProduct, ratingSum, rating2Sum, ratingNormSq, rating2NormSq)
  w = size*1.0 / (size + virtualCount)

  return (w * unregularizedCorrelation + (1 - w) * priorCorrelation)

We define de Cosine similarity function:

In [31]:
def cosineSimilarity(dotProduct, ratingNorm, rating2Norm):
  return dotProduct / (sqrt(ratingNorm) * sqrt(rating2Norm))

And we apply the function to our vectors:

In [33]:
similarity=vectorCalcs.rdd.map(lambda x: (x[0],x[1],regularizedCorrelation(x[9],x[2],x[3],x[4],x[5],x[6],PRIOR_COUNT, PRIOR_CORRELATION),cosineSimilarity(x[2],x[5],x[6])))
similarity.take(5)

We save results to start from here:

In [35]:
similarity.saveAsTextFile("/FileStore/tables/similarity3")

Start from here with saved results:

In [37]:
from pyspark import SparkConf, SparkContext

similarity = sc.textFile("/FileStore/tables/similarity3")
similarity.take(5)

We transform our RDD to dataframe:

In [39]:
results = similarity.map(lambda x: (int(x.replace('(','').replace(')','').split(',')[0]),int(x.replace('(','').replace(')','').split(',')[1]),float(x.replace('(','').replace(')','').split(',')[2]),float(x.replace('(','').replace(')','').split(',')[3]))).toDF(['movie','movie2','Reg_correlation','Cos_similarity'])
results.show()

Now we read the movies file to get the movie names:

In [41]:
movies = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/FileStore/tables/movies.csv")

In [42]:
movies.show()

Get the movies names from movies:

In [44]:
results_with_name=results.join(movies.selectExpr("movieId as movie", "title as name"), ['movie'] ,how='left')
results_with_name.show()

In [45]:
results_with_names=results_with_name.join(movies.selectExpr("movieId as movie2", "title as name2"), ['movie2'],how='left')
results_with_names.show()

Finally, we can check some results:

Check the most similar movies to Die Hard (We remove NaNs in Reg_correlation, they are associated to those movie pairs rated just by one person):

In [48]:
results_with_names.where(results_with_names.name=='Die Hard (1988)').sort('Reg_correlation', ascending=False).na.drop().show()

Check the most similar movies to Star Wars Episode IV:

In [50]:
results_with_names.where(results_with_names.name=='Star Wars: Episode IV - A New Hope (1977)').sort('Reg_correlation', ascending=False).na.drop().show()

Top ten most dissimilar to Star Wars:

In [52]:
results_with_names.where(results_with_names.name=='Star Wars: Episode IV - A New Hope (1977)').sort('Reg_correlation', ascending=True).na.drop().show(10)

Example of a NaN Reg_correlation value:

In [54]:
from pyspark.sql.functions import col

In [55]:
vectorCalcs.filter((col("movie")==1036) & (col("movie2")==1980)).show()