# Map Reduce Recommendation Engine

In this lesson, we'll build a cosine similarity recommendation engine that uses the movie lens data set.

### Lesson goals:
* Build the MapReduce cosine similarity recommendation engine from smaller components
* Show the transition of data into each component
* Produce recommendations

### For demonstration purposes, open the data sources and save them locally. 

The original source can be found here;

* https://grouplens.org/datasets/movielens/100k/
* Data: https://files.grouplens.org/datasets/movielens/ml-100k/u.data
* Labels: https://files.grouplens.org/datasets/movielens/ml-100k/u.item

This next cell will show the data structure we're working with.

In [1]:
import requests
data_source = 'https://files.grouplens.org/datasets/movielens/ml-100k/u.data'
labels_source = 'https://files.grouplens.org/datasets/movielens/ml-100k/u.item'
def download_source_file(url,file_name,limit=None):
    f = requests.get(url).text
    lines = f.split('\t')
    with open(file_name, 'w',encoding='utf-8') as outfile:
        lines = '\t'.join(lines)
        if limit:
            lines = lines[:int(len(lines)*limit)] #limit to only a fraction of the data
        outfile.writelines(lines)

def open_data(filename):
    with open(filename) as f:
        lines = f.readlines()
    return lines
        
download_source_file(data_source,'data1.txt')
download_source_file(labels_source,'labels1.txt')


data_txt = open_data('data1.txt')
print('Movie Ratings Data')
print('UserID movieID rating timestamp \n')
for row in data_txt[:5]:
    print(row)

print('\n'*5)

labels_txt = open_data('labels1.txt')
print('MovideID movieName date hyperlink\n')
for label in labels_txt[:5]:
    print(label)

Movie Ratings Data
UserID movieID rating timestamp 

196	242	3	881250949

186	302	3	891717742

22	377	1	878887116

244	51	2	880606923

166	346	1	886397596







MovideID movieName date hyperlink

1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0

