In [1]:
import findspark
findspark.init()

In [2]:
import sys
from pyspark import SparkConf, SparkContext
from math import sqrt
folder = '/home/cloudera/Desktop/git/big_data_management1/Big data management in class Codes/recommendation_engine'
import os

In [3]:
def loadMovieNames():
    movieNames = {}
    with open(os.path.join(folder,"itemfile.txt")) as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1].decode('ascii', 'ignore')
    return movieNames

In [4]:
def makePairs((user, ratings)):
    (movie1, rating1) = ratings[0]
    (movie2, rating2) = ratings[1]
    return ((movie1, movie2), (rating1, rating2))

In [5]:
def filterDuplicates( (userID, ratings) ):
    (movie1, rating1) = ratings[0]
    (movie2, rating2) = ratings[1]
    return movie1 < movie2

In [6]:
def computeCosineSimilarity(ratingPairs):
    numPairs = 0
    sum_xx = sum_yy = sum_xy = 0
    for ratingX, ratingY in ratingPairs:
        sum_xx += ratingX * ratingX
        sum_yy += ratingY * ratingY
        sum_xy += ratingX * ratingY
        numPairs += 1

    numerator = sum_xy
    denominator = sqrt(sum_xx) * sqrt(sum_yy)

    score = 0
    if (denominator):
        score = (numerator / (float(denominator)))

    return (score, numPairs)

In [7]:
rating_pairs = [(5,4.5),(1,0)]
computeCosineSimilarity(rating_pairs)

(0.9805806756909202, 2)

In [8]:
conf = SparkConf().setMaster("local[*]").setAppName("MovieSimilarities")
sc = SparkContext(conf = conf)

In [9]:
print "\nLoading movie names..."
nameDict = loadMovieNames()
nameDict


Loading movie names...


