In [161]:
import findspark, re
from difflib import SequenceMatcher
findspark.init()
import pyspark, sys
from pyspark import SparkConf, SparkContext
from math import sqrt

def loadMovieNames():
    movieNames = {}
    with open("ml-100k/u.ITEM", encoding='ascii', errors='ignore') as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

def movieGenre():
    genre = {}
    with open("ml-100k/u.item", encoding='ascii', errors='ignore') as f:
        for line in f:
            fields = line.split('|')
            genre[int(fields[0])] = fields[5:]
        return genre
 
    # uses the movieGenre() to determine the genres

#Python 3 doesn't let you pass around unpacked tuples,
#so we explicitly extract the ratings now.
def makePairs( userRatings ):
    ratings = userRatings[1]
    (movie1, rating1) = ratings[0]
    (movie2, rating2) = ratings[1]
    return ((movie1, movie2), (rating1, rating2))

def filterDuplicates( userRatings ):
    ratings = userRatings[1]
    (movie1, rating1) = ratings[0]
    (movie2, rating2) = ratings[1]
    return movie1 < movie2

def computeCosineSimilarity(movR):
    ratingPairs = movR[1]
    (movid1, movid2) = movR[0]
    numPairs = 0
    sum_xx = sum_yy = sum_xy = 0
    for ratingX, ratingY in ratingPairs:
        sum_xx += ratingX * ratingX
        sum_yy += ratingY * ratingY
        sum_xy += ratingX * ratingY
        numPairs += 1

    numerator = sum_xy
    denominator = sqrt(sum_xx) * sqrt(sum_yy)

    score = 0
    if (denominator):
        score = (numerator / (float(denominator)))

    
    sm= difflib.SequenceMatcher(None,movieIdGenre[movid1],movieIdGenre[movid2])
    
    return ((movid1, movid2), (score, numPairs, sm.ratio()))


conf = SparkConf().setMaster("local[*]").setAppName("MovieSimilarities")
sc = SparkContext(conf = conf)

print("\nLoading movie names...")
nameDict = loadMovieNames()

data = sc.textFile("C:\\ml-100k\\u.data")

# movieIdGenre is Genre dictionary that uses movieGenre() to determine genres 
movieIdGenre = {}
for k, y in movieGenre().items():
    f = list(y)
    genreMapping = {l: v for l, v in enumerate(f, 1) if (int(v) > 0)}
    genreMappingKeys = list(genreMapping.keys())
    movieIdGenre[k] = genreMappingKeys
movieIdGenre

# Map ratings to key / value pairs: user ID => movie ID, rating
ratings = data.map(lambda l: l.split()).map(lambda l: (int(l[0]), (int(l[1]), float(l[2]))))

# Emit every movie rated together by the same user.
# Self-join to find every combination.
joinedRatings = ratings.join(ratings)

# At this point our RDD consists of userID => ((movieID, rating), (movieID, rating))

# Filter out duplicate pairs
uniqueJoinedRatings = joinedRatings.filter(filterDuplicates)

# Now key by (movie1, movie2) pairs.
moviePairs = uniqueJoinedRatings.map(makePairs)

# We now have (movie1, movie2) => (rating1, rating2)
# Now collect all ratings for each movie pair and compute similarity
moviePairRatings = moviePairs.groupByKey()

# We now have (movie1, movie2) = > (rating1, rating2), (rating1, rating2) ...
# Can now compute similarities for both ratings and Genres.
moviePairSimilarities = moviePairRatings.map(computeCosineSimilarity).cache()

# Save the results if desired
#moviePairSimilarities.sortByKey()
#moviePairSimilarities.saveAsTextFile("movie-sims")

# Extract similarities for the movie we care about that are "good".
#if (len(sys.argv) > 1):

scoreThreshold = 0.97
coOccurenceThreshold = 50
genreThreshold = 0.60
movieID = 50# int(sys.argv[1])

FR = moviePairSimilarities.filter(lambda pairSim: \
    (pairSim[0][0] == movieID or pairSim[0][1] == movieID) \
    and pairSim[1][0] > scoreThreshold and pairSim[1][1] > coOccurenceThreshold \
                                 and pairSim[1][2] > genreThreshold)

# Sort by quality score.
results = FR.map(lambda pairSim: (pairSim[1], pairSim[0])).sortByKey(ascending = False).take(10)

print("Top 10 similar movies for " + nameDict[movieID])
for result in results:
    (sim, pair) = result
    # Display the similarity result that isn't the movie we're looking at
    similarMovieID = pair[0]
    if (similarMovieID == movieID):
        similarMovieID = pair[1]
    print(nameDict[similarMovieID] + "\tscore: " + str(sim[0]) + "\tstrength: " + str(sim[1]) \
         + "\tGenre Similarity: " + str(sim[2]))



Loading movie names...
Top 10 similar movies for Star Wars (1977)
Empire Strikes Back, The (1980)	score: 0.9895522078385338	strength: 345	Genre Similarity: 0.9090909090909091
Return of the Jedi (1983)	score: 0.9857230861253026	strength: 480	Genre Similarity: 1.0
African Queen, The (1951)	score: 0.9764692222674887	strength: 138	Genre Similarity: 0.8888888888888888
Princess Bride, The (1987)	score: 0.9713874963443219	strength: 284	Genre Similarity: 0.6666666666666666


In [7]:
genre = {"0" : "unknown", "1" : "Action", "2" : "Adventure", "3" : "Animation",\
         "4" : "Children's", "5" : "Comedy", "6" : "Crime", "7" : "Documentary",\
         "8" : "Drama", "9" : "Fantasy", "10" : "Film-Noir", "11" : "Horror", \
         "12" : "Musical", "13" : "Mystery", "14" : "Romance", "15" : "Sci-Fi",\
         "16" : "Thriller", "17" : "War", "18" : "Western"}

In [24]:
# Example of character similarity calculations using SequenceMatcher:

from difflib import SequenceMatcher

def similar(a, b):
    return SequenceMatcher(None, a, b).ratio()
similar("ActionAdventureComedy", "war")

0.08333333333333333

In [141]:
# cosine similarity for dictionary example
def counter_cosine_similarity(c1, c2):
    terms = set(c1).union(c2)
    dotprod = sum(c1.get(k, 0) * c2.get(k, 0) for k in terms)
    magA = math.sqrt(sum(c1.get(k, 0)**2 for k in terms))
    magB = math.sqrt(sum(c2.get(k, 0)**2 for k in terms))
    return dotprod / (magA * magB)

#c1 = Counter(movieIdGenre[2])
#c2 = Counter(movieIdGenre[5])
#counter_cosine_similarity(c1,c2 )



In [160]:
sc.stop()