2|GoldenEye (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?GoldenEye%20(1995)|0|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0

3|Four Rooms (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Four%20Rooms%20(1995)|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0

4|Get Shorty (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Get%20Shorty%20(1995)|0|1|0|0|0|1|0|0|1|0|0|0|0|0|0|0|0|0|0

5|Copycat (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Copycat%20(1995)|0|0|0|0|0|0|1|0|1|0|0|0|0|0|0|0|1|0|0



# Step 1: Parse the dataset into map reduce
We create key value pairs for ever user and the set of movies they rated (and the ratings they gave them)

The goals of step 1
* read/parse our text document data
* map the data into key value pairs of keys: userID, values:(movieID, rating)
* reduce this mapping into keys: userID, values: `[(movieID1,rating1),(movieID2,rating2),....]`

The following cell will make a `rec_step1.py` file and we can then run it as if we were running it in terminal.

In [2]:
%%file rec_step1.py 
from mrjob.job import MRJob
class rec_step1(MRJob):
    def mapper(self, _, line):
        try:
            (userID, movieID, rating, timestamp) = line.split('\t')
        except:
            (userID, movieID, rating, timestamp) = None,None,0,None
        yield  userID, (movieID, float(rating)) 

    def reducer(self, user_id, itemRatings):
        ratings = []
        for movieID, rating in itemRatings:
            ratings.append((movieID, rating))
        #reducer: for every user_id key, we have a list of ratings in the form [(movieID1,rating1),(movieID2,rating2),...]
        yield user_id, ratings

        
if __name__ == '__main__':
    rec_step1.run()

Overwriting rec_step1.py


Now run `rec_step1.py` as if we were in terminal, on the `data1.txt` file we saved from our data locally. Save the output to `step1output.txt`

In [3]:
%%time
!python rec_step1.py data1.txt > step1output.txt

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /var/folders/hj/xyjyq1ds11n1ycjbp53_4svw0000gp/T/rec_step1.teaching.20230201.010956.958721
Running step 1 of 1...
job output is in /var/folders/hj/xyjyq1ds11n1ycjbp53_4svw0000gp/T/rec_step1.teaching.20230201.010956.958721/output
Streaming final output from /var/folders/hj/xyjyq1ds11n1ycjbp53_4svw0000gp/T/rec_step1.teaching.20230201.010956.958721/output...
Removing temp directory /var/folders/hj/xyjyq1ds11n1ycjbp53_4svw0000gp/T/rec_step1.teaching.20230201.010956.958721...
CPU times: user 21.5 ms, sys: 22.9 ms, total: 44.4 ms
Wall time: 2.03 s


Now lets take a look at the first few lines saved in our `step1output.txt` file, with a little text to describe it's meaning;

In [4]:
for line in open('step1output.txt','r').readlines()[:5]:
    user_id,ratings= line.split('\t')
    user_id = int(user_id.replace('"',''))
    ratings = eval(ratings)[:5]
    print('user_id: ',user_id,'   Ratings: ',ratings,'....')
    print('\n')

user_id:  844    Ratings:  [['22', 4.0], ['403', 3.0], ['195', 3.0], ['90', 3.0], ['423', 3.0]] ....


user_id:  845    Ratings:  [['311', 4.0], ['903', 4.0], ['877', 2.0], ['750', 3.0], ['1592', 3.0]] ....


user_id:  846    Ratings:  [['1074', 3.0], ['94', 4.0], ['627', 4.0], ['57', 2.0], ['377', 2.0]] ....


user_id:  847    Ratings:  [['290', 4.0], ['185', 2.0], ['826', 3.0], ['288', 4.0], ['405', 3.0]] ....


user_id:  848    Ratings:  [['164', 5.0], ['294', 5.0], ['1126', 5.0], ['481', 3.0], ['69', 2.0]] ....




# Step 2: 

Step 2 has several components, and it takes step1 as an input using `step1output.txt`

* Map the line by line input of `(userID, [(movieID1,rating1),(movieID2,rating2),....])` to key-value pairs of
`keys: (movieID1,movieID2), values: (rating1,rating2)`. Do this by getting every combination of movies watched by every unique user.

* For each key-value pair, reduce these values by getting the 
    * count of users that watched both movies
    * cosine similarity for the ratings of both movies;
        * Each movie will have ratings in the form of 
            * `movie1: (user1rating, user2rating, user3rating,...)`
            * `movie2: (user1rating, user2rating, user3rating,...)`
            * treat these as vectors for a dot product for multiplication.
    * The output should be:
        * `keys: (movie1,movie2), values: (cosineSimilarity, numberOfTimesUsersWatchedBoth)`
        * Filter only those with cosineSimilarity>0.95, numberOfTimesUsersWatchedBoth >10

In [5]:
%%file rec_step2.py 
#make this cell a python file
from itertools import combinations
from math import sqrt
from mrjob.job import MRJob

class rec_step2(MRJob):
    def mapper(self, _, line):
        user_id,itemRatings = line.split('\t')
        for itemRating1, itemRating2 in combinations(eval(itemRatings), 2):
            movieID1,rating1 = itemRating1[0],itemRating1[1]
            movieID2,rating2 = itemRating2[0],itemRating2[1]
            yield (movieID1, movieID2), (rating1, rating2)
            yield (movieID2, movieID1), (rating2, rating1)
    def cosine_similarity(self, ratingPairs):
        numPairs = 0
        sum_xx = sum_yy = sum_xy = 0
        for ratingX, ratingY in ratingPairs:
            sum_xx += ratingX * ratingX
            sum_yy += ratingY * ratingY
            sum_xy += ratingX * ratingY
            numPairs += 1

        numerator = sum_xy
        denominator = sqrt(sum_xx) * sqrt(sum_yy)

        score = 0
        if (denominator):
            score = (numerator / (float(denominator)))
        return (score, numPairs)
    
    def reducer(self, moviePair, ratingPairs,minPairs=10,minScore=0.95):
        score, numPairs = self.cosine_similarity(ratingPairs)
        if (numPairs > minPairs and score > minScore):
            yield moviePair, (score, numPairs)

if __name__ == '__main__':
    rec_step2.run()

Overwriting rec_step2.py


Now run `rec_step2.py` on `step1output.txt` and save the file to `step2output.txt` 
* Print the first 10 lines saved to `step2output.txt`

In [6]:
%%time
!python rec_step2.py step1output.txt > step2output.txt
for line in open('step2output.txt','r').readlines()[:10]:
    print(line)

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /var/folders/hj/xyjyq1ds11n1ycjbp53_4svw0000gp/T/rec_step2.teaching.20230201.012146.874491
Running step 1 of 1...
job output is in /var/folders/hj/xyjyq1ds11n1ycjbp53_4svw0000gp/T/rec_step2.teaching.20230201.012146.874491/output
Streaming final output from /var/folders/hj/xyjyq1ds11n1ycjbp53_4svw0000gp/T/rec_step2.teaching.20230201.012146.874491/output...
Removing temp directory /var/folders/hj/xyjyq1ds11n1ycjbp53_4svw0000gp/T/rec_step2.teaching.20230201.012146.874491...
["740","132"]	[0.96477212667565,17]

["740","133"]	[0.9549950604770451,19]

["740","134"]	[0.9704274486322576,17]

["740","143"]	[0.9650419442914718,19]

["740","154"]	[0.9598987017177196,15]

["740","155"]	[0.9548872180451126,13]

["740","162"]	[0.9610400702474754,11]

["740","164"]	[0.952830641518021,17]

["740","178"]	[0.9536746412526173,11]

["740","185"]	[0.9538407392261975,19]



# Step 3: sort the similarities, load the movie names and output similarities

* Initialize the mapper using `mapper_init` to open the `labels1.txt` file to give the actual movie names instead of numbers

* Map the output of step 2 to `keys: (movieName1,cosineScore), values: (movieName2, number_users_who_watched_both)`

* Reduce the output of the mapper such that for every `movie1` we get a `(movie2, cosineScore, number_users_who_watched_both)`

In [None]:
%%file rec_step3.py
from mrjob.job import MRJob
import os
class rec_step3(MRJob):
    def mapper_init(self,labelfilename='labels1.txt'):
        # Load database of movie names.
        self.movieNames = {}
        #change the file path to wherever your current main directory is
        filepath = '/Users/teaching/Documents/github/teaching/SaintPeters/DS610/week9'
        filepath = os.path.join(filepath,labelfilename)
        with open(filepath,'r') as f:
            for line in f:
                fields = line.split('|')
                self.movieNames[int(fields[0])] = fields[1]
                
    def mapper(self, _, line):
        moviePair,scores = line.split('\t')
        score, n = eval(scores)
        movie1, movie2 = eval(moviePair)
        yield (self.movieNames[int(movie1)], score),(self.movieNames[int(movie2)], n)
        
    def reducer(self, movieScore, similarN):
        movie1, score = movieScore
        for movie2, n in similarN:
            yield movie1, (movie2, score, n)
            
if __name__ == '__main__':
    rec_step3.run()

Run the `rec_step3.py` script on the `step2output.txt` file from above, and save it to `step3output.txt` as if in terminal.

In [None]:
!python rec_step3.py step2output.txt > step3output.txt

Print the first 10 lines of `step3output.txt`

In [None]:
for line in open('step3output.txt','r').readlines()[:10]:
    print(line)

In [None]:
!pipreqs --force /Users/teaching/Documents/github/teaching/SaintPeters/DS610/week9