
**This project is to find the similar movies for each of the target movie using Item-based Collaborative Filtering.**


### Environment Set-up and Read in file

In [3]:
#sc.stop()
from pyspark import SparkContext 
sc = SparkContext('local','pyspark')
import os
cwd = os.getcwd()

### Movie- Count by Ratings

In [2]:
moviesrdd = sc.textFile('file://' + cwd + '/ml-100k/u.data')
moviesrdd.take(5)
#user_id  item_id  rating  timestamp

[u'196\t242\t3\t881250949',
 u'186\t302\t3\t891717742',
 u'22\t377\t1\t878887116',
 u'244\t51\t2\t880606923',
 u'166\t346\t1\t886397596']

In [25]:
ratings_counter = (moviesrdd
 .map(lambda x:x.split('\t')[2])
 .map(lambda x:(x,1))  
 .reduceByKey(lambda x,y:x+y)
 .sortByKey()
 .collectAsMap()
)
ratings_counter

{u'1': 6110, u'2': 11370, u'3': 27145, u'4': 34174, u'5': 21201}

## Most Popular Movie

In [3]:
moviesrdd = sc.textFile('file://' + cwd + '/ml-100k/u.data')
moviesrdd.take(5)
#user_id  item_id  rating  timestamp

[u'196\t242\t3\t881250949',
 u'186\t302\t3\t891717742',
 u'22\t377\t1\t878887116',
 u'244\t51\t2\t880606923',
 u'166\t346\t1\t886397596']

In [7]:
MostPopularMovie = (moviesrdd
                    .map(lambda x:x.split('\t')[1])
                    .map(lambda x:(x,1))
                    .reduceByKey(lambda x,y:x+y) #counting
                    .sortBy(lambda (id,count):count, ascending = False)
                    .take(10)
                    )
MostPopularMovie
#movie_id, count                    

[(u'50', 583),
 (u'258', 509),
 (u'100', 508),
 (u'181', 507),
 (u'294', 485),
 (u'286', 481),
 (u'288', 478),
 (u'1', 452),
 (u'300', 431),
 (u'121', 429)]

**Map movie_id with the actural movie name**

In [9]:
def loadMovieNames():
    movieNames = {}
    with open("ml-100k/u.ITEM") as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames # {id:name}

## broadcast the dictionary to every node in cluster, efficient! 
nameDict = sc.broadcast(loadMovieNames())

result = (moviesrdd
          .map(lambda x:x.split('\t')[1])
          .map(lambda x:(int(x),1))
          .reduceByKey(lambda x,y:x+y) #counting
          .sortBy(lambda (id,count):count, ascending = False)
          .map(lambda (id,count):(nameDict.value[id],count))   ## retrieve dictionary value using .value
          .take(10)
          )
result
#name, count

[('Star Wars (1977)', 583),
 ('Contact (1997)', 509),
 ('Fargo (1996)', 508),
 ('Return of the Jedi (1983)', 507),
 ('Liar Liar (1997)', 485),
 ('English Patient, The (1996)', 481),
 ('Scream (1996)', 478),
 ('Toy Story (1995)', 452),
 ('Air Force One (1997)', 431),
 ('Independence Day (ID4) (1996)', 429)]

## Most Popular Marvel Hero 

In [21]:
names = sc.textFile('file://'+cwd+'/marvel-names.txt')
namesRdd = names.map(lambda x:(int(x.split('\"')[0]),x.split('\"')[1].encode("utf8"))).collectAsMap()
nameDict = sc.broadcast(namesRdd)
#{hero_id,hero_name}

graph = sc.textFile('file://'+cwd+'/marvel-graph.txt')
graphRdd = graph.map(lambda x:(int(x.split()[0]),len(x.split())-1))
#hero_id,count_of_coccurance 

In [22]:
MostPopularHero = (graphRdd
                   .reduceByKey(lambda x,y:x+y)
                   .sortBy(lambda (id,count):count,ascending = False)
                   .map(lambda (id,count):(nameDict.value[id],count)) #Map Hero id with name 
                   .take(10)
                   )
MostPopularHero

[('CAPTAIN AMERICA', 1933),
 ('SPIDER-MAN/PETER PAR', 1741),
 ('IRON MAN/TONY STARK ', 1528),
 ('THING/BENJAMIN J. GR', 1426),
 ('WOLVERINE/LOGAN ', 1394),
 ('MR. FANTASTIC/REED R', 1386),
 ('HUMAN TORCH/JOHNNY S', 1371),
 ('SCARLET WITCH/WANDA ', 1345),
 ('THOR/DR. DONALD BLAK', 1289),
 ('BEAST/HENRY &HANK& P', 1280)]

