## Import

In [1]:
from pyspark import SparkConf, SparkContext
import pyspark
import math

## MAPPER

## average_rate

算出每個 movie 的 rating 的平方根，也就是 cosine similarity 的分母

In [2]:
def average_rate(line):
    total = 0
    for i in line[1]:
        total += float(i[1])*float(i[1])

    return (int(line[0]), math.sqrt(total) )

## vector_multiplication 

這裡算出 movie 之間兩兩 rating 的向量相乘，也就是 cosine similarity 的分子

In [3]:
def vector_multiplication(line):
    M = []
    for i in range(len(line[1])-1):
        for j in range(i+1, len(line[1])):
            M.append( ((line[1][i][0], line[1][j][0]), float(line[1][i][1])*float(line[1][j][1])) )
    return M

## cosine_similarity 

這裡算出 cosine similarity，也就是 rx * ry / ||rx|| * ||ry||

In [4]:
def cosine_similarity(line):
    return ( line[0], line[1]/(denominator[line[0][0]]*denominator[line[0][1]]) )

## Reducer 

這裡是向量相乘需要的加法

In [5]:
def Reducer(x, y):
    return x+y

## 讀取資料

lines 將 movie 作為 key，然後 ( user , rating ) 作為 value，以便算出 cosine similarity 的分母
lines2 將 user 作為 key，然後 ( movie, rating ) 作為 value，以便後面以 user 作為 index 做出向量相乘

In [6]:
conf = SparkConf().set("spark.default.parallelism", 4).setAppName("FinalProject") \
    .set('spark.executor.memory', '4G') \
    .set('spark.driver.memory', '45G') \
    .set('spark.driver.maxResultSize', '10G')
sc = SparkContext(conf=conf)
lines = sc.textFile("data1.txt").map(lambda line : line.split('\t')).map(lambda line : (line[1], (line[0], line[2])))\
        .groupByKey().mapValues(list)
lines2 = sc.textFile("data1.txt").map(lambda line : line.split('\t')).map(lambda line : (int(line[0]), (int(line[1]), line[2])))\
        .groupByKey().mapValues(list)
sorted(lines.collect())

