# _Pregel_

### pregel.py:

调用库

In [1]:
import collections
import threading

定义Vertex类

In [2]:
class Vertex():
    def __init__(self, id, value, neighbors):
        self.id = id #顶点的唯一标识符
        self.value = value #顶点的值
        self.neighbors = neighbors #顶点的邻居列表
        self.incoming_messages = [] #收到的消息列表
        self.outgoing_messages = [] #发送的消息列表
        self.is_active = True #顶点是否活跃
        self.superstep = 0 #超级步长


Pregel算法的实现

In [3]:
class Pregel():
    def __init__(self, vertices, num_workers): #初始化方法，传入顶点列表和工作线程数
        self.vertices = vertices
        self.num_workers = num_workers

    def run(self):
        self.partition = self.partition_vertices()
        while self.check_active(): #while循环，当有活动顶点时执行以下操作：
            self.superstep() #superstep：每个工作线程执行一次超级步长操作
            self.spread_messages() #spread_messages：传播消息，更新顶点的超级步长和接收到的消息列表



    def partition_vertices(self): #将顶点根据工作线程数进行分区，使用哈希函数将顶点映射到工作线程
        partition = collections.defaultdict(list)
        for vertex in self.vertices:
            partition[self.worker(vertex)].append(vertex)
        return partition

    def worker(self, vertex): #根据顶点的哈希值确定工作线程
        return hash(vertex) % self.num_workers

    def superstep(self): #每个工作线程执行一次超级步长操作，更新顶点的状态
        workers = []
        for vertex_list in self.partition.values():
            worker = Worker(vertex_list)
            workers.append(worker)
            worker.start()
        for worker in workers:
            worker.join()

    def spread_messages(self): #传播消息，更新顶点的超级步长和接收到的消息列表
        for vertex in self.vertices:
            vertex.superstep += 1
            vertex.incoming_messages = []
        for vertex in self.vertices:
            for (recv_vertex, msg) in vertex.outgoing_messages:
                recv_vertex.incoming_messages.append((vertex, msg))

    def check_active(self): #检查是否有活动顶点
        return any([vertex.is_active for vertex in self.vertices])

定义工作节点

In [4]:
class Worker(threading.Thread):
    def __init__(self, vertices): #初始化方法，传入顶点列表
        threading.Thread.__init__(self)
        self.vertices = vertices

    def run(self): #线程运行方法，调用超级步长方法
        self.superstep()

    def superstep(self): #超级步长方法，遍历顶点列表，对活跃顶点进行更新
        for vertex in self.vertices:
            if vertex.is_active:
                vertex.update()

### pagerank.py:

调用包：

In [5]:
##from pregel import Vertex, Pregel
import time
from numpy import mat, eye, zeros, ones, linalg
import random

定义工作线程数，顶点数

In [6]:
num_workers = 10
num_vertices = 10000

定义pagerank的顶点

In [7]:
class PageRankVertex(Vertex):
    def update(self): #超步通讯
        if self.superstep < 100:
            self.value = 0.15 / num_vertices + 0.85 * sum([pagerank for (vertex, pagerank) in self.incoming_messages])
            outgoing_pagerank = self.value / len(self.neighbors)
            self.outgoing_messages = [(vertex, outgoing_pagerank) for vertex in self.neighbors]
        else:
            self.is_active = False

In [8]:
def create_edges(vertices):
    for vertex in vertices:
        vertex.neighbors = random.sample(vertices, random.randint(1,5))  # 随机选择邻居节点

In [9]:
def calc_pagerank(vertices): #直接计算pagerank的算法
    I = mat(eye(num_vertices))  # 创建单位矩阵I
    G = zeros((num_vertices, num_vertices))  # 创建邻接矩阵G
    for vertex in vertices:
        num_neighbor = len(vertex.neighbors)
        for nv in vertex.neighbors:
            G[nv.id, vertex.id] = 1.0 / num_neighbor  # 构建邻接矩阵
    P = (1.0 / num_vertices) * mat(ones((num_vertices, 1)))  # 创建概率向量P
    return 0.15 * ((I - 0.85 * G).I) * P  # 计算PageRank值

In [10]:
def pregel_pagerank(vertices): #使用pregel计算pagerank的算法
    p = Pregel(vertices, num_workers)  # 创建Pregel模型对象
    p.run()  # 运行Pregel模型
    return mat([vertex.value for vertex in p.vertices]).transpose()  # 获取每个顶点的PageRank值

In [11]:
def main():
    vertices = [PageRankVertex(j, 1.0 / num_vertices, []) for j in range(num_vertices)]  # 创建PageRankVertex对象列表
    create_edges(vertices)  # 创建边
    start = time.time()  # 记录开始时间
    pr = calc_pagerank(vertices)  # 计算PageRank值
    elapsed_time = time.time() - start  # 计算运行时间
    print("elapsed %s sec for calc PageRank" % elapsed_time)  # 输出计算PageRank所用的时间
    start = time.time()
    pr_pregel = pregel_pagerank(vertices)  # 使用Pregel模型计算PageRank值
    elapsed_time = time.time() - start  # 计算运行时间
    print("elapsed %s sec for Pregel PageRank" % elapsed_time)  # 输出使用Pregel模型计算PageRank所用的时间
    diff = pr_pregel-pr  # 计算两个PageRank向量之间的差异
    print("The norm of the difference is: %s" % linalg.norm(diff))  # 输出差异向量的范数

In [12]:
main() #执行

elapsed 9.747305154800415 sec for calc PageRank
elapsed 3.1489317417144775 sec for Pregel PageRank
The norm of the difference is: 2.320301222358135e-09
