In [None]:
%pip install spytial-diagramming
from spytial import *
from spytial.annotations import *

In [16]:
# Setup for performance metrics
import random

# Set this to True to run performance tests
RUN_PERF = False

perf_base = "spytial_perf"
def get_perf_path(structure, size):
    return perf_base + "_" + structure + "_" + f"{size}.json"
PI = 20
SIZES = [5, 10, 25, 50]


There are many ways to represent graphs, including explicitly encoding the edges in the graph node, adjacency matrices, adjacency lists, edge lists.

## Unweighted Graphs from Adjacency Matrix

![unweighted-dir-graph](./img/directed-graph.png)


** View from Adjacency List (**c**) to directed graph (**a**)

In [17]:
adj_matrix = [[0, 1, 0, 1, 0, 0],
              [0, 0, 0, 0, 1, 0],
              [0, 0, 0, 0, 1, 1],
              [0, 1, 0, 0, 0, 0],
              [0, 0, 0, 1, 0, 0],
              [0, 0, 0, 0, 0, 1]]

#ADJ_MATRIX_TO_EDGE_SELECTOR = "{i, j : int | (some xs, ys : list |  (xs->i->ys + ys->j->1  in idx))}"
ADJ_MATRIX_TO_EDGE_SELECTOR = "{i, j: int | 1 in (list.idx[i]).idx[j]}"

# These are 1 indexed instead of 0 indexed.
directedgraph = inferredEdge(selector=ADJ_MATRIX_TO_EDGE_SELECTOR, name="")(adj_matrix)
directedgraph = hideAtom(selector="list")(directedgraph)
diagram(directedgraph)

### Performance - Unweighted Graph (Adjacency Matrix)

In [18]:
if RUN_PERF:
    STRUCTURE = "unweighted_graph_adjmatrix"
    for size in SIZES:
        # Generate random adjacency matrix
        adj_matrix = [[random.randint(0, 1) if i != j else 0 for j in range(size)] for i in range(size)]
        
        directedgraph = inferredEdge(selector=ADJ_MATRIX_TO_EDGE_SELECTOR, name="")(adj_matrix)
        directedgraph = hideAtom(selector="list")(directedgraph)
        diagram(directedgraph, method="browser", perf_path=get_perf_path(STRUCTURE, size), perf_iterations=PI, headless=True, timeout=300)
else:
    print("Performance testing skipped. Set RUN_PERF = True to enable.")


Performance testing skipped. Set RUN_PERF = True to enable.


# Weighted Graphs


Adjacency matrix (**c**) to adjacency list (**b**).



In [19]:
@attribute(field='key')
class GNode:
    def __init__(self, key):
        self.key = key

EDGE_SELECTOR = "{u : (tuple.t0), w : (tuple.t2), v : (tuple.t1) | some(  u.~t0 & v.~t1 & w.~t2) }"

## GEt rid of the (some). Can just be u.~t0 = w.~t2 = v.~t1
@inferredEdge(selector=EDGE_SELECTOR, name='edge')
@hideAtom(selector='list + tuple + Graph + int + str')
class Graph:
    def __init__(self):
        self.adj = []  # adjacency list representation

    def addEdge(self, u: GNode, v: GNode, w: int):
        self.adj.append((u, v, w))


adj_list = [[j for j,val in enumerate(row) if val] for row in adj_matrix]  # 0-indexed

g = Graph()                       # create one graph once
nodes = {i: GNode(i) for i in range(len(adj_list))}  # reuse node objects
node_list = list(nodes.values())


for i, nbrs in enumerate(adj_list):
    for j in nbrs:
        g.addEdge(nodes[i], nodes[j], random.randint(1, 10))

diagram(g)


### Performance - Weighted Graph

In [20]:
if RUN_PERF:
    STRUCTURE = "weighted_graph"
    for size in SIZES:
        # Step 1: Generate random adjacency matrix (0s and 1s)
        adj_matrix = [[random.randint(0, 1) if i != j else 0 for j in range(size)] for i in range(size)]
        
        # Step 2: Create graph and add weighted edges based on the adjacency matrix
        g = Graph()
        nodes = {i: GNode(i) for i in range(size)}
        
        for i in range(size):
            for j in range(size):
                if adj_matrix[i][j] == 1:  # If there's an edge in the adj matrix
                    w = random.randint(1, 20)  # Assign random weight
                    g.addEdge(nodes[i], nodes[j], w)
        
        diagram(g, method="browser", perf_path=get_perf_path(STRUCTURE, size), perf_iterations=PI, headless=True, timeout=800)
else:
    print("Performance testing skipped. Set RUN_PERF = True to enable.")


Performance testing skipped. Set RUN_PERF = True to enable.


# Minimum Spanning Tree



In [21]:

#"{u : (tuple.t0), w : (tuple.t2), v : (tuple.t1) | some(  u.~t0 & v.~t1 & w.~t2) }"
MST_SELECTOR = "{ u : (tuple.t0), w : (tuple.t2), v : (tuple.t1) | some(  u.~t0 & v.~t1 & w.~t2 & (int.((MSTGraph.mst_edges).idx)) ) }"
#MST_SELECTOR = "{u : (tuple.t0), w : (tuple.t2), v : (tuple.t1) | (some t : tuple & (int.((MSTGraph.mst_edges).idx)) | t.t0 = u and t.t1 = v and t.t2 = w) }"
#EDGE_SELECTOR = '{u : GNode, w : int, v : GNode | (some t : tuple | t.t0 = u and t.t1 = v and t.t2 = w) }'

@dont_inherit_directives
@inferredEdge(selector=EDGE_SELECTOR + " - " + MST_SELECTOR, name='e', color='black')
@hideAtom(selector='list + tuple + Graph + int + str')
@inferredEdge(selector=MST_SELECTOR, name='mst', color='orange')
class MSTGraph(Graph):
    def __init__(self):
        super().__init__()  # Initialize the parent Graph class
        self.mst_edges = []  # Store edges that are part of the MST

    def compute_mst(self):
        """Compute the MST using Kruskal's algorithm and store the edges."""
        parent = {}
        rank = {}

        # Helper functions for union-find
        def find(node):
            if parent[node] != node:
                parent[node] = find(parent[node])  # Path compression
            return parent[node]

        def union(node1, node2):
            root1 = find(node1)
            root2 = find(node2)
            if root1 != root2:
                if rank[root1] > rank[root2]:
                    parent[root2] = root1
                elif rank[root1] < rank[root2]:
                    parent[root1] = root2
                else:
                    parent[root2] = root1
                    rank[root1] += 1

        # Initialize union-find structures for ALL nodes that appear in edges
        all_nodes = set()
        for u, v, w in self.adj:
            all_nodes.add(u)
            all_nodes.add(v)
        
        for node in all_nodes:
            parent[node] = node
            rank[node] = 0

        # Sort edges by weight
        edges = sorted(self.adj, key=lambda x: x[2])  # (u, v, weight)

        # Kruskal's algorithm
        for u, v, w in edges:
            if find(u) != find(v):
                union(u, v)
                self.mst_edges.append((u, v, w))  # Add edge to MST



![mst](img/minimum-spanning-tree.png)

In [22]:


# Create the graph
mst_graph = MSTGraph()

a = GNode("a")
b = GNode("b")
c = GNode("c")
d = GNode("d")
e = GNode("e")
f = GNode("f")
g = GNode("g")
h = GNode("h")
i = GNode("i")

node_list = [a, b, c, d, e, f, g, h, i]
nodes = {node.key: node for node in node_list}


mst_graph.addEdge(a, b, 4)
mst_graph.addEdge(a, h, 8)
mst_graph.addEdge(c,d,7)
mst_graph.addEdge(c,f,4)
mst_graph.addEdge(c,i,2)
mst_graph.addEdge(b,c,8)
mst_graph.addEdge(b,h,11)
mst_graph.addEdge(d,e,9)
mst_graph.addEdge(d,f,14)
mst_graph.addEdge(e,f,10)
mst_graph.addEdge(f,g,2)
mst_graph.addEdge(g,h,1)
mst_graph.addEdge(g,i,6)
mst_graph.addEdge(h,i,7)

# Compute the MST
mst_graph.compute_mst()
diagram(mst_graph)


### Performance - Minimum Spanning Tree