{1: u'Toy Story (1995)',
 2: u'GoldenEye (1995)',
 3: u'Four Rooms (1995)',
 4: u'Get Shorty (1995)',
 5: u'Copycat (1995)',
 6: u'Shanghai Triad (Yao a yao yao dao waipo qiao) (1995)',
 7: u'Twelve Monkeys (1995)',
 8: u'Babe (1995)',
 9: u'Dead Man Walking (1995)',
 10: u'Richard III (1995)',
 11: u'Seven (Se7en) (1995)',
 12: u'Usual Suspects, The (1995)',
 13: u'Mighty Aphrodite (1995)',
 14: u'Postino, Il (1994)',
 15: u"Mr. Holland's Opus (1995)",
 16: u'French Twist (Gazon maudit) (1995)',
 17: u'From Dusk Till Dawn (1996)',
 18: u'White Balloon, The (1995)',
 19: u"Antonia's Line (1995)",
 20: u'Angels and Insects (1995)',
 21: u'Muppet Treasure Island (1996)',
 22: u'Braveheart (1995)',
 23: u'Taxi Driver (1976)',
 24: u'Rumble in the Bronx (1995)',
 25: u'Birdcage, The (1996)',
 26: u'Brothers McMullen, The (1995)',
 27: u'Bad Boys (1995)',
 28: u'Apollo 13 (1995)',
 29: u'Batman Forever (1995)',
 30: u'Belle de jour (1967)',
 31: u'Crimson Tide (1995)',
 32: u'Crumb (1994)',

In [10]:
data = sc.textFile("file://{}/ratings2.dat".format(folder))

In [11]:
data.collect()

[u'1::1193::5::978300760',
 u'1::661::3::978302109',
 u'1::914::3::978301968',
 u'1::3408::4::978300275',
 u'1::2355::5::978824291',
 u'1::1197::3::978302268',
 u'1::1287::5::978302039',
 u'1::2804::5::978300719',
 u'1::594::4::978302268',
 u'1::919::4::978301368',
 u'1::595::5::978824268',
 u'1::938::4::978301752',
 u'1::2398::4::978302281',
 u'1::2918::4::978302124',
 u'1::1035::5::978301753',
 u'1::2791::4::978302188',
 u'1::2687::3::978824268',
 u'1::2018::4::978301777',
 u'1::3105::5::978301713',
 u'1::2797::4::978302039',
 u'1::2321::3::978302205',
 u'1::720::3::978300760',
 u'1::1270::5::978300055',
 u'1::527::5::978824195',
 u'1::2340::3::978300103',
 u'1::48::5::978824351',
 u'1::1097::4::978301953',
 u'1::1721::4::978300055',
 u'1::1545::4::978824139',
 u'1::745::3::978824268',
 u'1::2294::4::978824291',
 u'1::3186::4::978300019',
 u'1::1566::4::978824330',
 u'1::588::4::978824268',
 u'1::1907::4::978824330',
 u'1::783::4::978824291',
 u'1::1836::5::978300172',
 u'1::1022::5:

In [12]:
ratings = data.map(lambda l: l.split('::')).map(lambda l: (int(l[0]), (int(l[1]), float(l[2]))))

In [13]:
ratings.collect()

[(1, (1193, 5.0)),
 (1, (661, 3.0)),
 (1, (914, 3.0)),
 (1, (3408, 4.0)),
 (1, (2355, 5.0)),
 (1, (1197, 3.0)),
 (1, (1287, 5.0)),
 (1, (2804, 5.0)),
 (1, (594, 4.0)),
 (1, (919, 4.0)),
 (1, (595, 5.0)),
 (1, (938, 4.0)),
 (1, (2398, 4.0)),
 (1, (2918, 4.0)),
 (1, (1035, 5.0)),
 (1, (2791, 4.0)),
 (1, (2687, 3.0)),
 (1, (2018, 4.0)),
 (1, (3105, 5.0)),
 (1, (2797, 4.0)),
 (1, (2321, 3.0)),
 (1, (720, 3.0)),
 (1, (1270, 5.0)),
 (1, (527, 5.0)),
 (1, (2340, 3.0)),
 (1, (48, 5.0)),
 (1, (1097, 4.0)),
 (1, (1721, 4.0)),
 (1, (1545, 4.0)),
 (1, (745, 3.0)),
 (1, (2294, 4.0)),
 (1, (3186, 4.0)),
 (1, (1566, 4.0)),
 (1, (588, 4.0)),
 (1, (1907, 4.0)),
 (1, (783, 4.0)),
 (1, (1836, 5.0)),
 (1, (1022, 5.0)),
 (1, (2762, 4.0)),
 (1, (150, 5.0)),
 (1, (1, 5.0)),
 (1, (1961, 5.0)),
 (1, (1962, 4.0)),
 (1, (2692, 4.0)),
 (1, (260, 4.0)),
 (1, (1028, 5.0)),
 (1, (1029, 5.0)),
 (1, (1207, 4.0)),
 (1, (2028, 5.0)),
 (1, (531, 4.0)),
 (1, (3114, 4.0)),
 (1, (608, 4.0)),
 (1, (1246, 4.0)),
 (2, (1357, 5

In [14]:
joinedRatings = ratings.join(ratings)

In [15]:
joinedRatings.take(10)

[(2, ((1357, 5.0), (1357, 5.0))),
 (2, ((1357, 5.0), (3068, 4.0))),
 (2, ((1357, 5.0), (1537, 4.0))),
 (2, ((1357, 5.0), (647, 3.0))),
 (2, ((1357, 5.0), (2194, 4.0))),
 (2, ((1357, 5.0), (648, 4.0))),
 (2, ((1357, 5.0), (2268, 5.0))),
 (2, ((1357, 5.0), (2628, 3.0))),
 (2, ((1357, 5.0), (1103, 3.0))),
 (2, ((1357, 5.0), (2916, 3.0)))]

In [16]:
uniqueJoinedRatings = joinedRatings.filter(filterDuplicates)

In [17]:
uniqueJoinedRatings.take(10)

[(2, ((1357, 5.0), (3068, 4.0))),
 (2, ((1357, 5.0), (1537, 4.0))),
 (2, ((1357, 5.0), (2194, 4.0))),
 (2, ((1357, 5.0), (2268, 5.0))),
 (2, ((1357, 5.0), (2628, 3.0))),
 (2, ((1357, 5.0), (2916, 3.0))),
 (2, ((1357, 5.0), (3468, 5.0))),
 (2, ((1357, 5.0), (1792, 3.0))),
 (2, ((1357, 5.0), (1687, 3.0))),
 (2, ((1357, 5.0), (3578, 5.0)))]

In [18]:
moviePairs = uniqueJoinedRatings.map(makePairs)
moviePairs.take(10)

[((1357, 3068), (5.0, 4.0)),
 ((1357, 1537), (5.0, 4.0)),
 ((1357, 2194), (5.0, 4.0)),
 ((1357, 2268), (5.0, 5.0)),
 ((1357, 2628), (5.0, 3.0)),
 ((1357, 2916), (5.0, 3.0)),
 ((1357, 3468), (5.0, 5.0)),
 ((1357, 1792), (5.0, 3.0)),
 ((1357, 1687), (5.0, 3.0)),
 ((1357, 3578), (5.0, 5.0))]

In [19]:
moviePairRatings = moviePairs.groupByKey()

In [20]:
[(key, list(values)) for (key, values) in moviePairRatings.take(10)]

[((2942, 3500), [(3.0, 2.0), (3.0, 2.0), (3.0, 4.0)]),
 ((76, 2796), [(1.0, 1.0)]),
 ((898, 1148), [(4.0, 5.0), (3.0, 5.0), (5.0, 5.0), (5.0, 5.0), (5.0, 4.0)]),
 ((556, 2688), [(5.0, 1.0), (4.0, 3.0)]),
 ((1771, 2719), [(2.0, 1.0)]),
 ((494, 2432), [(3.0, 3.0)]),
 ((663, 3457), [(3.0, 3.0), (4.0, 2.0), (3.0, 5.0), (3.0, 4.0)]),
 ((1952, 2042), [(4.0, 4.0)]),
 ((307, 3075), [(5.0, 5.0)]),
 ((1260, 1432), [(4.0, 3.0)])]

In [21]:
moviePairSimilarities = moviePairRatings.mapValues(computeCosineSimilarity).cache()

In [22]:
moviePairSimilarities.take(10)

[((1204, 3822), (0.9701425001453319, 2)),
 ((1990, 2436), (1.0, 1)),
 ((953, 3827), (0.9618890614488318, 7)),
 ((555, 1963), (0.994067082686925, 4)),
 ((1605, 2699), (0.9159601797280588, 5)),
 ((2248, 3512), (0.9564087485519139, 10)),
 ((1345, 3853), (1.0, 1)),
 ((1799, 1841), (1.0, 1)),
 ((152, 2028), (1.0, 1)),
 ((3608, 3830), (1.0, 1))]

In [None]:
if (len(sys.argv) > 1):

    scoreThreshold = 0.10
    coOccurenceThreshold = 2

    movieID = int(sys.argv[1])

    filteredResults = moviePairSimilarities.filter(lambda((pair,sim)): \
        (pair[0] == movieID or pair[1] == movieID) \
        and sim[0] > scoreThreshold and sim[1] > coOccurenceThreshold)

    results = filteredResults.map(lambda((pair,sim)): (sim, pair)).sortByKey(ascending = False).take(10)

    print "Top 10 similar movies for " + nameDict[movieID]
    for result in results:
        (sim, pair) = result
        similarMovieID = pair[0]
        if (similarMovieID == movieID):
            similarMovieID = pair[1]
        print nameDict[similarMovieID] + "\tscore: " + str(sim[0]) + "\tstrength: " + str(sim[1])

In [23]:
scoreThreshold = 0.10
coOccurenceThreshold = 2

movieID = int('1')

filteredResults = moviePairSimilarities.filter(lambda((pair,sim)): \
    (pair[0] == movieID or pair[1] == movieID) \
    and sim[0] > scoreThreshold and sim[1] > coOccurenceThreshold)

results = filteredResults.map(lambda((pair,sim)): (sim, pair)).sortByKey(ascending = False).take(10)

print "Top 10 similar movies for " + nameDict[movieID]
for result in results:
    (sim, pair) = result
    similarMovieID = pair[0]
    if (similarMovieID == movieID):
        similarMovieID = pair[1]
    print nameDict[similarMovieID] + "\tscore: " + str(sim[0]) + "\tstrength: " + str(sim[1])

Top 10 similar movies for Toy Story (1995)
Omen, The (1976)	score: 1.0	strength: 4


KeyError: 3046

In [26]:
scoreThreshold = 0.10
coOccurenceThreshold = 2

movieID = int('25')

filteredResults = moviePairSimilarities.filter(lambda((pair,sim)): \
    (pair[0] == movieID or pair[1] == movieID) \
    and sim[0] > scoreThreshold and sim[1] > coOccurenceThreshold)

results = filteredResults.map(lambda((pair,sim)): (sim, pair)).sortByKey(ascending = False).take(10)

print "Top 10 similar movies for " + nameDict[movieID]
for result in results:
    (sim, pair) = result
    similarMovieID = pair[0]
    if (similarMovieID == movieID):
        similarMovieID = pair[1]
    try:
        print nameDict[similarMovieID] + "\tscore: " + str(sim[0]) + "\tstrength: " + str(sim[1])
    except KeyError:
        pass

Top 10 similar movies for Birdcage, The (1996)
Funny Face (1957)	score: 1.0	strength: 4
Blue Sky (1994)	score: 1.0	strength: 4
Heaven's Prisoners (1996)	score: 1.0	strength: 4


In [28]:
results

[((1.0000000000000002, 3), (25, 3308)),
 ((1.0000000000000002, 3), (25, 2212)),
 ((1.0, 6), (25, 3507)),
 ((1.0, 5), (25, 2313)),
 ((1.0, 5), (25, 3000)),
 ((1.0, 4), (25, 965)),
 ((1.0, 4), (25, 2969)),
 ((1.0, 4), (25, 1211)),
 ((1.0, 4), (25, 978)),
 ((1.0, 4), (25, 2738))]