# Second Exercise: Cosine similarity for movie comparison

In this exercise you have to implement in a python notebook using the spark framework:

1. The distributed (map/reduce) algorithm of slide "3.7" (in notebook "8-Item-to-Items-globalfiltering-recommenders-py3-sshow.ipynb") for computing the cosine similarity of a set of products with negative and positive ratings, using as input information an RDD (or spark dataframe that is also distributed) with ratings with this format:

     (userID,movieID,rating)

2. The computation of the Cosine Similarity (with the previous algorithm) of all the pairs of movies from the different files you have with this exercise:
  filtered50movies.csv filtered100movies.csv  filtered150movies.csv   filtered200movies.csv

Each file contains ratings for a different set of movies, but the ones in a smaller file
are always a subset of a file with bigger size. We provide files with different size
in case you have some memory issues in your computer, so use the biggest file you are able to use, although during "testing" of your code you can of course use the smallest file, or even any smaller subset of the file filtered50movies.csv.

3. Show on the screen the information for the "top 10" most similar pairs, but using the
name of the movies you can find in the file movies.

All the steps should be implemented always with map/reduce operations with spark RDDs/dataframes. Except the last step, when you have to find the name of the movies in the top-ten recommendations.

Present your notebook with plenty of comments in all your functions.

NOTE: The ratings for movies come from a dataset obtained from the smallest dataset from:
https://grouplens.org/datasets/movielens/
But the ratings have been re-scaled from the range [0,5] to the range [-3,2.5]

In [158]:
import pyspark
import os
import psutil
from itertools import combinations
import math

In [None]:

# make sure pyspark tells workers to use python3 not 2 if both are installed\n",
os.environ["PYSPARK_PYTHON"] = "python3"
spark_home = os.environ.get('SPARK_HOME', None)
print ( spark_home )
sc = pyspark.SparkContext('local[*]')
sc

First, i'm going to add some lines to charge the different files, I'm not gonna charge all of them because I want as much memory free as I can to perform the biggest computation possible with my PC specifications. **Select and run the cell of the dataaset that you want to execute the code with!**

In [192]:
movies = sc.textFile("filtered50movies.csv")

In [174]:
movies = sc.textFile("filtered100movies.csv")

In [180]:
movies = sc.textFile("filtered150movies.csv")

In [207]:
movies = sc.textFile("filtered200movies.csv")

In [204]:
movies = sc.textFile("movies.csv")

In [208]:
movies = movies.map(lambda x: x.split(',')).collect()
moviesRDD = sc.parallelize(movies[1:])

In [209]:
moviesRDD.take(5)

[['1', '1', '1.5'],
 ['5', '1', '1.5'],
 ['7', '1', '2.0'],
 ['15', '1', '-0.5'],
 ['17', '1', '2.0']]

Now let's create the pairs to obtain the startid DataStructure containing the pairs. 
$$  (u,p_1,r_1),(u,p_2,r_2) $$
To do it we will group by user, create all possible combinations of pairs and flat the result in a single list.

In [210]:
moviePairsByUserRDD = moviesRDD \
.groupBy(lambda x: x[0]).map(lambda x : list(combinations(list(x[1]),2))).flatMap(lambda x: x)

In [211]:
moviePairsByUserRDD.take(5)



[(['44', '1', '0.5'], ['44', '3', '0.5']),
 (['44', '1', '0.5'], ['44', '6', '0.5']),
 (['44', '1', '0.5'], ['44', '260', '2.5']),
 (['44', '1', '0.5'], ['44', '648', '0.5']),
 (['44', '1', '0.5'], ['44', '661', '0.5'])]

Now we can apply the transformation:
$$  (u,p_1,r_1),(u,p_2,r_2) \rightarrow ((p_1,p_2),(r_1 r_2,r_1^2,r_2^2) ) $$

In [212]:
movieRatingComputations = moviePairsByUserRDD \
    .map(lambda x: ((x[0][1], x[1][1]), (float(x[0][2])*float(x[1][2]),float(x[0][2])**2, float(x[1][2])**2) ))
movieRatingComputations.take(5)

[(('1', '3'), (0.25, 0.25, 0.25)),
 (('1', '6'), (0.25, 0.25, 0.25)),
 (('1', '260'), (1.25, 0.25, 6.25)),
 (('1', '648'), (0.25, 0.25, 0.25)),
 (('1', '661'), (0.25, 0.25, 0.25))]

Now we can apply the reduction:
$$ ((p_1,p_2),(pra_{1,2},ra_1^2,ra_2^2) ) + ((p_1,p_2),(prb_{1,2},rb_1^2,rb_2^2) ) 
   \rightarrow \\  ((p_1,p_2),( pra_{1,2}+prb_{1,2}, ra_1^2+rb_1^2,
   ra_2^2+rb_2^2) ) $$
To do so it's used the reduceByKey operation.

In [213]:
movieRatingComputationsReduced = movieRatingComputations.reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1], x[2]+y[2]))
movieRatingComputationsReduced.take(5)

[(('1', '736'), (98.0, 225.25, 132.5)),
 (('3', '260'), (30.5, 48.75, 134.75)),
 (('6', '260'), (194.25, 186.25, 296.0)),
 (('648', '1552'), (34.5, 90.0, 72.5)),
 (('733', '1552'), (31.5, 69.25, 55.5))]

Finally, we compute the cosine distance like:
    
$$ ((p_1,p_2),(\sum_u r_1 r_2,\sum_u r_1^2,\sum_u r_2^2) ) \rightarrow 
\frac{\sum_u r_1 r_2}{\sqrt{\sum_u r_1^2} \sqrt{\sum_u r_2^2}}  $$ 

This is going to be done with a simple map operation


In [214]:
cosine_similarity = movieRatingComputationsReduced \
    .map(lambda x: (x[0], x[1][0]/(math.sqrt(x[1][1])*math.sqrt(x[1][2]))))
cosine_similarity.take(5)

[(('1', '736'), 0.5672646709965458),
 (('3', '260'), 0.37631206489056224),
 (('6', '260'), 0.8273076329051288),
 (('648', '1552'), 0.42709927780721924),
 (('733', '1552'), 0.5081058245382075)]

Now we are going to sort the results and show the top 10 more similar movies.

In [215]:
cosine_similarity.sortBy(lambda x: x[1], ascending=False).take(10)

[(('151', '441'), 1.0),
 (('362', '2616'), 0.9922778767136677),
 (('2944', '3034'), 0.9855258295520649),
 (('362', '2949'), 0.9832820049844603),
 (('362', '2478'), 0.9785497849867492),
 (('151', '2580'), 0.9782797401561579),
 (('1805', '3034'), 0.977897823397447),
 (('151', '2470'), 0.9734503756241593),
 (('2018', '2944'), 0.9689627902499088),
 (('362', '2580'), 0.9666666666666666)]

**There we go! Our top 10!**