### Getting input data and some initial values
1. 利用 sc.textFile去取得input data 存放於data RDD
2. N = 10876代表全部參與network的所有nodes數量
3. Beta B = 0.8
4. add_val = 1-Beta = 0.2
5. complment用於之後考慮dead_ends所得到的回補值

In [1]:
#initial
import time
import os
from pyspark import SparkContext, SparkConf
sc.stop()
conf = SparkConf().setMaster("local").setAppName("PageRank")
conf=SparkConf().set("spark.default.parallelism", 4)
sc = SparkContext(conf = conf)
data = sc.textFile("p2p-Gnutella04.txt")
N = 10876
B = 0.8
add_val = (1-B)
complement = 0

### Getting all the nodes which has receives from other nodes
1. 取得data後，利用mapper1取得所有links，並用兩次map與distinct取得所有參與這network的所nodes
2. 利用gruopByKey()與mapper2建立一RDD links_with_n，其<key,value>: key = node_i, value = (所有receiver_node pointed by node_i)
3. 另外在map中我用1跟-1來標記該node是sender還是receiver，在用reduce1加總來分辨該node的行為如果>=1則代表該node單純只有做sending的動作， 並把那些nodes紀錄於only_sending_nodes RDD
4. 創建ranks，並利用initial_ranks_mapper賦予個個key初值 = 1/N

In [2]:
# construct before iteration
def mapper1(x):
    y = x.split('\t')
    return y[0],y[1]
def mapper2(x):
    k = []
    for i in x[1]:
        k.append(i)
    return x[0],(k,len(x[1]))
def initial_ranks_mapper(x):
    return x,1/N
def reducer1(x,y):
    return x+y
def mapper0(x):
    if x[1]==0: return x[0]
links = data.map(mapper1)
links_with_n = links.groupByKey().map(mapper2)
rec_node1 = links.map(lambda x:(x[0],1)).distinct() #send nodes
rec_node2 = links.map(lambda x:(x[1],-1)).distinct() # rec nodes
rec_node = rec_node1.union(rec_node2).map(lambda x:x[0]).distinct()
only_sending_nodes = rec_node1.union(rec_node2).reduceByKey(reducer1).filter(lambda x:x[1]>=1)
ranks = rec_node.map(initial_ranks_mapper)

### considering the nodes that only do sending
1. 利用flatMap得到rec_ndoes_all，<key,value>: key:node   values:(對應到的一個contributor,該contribitor貢獻給多少nodes的總數)
2. 為了確保為了確保之後在中，每次join後，每個nodes都存在，以免在做回補complement時不會漏加，我利用#當作一個node，讓他指向所有單純sending的nodes，並賦予#這個node的probability = 0 ranks中。
3. 有了步驟1的前置動作，才能將那些only-do-sending nodes 加入rec_nodes_all中
4. 我認為這樣的好處是可以避免在每在iterarion中還要另外檢查，使得iteration中少了不少步驟。

In [3]:
def exc_map(x):
    return '#',(x[0],1)
ex = sc.parallelize(list('#')).map(lambda x:('#',(x,1)))
exc = sc.parallelize(list('#')).map(lambda x:(x,0)) #for only sending nodes
only_sending_nodes_ex = only_sending_nodes.map(exc_map).union(ex)
rec_nodes_all = links_with_n.flatMap(lambda x:[(x[0],(i,x[1][1])) for i in x[1][0]])
ranks = ranks.union(exc)
rec_nodes_all = rec_nodes_all.union(only_sending_nodes_ex)
#rec_nodes_all.cache().collect() # trigger action to cache()

### Calculate in Iteration
1. 有了前置準備後，先將rec_nodes_all 與 ranks做join，做出每個receives node所對應到的sending node的rank值
2. 再來先用mapper5做map，算出每個receives node所對應到的sending node的contribution 並乘上Beta
3. 然後再利用reducer1 將同樣key的value全部加起來，以完成下圖算式:
<a href="https://imgbb.com/"><img src="https://i.ibb.co/pd1KYQJ/1.jpg" alt="1" border="0" /></a>
4. 接下我們要求未考慮dead_ends的ranks sum，利用map先單純取出value，並用sum()將所有值加總，並加上(1-Beta)/N
5. 得到r_sum後，就可以求出考慮dead_ends所需分配回去的complement = (1-r_sum)/N
6. 算出complement後，將前面求得的ranks，加上complments與(1-Beta/N)，得到新的ranks已完成一次iteration
<a href="https://imgbb.com/"><img src="https://i.ibb.co/P9J0GDF/image.jpg" alt="image" border="0" /></a>
<a href="https://ibb.co/zhpHvLq"><img src="https://i.ibb.co/WWZpJdm/3.jpg" alt="3" border="0" /></a>
7. 為了減少map量，我是最後才將(1-Beta)/N的值與complement一起加回ranks
8. 另外，過程會用到#是對那些單純只有sending的nodes做標記，讓他們在join完後仍存在於RDD中，並再回加complents與(1-Beta)/N時還存在ranks中


In [4]:
def mapper4(x):
    if x[0]=='#':
        return x[0],0
    else: 
        return x[0],x[1]+complement+(add_val/N)
def mapper5(x):
    if x[1][0][0]=='#':
        return x[1][0][0],0
    else: 
        return x[1][0][0],x[1][1]/x[1][0][1]*B
tStart = time.time()
for i in range(20):
    #print("iteration:"+str(i)+"...")
    a = rec_nodes_all.join(ranks)
    b = a.map(mapper5).reduceByKey(reducer1)
    r_sum = b.map(lambda x:x[1]).sum() + add_val
    complement = (1-r_sum)/N
    ranks = b.map(mapper4)
    #print("iteration:"+str(i)+" end")
    #print(r_sum)
tEnd = time.time()#計時結束
#print("It cost %f sec" % (tEnd - tStart))
#大約15 mins

### Sorting and Output
1. 經過20次iteration後，利用sortBy()依據ranks值將結果由大到小排序。
2. 將排序結果利用collect()並將前10個pairs output出來。
3. 其中'%.3g '代表取小數點後有效的三位。
4. final output:
```
1056	0.000632 
1054	0.000629 
1536	0.000524 
171	0.000512 
453	0.000496 
407	0.000485 
263	0.00048 
4664	0.00047 
261	0.000463 
410	0.000462
```

In [5]:
#print("sorting...")
sorted_result_rdd = ranks.sortBy(lambda x:x[1],ascending=False)
#print top 10 pairs
#print("output..")
cnt = 0
for i in sorted_result_rdd.collect():
    if(cnt<10):
        print(i[0]+'\t'+('%.3g ' % i[1]))
    else:
        break;
    cnt+=1

1056	0.000632 
1054	0.000629 
1536	0.000524 
171	0.000512 
453	0.000496 
407	0.000485 
263	0.00048 
4664	0.00047 
261	0.000463 
410	0.000462 