[('1',
  [('1', '4'),
   ('5', '4'),
   ('7', '4.5'),
   ('15', '2.5'),
   ('17', '4.5'),
   ('18', '3.5'),
   ('19', '4')]),
 ('10', [('6', '3'), ('8', '2'), ('11', '3'), ('19', '2')]),
 ('100', [('6', '3')]),
 ('1004', [('6', '4')]),
 ('1006', [('6', '4')]),
 ('1009', [('1', '3')]),
 ('101', [('1', '5')]),
 ('1010', [('20', '3')]),
 ('1012', [('20', '4')]),
 ('1013', [('20', '4')]),
 ('1015', [('20', '3.5')]),
 ('1016', [('19', '3')]),
 ('1018', [('20', '3.5')]),
 ('101864', [('15', '4')]),
 ('102', [('6', '1'), ('19', '1')]),
 ('1020', [('18', '3.5'), ('19', '3')]),
 ('1021', [('19', '2'), ('20', '2')]),
 ('102125', [('18', '3.5')]),
 ('1022', [('20', '4.5')]),
 ('1023', [('1', '5')]),
 ('1024', [('1', '5')]),
 ('102445', [('18', '4')]),
 ('1025', [('1', '5'), ('4', '4'), ('20', '5')]),
 ('1028', [('10', '0.5'), ('19', '2'), ('20', '4.5')]),
 ('1029', [('1', '5'), ('20', '4.5')]),
 ('102903', [('18', '4')]),
 ('1030', [('1', '3'), ('20', '3')]),
 ('1031', [('1', '5'), ('19', '2'), (

In [7]:
sorted(lines2.collect())

[(1,
  [(1, '4'),
   (3, '4'),
   (6, '4'),
   (47, '5'),
   (50, '5'),
   (70, '3'),
   (101, '5'),
   (110, '4'),
   (151, '5'),
   (157, '5'),
   (163, '5'),
   (216, '5'),
   (223, '3'),
   (231, '5'),
   (235, '4'),
   (260, '5'),
   (296, '3'),
   (316, '3'),
   (333, '5'),
   (349, '4'),
   (356, '4'),
   (362, '5'),
   (367, '4'),
   (423, '3'),
   (441, '4'),
   (457, '5'),
   (480, '4'),
   (500, '3'),
   (527, '5'),
   (543, '4'),
   (552, '4'),
   (553, '5'),
   (590, '4'),
   (592, '4'),
   (593, '4'),
   (596, '5'),
   (608, '5'),
   (648, '3'),
   (661, '5'),
   (673, '3'),
   (733, '4'),
   (736, '3'),
   (780, '3'),
   (804, '4'),
   (919, '5'),
   (923, '5'),
   (940, '5'),
   (943, '4'),
   (954, '5'),
   (1009, '3'),
   (1023, '5'),
   (1024, '5'),
   (1025, '5'),
   (1029, '5'),
   (1030, '3'),
   (1031, '5'),
   (1032, '5'),
   (1042, '4'),
   (1049, '5'),
   (1060, '4'),
   (1073, '5'),
   (1080, '5'),
   (1089, '5'),
   (1090, '4'),
   (1092, '5'),
   (1097, '5'

對每個 movie 後面加入每位 user 的 rating 的平方根

In [8]:
average_rates = lines.map(average_rate)
average_rates_collect = sorted(average_rates.collect())
sorted(average_rates.collect())

[(1, 10.344080432788601),
 (2, 7.681145747868608),
 (3, 7.0710678118654755),
 (4, 4.242640687119285),
 (5, 5.0),
 (6, 8.54400374531753),
 (7, 5.385164807134504),
 (8, 3.1622776601683795),
 (10, 5.0990195135927845),
 (11, 5.656854249492381),
 (12, 1.0),
 (13, 5.830951894845301),
 (15, 4.47213595499958),
 (16, 6.020797289396148),
 (17, 4.0),
 (19, 3.0),
 (21, 6.708203932499369),
 (22, 5.0),
 (24, 4.0),
 (25, 5.0),
 (26, 4.0),
 (27, 3.0),
 (31, 3.0413812651491097),
 (32, 8.774964387392123),
 (34, 9.759610647971568),
 (36, 8.54400374531753),
 (39, 6.557438524302),
 (41, 5.0),
 (43, 4.0),
 (44, 5.123475382979799),
 (45, 4.242640687119285),
 (46, 4.0),
 (47, 13.444329659748751),
 (48, 5.0990195135927845),
 (50, 12.186057606953941),
 (52, 3.0),
 (54, 4.47213595499958),
 (58, 6.557438524302),
 (60, 4.0),
 (61, 4.0),
 (62, 4.0),
 (64, 2.0),
 (65, 3.605551275463989),
 (66, 3.0),
 (70, 5.024937810560445),
 (76, 4.0),
 (79, 3.0),
 (86, 5.0),
 (87, 3.605551275463989),
 (88, 2.0),
 (89, 4.0),
 (92, 

這裏把每個 movie 的平均 rating ，並存在 denominator 裡之後方便取用

In [9]:
denominator = {}

for i in average_rates_collect:
    denominator[i[0]] = i[1]

這裡實作向量相乘

In [10]:
numerator = lines2.flatMap(vector_multiplication).reduceByKey(Reducer)
sorted(numerator.collect())

[((1, 2), 22.5),
 ((1, 3), 28.0),
 ((1, 6), 30.0),
 ((1, 7), 8.0),
 ((1, 10), 8.0),
 ((1, 12), 4.0),
 ((1, 13), 12.0),
 ((1, 15), 8.0),
 ((1, 16), 15.75),
 ((1, 19), 8.0),
 ((1, 21), 16.0),
 ((1, 32), 30.0),
 ((1, 34), 40.75),
 ((1, 36), 30.0),
 ((1, 39), 12.0),
 ((1, 44), 30.25),
 ((1, 47), 74.5),
 ((1, 48), 4.0),
 ((1, 50), 94.0),
 ((1, 54), 8.0),
 ((1, 58), 33.5),
 ((1, 64), 8.0),
 ((1, 65), 8.0),
 ((1, 70), 32.25),
 ((1, 87), 8.0),
 ((1, 92), 8.0),
 ((1, 101), 20.0),
 ((1, 102), 4.0),
 ((1, 104), 16.75),
 ((1, 107), 8.0),
 ((1, 110), 68.0),
 ((1, 111), 32.0),
 ((1, 118), 12.0),
 ((1, 135), 12.0),
 ((1, 141), 4.0),
 ((1, 145), 12.25),
 ((1, 150), 48.0),
 ((1, 151), 20.0),
 ((1, 153), 20.0),
 ((1, 157), 20.0),
 ((1, 158), 14.5),
 ((1, 161), 8.0),
 ((1, 163), 34.0),
 ((1, 165), 44.0),
 ((1, 170), 12.0),
 ((1, 172), 2.5),
 ((1, 173), 4.0),
 ((1, 180), 14.0),
 ((1, 185), 8.0),
 ((1, 186), 8.0),
 ((1, 196), 8.0),
 ((1, 208), 18.5),
 ((1, 216), 28.0),
 ((1, 223), 38.0),
 ((1, 231), 36.75)

這裏算出 movie 兩兩的 cosine similarity 之後印出來

In [11]:
similarity = numerator.map(cosine_similarity)
similarity_collect = sorted(similarity.collect())
sorted(similarity.collect())

[((1, 2), 0.2831813341071302),
 ((1, 3), 0.3828081191338114),
 ((1, 6), 0.3394438431427919),
 ((1, 7), 0.14361476740914422),
 ((1, 10), 0.15167409914295432),
 ((1, 12), 0.3866945956182654),
 ((1, 13), 0.19895272809236134),
 ((1, 15), 0.17293508046684675),
 ((1, 16), 0.2528917512184884),
 ((1, 19), 0.2577963970788436),
 ((1, 21), 0.230580107289129),
 ((1, 32), 0.3305095427286308),
 ((1, 34), 0.40364839694500027),
 ((1, 36), 0.3394438431427919),
 ((1, 39), 0.1769111189613905),
 ((1, 44), 0.5707801171599115),
 ((1, 47), 0.5357044215415935),
 ((1, 48), 0.07583704957147716),
 ((1, 50), 0.7457147578100715),
 ((1, 54), 0.17293508046684675),
 ((1, 58), 0.4938768737672151),
 ((1, 64), 0.3866945956182654),
 ((1, 65), 0.21449956806868745),
 ((1, 70), 0.6204505000280862),
 ((1, 87), 0.21449956806868745),
 ((1, 92), 0.17293508046684675),
 ((1, 101), 0.3866945956182654),
 ((1, 102), 0.27343437080986527),
 ((1, 104), 0.3160518004108601),
 ((1, 107), 0.19185424613837138),
 ((1, 110), 0.525484430517055

In [12]:
f = open('Outputfile.txt', 'w')

f.write(str(similarity_collect))
        
f.close()
sc.stop()