## Collaborative Filtering (Item-based)

In [5]:
sc.stop()
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("MovieSimilarities") #need to use all the cores
sc = SparkContext(conf = conf)

data = sc.textFile("file://"+cwd+"/ml-100k/u.data")

In [6]:
def loadMovieNames():
    movieNames = {}
    with open("ml-100k/u.ITEM") as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1].decode('ascii', 'ignore')
    return movieNames

nameDict = sc.broadcast(loadMovieNames())

In [7]:
# Map ratings to key / value pairs: user ID => movie ID, rating
ratings = data.map(lambda l: l.split()).map(lambda l: (int(l[0]), (int(l[1]), float(l[2]))))

# Self-join to find every combination.
## Very expensive!!
joinedRatings = ratings.join(ratings)

# Now: 
#(userID,((movieID, rating), (movieID, rating)))

In [8]:
def filterDuplicates( (userID, ratings) ):
    (movie1, rating1) = ratings[0]
    (movie2, rating2) = ratings[1]
    return movie1 < movie2

uniqueJoinedRatings = joinedRatings.filter(filterDuplicates)

In [9]:
def makePairs((user, ratings)):
    (movie1, rating1) = ratings[0]
    (movie2, rating2) = ratings[1]
    return ((movie1, movie2), (rating1, rating2))

moviePairs = uniqueJoinedRatings.map(makePairs)

#Now: 
#(movie1,movie2):(rating1,rating2)

In [10]:
moviePairRatings = moviePairs.groupByKey()
#(movie1, movie2):(rating1, rating2), (rating1, rating2) ...

In [12]:
from math import sqrt
def computeCosineSimilarity(ratingPairs):
    numPairs = 0
    sum_xx = sum_yy = sum_xy = 0
    for ratingX, ratingY in ratingPairs:
        sum_xx += ratingX * ratingX
        sum_yy += ratingY * ratingY
        sum_xy += ratingX * ratingY
        numPairs += 1

    numerator = sum_xy
    denominator = sqrt(sum_xx) * sqrt(sum_yy)

    score = 0
    if (denominator):
        score = (numerator / (float(denominator)))

    return (score, numPairs)
moviePairSimilarities = moviePairRatings.mapValues(computeCosineSimilarity).cache()
moviePairSimilarities.sortByKey()
moviePairSimilarities.saveAsTextFile("movie-sims")

### Extract similar movies for one paticular movie

In [13]:
movieID = 50   # "Star Wars (1977)"
scoreThreshold = 0.97
coOccurenceThreshold = 50

In [16]:
#Filter on the above threshold
filteredResults = moviePairSimilarities.filter(lambda((pair,sim)): \
    (pair[0] == movieID or pair[1] == movieID) \
    and sim[0] > scoreThreshold and sim[1] > coOccurenceThreshold)

#Sort by quality score
results = filteredResults.map(lambda((pair,sim)): (sim, pair)).sortByKey(ascending = False).take(10)

print("Top 10 similar movies for " + nameDict.value[movieID])
for result in results:
    (sim, pair) = result
    #Display only the similarity result that isn't the movie we're looking at
    similarMovieID = pair[0]
    if (similarMovieID == movieID):
        similarMovieID = pair[1]
    print(nameDict.value[similarMovieID] + "\tscore: " + str(sim[0]) + "\tstrength: " + str(sim[1]))

Top 10 similar movies for Star Wars (1977)
Empire Strikes Back, The (1980)	score: 0.989552207839	strength: 345
Return of the Jedi (1983)	score: 0.985723086125	strength: 480
Raiders of the Lost Ark (1981)	score: 0.981760098873	strength: 380
20,000 Leagues Under the Sea (1954)	score: 0.97893856055	strength: 68
12 Angry Men (1957)	score: 0.977657612045	strength: 109
Close Shave, A (1995)	score: 0.977594829105	strength: 92
African Queen, The (1951)	score: 0.976469222267	strength: 138
Sting, The (1973)	score: 0.975151293774	strength: 204
Wrong Trousers, The (1993)	score: 0.974868135546	strength: 103
Wallace & Gromit: The Best of Aardman Animation (1996)	score: 0.97418161283	strength: 58
