PYSPARK PAGERANK 
===
本次作業我做了幾個MAPPER與REDUCER  
基本上REDUCER只有負責加法  
主程式讀取資料進來後  
## 1.首先根據作業要求初始化RANK值  
```=python
N=10876
B=0.8
ranks = [ (str(i),1/N) for i in range(1,N+1) ]
ranks=sc.parallelize(ranks)
```
## 2.將相鄰矩陣MAPPING成我們要的資料格式  
    [(1,[2,3]),(2,[4]),...]
![image.png](./image.png)  

```=python
def divergeTwoPart(line):
    parts = re.split(r'\s+', line) # 正則'
    return (parts[0],[parts[1]])
def add(x,y):
    return x+y
source=source.map(divergeTwoPart)
source=source.reduceByKey(add)
```


這邊使用到divergeTwoPart，把資料切成(source,[destination node])    
再用reduceByKey(add)，把相同來源的整理成一起  
因為圖上的degree可以用len([destination nodes]來代替，所以就沒存在資料結構中  

## 3.進行rank值更新 迭代20次  參考ppt公式  
![image-4.png](./image-4.png)  

contributions 負責計算出所有點到下一個點的 ri/di    
再用reduceByKey(add)把相同目的地的都加起來  並*B(0.8)  
```=python
def contributions(source_dests_rank):
    dests=source_dests_rank[1][0] 
    rank=source_dests_rank[1][1]
    deg = len(dests)
    for dest in dests:
        yield (dest, rank / deg)
        
for i in range(20):        
    contribs = source.join(ranks).flatMap(contributions)
    ranks = contribs.reduceByKey(add).mapValues(lambda rank : rank * B )
```

## 3.接續上數迭代進行renormalize  參考ppt公式  
![image-5.png](./image-5.png)  

這邊用到了sc的累加器，基本上就只是存全部的RANK加起來的值    
最後再套用公式 (1-S)/N 計算出答案    
每次迭代更新一下S    
```=python
accum = sc.accumulator(0.0)
def addS(rank):
    global accum
    accum += rank[1]
    
    ranks.foreach(addS)
    S=accum.value
    ranks = ranks.mapValues(lambda rank : rank +  (1 - S)/N)
    accum.add(-accum.value)
```
## 4.印出答案  
![image-6.png](./image-6.png)

In [6]:
from pyspark import SparkConf,SparkContext
from operator import add
import re

def divergeTwoPart(line):
    parts = re.split(r'\s+', line) # 正則'
    return (parts[0],[parts[1]])

def contributions(source_dests_rank):
    dests=source_dests_rank[1][0] 
    rank=source_dests_rank[1][1]
    deg = len(dests)
    for dest in dests:
        yield (dest, rank / deg)

def add(x,y):
    return x+y

accum = sc.accumulator(0.0)
def addS(rank):
    global accum
    accum += rank[1]
    

In [7]:
sc.stop()
#本欄位為跑實際資料用的
conf=SparkConf().setMaster('local').setAppName('PageRank').set("spark.ui.port", "34050")
sc=SparkContext(conf=conf)
source = sc.textFile(r"./p2p-Gnutella04.txt")
#初使化
N=10876
B=0.8
ranks = [ (str(i),1/N) for i in range(1,N+1) ]
ranks=sc.parallelize(ranks)
source=source.map(divergeTwoPart)
source=source.reduceByKey(add)


for i in range(20):
    contribs = source.join(ranks).flatMap(contributions)
    ranks = contribs.reduceByKey(add).mapValues(lambda rank : rank * B )
    ranks.foreach(addS)
    S=accum.value
    ranks = ranks.mapValues(lambda rank : rank +  (1 - S)/N)
    accum.add(-accum.value)



In [8]:
#本欄位為跑實際資料用的
ans=sorted(ranks.collect(), key = lambda s: s[1],reverse=True)
for i in range(10):
    try:
        print(ans[i])
    except:
        pass

('1054', 0.000629741636046356)
('1056', 0.000629582815958033)
('1536', 0.0005251083939889216)
('171', 0.0005126860232970683)
('453', 0.0004962075018740512)
('407', 0.0004857273277695018)
('263', 0.00048044391878490416)
('4664', 0.00047179820746095873)
('261', 0.0004639005279768693)
('410', 0.00046196765017376723)


In [88]:
sc.stop()
#本欄位為跑測試資料用的
conf=SparkConf().setMaster('local').setAppName('PageRank').set("spark.ui.port", "34050")
sc=SparkContext(conf=conf)
source = sc.textFile(r"./test.txt")
#初使化
N=5
B=0.8
ranks = [ (str(i),1/N) for i in range(1,N+1) ]
ranks=sc.parallelize(ranks)
source=source.map(divergeTwoPart)
source=source.reduceByKey(add)


for i in range(10):
    contribs = source.join(ranks).flatMap(contributions)
    ranks = contribs.reduceByKey(add).mapValues(lambda rank : rank * B )
    ranks.foreach(addS)
    S=accum.value
    ranks = ranks.mapValues(lambda rank : rank +  (1 - S)/N)
    accum.add(-accum.value)



[('4', 0.32906630475610793), ('1', 0.1931454977244748), ('2', 0.16991320025033024), ('3', 0.16991320025033024), ('5', 0.13796179701875677)]


In [None]:
#本欄位為跑測試資料用的
ans=sorted(ranks.collect(), key = lambda s: s[1],reverse=True)
for i in range(10):
    try:
        print(ans[i])
    except:
        pass