In [1]:
from itertools import islice
from datetime import datetime
import json
import pandas as pd
import numpy as np
from pprint import pprint

np.set_printoptions(precision=3, suppress=True)

In [2]:
files = "./ml-20m/"
movies = pd.read_csv(files + "movies.csv", index_col="movieId")
ratings = pd.read_csv(files + "ratings.csv", index_col=["movieId", "userId"])

In [3]:
movies.genres = movies.genres.str.split("|")

In [4]:
R = ratings.rating
movies["rating"] = R.mean(level="movieId")

In [5]:
ratings.timestamp = ratings.timestamp.apply(datetime.fromtimestamp)

In [6]:
m1, m2 = movies.ix[318], movies.ix[858]
r1 = R.ix[m1.name] - m1.rating
r2 = R.ix[m2.name] - m2.rating
similarity = (r1 * r2).sum(skipna=True) / np.sqrt((r1**2).sum() * (r2**2).sum())
similarity

0.099149146678717062

In [7]:
def cosine(r1, r2):
    count = (r1 * r2).sum(skipna=True)
    if count <= 0:
        return 0.0
    return count / np.sqrt((r1**2).sum() * (r2**2).sum())

cosine(R.ix[m1.name], R.ix[m2.name])

0.50176855139158272

In [8]:
def sim_peason(m1, m2, r1, r2):
    return cosine(r1 - m1.rating, r2 - m2.rating)

sim_peason(m1, m2, R.ix[m1.name], R.ix[m2.name])

0.099149146678717062

In [29]:
movies.ix[:10-1].index

Int64Index([1, 2, 3, 4, 5, 6, 7, 8, 9], dtype='int64', name='movieId')

In [32]:
def calculate_sim(movieId, target_ids):
    return np.array([
        cosine(R.ix[movieId], R.ix[m]) for m in target_ids
    ])

calculate_sim(10, movies.ix[:10-1].index)

array([ 0.382,  0.427,  0.203,  0.124,  0.205,  0.377,  0.212,  0.109,
        0.167])

In [38]:
def iterate_movies(movies):
    for i in movies.index:
        yield i, movies.ix[:i-1].index

list(islice(iterate_movies(movies), 10))

[(1, Int64Index([], dtype='int64', name='movieId')),
 (2, Int64Index([1], dtype='int64', name='movieId')),
 (3, Int64Index([1, 2], dtype='int64', name='movieId')),
 (4, Int64Index([1, 2, 3], dtype='int64', name='movieId')),
 (5, Int64Index([1, 2, 3, 4], dtype='int64', name='movieId')),
 (6, Int64Index([1, 2, 3, 4, 5], dtype='int64', name='movieId')),
 (7, Int64Index([1, 2, 3, 4, 5, 6], dtype='int64', name='movieId')),
 (8, Int64Index([1, 2, 3, 4, 5, 6, 7], dtype='int64', name='movieId')),
 (9, Int64Index([1, 2, 3, 4, 5, 6, 7, 8], dtype='int64', name='movieId')),
 (10, Int64Index([1, 2, 3, 4, 5, 6, 7, 8, 9], dtype='int64', name='movieId'))]

In [43]:
rdd = sc.parallelize(
    islice(iterate_movies(movies), 1000), 8
).map(
    lambda x: calculate_sim(x[0], x[1])
)

In [None]:
similarities = rdd.collect()

In [32]:
max_len = sims[-1].size
result = np.zeros((max_len, max_len))
for i, s in enumerate(sims):
    result[i, :s.size] = s
result + result.T - np.identity(max_len)

array([[ 1.        ,  0.31501184,  0.17379116,  0.16582331,  0.21226605,
         0.11603759,  0.19424208,  0.19642781,  0.08660063,  0.22551062,
         0.20889393, -0.00774983,  0.23755015,  0.1103709 ,  0.14167647,
         0.08702847,  0.18640497,  0.00199259,  0.14468261,  0.08724871],
       [ 0.31501184,  1.        ,  0.35424204,  0.30434227,  0.42471435,
         0.11666153,  0.34585337,  0.40833376,  0.38390436,  0.29971012,
         0.28986993,  0.29343352,  0.36508444,  0.12953916,  0.44822586,
         0.13384564,  0.17119933,  0.1845611 ,  0.3075455 ,  0.40804445],
       [ 0.17379116,  0.35424204,  1.        ,  0.27102355,  0.47317664,
         0.15431559,  0.30378027,  0.47393198,  0.30958607,  0.27860627,
         0.2925893 ,  0.30834273,  0.36490975,  0.08996912,  0.3289103 ,
         0.14294103,  0.09119123,  0.18655605,  0.29257432,  0.39560433],
       [ 0.16582331,  0.30434227,  0.27102355,  1.        ,  0.34759068,
         0.1616584 ,  0.29106905,  0.49372458,  

In [12]:
#sims = rdd.collect() # aggregate("", lambda acc, x: "{}\n{}".format(acc, json.dumps(x)), lambda x, y: "{}\iin{}".format(x, y))