# Self study 11

## Libraries
For this task you will need the Abstract Base Classes and networkx libraries. Abstract Base Classes (abc) is a library to manage abstract classes in Python. You can find its documentation [here](https://docs.python.org/3/library/abc.html).

In [None]:
from abc import ABC, abstractmethod
import math
import networkx as nx

## Introduction
In this session, we will test some graph parallel framework implementations. There are frameworks which one can use, like [GraphX](https://spark.apache.org/graphx/), which is a library for Apache Spark. Since the installation of Spark, however, can be time consuming, in this notebook we will use a rudimentary implementation of a graph parallel framework. We assume that the graph is directed. The following code implements two abstract classes, Node and Edge. 

Node represents a computational node in a graph processing framework. It should implement the functions Agg, Msg and End, as described during the lecture. It defines three abstract methods:

* aggregate: it combines the internal node status with the received messages to compute the new internal status.
* prepare_message: it prepares the message to be sent to the connected nodes.
* end: it returns a boolean value indicating if the node wants to continue the computation.

In [None]:
class Node(ABC):
    def __init__(self, name, features, active = False):
        self.name = name
        self.features = features
        self.edges = []
        self.messages = []
        self.active = active
    
    def add_edge(self, edge):
        self.edges.append(edge)

    @abstractmethod
    def prepare_message(self, message): pass

    @abstractmethod
    def aggregate(self): pass

    @abstractmethod
    def end(self): pass

    def agg(self):
        self.aggregate()
        self.messages = []

    def send_msg(self):
        if self.active:
            message = self.prepare_message()
            for edge in self.edges:
                edge.msg(message)

    def recv_message(self, message):
        self.messages.append(message)

    def print(self):
        print(self.name, self.features)

The role of the Edge class is to track the graph edges, and to forward the messages from the ingoing to the outgoing nodes. The class defines an abstract class prepare_message, which receives the message from the ingoing node, and transforms it before sending it to the outgoing node.

In [None]:
class Edge(ABC):
    def __init__(self, node2, features):
        self.node2 = node2
        self.features = features

    @abstractmethod
    def prepare_message(self, message): pass

    def msg(self, message): 
        message = self.prepare_message(message)
        self.node2.recv_message(message)

The execute function implements the execution routine of a graph processing framework. It continues to repeat the two superstep phases (computation and message exchange) until all the nodes are ready to complete the processing. This code "simulates" the behaviour of a graph parallel framework in a single thread architecture. The next natural step would be to execute all send_msg in parallel, and when they finish, execute all the agg methods in parallel.

In [None]:
def execute(nodes: Node):
    exit = False
    while exit != True:
        exit = True
        for node in nodes:
            node.send_msg()
        for node in nodes:
            node.agg()
            exit = exit and node.end()

## Task: Single Source Shortest Path

In the first task, we implement the single source shortest path, as seen during the lecture. 

First, we define a class NodeSP that implements Node. Specifically:

* aggregate determines the minimum value between the internal one and the messages received from the other nodes. If the minimum value is different from the current one, the internal state is updated, and messages will be sent to the neighbours.
* prepare_message prepares the message for the neighbour nodes. The message includes the new state value (i.e. the minimum distance from the source node).
* end returns true if the node did not change the internal state, false otherwise.

In [None]:
class NodeSP(Node):
    def __init__(self, name, features, active = False):
        super().__init__(name, features, active)

    def prepare_message(self):
        message = []
        message.append(self.features[0])
        return message

    def aggregate(self):
        self.messages.append(self.features[0])
        if min(self.messages) != self.features[0]:
            self.active = True
            self.features[0] = min(self.messages)  
        else:
            self.active = False

    def end(self):
        return not self.active


EdgeSP implements Edge. The prepare_message function adds to the message the edge weight.

In [None]:
class EdgeSP(Edge):
    def __init__(self, node2, features):
        super().__init__(node2, features)

    def prepare_message(self, message):
        message = message[0]+self.features[0]
        return message

We now run the algorithm on the graph we studied at the slide 38 of the lecture. First, we prepare the graph. Then we compute the shortest path distance from node B to all the other nodes.

In [None]:
a = NodeSP('a', [10000])
b = NodeSP('b', [0], True)
c = NodeSP('c', [10000])
d = NodeSP('d', [10000])

a.add_edge(EdgeSP(c, [5]))
a.add_edge(EdgeSP(d, [1]))
b.add_edge(EdgeSP(a, [2]))
b.add_edge(EdgeSP(c, [6]))
b.add_edge(EdgeSP(d, [4]))
d.add_edge(EdgeSP(c, [1]))

G = [a, b, c, d]

Next, we perform the computation, and we print the shortest path distance between node B and the other nodes. The values are the same we obtained while studying the algorithm during the lecture.

In [None]:
execute(G)

for node in G:
    node.print()

We can also apply the algorithm to the Lazega graph (already used in the self study session 4). We compute the shortest path distances from node 17 (the source node) to all the others. 

First, we load the graph and we prepare the graph. We set the weight of each edge to 1. 

Next, we execute the program and print the nodes with their distance from node 17. If the (node?)value is set to 10000, it means that the node is not reachable.

In [None]:
lazega = nx.readwrite.graphml.read_graphml('/mnt/c/Users/dade/OneDrive - Aalborg Universitet/teaching/WI/E23/lazega.gml')

nodes = {}
source = "17"

for node in lazega.nodes:
    if node == source:
        nodes[node] = NodeSP(node, [0], True)
    else:
        nodes[node] = NodeSP(node, [10000])

for edge in lazega.edges:
    nodes[edge[0]].add_edge(EdgeSP(nodes[edge[1]], [1]))

lG = list(nodes.values())

execute(lG)

for node in lG:
    node.print()


## Task: PageRank

The following code implements PageRank. The class NodePR implements the abstract methods of the Node:

* aggregate sums the values of the received messages. Then it adds the result to (1-d)/#nodes.
* prepare_message sends d*(current PageRank value)/#outdegree as the message for the neighbour nodes.
* end terminates after z iterations.

EdgePR extends Edge by implementing prepare_message. As seen during the lecture, the feature vectors of edges are empty, and no modification happens.

In [None]:
class NodePR(Node):
    def __init__(self, name: str, features, d: float, z: int, active = False):
        super().__init__(name, features, active)
        self.d = d
        self.z = z
        self.i = 0

    def prepare_message(self):
        message = []
        message.append((self.d * self.features[0])/self.features[1])
        return message

    def aggregate(self):
        s = 0
        for message in self.messages:
            s = s + message[0]
        self.features[0] = (1-self.d)/self.features[2] + s
        self.i = self.i + 1

    def end(self):
        return self.i >= self.z
    
class EdgePR(Edge):
    def __init__(self, node2, features):
        super().__init__(node2, features)

    def prepare_message(self, message):
        return message

The following code prepares the graph of slide 41. 

In [None]:
acme = NodePR('acme', [1/4, 1, 4], 0.85, 10, True)
bob = NodePR('bob', [1/4, 2, 4], 0.85, 10, True)
carol = NodePR('carol', [1/4, 3, 4], 0.85, 10, True)
diana = NodePR('diana', [1/4, 1, 4], 0.85, 10, True)

acme.add_edge(EdgePR(diana, []))
bob.add_edge(EdgePR(acme, []))
bob.add_edge(EdgePR(carol, []))
carol.add_edge(EdgePR(acme, []))
carol.add_edge(EdgePR(bob, []))
carol.add_edge(EdgePR(diana, []))
diana.add_edge(EdgePR(carol, []))

G = [acme, bob, carol, diana]

We can now compute the PageRank of the graph. The resulting values are the same we determined at slide 51.

In [None]:
execute(G)

for node in G:
    node.print()

We can now try PageRank with the Lazega graph. When using real graphs, it is common to find endpoint (or sink) nodes, i.e. nodes with outdegree zero. These points are problematic because they acquire PageRank mass, breaking the algorithm. The damping factor (or the teleport) is a classical workaround to overcome the issue.

In the case of the graph parallel framework we are studying, there is not a teleport step. However, we can still simulate it: each endpoint node should distribute the pagerank to all the nodes in the graph. 

The prepare_function below converts a NetworkX directed graph in a graph we can process with our graph parallel framework for PageRank. In the conversion process, it connects endpoints to all the graph nodes.

In [None]:
def prepare_graph(graph: nx.DiGraph):
    nodes = {}
    endpoints = []
    for node in graph.nodes:
        nodes[node] = NodePR(node, [1/len(graph.nodes), graph.out_degree(node), len(graph.nodes)], 0.85, 100, True)
        if graph.out_degree(node) == 0:
            endpoints.append(node)

    for edge in graph.edges:
        nodes[edge[0]].add_edge(EdgePR(nodes[edge[1]], []))

    for node in endpoints:
        for n in nodes:
            nodes[node].add_edge(EdgePR(nodes[n], []))
        nodes[node].features[1] = len(nodes)

    return nodes


We can now pre-process the Lazega graph, and execute the PageRank algorithm

In [None]:
nodes = prepare_graph(lazega)

G = list(nodes.values())

execute(G)

for node in G:
    node.print()


As a sanity check, we compute the MSE of the computed PageRank with the one implemented in NetworkX. The result suggests that the values are substantially the same.

In [None]:
pr = nx.pagerank(lazega)

error = 0
for node in pr:
    error = error + math.pow(pr[node]-nodes[node].features[0], 2)
print(math.sqrt(error/len(pr)))