# 0 GraphLite Monitor
single process

In [4]:
class Edge:
    def __init__(self, source, target, weight):
        self.source = source
        self.target = target
        self.value = weight
        self.globalvar = {}
        
    def __str__(self):
        return str((self.source, self.target, self.value))
        
class Message:
    def __init__(self, source, target, value):
        self.source = source
        self.target = target
        self.value = value
        
    def __str__(self):
        return str((self.source, self.target, self.value))

class Vertex:
    globalvar = {}            # set by master, it is a copy of master var; or aggregator
    def __init__(self, vid):
        self.vid = vid
        self.outEdges = []     # neighbour is out neighbour, A->B means A has neighbour B, but B has not neighbour A
        self.halt = False
        self.outMsgs = []
        
    def voteToHalt(self):
        self.halt = True
    
    def compute(self, inMsgs):
        if self.getSuperStep() == 0:
            for edge in self.outEdges: print(edge, end=' ')
            print("")
            self.voteToHalt()
    
    def sendMessageToNeighbour(self, target, value):
        self.outMsgs.append(Message(self.vid, target, value))
        
    def sendMessageToAllNeighbours(self, value):
        for egde in self.outEdges:
            self.outMsgs.append(Message(self.vid, egde.target, value))
            
    def __str__(self):
        return str((self.vid, [(e.target, e.value) for e in self.outEdges], self.outMsgs, self.halt))
    
    def getSuperStep(self):
        return Vertex.globalvar['superstep']
    
    def accumulate(self, idx, value):
        Vertex.globalvar['aggregators'][idx].accumulate(value)
        
    def getAggrGlobal(self, idx):
        return Vertex.globalvar['aggregators'][idx].global_v
        
class Aggregator:
    def __init__(self, initLocalValue=None, initGlobalValue=None):
        self.initLocalValue = initLocalValue
        self.initGlobalValue = initGlobalValue
        self.init()
        
    def init(self): #
        self.local_v = self.initLocalValue     # current local var, in vertex can be write.
        self.global_v = self.initGlobalValue   # last global var, in vertex can only read.
    
    def merge(self, value): #  global init, single process do not need
        self.global_v += value
        
    def accumulate(self, value): # local init
        self.local_v += value
        
    def copy(self):
        a = Aggregator()
        a.initLocalValue = self.initLocalValue
        a.initGlobalValue = self.initGlobalValue
        a.local_v = self.local_v
        a.global_v =  self.global_v
        return a
        
class Graph:
    def __init__(self, vertexs, edges):
        self.vertexs = vertexs
        self.edges = edges
        self.inMsgs = []
        for i in range(len(self.vertexs)):   # parfor
            self.inMsgs.append([])
        self.outMsgs = []
        self.aggregators = []
        
    def copyGlobal(self):                               # just like Aggregator
        Vertex.globalvar['superstep'] = self.superstep
        
        if 'aggregators' in Vertex.globalvar:           # after sovle
            # merge local(Vertex) to global(Graph)
            for gagg in self.aggregators: gagg.init()
            Vaggs = [Vertex.globalvar['aggregators']]
            for i in range(len(self.aggregators)):
                gagg = self.aggregators[i]
                for worker_id in range(1):  # single process
                    vagg = Vaggs[worker_id][i]
                    gagg.merge(vagg.local_v)
        
        # copy global(Graph) to global(Vertex)
        if 'aggregators' not in Vertex.globalvar:
            Vertex.globalvar['aggregators'] = []
            for agg in self.aggregators:
                Vertex.globalvar['aggregators'].append(agg.copy())
        else:
            for vagg in Vertex.globalvar['aggregators']: vagg.init()
            Vaggs = [Vertex.globalvar['aggregators']]
            for i in range(len(self.aggregators)):
                gagg = self.aggregators[i]
                for worker_id in range(1):  # single process
                    vagg = Vaggs[worker_id][i]
                    vagg.merge(gagg.global_v)
        
    def splitMessage(self):
        self.clearMessage(self.inMsgs)
        for msg in self.outMsgs:
            self.inMsgs[msg.target].append(msg)
        self.outMsgs[:] = []
        
    def clearMessage(self, msgs):
        for i in range(len(msgs)):
            msgs[i][:] = []
            
    def collectOutMessage(self):
        for v in self.vertexs:
            self.outMsgs.extend(v.outMsgs)
            v.outMsgs[:] = []
    
    def run(self, verbose=False):
        self.superstep = 0
        all_halt = False
        while not all_halt:
            if verbose:
                print("superstep", self.superstep)
            
            self.copyGlobal()     # copy global var to some points, like aggregator
            self.splitMessage()   # move outMsgs to inMsgs
            
            all_halt = True
            for v in self.vertexs:
                if not v.halt:
                    all_halt = False
                    v.compute(self.inMsgs[v.vid])
                    
            self.collectOutMessage() # collect all vertex.outMsgs to outMsgs
            
            if verbose:
                if not all_halt:
                    print([v.value for v in self.vertexs])
            self.superstep += 1
            
    def __str__(self):
        string = "vertexs"
        for v in self.vertexs:
            string += "\t" + str(v) + "\n"
        return string
        
