* 以下假設 node i 指向 node j
* i -> j 

## Import pyspark

In [1]:
from pyspark import SparkConf, SparkContext
import pyspark

# Mapper

## Distri

* 算出每個 i 的 out-link 會指到哪些 j，個別分了多少部分給這些 j
* 假設 i 指到兩個 j，形式為 **(  i, [ (  j1, 1/2  ), (  j2, 1/2  ) ]  )**

In [2]:
def Distri(line):
    M = []
    L = len(line[1])
    for i in line[1]:
        M.append( (line[0], (i, 1/L)) )
    return M

## Multi

* 算出每個 j 接收到的 in-link 的值分別為多少
* 格式為 **( j, value1 ), ( j, value2 )**

In [3]:
def Multi(line):
    M = []
    key = line[0]
    
    if len(line[1]) is not 1:  
        for i in range(len(line[1][0])):
            M.append( (line[1][0][i][0], line[1][0][i][1]*line[1][1]) )
    
    return M

## Adder

* 這個 mapper 傳進去的值算出來為 **( j, [ value1, value2, ... ] )**
* 這裡會將每個 j 的 value 加起來
* 格式為 **( j, sum of the values )**

In [4]:
def Adder(line):
    
    M = []
    temp = 0
    for i in line[1]:
        temp += i
    M.append( (line[0], temp) ) #temp*0.8+1/N*0.2

    return M

## Prob

* 這個會運算 teleports 的公式
* probability 為 0.8
* 公式為 **rj new = sum of ( ri/di )*B + 1/N*( 1-B )**

In [5]:
def Prob(line):
    N = 10876 # nodes 10876
    M = []
    M.append( (line[0], line[1]*0.8+(1/N)*0.2) )
    return M

# Reducer

## Reducer

* 前面是算出每個 node 的初始值為 1/N
* 因為 i 和 j 會重複計算，所以用 Reducer 只取一個就好

## groupByKey()

* spark 內建的 groupByKey()
* 把 key 一樣的 value 放在一起

In [6]:
def Reducer(x, y):
    return x

# Report

## list_diff

* 找出兩個 list 一樣的值有哪些
* 用 Multi 會漏掉只有 out-link 的 node，用此 function 找出漏掉的是哪些 node

In [7]:
def list_diff(list1, list2): 
    return (list(set(list2) - set(list1))) 

* 這裡先做資料處理
* 格式為 **( i, [ j1, j2, ... ] )**

In [8]:
conf = SparkConf().set("spark.default.parallelism", 4).setAppName("PageRank")
sc = SparkContext(conf=conf)
lines = sc.textFile("p2p-Gnutella04.txt").map(lambda line : line.split('\t')).map(lambda line : (line[0], line[1])).groupByKey().mapValues(list)
lines.collect() # p2p-Gnutella04

