In [1]:
import sys
from pyspark import SparkConf, SparkContext
from math import sqrt

In [2]:
def loadMovieNames():
    movieNames = {}
    with open("ml-1m/movies.dat", encoding='latin-1') as f:
        for line in f:
            fields = line.split('::')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

In [3]:
def makePairs(x):
    (movie1, rating1) = x[1][0]
    (movie2, rating2) = x[1][1]
    return ((movie1, movie2), (rating1, rating2))

In [4]:
def filterDuplicates(x):
    (movie1, rating1) = x[1][0]
    (movie2, rating2) = x[1][1]
    return movie1 < movie2

In [5]:
def computeCosineSimilarity(ratingPairs):
    numPairs = 0
    sum_xx = sum_yy = sum_xy = 0
    for ratingX, ratingY in ratingPairs:
        sum_xx += ratingX * ratingX
        sum_yy += ratingY * ratingY
        sum_xy += ratingX * ratingY
        numPairs += 1

    numerator = sum_xy
    denominator = sqrt(sum_xx) * sqrt(sum_yy)

    score = 0
    if (denominator):
        score = (numerator / (float(denominator)))

    return (score, numPairs)

In [7]:
# Change to use cluster config
conf = SparkConf().setMaster("local[*]").setAppName("MovieSimilarities")
sc = SparkContext(conf = conf)

In [8]:
print("\nLoading movie names...")
nameDict = loadMovieNames()


Loading movie names...


In [9]:
nameDict[4]

'Waiting to Exhale (1995)'

In [10]:
data = sc.textFile("ml-1m/ratings.dat")

In [11]:
data.top(5)

['9::994::4::978226328',
 '9::920::3::978225401',
 '9::912::4::978224879',
 '9::861::2::978226665',
 '9::838::3::978226495']

In [12]:
type(data)

pyspark.rdd.RDD

In [15]:
# Map ratings to key / value pairs: user ID => movie ID, rating
ratings = data.map(lambda l: l.split('::')).map(lambda l: (int(l[0]), (int(l[1]), float(l[2]))))

In [16]:
type(ratings.top(50))

list

In [None]:
# Emit every movie rated together by the same user.
# Self-join to find every combination.
ratingsPartitioned = ratings.partitionBy(100)
joinedRatings = ratingsPartitioned.join(ratingsPartitioned)

In [None]:
# At this point our RDD consists of userID => ((movieID, rating), (movieID, rating))
joinedRatings.top(5)

In [None]:
# Filter out duplicate pairs
uniqueJoinedRatings = joinedRatings.filter(filterDuplicates)

In [None]:
# Now key by (movie1, movie2) pairs.
moviePairs = uniqueJoinedRatings.map(makePairs)
# We now have (movie1, movie2) => (rating1, rating2)

In [None]:
type(moviePairs)

In [None]:
moviePairs.top(5)

In [None]:
# Now collect all ratings for each movie pair and compute similarity
moviePairRatings = moviePairs.groupByKey()

In [None]:
# We now have (movie1, movie2) = > (rating1, rating2), (rating1, rating2) ...
# Can now compute similarities.
moviePairSimilarities = moviePairRatings.mapValues(computeCosineSimilarity).cache()

In [None]:
moviePairSimilarities.top(5)

In [None]:
# Save the results if desired
#moviePairSimilarities.sortByKey()
#moviePairSimilarities.saveAsTextFile("movie-sims")

In [None]:
sys.argv

In [None]:
defaultId = 50
# Extract similarities for the movie we care about that are "good".
if (len(sys.argv) > 1):
    try:
        movieID = int(sys.argv[1])
    except:
        movieID = defaultId    
else: 
    movieID = defaultId

In [None]:
scoreThreshold = 0.97
coOccurenceThreshold = 50


# Filter for movies with this sim that are "good" as defined by
# our quality thresholds above
filteredResults = moviePairSimilarities.filter(lambda x: \
        (x[0][0] == movieID or x[0][1] == movieID) \
        and x[1][0] > scoreThreshold and x[1][1] > coOccurenceThreshold)

In [None]:
# Sort by quality score.
results = filteredResults.map(lambda x: (x[1],x[0])).sortByKey(ascending = False).take(10)

print("Top 10 similar movies for " + nameDict[movieID])
for result in results:
        (sim, pair) = result
        # Display the similarity result that isn't the movie we're looking at
        similarMovieID = pair[0]
        if (similarMovieID == movieID):
            similarMovieID = pair[1]
        print(nameDict[similarMovieID] + "\tscore: " + str(sim[0]) + "\tstrength: " + str(sim[1]))