def createGraphFromG(G, Vertex=Vertex, Edge=Edge , Message=Message): # just like input formater and create_grpah
    def addEdge(source, target, weight):
        edge = Edge(source, target, weight)
        edges.append(edge)
        vertexs[source].outEdges.append(edge)
        
    edges = []
    vertexs = []
    N = len(G)
    for vid in range(N):
        vertexs.append(Vertex(vid))
    for i in range(N):
        for j in range(N):
            if i!= j and G[i, j] >= 0:
                addEdge(i, j, G[i, j])

    graph = Graph(vertexs, edges)
    return graph

## 0.1 InputReader and Compare

In [6]:
import numpy as np
INF = 1e10
class SparseGraph:
    def __init__(self, v_count=None, e_count=None, INF=None):
        self.v_count = v_count
        self.e_count = e_count
        self.rows = {}
        SparseGraph.INF = INF
        self.iteridx = 0
    
    def addEdge(self, start, end, weight=None):
        if start in self.rows:
            self.rows[start][end] = weight
        else:
            self.rows[start] = {end: weight}
            
    def __getitem__(self, i):
        i1, i2 = i
        if i1 in self.rows and i2 in self.rows[i1]:
            return self.rows[i1][i2]
        else: return SparseGraph.INF
    
    def __setitem__(self, i, v):
        i1, i2 = i
        self.addEdge(i1, i2, v)
    
    def __len__(self):
        return self.v_count
        
def loadFileToG(filepath):
    f = open(filepath)
    vertex_count = int(f.readline())
    edge_count = int(f.readline())
    G = SparseGraph(vertex_count, edge_count, INF=INF)
    for line in f.readlines():
        res = line.split(' ')
        if len(res) < 2:
            raise ValueError('edge line must have two element at least.')
        s, e, w = int(res[0]), int(res[1]), 1
        if len(res) >= 3:
            w = float(res[2])
        G[s, e] = w
    return G

def createGraphFromSparseG(G, Vertex=Vertex, Edge=Edge , Message=Message): # just like input formater and create_grpah
    def addEdge(source, target, weight):
        edge = Edge(source, target, weight)
        edges.append(edge)
        vertexs[source].outEdges.append(edge)
        
    edges = []
    vertexs = []
    N = len(G)
    for vid in range(N):
        vertexs.append(Vertex(vid))
    for s in G.rows:
        for t in G.rows[s]:
            if s!= t:
                addEdge(s, t, G.rows[s][t])

    graph = Graph(vertexs, edges)
    return graph

# G = loadFileToG('cpp_GraphLite/part2-input/SSSP-graph0')
# print len(G)
# print G.e_count
# graph = createGraphFromSparseG(G)

def compare(data, filepath, seq=' '):
    for i, (d, l) in enumerate(zip(data, open(filepath).readlines())):
        rd = [float(e) for e in l.split(seq)]
        for de, rde in zip(d, rd):
            if de != rde:
                print("not equal in line %d:" %i, d,'!=', rd)
                return False
    return True

# 1. SSSP(single source shortest path)

## 1.1 create a simple test

In [7]:
import numpy as np
def add_edge(G, v1, v2, w):
    G[V[v1], V[v2]] = w
    
V = {'A':0, 'B':1, 'C':2, 'D':3, 'E':4, 'F':5}
F = 'ABCDEF'
G = [[0, 50, 10, -1, 45, -1],
     [-1, 0, 15, -1, 10, -1],
     [20, -1, 0, 15, -1, -1], 
     [-1, 20, -1, 0, 35, 3],
     [-1, -1, -1, 30, 0, -1],
     [-1, -1, -1, -1, -1, 0]]
G = np.array(G)

## 1.2 Bellman Folloy AL implement in GraphLite
```py
dis[:] = INF
dis[source] = 0
for k in range(N):
    for v in vertexs:
        for u in v.inNeighbours:
            dis[v] = min(dis[v], dis[u]+G[u, v])
```

In [8]:
V0 = 0
update = True

class IsUpdate(Aggregator):
    def merge(self, value):
        self.global_v = self.global_v or value
        
    def accumulate(self, value):
        self.local_v = self.local_v or value

class SSSPVertex(Vertex):
    def compute(self, Msgs):
        if self.getSuperStep() == 0:  # init
            if self.vid == V0:
                self.value = 0
                for outEdge in self.outEdges:
                    self.sendMessageToNeighbour(outEdge.target, outEdge.value)
            else:
                self.value = INF
        else:
            if(self.getSuperStep() >= 2):                # end condition
                if not self.getAggrGlobal(0):
                    self.voteToHalt()
                    return;
            
            minpath = self.value
            for msg in Msgs:
                if minpath > msg.value:
                    minpath = msg.value
            if minpath < self.value:
                self.value = minpath
                self.accumulate(0, True)
                for outEdge in self.outEdges:
                    self.sendMessageToNeighbour(outEdge.target, minpath + outEdge.value)
                
def MyCreateGraphFromG(G, Vertex=Vertex, Edge=Edge , Message=Message):
    graph = createGraphFromG(G, Vertex, Edge, Message)
    graph.aggregators.append(IsUpdate(False, False))
    return graph