[('10', ['41', '136', '137', '138', '139', '140', '141', '142', '143', '144']),
 ('12', ['40', '41', '42', '43', '44', '45', '46', '47', '48', '49']),
 ('20', ['50', '51', '52', '53', '54', '55', '56', '57', '58', '59']),
 ('26', ['78', '89', '90', '91', '92', '93', '94', '95', '96', '97']),
 ('54',
  ['145', '146', '147', '148', '149', '150', '151', '152', '153', '154']),
 ('60', ['184']),
 ('77', ['80', '84', '85', '86', '88', '294', '309', '310', '311', '312']),
 ('83', ['63', '223', '224', '225', '226', '227', '228', '229', '230', '231']),
 ('88', ['84', '85', '86', '292', '293', '294', '295', '296', '297', '298']),
 ('108', ['350']),
 ('111', ['102']),
 ('112',
  ['329', '351', '352', '353', '354', '355', '356', '357', '358', '359']),
 ('113', ['86', '360', '361', '362', '363', '364', '365', '366', '367']),
 ('115',
  ['395', '396', '397', '398', '399', '400', '401', '402', '403', '404']),
 ('119',
  ['102', '103', '322', '378', '379', '380', '381', '382', '383', '384']),
 ('121',

* 算出每個 node 的初始 pagerank 的值
* 題目要求為 1/N
* 格式為 **( i, 1/N )**, **( j, 1/N )**

In [9]:
keys = lines.keys().collect()
values = lines.values().collect()

length = len(keys)
Keys = []
Values = []

N = 10876 # nodes 10876

for i in range(length):
    Keys.append((str(keys[i]), 1/N))
for i in range(len(values)):
    if(len(values[i])==1):
        Values.append( (str(values[i][0]), 1/N) )
    else:
        for j in range(len(values[i])):
            Values.append( (str(values[i][j]), 1/N) )
    
rank1 = sc.parallelize(Keys)
rank2 = sc.parallelize(Values)

* 把 i 和 j 合再一起後，用 Reducer 刪掉重複的

In [10]:
ini = rank1.union(rank2).reduceByKey(Reducer)

* 這裡要做運算前的準備，算出 i 給哪些 j ，並算出 i 要分成幾份
* 格式為 **( i, [ ( j1, 1 / sum of j from i ), ( j2, 1 / sum of j from i ), ... ] )**

In [11]:
distribution = lines.flatMap(Distri).groupByKey().mapValues(list)
distribution.collect()

[('10',
  [('41', 0.1),
   ('136', 0.1),
   ('137', 0.1),
   ('138', 0.1),
   ('139', 0.1),
   ('140', 0.1),
   ('141', 0.1),
   ('142', 0.1),
   ('143', 0.1),
   ('144', 0.1)]),
 ('12',
  [('40', 0.1),
   ('41', 0.1),
   ('42', 0.1),
   ('43', 0.1),
   ('44', 0.1),
   ('45', 0.1),
   ('46', 0.1),
   ('47', 0.1),
   ('48', 0.1),
   ('49', 0.1)]),
 ('20',
  [('50', 0.1),
   ('51', 0.1),
   ('52', 0.1),
   ('53', 0.1),
   ('54', 0.1),
   ('55', 0.1),
   ('56', 0.1),
   ('57', 0.1),
   ('58', 0.1),
   ('59', 0.1)]),
 ('26',
  [('78', 0.1),
   ('89', 0.1),
   ('90', 0.1),
   ('91', 0.1),
   ('92', 0.1),
   ('93', 0.1),
   ('94', 0.1),
   ('95', 0.1),
   ('96', 0.1),
   ('97', 0.1)]),
 ('54',
  [('145', 0.1),
   ('146', 0.1),
   ('147', 0.1),
   ('148', 0.1),
   ('149', 0.1),
   ('150', 0.1),
   ('151', 0.1),
   ('152', 0.1),
   ('153', 0.1),
   ('154', 0.1)]),
 ('60', [('184', 1.0)]),
 ('77',
  [('80', 0.1),
   ('84', 0.1),
   ('85', 0.1),
   ('86', 0.1),
   ('88', 0.1),
   ('294', 0.1),
 

* 這裡則是將前面的兩個數值合再一起
* 分別是每個 node 的值和每個 i 要給哪些 j
* 格式為 **( i, [ ( j1, 1 / sum of j from i ), ( j2, 1 / sum of j from i ), ... ], value of i )**

In [12]:
rank = distribution.union(ini).groupByKey().mapValues(list)
rank.collect()

[('10',
  [[('41', 0.1),
    ('136', 0.1),
    ('137', 0.1),
    ('138', 0.1),
    ('139', 0.1),
    ('140', 0.1),
    ('141', 0.1),
    ('142', 0.1),
    ('143', 0.1),
    ('144', 0.1)],
   9.194556822361162e-05]),
 ('12',
  [[('40', 0.1),
    ('41', 0.1),
    ('42', 0.1),
    ('43', 0.1),
    ('44', 0.1),
    ('45', 0.1),
    ('46', 0.1),
    ('47', 0.1),
    ('48', 0.1),
    ('49', 0.1)],
   9.194556822361162e-05]),
 ('20',
  [[('50', 0.1),
    ('51', 0.1),
    ('52', 0.1),
    ('53', 0.1),
    ('54', 0.1),
    ('55', 0.1),
    ('56', 0.1),
    ('57', 0.1),
    ('58', 0.1),
    ('59', 0.1)],
   9.194556822361162e-05]),
 ('26',
  [[('78', 0.1),
    ('89', 0.1),
    ('90', 0.1),
    ('91', 0.1),
    ('92', 0.1),
    ('93', 0.1),
    ('94', 0.1),
    ('95', 0.1),
    ('96', 0.1),
    ('97', 0.1)],
   9.194556822361162e-05]),
 ('54',
  [[('145', 0.1),
    ('146', 0.1),
    ('147', 0.1),
    ('148', 0.1),
    ('149', 0.1),
    ('150', 0.1),
    ('151', 0.1),
    ('152', 0.1),
    ('153',

* 這裏就是做乘法的運算
* 算完後格式為 **( j , [ value1, value2, ... ] )**

In [13]:
multi = rank.flatMap(Multi).groupByKey().mapValues(list)
multi.collect()

[('139', [9.194556822361162e-06, 1.1493196027951453e-05]),
 ('141',
  [9.194556822361162e-06,
   9.194556822361162e-06,
   1.0216174247067958e-05,
   9.194556822361162e-06,
   1.0216174247067958e-05,
   9.194556822361162e-06,
   9.194556822361162e-06]),
 ('143',
  [9.194556822361162e-06,
   9.194556822361162e-06,
   9.194556822361162e-06,
   4.3783603916005534e-06,
   9.194556822361162e-06]),
 ('144', [9.194556822361162e-06]),
 ('40',
  [9.194556822361162e-06,
   9.194556822361162e-06,
   9.194556822361162e-06,
   9.194556822361162e-06,
   9.194556822361162e-06]),
 ('44',
  [9.194556822361162e-06,
   9.194556822361162e-06,
   7.662130685300969e-06,
   9.194556822361162e-06,
   9.194556822361162e-06,
   1.0216174247067958e-05,
   9.194556822361162e-06,
   1.0216174247067958e-05,
   9.194556822361162e-06,
   9.194556822361162e-06,
   9.194556822361162e-06,
   9.194556822361162e-06,
   9.194556822361162e-06,
   9.194556822361162e-06,
   9.194556822361162e-06,
   1.0216174247067958e-05,
  

* 這裏做將乘法做完的value加起來
* 格式為 **( j, sum of the values )**

In [14]:
adder = multi.flatMap(Adder)
array = adder.collect()
adder.collect()

[('139', 2.0687752850312616e-05),
 ('141', 6.640513260594173e-05),
 ('143', 4.11565876810452e-05),
 ('144', 9.194556822361162e-06),
 ('40', 4.597278411180581e-05),
 ('44', 0.00015783989211719994),
 ('50', 0.00011033468186833395),
 ('53', 5.5167340934166975e-05),
 ('54', 6.436189775652814e-05),
 ('56', 9.194556822361162e-06),
 ('57', 6.538351518123493e-05),
 ('145', 4.597278411180581e-05),
 ('146', 0.00030546360998733187),
 ('147', 0.00036549093095617543),
 ('150', 5.618895835887377e-05),
 ('153', 9.194556822361162e-06),
 ('154', 2.7583670467083487e-05),
 ('86', 6.538351518123493e-05),
 ('88', 2.7583670467083487e-05),
 ('226', 4.597278411180581e-05),
 ('230', 6.436189775652814e-05),
 ('102', 0.0002666421478484737),
 ('356', 9.194556822361162e-05),
 ('361', 3.123230412675061e-05),
 ('483', 3.7799844714151446e-05),
 ('485', 2.7583670467083487e-05),
 ('488', 5.5167340934166975e-05),
 ('429', 9.398880307302522e-05),
 ('435', 2.7583670467083487e-05),
 ('502', 6.102819879169543e-05),
 ('503',

* 因為前面的 Multi 出來的格式的 key 為 j 
* 代表若是只有 out-link 的 node 不會繼續存在資料裡
* 所以這裡找出漏掉的 node 存進 list3 裡面，之後會再放進資料裡面

In [15]:
VALUE = rank2.collect()
KEY = rank1.collect()
list1 = []
list2 = []
list3 = []
for i in range(len(VALUE)):
    list1.append(str(VALUE[i][0]))
for j in range(len(KEY)):
    list2.append(str(KEY[j][0]))
# print(list1)
# print(list2)
list3 = list_diff(list1, list2)
# print(list3)

* 這裏把加總完成的 value 做 probability 的運算
* 格式一樣為 **( j, sum of the values )**
* 公式為 **rj new = ri*B + 1/N*(1-B)**

In [16]:
result = adder.flatMap(Prob)
result_array = result.collect()
result.collect()

[('139', 3.493931592497242e-05),
 ('141', 7.151321972947571e-05),
 ('143', 5.131438378955849e-05),
 ('144', 2.5744759102611254e-05),
 ('40', 5.5167340934166975e-05),
 ('44', 0.00014466102733848228),
 ('50', 0.00010665685913938949),
 ('53', 6.252298639205591e-05),
 ('54', 6.987863184994484e-05),
 ('56', 2.5744759102611254e-05),
 ('57', 7.069592578971027e-05),
 ('145', 5.5167340934166975e-05),
 ('146', 0.0002627600016345878),
 ('147', 0.0003107818584096627),
 ('150', 6.334028033182134e-05),
 ('153', 2.5744759102611254e-05),
 ('154', 4.0456050018389116e-05),
 ('86', 7.069592578971027e-05),
 ('88', 4.0456050018389116e-05),
 ('226', 5.5167340934166975e-05),
 ('230', 6.987863184994484e-05),
 ('102', 0.00023170283192350128),
 ('356', 9.194556822361162e-05),
 ('361', 4.3374956946122816e-05),
 ('483', 4.862898941604348e-05),
 ('485', 4.0456050018389116e-05),
 ('488', 6.252298639205591e-05),
 ('429', 9.35801561031425e-05),
 ('435', 4.0456050018389116e-05),
 ('502', 6.721167267807868e-05),
 ('503

* 這裏把剛剛漏掉的只有 out-link 的 node 放回去，並將其 value 也運行 probability 的計算
* 公式為 **rj new = sum of ( ri/di )*B + 1/N*( 1-B )**
* 但因為這些 node 沒有 in-link，所以將其值設為 **1/N*( 1-B )** 即可

In [17]:
for i in range(len(list3)):
    result_array.append( (list3[i], 0.2*(1/N)) )
            
final_result = sc.parallelize(result_array)
nodes = final_result.collect()
final_result.collect()

[('139', 3.493931592497242e-05),
 ('141', 7.151321972947571e-05),
 ('143', 5.131438378955849e-05),
 ('144', 2.5744759102611254e-05),
 ('40', 5.5167340934166975e-05),
 ('44', 0.00014466102733848228),
 ('50', 0.00010665685913938949),
 ('53', 6.252298639205591e-05),
 ('54', 6.987863184994484e-05),
 ('56', 2.5744759102611254e-05),
 ('57', 7.069592578971027e-05),
 ('145', 5.5167340934166975e-05),
 ('146', 0.0002627600016345878),
 ('147', 0.0003107818584096627),
 ('150', 6.334028033182134e-05),
 ('153', 2.5744759102611254e-05),
 ('154', 4.0456050018389116e-05),
 ('86', 7.069592578971027e-05),
 ('88', 4.0456050018389116e-05),
 ('226', 5.5167340934166975e-05),
 ('230', 6.987863184994484e-05),
 ('102', 0.00023170283192350128),
 ('356', 9.194556822361162e-05),
 ('361', 4.3374956946122816e-05),
 ('483', 4.862898941604348e-05),
 ('485', 4.0456050018389116e-05),
 ('488', 6.252298639205591e-05),
 ('429', 9.35801561031425e-05),
 ('435', 4.0456050018389116e-05),
 ('502', 6.721167267807868e-05),
 ('503

* 這裏做運算 renormalize 的動作
* 公式為 **rj new = rj + ( 1-S )/N, S = sum of rj**
* 算完後即為最後的 pagerank
* 格式為 **( node, pagerank )**

In [18]:
S = 0
for i in range(len(nodes)):
    S += nodes[i][1]
print(S)
final_result = final_result.map(lambda line : (line[0], line[1]+(1-S)/N) )
array = final_result.collect()
final_result.collect()

0.5630011033468449


[('139', 7.511942779083808e-05),
 ('141', 0.00011169333159534138),
 ('143', 9.149449565542416e-05),
 ('144', 6.592487096847692e-05),
 ('40', 9.534745280003265e-05),
 ('44', 0.00018484113920434796),
 ('50', 0.00014683697100525517),
 ('53', 0.00010270309825792157),
 ('54', 0.00011005874371581052),
 ('56', 6.592487096847692e-05),
 ('57', 0.00011087603765557594),
 ('145', 9.534745280003265e-05),
 ('146', 0.00030294011350045347),
 ('147', 0.00035096197027552833),
 ('150', 0.00010352039219768702),
 ('153', 6.592487096847692e-05),
 ('154', 8.063616188425479e-05),
 ('86', 0.00011087603765557594),
 ('88', 8.063616188425479e-05),
 ('226', 9.534745280003265e-05),
 ('230', 0.00011005874371581052),
 ('102', 0.00027188294378936695),
 ('356', 0.0001321256800894773),
 ('361', 8.355506881198849e-05),
 ('483', 8.880910128190915e-05),
 ('485', 8.063616188425479e-05),
 ('488', 0.00010270309825792157),
 ('429', 0.00013376026796900817),
 ('435', 8.063616188425479e-05),
 ('502', 0.00010739178454394435),
 ('5

* 題目要求做 20 次的 iteration
* 因為前面已經做過一次，這裏再做 19 次就好

In [19]:
for k in range(19):
    rank = distribution.union(final_result).groupByKey().mapValues(list)
    multi = rank.flatMap(Multi).groupByKey().mapValues(list)
    adder = multi.flatMap(Adder)
    result = adder.flatMap(Prob)
    
    result_array = result.collect()
    for i in range(len(list3)):
        result_array.append( (list3[i], 0.2*(1/N)) )

    final_result = sc.parallelize(result_array)
    nodes = final_result.collect()
    
    #print("S前: ", nodes)
    S = 0
    for i in range(len(nodes)):
        S += nodes[i][1]
    print("S: ",S)
    
    final_result = final_result.map(lambda line : (line[0], line[1]+(1-S)/N) )
    array = final_result.collect()

S:  0.5785724137280063
S:  0.5775470460198333
S:  0.5774128804785479
S:  0.5774106950634783
S:  0.5774213162185932
S:  0.5774212503426898
S:  0.5774198901892439
S:  0.5774197142800691
S:  0.5774197208201517
S:  0.5774196921283059
S:  0.5774196872154703
S:  0.5774196872417219
S:  0.5774196870283019
S:  0.5774196870165097
S:  0.5774196870161719
S:  0.5774196870178886
S:  0.5774196870189514
S:  0.5774196870190638
S:  0.5774196870190499


* 將 pagerank 的值由大至小排列出來

In [20]:
output = final_result.map(lambda line : (line[1], line[0]) )
array = sorted(output.collect(), reverse = True)

* 印出 pagerank 前 10 大的 node

In [21]:
for i in range(10):
    print('{:6}'.format(str(array[i][1]))+str(round(array[i][0], 15)))

1056  0.00063219880959
1054  0.00062915571286
1536  0.000523910339753
171   0.000511622470602
453   0.00049565864767
407   0.000484844199639
263   0.000479619289318
4664  0.000470497551407
261   0.000462891586569
410   0.00046151003829


* 將輸出印至 Outputfile.txt 即可
* 這裏四捨五入取到小數點後 15 位

In [22]:
f = open('Outputfile.txt', 'w')

for i in range(10):
    f.write('{:6}'.format(str(array[i][1]))+str(round(array[i][0], 15))+'\n')
        
f.close()
sc.stop()