In [23]:
if RUN_PERF:
    STRUCTURE = "mst_graph"
    for size in SIZES:
        mst_graph = MSTGraph()
        nodes = {i: GNode(i) for i in range(size)}
        
        # Create a connected graph with random weighted edges
        # First ensure connectivity with a path
        for i in range(size - 1):
            w = random.randint(1, 20)
            mst_graph.addEdge(nodes[i], nodes[i + 1], w)
        
        # Add additional random edges
        num_extra_edges = min(size, size * (size - 1) // 4)
        for _ in range(num_extra_edges):
            u = random.randint(0, size - 1)
            v = random.randint(0, size - 1)
            if u != v:
                w = random.randint(1, 20)
                mst_graph.addEdge(nodes[u], nodes[v], w)
        
        mst_graph.compute_mst()
        diagram(mst_graph, method="browser", perf_path=get_perf_path(STRUCTURE, size), perf_iterations=PI, headless=True, timeout=500)
else:
    print("Performance testing skipped. Set RUN_PERF = True to enable.")


Performance testing skipped. Set RUN_PERF = True to enable.


# Topological Sort

Alternately, we could use constraints to topologically sort a graph in a visualization.

In [24]:
dag_adj_matrix = [
    [0, 1, 1, 0, 0, 0],
    [0, 0, 1, 1, 0, 0],
    [0, 0, 0, 1, 1, 0],
    [0, 0, 0, 0, 1, 1],
    [0, 0, 0, 0, 0, 1],
    [0, 0, 0, 0, 0, 0]
]

# These are 1 indexed instead of 0 indexed.
dag = inferredEdge(selector=ADJ_MATRIX_TO_EDGE_SELECTOR, name="")(dag_adj_matrix)

## This topologically sorts the dag left to right.
dag = orientation(selector=ADJ_MATRIX_TO_EDGE_SELECTOR, directions=["right"])(dag)
dag = hideAtom(selector="list")(dag)
diagram(dag)

And this can even help identify when a graph is NOT a dag. Using the directed graph built from `adj_matrix` (from earlier)

In [25]:
directedgraph = orientation(selector=ADJ_MATRIX_TO_EDGE_SELECTOR, directions=["right"])(directedgraph)
diagram(directedgraph, height=800)

# Strongly Connected Components

Similarly, groups can be used to identify strongly connected components of a graph.



In [26]:
adj_two_components = [
    [0, 1, 0, 0, 1, 0],
    [0, 0, 1, 0, 0, 0],
    [1, 0, 0, 0, 0, 0],
    [0, 0, 0, 0, 1, 0],
    [0, 0, 0, 0, 0, 1],
    [0, 0, 0, 1, 0, 0]
]

two_comp = inferredEdge(selector=ADJ_MATRIX_TO_EDGE_SELECTOR, name="")(adj_two_components)
two_comp = hideAtom(selector="list")(two_comp)


## S
SCC_FOR_ALL_NODES = f"{{u, v : int | ((u->v +v->u) in ^{ADJ_MATRIX_TO_EDGE_SELECTOR}) }}"


SCC_COLLAPSED = f"{{ u, v : int | u <= v   and (u->v in (^{ADJ_MATRIX_TO_EDGE_SELECTOR} & ~(^{ADJ_MATRIX_TO_EDGE_SELECTOR}))) and (all k: int |(u->k in (^{ADJ_MATRIX_TO_EDGE_SELECTOR} & ~(^{ADJ_MATRIX_TO_EDGE_SELECTOR}))) implies u <= k) }}"
sss = SCC_FOR_ALL_NODES

two_comp = group(
    selector = sss,
    name="SCC"
)(two_comp)

diagram(two_comp, height=600)


### Performance - Strongly Connected Components

In [27]:
if RUN_PERF:
    STRUCTURE = "scc_graph"
    for size in SIZES:
        # Generate adjacency matrix with cycles for SCCs
        adj_matrix = [[random.randint(0, 1) if i != j else 0 for j in range(size)] for i in range(size)]
        
        two_comp = inferredEdge(selector=ADJ_MATRIX_TO_EDGE_SELECTOR, name="")(adj_matrix)
        two_comp = hideAtom(selector="list")(two_comp)
        
        sss = f"(int->int) & {{u, v : int | ((u->v +v->u) in ^{ADJ_MATRIX_TO_EDGE_SELECTOR}) }}"
        two_comp = group(selector=sss, name="SCC")(two_comp)
        
        print(f"{STRUCTURE}({size} nodes): Rendering with perf_iterations={PI}...")
        diagram(two_comp, method="browser", perf_path=get_perf_path(STRUCTURE, size), perf_iterations=PI, headless=True, timeout=600)
else:
    print("Performance testing skipped. Set RUN_PERF = True to enable.")


Performance testing skipped. Set RUN_PERF = True to enable.


## Perf TOPO sort

In [28]:
if RUN_PERF:
    STRUCTURE = "topo_sort_dag"
    for size in SIZES:
        # Generate a DAG adjacency matrix
        dag_adj_matrix = [[0] * size for _ in range(size)]
        
        # Create edges that go from lower to higher indices to ensure it's a DAG
        for i in range(size):
            for j in range(i + 1, size):
                if random.random() < 0.3:  # 30% chance of edge
                    dag_adj_matrix[i][j] = 1
        
        dag = inferredEdge(selector=ADJ_MATRIX_TO_EDGE_SELECTOR, name="")(dag_adj_matrix)
        dag = orientation(selector=ADJ_MATRIX_TO_EDGE_SELECTOR, directions=["right"])(dag)
        dag = hideAtom(selector="list")(dag)
        
        print(f"{STRUCTURE}({size} nodes): Rendering with perf_iterations={PI}...")
        diagram(dag, method="browser", perf_path=get_perf_path(STRUCTURE, size), perf_iterations=PI, headless=True, timeout=600)
else:
    print("Performance testing skipped. Set RUN_PERF = True to enable.")


Performance testing skipped. Set RUN_PERF = True to enable.
