In [1]:
from math import sqrt
from pyspark import SparkConf, SparkContext

In [2]:
def loadMovieNames():
    movieNames = {}
    with open('ml-100k/u.item', encoding = 'ascii', errors='ignore') as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
            
    return movieNames        
            

In [3]:
#Python 3 doesn't let you pass around unpacked tuples,
#so we explicitly extract the ratings now.
def makepairs(userRatings):
    ratings = userRatings[1]
    (movie1, rating1) = ratings[0]
    (movie2, rating2) = ratings[1]
    return (movie1, movie2),(rating1, rating2)

def filterDuplicate(userRatings):
    ratings = userRatings[1]
    (movie1, rating1) = rating[0]
    (movie2, rating2) = rating[1]
    return (movie1 < movie2)

def computeCosineSimilarity(ratingPairs):
    numPairs = 0
    sum_xx = sum_yy = sum_xy = 0
    for ratingX, ratingY in ratingPairs:
        sum_xx += ratingX * ratingX
        sum_yy += ratingY * ratingY
        sum_xy += ratingX * ratingY
        numPairs += 1
        
    numerator = sum_xy
    denominator = sqrt(sum_xx) * sqrt(sum_yy)
    
    score = 0
    if(denominator):
        score = numerator / float(denominator)
        
    return (score, numPairs)    

In [4]:
conf = SparkConf().setMaster("local[*]").setAppName("movieSimilarity")
sc = SparkContext(conf=conf)

In [6]:
data = sc.textFile('ml-100k/u.data')
# Map ratings to key / value pairs: user ID => movie ID, rating
ratings = data.map(lambda l : l.split()).map(lambda l : (int(l[0]), (int(l[1]), float(l[2]))))

ratings.collect()[:5]
# Emit every movie rated together by the same user.
# Self-join to find every combination.
#joinedRatings = ratings.join(ratings)
#


[(196, (242, 3.0)),
 (186, (302, 3.0)),
 (22, (377, 1.0)),
 (244, (51, 2.0)),
 (166, (346, 1.0))]

In [None]:
#joinedRatings.collect()[:5]

In [7]:
sc.stop()