In [370]:
import sys
import os
from pyspark.rdd import RDD
from pyspark import SparkConf, SparkContext
os.environ['JAVA_HOME'] = 'C:\Program Files\Java\jdk1.8.0_301'
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

$(i, j)$ pair 代表存在有向邊 $i \rightarrow j$

在 `mapper1` 中，讀取 `input.txt` 形成 $(i, j)$ pair

In [371]:
def mapper1(line):
    line = line.split()
    maplist = []
    maplist.append((line[0], line[1]))

    return maplist

在 `mapper2` 中，`x[0]` 為 node $i$，而 `x[1][0]` 為所有 node $i$ 的 out-linker。

故在數學式$r_j=\displaystyle\sum_{i\rightarrow j}\beta\frac{r_i}{d_i}+(1-\beta)\frac{1}{N}$ 中的 $d_i$ 即為 `len(x[1][0])`。

計算時，因為 $\beta$ 與 i 無關，因此我們將 $\beta$ 提出來，之後再運算。



在 `mapper2` 中，我們計算 $\frac{r_i}{d_i}$，並形成 $(j, \frac{r_i}{d_i}$) 的 pair，最後再利用 `reduceByKey` 來算出每一項的 $r_j$

In [372]:
def mapper2(x):
    ranks_list = []

    # x[0] 為 node i
    # x[1][0] 為所有 node i 射出的點，故 len(x[1][0]) 為 out-degree of node i
    for j in x[1][0]:
        # r_new = beta * (float(x[1][1])/len(x[1][0]))
        r_new = float(x[1][1])/len(x[1][0])

        # 我們最後是以 j 為 reduceByKey 的 key, 故這邊我們要 append((j, r_new))
        ranks_list.append((j, r_new))

    return ranks_list

在 `count` 中，因為有些點只有 in-link，沒有 out-link，故在計算 total node number $N$ 時會少算。

這邊我們重新讀檔，將所有的 node 加入 `node_list`，因為所有 node 皆是有編號的，因此我們利用最大的編號數來代表 total node number $N$ 。

In [373]:
def count(line):
    node_list = []
    line = line.split()

    # Add node into node_list
    # 若重複出現者，則不用加入
    if int(line[0]) not in node_list:
        node_list.append(int(line[0]))
    
    if int(line[1]) not in node_list:
        node_list.append(int(line[1]))
    
    return node_list

In [374]:
def reducer(x,y):
    return x+y

In [382]:
def output(res_list):
    f = open('Outputfile.txt', 'w')
    str = "{}\t{}\n"

    out_cnt = len(res_list)
    if out_cnt > 10:
        out_cnt = 10
    for i in range(out_cnt):
        f.write(str.format(res_list[i][1], round(res_list[i][0], 6)))
        print(str.format(res_list[i][1], round(res_list[i][0], 6)))
    print("Done")
        
    return

In [376]:
conf = SparkConf().setMaster("local").setAppName("PageRank")
sc = SparkContext(conf=conf)


經過 `mapper1` 後形成 $(i, j)$ pair。

透過 `groupByKey()` 形成 (node i, List of node's outlink)。

需注意 node 編號從 0 開始，因此最後 total node number 需要再 $+1$

一開始， $\forall r_j, \; r_j=1/N$，因此我們 `ranks` 採用 $(j, r_j)$ pair。

> 需注意因為 `input-test.txt` 的 node 編號從 1 開始，因此 `ranks` 的初始值應為 (i+1, 1/N)

In [383]:
links = sc.textFile("input.txt")
links = links.flatMap(mapper1)

# Calculate all page
# 如果有個點沒有 out-link，則不會算到!!
# links 型式: (page, list of page's outlink)
links = links.groupByKey()

N = sc.textFile("input.txt")
N = N.flatMap(count).max()
# node num start from 0
N = int(N)+1

# Create and initialize the ranks
ranks = sc.parallelize([(str(i), 1/N) for i in range(N)])
# # 若 input 為 `input-test.txt`，node 編號從 1 開始，因此需注意 ranks 初始值
# ranks = sc.parallelize([(str(i+1), 1/N) for i in range(N)])

10879


我們透過 `.join()` 形成 (node i, [[list of i's out-link], $r_j$]) 的 format。

經過 `mapper2` 和 `reducer` 後，即完成數學式 $r_j=\displaystyle\sum_{i\rightarrow j}\beta\frac{r_i}{d_i}+(1-\beta)\frac{1}{N}$ 中的 $\displaystyle\sum_{i\rightarrow j}\frac{r_i}{d_i}$。

接著，需乘上 $\beta$ 和後面的 $\frac{(1-\beta)}{N}$，形成$r^{new}_j$

我們透過 $\sum_j r^{new}_j \lt 1$ 來判斷是否出現 **dead-end**。 若我們發現 **dead-end** 的出現，則必須 renormalize:
$$
\forall j: r^{new}_j=r'^{new}_j+\frac{1-S}{N}
$$
, where $S=\sum r'^{new}_j$


In [384]:
Iteration = 20
beta = 0.8
constant = (1-beta)/N

for i in range (Iteration):
    ranks = links.join(ranks).flatMap(mapper2).reduceByKey(reducer)
    ranks = ranks.map(lambda x: (x[0], beta*x[1]+constant))

    # Detect whether dead-end occur
    ranks_sum = ranks.map(lambda x: x[1])
    s = ranks_sum.sum()
    if s < 1:
        # print("Detect Dead-end")
        ranks = ranks.map(lambda x: (x[0], x[1]+(1-s)/N))
        
# 將 key value 對換，這樣可以利用 sortByKey 來達成 output format
ranks = ranks.map(lambda x : (x[1], x[0]))
output(ranks.sortByKey(False).collect())

10861	0.00063

4240	0.00063

6899	0.000526

9526	0.000513

2118	0.000497

3419	0.000486

1311	0.000481

3186	0.000472

3541	0.000464

367	0.000462

Done


In [None]:
sc.stop()