## Training

In [101]:
from math import sqrt
import time
from pyspark.sql import SparkSession
import numpy as np

In [102]:
spark = SparkSession \
        .builder \
        .master('local[*]') \
        .appName("MovieSimilarities") \
        .getOrCreate()

In [103]:
# Load training data
train = spark.sparkContext.textFile("ml-100k/ub.base")

In [104]:
# userID => movieID, rating
train = train.map(lambda l: l.split()).map(lambda l: (int(l[0]), (int(l[1]), float(l[2]))))
train.take(5)

[(1, (1, 5.0)), (1, (2, 3.0)), (1, (3, 4.0)), (1, (4, 3.0)), (1, (5, 3.0))]

In [105]:
def filterDuplicates(userRatings):
    ratings = userRatings[1]
    (movie1, rating1) = ratings[0]
    (movie2, rating2) = ratings[1]
    return movie1 < movie2

# userID => ((movieID, rating), (movieID, rating))
joinedRatings = train.join(train)
uniqueJoinedRatings = joinedRatings.filter(filterDuplicates)
uniqueJoinedRatings.take(5)

[(4, ((50, 5.0), (260, 4.0))),
 (4, ((50, 5.0), (264, 3.0))),
 (4, ((50, 5.0), (288, 4.0))),
 (4, ((50, 5.0), (294, 5.0))),
 (4, ((50, 5.0), (301, 5.0)))]

In [106]:
def makePairs(userRatings):
    ratings = userRatings[1]
    (movie1, rating1) = ratings[0]
    (movie2, rating2) = ratings[1]
    return (movie1, movie2), (rating1, rating2)

# (movie1, movie2) => (rating1, rating2)
moviePairs = uniqueJoinedRatings.map(makePairs)
moviePairs.take(5)

[((50, 260), (5.0, 4.0)),
 ((50, 264), (5.0, 3.0)),
 ((50, 288), (5.0, 4.0)),
 ((50, 294), (5.0, 5.0)),
 ((50, 301), (5.0, 5.0))]

In [107]:
# (movie1, movie2) = > (rating1, rating2), (rating1, rating2) ...
moviePairRatings = moviePairs.groupByKey()

def computeCosineSimilarity(ratingPairs):
    numPairs = 0
    sum_xx = sum_yy = sum_xy = 0
    for ratingX, ratingY in ratingPairs:
        sum_xx += ratingX * ratingX
        sum_yy += ratingY * ratingY
        sum_xy += ratingX * ratingY
        numPairs += 1

    numerator = sum_xy
    denominator = sqrt(sum_xx) * sqrt(sum_yy)

    score = 0
    if denominator:
        score = (numerator / (float(denominator)))

    return score, numPairs

# Calculate similariry each pairwirse
moviePairSimilarities = moviePairRatings.mapValues(computeCosineSimilarity)
moviePairSimilarities.take(5)

[((50, 260), (0.8877236082822483, 75)),
 ((50, 264), (0.9482881919215416, 57)),
 ((50, 288), (0.9394779267044785, 223)),
 ((50, 356), (0.9463932391575332, 85)),
 ((50, 360), (0.9525793444156803, 4))]

## Evaluate

In [108]:
# Load testing data
test = spark.sparkContext.textFile('ml-100k/ub.test')

# userX => (movieY, ratingXY)
test = test.map(lambda l: l.split()).map(lambda l: (int(l[0]), (int(l[1]), float(l[2]))))
n_test = test.count()
n_test

9430

In [109]:
# userX => ((movieY, ratingXY), (movieA, ratingXA)) - A is movieID rated by userX
evaluate = test.join(train)
evaluate.take(5)

[(4, ((11, 4.0), (50, 5.0))),
 (4, ((11, 4.0), (260, 4.0))),
 (4, ((11, 4.0), (264, 3.0))),
 (4, ((11, 4.0), (288, 4.0))),
 (4, ((11, 4.0), (294, 5.0)))]

In [110]:
# (movieY, movieA) => (i, userX, ratingXY, ratingXA) - i is index of movieY in key
def map1(row):
    userX, ((movieY, ratingXY), (movieA, ratingXA))= row
    if movieY < movieA:
        return (movieY, movieA), (0, userX, ratingXY, ratingXA)
    return (movieA, movieY), (1, userX, ratingXY, ratingXA)

evaluate = evaluate.map(map1)
evaluate.take(5)

[((11, 50), (0, 4, 4.0, 5.0)),
 ((11, 260), (0, 4, 4.0, 4.0)),
 ((11, 264), (0, 4, 4.0, 3.0)),
 ((11, 288), (0, 4, 4.0, 4.0)),
 ((11, 294), (0, 4, 4.0, 5.0))]

In [111]:
# (movieY, movieA) => ((i, userX, ratingXY, ratingXA), (simYA, numPairsYA))
evaluate = evaluate.join(moviePairSimilarities)

# (userX, movieY, ratingXY) => [(ratingXA, simYA)]
def map2(row):
    (movieY, movieA), ((i, userX, ratingXY, ratingXA), (simYA, numPairsYA)) = row
    if i == 0:
        return (userX, movieY, ratingXY), [(ratingXA, simYA)]
    return (userX, movieA, ratingXY), [(ratingXA, simYA)]

evaluate = evaluate.map(map2)
evaluate.take(5)

[((4, 11, 4.0), [(5.0, 0.9352046336456875)]),
 ((329, 303, 4.0), [(3.0, 0.9352046336456875)]),
 ((747, 303, 5.0), [(5.0, 0.9352046336456875)]),
 ((4, 210, 3.0), [(3.0, 0.9555336474456124)]),
 ((416, 210, 5.0), [(3.0, 0.9555336474456124)])]

In [112]:
# (userX, movieY, ratingXY) => [(ratingXA, simYA), (ratingXB, simYB), ...] - A,B is movieID rated by userX
evaluate = evaluate.reduceByKey(lambda a, b: a + b)

# Take top k movies greatest similarity
k = 30
evaluate = evaluate.mapValues(lambda l: sorted(l, key=lambda x: x[1], reverse=True)[:k])
evaluate.first()

((4, 11, 4.0),
 [(5.0, 1.0),
  (5.0, 0.9733262629940814),
  (4.0, 0.9622310914738059),
  (4.0, 0.957818989314913),
  (3.0, 0.9496642341028995),
  (5.0, 0.9471077725614722),
  (5.0, 0.9352046336456875),
  (5.0, 0.9343415753188752),
  (3.0, 0.9316832141110515),
  (5.0, 0.9157349380502997),
  (2.0, 0.9068877544453096),
  (4.0, 0.8845187867317229),
  (5.0, 0.8508495020317434),
  (5.0, 0.8221921916437785)])

In [113]:
# Predict rating and calculating RMSE
def predict(row):
    rating_ori = row[0][2]
    val = np.array(row[1])
    ratings = val[:,0]
    sims = val[:,1]
    rating_pred = np.dot(ratings, sims) / (np.abs(sims).sum() + 1e-8)
    return (rating_pred - rating_ori)**2

evaluate = evaluate.map(predict).reduce(lambda a, b: a + b)

# Calculate RMSE
RMSE = np.sqrt(evaluate/n_test)
RMSE

1.0433581643686343