In [1]:
import pathpyG as pp
import torch
from torch_geometric import EdgeIndex

In [2]:
g1 = pp.Graph.from_edge_index(torch.tensor([[0,1,1],[1,2,3]]), mapping=pp.IndexMap(['a', 'b', 'c', 'd']))
print(g1.data.edge_index)
g2 = pp.Graph.from_edge_index(torch.tensor([[0,1,1],[1,2,3]]), mapping=pp.IndexMap(['a', 'b', 'c', 'd']))
print(g2.data.edge_index)
g = g1 + g2
print(g.data.edge_index)
print(g)

EdgeIndex([[0, 1, 1],
           [1, 2, 3]], sparse_size=(4, 4), nnz=3, sort_order=row)
EdgeIndex([[0, 1, 1],
           [1, 2, 3]], sparse_size=(4, 4), nnz=3, sort_order=row)
EdgeIndex([[0, 0, 1, 1, 1, 1],
           [1, 1, 2, 3, 2, 3]], sparse_size=(4, 4), nnz=6, sort_order=row)
Directed graph with 4 nodes and 6 edges

Graph attributes
	num_nodes		<class 'int'>



In [3]:
g1 = pp.Graph.from_edge_index(torch.tensor([[0,1,1],[1,2,3]]), mapping=pp.IndexMap(['a', 'b', 'c', 'd']))
print(g1.data.edge_index)
g2 = pp.Graph.from_edge_index(torch.tensor([[0,1,1],[1,2,3]]), mapping=pp.IndexMap(['e', 'f', 'g', 'h']))
print(g2.data.edge_index)
g = g1 + g2
print(g.data.edge_index)
print(g)

EdgeIndex([[0, 1, 1],
           [1, 2, 3]], sparse_size=(4, 4), nnz=3, sort_order=row)
EdgeIndex([[0, 1, 1],
           [1, 2, 3]], sparse_size=(4, 4), nnz=3, sort_order=row)
EdgeIndex([[0, 1, 1, 4, 5, 5],
           [1, 2, 3, 5, 6, 7]], sparse_size=(8, 8), nnz=6, sort_order=row)
Directed graph with 8 nodes and 6 edges

Graph attributes
	num_nodes		<class 'int'>



In [4]:
g1 = pp.Graph.from_edge_index(torch.tensor([[0,1,1],[1,2,3]]), mapping=pp.IndexMap(['a', 'b', 'c', 'd']))
print(g1.data.edge_index)
g2 = pp.Graph.from_edge_index(torch.tensor([[0,1,1],[1,2,3]]), mapping=pp.IndexMap(['a', 'b', 'g', 'h']))
print(g2.data.edge_index)
g = g1 + g2
print(g.data.edge_index)
print(g)

EdgeIndex([[0, 1, 1],
           [1, 2, 3]], sparse_size=(4, 4), nnz=3, sort_order=row)
EdgeIndex([[0, 1, 1],
           [1, 2, 3]], sparse_size=(4, 4), nnz=3, sort_order=row)
EdgeIndex([[0, 0, 1, 1, 1, 1],
           [1, 1, 2, 3, 4, 5]], sparse_size=(6, 6), nnz=6, sort_order=row)
Directed graph with 6 nodes and 6 edges

Graph attributes
	num_nodes		<class 'int'>



In [7]:
def num_labels(d):
    return len(set(d.values()))

In [35]:
from typing import Tuple, List

In [45]:
def WL_test(g1: pp.Graph, g2: pp.Graph) -> Tuple[bool, List[str], List[str]]:
    """Run Weisfeiler-Leman test on two graphs"""
    if g1.mapping is None or g2.mapping is None:
        raise Exception('Graphs must contain IndexMap that assigns node IDs')
    if len(set(g1.mapping.node_ids).intersection(g2.mapping.node_ids)) > 0:
        raise Exception('node identifiers of graphs must not overlap')
    g_combined = g1 + g2
    # initialize labels of all ndoes to zero
    fingerprint = { v:'0' for v in g_combined.nodes }
    labels = {} 
    label_count = 1
    stop = False
    while not stop:
        new_fingerprint = {} 
        for node in g_combined.nodes:
            # create new label based on own label and sorted labels of all neighbors
            n_label = [fingerprint[x] for x in g_combined.successors(node)]
            n_label.sort()
            label = str(fingerprint[node]) + str(n_label)
            # previously unknown label
            if label not in labels:
                # create a new label based on next consecutive number
                labels[label] = label_count
                label_count += 1 
            new_fingerprint[node] = labels[label]        
        if len(set(fingerprint.values())) == len(set(new_fingerprint.values())):
            # we processed all nodes in both graphs without encountering a new label, so we stop
            stop = True
        else:
            # update fingerprint and continue
            fingerprint = new_fingerprint.copy()
    fingerprint_1 = [fingerprint[v] for v in g1.nodes]
    fingerprint_1.sort()
    fingerprint_2 = [fingerprint[v] for v in g2.nodes]
    fingerprint_2.sort()
    if fingerprint_1 == fingerprint_2:
        return True, fingerprint_1, fingerprint_2
    return False, fingerprint_1, fingerprint_2

In [46]:
g1 = pp.Graph.from_edge_list([('a', 'b'), ('b', 'c')])
g2 = pp.Graph.from_edge_list([('y', 'z'), ('x', 'y')])
WL_test(g1, g2)

(True, [3, 4, 5], [3, 4, 5])

In [26]:
d = {'a': 0, 'b': 1, 'c': 0}
num_labels(d)

2