In [9]:
graph = MyCreateGraphFromG(G, Vertex=SSSPVertex)
# graph = createGraphFromG(G)
print(graph)
graph.run(verbose=True)
# repeate line 1 is set isUpdate to False
# repeate line 2 is get kown isUpdate is False
# so last three line repated

vertexs	(0, [(1, 50), (2, 10), (4, 45)], [], False)
	(1, [(2, 15), (4, 10)], [], False)
	(2, [(0, 20), (3, 15)], [], False)
	(3, [(1, 20), (4, 35), (5, 3)], [], False)
	(4, [(3, 30)], [], False)
	(5, [], [], False)

superstep 0
[0, 10000000000.0, 10000000000.0, 10000000000.0, 10000000000.0, 10000000000.0]
superstep 1
[0, 50, 10, 10000000000.0, 45, 10000000000.0]
superstep 2
[0, 50, 10, 25, 45, 10000000000.0]
superstep 3
[0, 45, 10, 25, 45, 28]
superstep 4
[0, 45, 10, 25, 45, 28]
superstep 5
[0, 45, 10, 25, 45, 28]
superstep 6


## 1.3 Dijkstra AL

In [10]:
import numpy as np
def SSSP_Dijkstra(G, s):
    def find_min_dis_out_set(dis, in_set):
        min_i = -1
        min_dis = -1
        for i in range(len(dis)):
            if in_set[i] or dis[i] == -1: continue
            if min_dis == -1 or min_dis > dis[i]:
                min_dis = dis[i]
                min_i = i
        return min_i, min_dis
    
    def update_dis_out_set(dis, before_v, in_set, min_i):
        for i in range(len(dis)):
            if in_set[i]: continue
            if G[min_i, i] == -1: continue
            if dis[i] == -1 or dis[i] > dis[min_i] + G[min_i, i]:
                dis[i] = dis[min_i] + G[min_i, i]
                before_v[i] = min_i
    
    N = G.shape[0]
    in_set = [False] * N
    in_set[s] = True
    dis = G[s, :].copy()
    before_v = [s] * N    # for traceback to find path
    
    i = 1
    while i < N:
        min_i, min_dis = find_min_dis_out_set(dis, in_set)
        if min_i == -1: break
        in_set[min_i] = True
        update_dis_out_set(dis, before_v, in_set, min_i)
        i += 1
    return dis, before_v

def get_path(before_v, s, t, V):
    v = t
    path = [F[v]]
    while v != s:
        v = before_v[v]
        path.append(F[v])
    path.reverse()
    return path

In [12]:
min_dis, before_v = SSSP_Dijkstra(G, 0)
for t in range(len(V)):
    print(min_dis[t], get_path(before_v, 0, t, V))

0 ['A']
45 ['A', 'C', 'D', 'B']
10 ['A', 'C']
25 ['A', 'C', 'D']
45 ['A', 'E']
28 ['A', 'C', 'D', 'F']


for k in range(N):
    v = find_min_dis_out_set(dis, in_set)
    update_dis_out_set(dis, in_set, v)

def find_min(min_length, v):
    if not v.value.inset:    # find min
        if min_length > v.value.length:
            min_length = v.value.length
    
for k in range(N):
    min_length = INF
    for v in vertexs:
        find_min(min_length, v)
    for v in 
    

## 1.5 test on big data

In [13]:
def MyCreateGraphFromSparseG(G, Vertex=Vertex, Edge=Edge , Message=Message):
    graph = createGraphFromSparseG(G, Vertex, Edge, Message)
    graph.aggregators.append(IsUpdate(False, False))
    return graph

G = loadFileToG('cpp_GraphLite/part2-input/SSSP-graph0')
print(len(G))
print(G.e_count)

graph = MyCreateGraphFromSparseG(G, Vertex=SSSPVertex)
# print graph
graph.run(verbose=False)
print([v.value for v in graph.vertexs[0:10000: 100]])

32767
32766
[0, 12.0, 14.0, 16.0, 16.0, 16.0, 18.0, 18.0, 18.0, 18.0, 18.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 20.0, 22.0, 22.0, 22.0, 22.0, 22.0, 22.0, 22.0, 22.0, 22.0, 22.0, 22.0, 22.0, 22.0, 22.0, 22.0, 22.0, 22.0, 22.0, 22.0, 22.0, 24.0, 24.0, 24.0, 24.0, 24.0, 24.0, 24.0, 24.0, 24.0, 24.0, 24.0, 24.0, 24.0, 24.0, 24.0, 24.0, 24.0, 24.0, 24.0, 24.0, 24.0, 24.0, 24.0, 24.0, 24.0, 24.0, 24.0, 24.0, 24.0, 24.0, 24.0, 24.0, 24.0, 24.0, 24.0, 24.0, 24.0, 24.0, 24.0, 24.0, 24.0, 26.0, 26.0, 26.0, 26.0, 26.0, 26.0, 26.0, 26.0, 26.0, 26.0, 26.0, 26.0, 26.0, 26.0, 26.0, 26.0, 26.0, 26.0]


# 2 SimRank