# Graph: Directed Graphs with Algorithms

**Graph** provides directed graph structures with O(1) operations and classical graph algorithms.

**Key Features:**
- Directed edges with labels, properties, and traversal conditions
- O(1) adjacency queries via pre-computed lists
- Cycle detection, topological sort, path finding
- Serialization with automatic adjacency rebuilding

In [1]:
from lionpride.core import Edge, EdgeCondition, Graph, Node

# Create empty graph
graph = Graph()
print(f"Graph: {len(graph)} nodes, {len(graph.edges)} edges")

Graph: 0 nodes, 0 edges


## 1. Building Graphs: Nodes and Edges

In [2]:
# Add nodes (using dict content - Node requires dict, BaseModel, or Serializable)
n1 = Node(content={"name": "Start"})
n2 = Node(content={"name": "Process"})
n3 = Node(content={"name": "End"})

graph.add_node(n1)
graph.add_node(n2)
graph.add_node(n3)

# Add edges (directed: head → tail)
e1 = Edge(head=n1.id, tail=n2.id, label=["step1"])
e2 = Edge(head=n2.id, tail=n3.id, label=["step2"])

graph.add_edge(e1)
graph.add_edge(e2)

print(f"Graph: {len(graph)} nodes, {len(graph.edges)} edges")
print(f"Chain: {n1.content['name']} → {n2.content['name']} → {n3.content['name']}")

Graph: 3 nodes, 2 edges
Chain: Start → Process → End


## 2. Adjacency Queries: Predecessors & Successors

In [3]:
# Get predecessors (nodes pointing TO this node)
preds = graph.get_predecessors(n2)
print(f"Predecessors of {n2.content['name']}: {[n.content['name'] for n in preds]}")

# Get successors (nodes this node points TO)
succs = graph.get_successors(n2)
print(f"Successors of {n2.content['name']}: {[n.content['name'] for n in succs]}")

# Get all edges connected to node
edges = graph.get_node_edges(n2, direction="both")
print(f"Edges at {n2.content['name']}: {len(edges)} (1 in, 1 out)")

Predecessors of Process: ['Start']
Successors of Process: ['End']
Edges at Process: 2 (1 in, 1 out)


## 3. Head & Tail Nodes

In [4]:
# Heads: nodes with no incoming edges (sources)
heads = graph.get_heads()
print(f"Source nodes: {[n.content['name'] for n in heads]}")

# Tails: nodes with no outgoing edges (sinks)
tails = graph.get_tails()
print(f"Sink nodes: {[n.content['name'] for n in tails]}")

Source nodes: ['Start']
Sink nodes: ['End']


## 4. Cycle Detection

In [5]:
# Check if graph is acyclic (no cycles)
print(f"Is acyclic: {graph.is_acyclic()}")

# Create a cycle to test
cyclic_graph = Graph()
a = Node(content={"name": "A"})
b = Node(content={"name": "B"})
cyclic_graph.add_node(a)
cyclic_graph.add_node(b)
cyclic_graph.add_edge(Edge(head=a.id, tail=b.id))
cyclic_graph.add_edge(Edge(head=b.id, tail=a.id))  # Creates cycle

print(f"Cyclic graph (A ↔ B): {cyclic_graph.is_acyclic()}")

Is acyclic: True
Cyclic graph (A ↔ B): False


## 5. Topological Sort

In [6]:
# Topological sort: linear ordering where all edges point forward
# Only works on DAGs (Directed Acyclic Graphs)
sorted_nodes = graph.topological_sort()
print("Topological order:")
for node in sorted_nodes:
    print(f"  {node.content['name']}")

# Cyclic graphs raise ValueError
try:
    cyclic_graph.topological_sort()
except ValueError as e:
    print(f"\nCyclic graph error: {e}")

Topological order:
  Start
  Process
  End

Cyclic graph error: Cannot topologically sort graph with cycles


## 6. Path Finding (BFS)

In [7]:
# Find shortest path (minimum edge count)
path = await graph.find_path(n1, n3)

if path:
    print(f"Path from {n1.content['name']} to {n3.content['name']}:")
    for edge in path:
        head = graph.nodes[edge.head]
        tail = graph.nodes[edge.tail]
        print(f"  {head.content['name']} → {tail.content['name']} (label: {edge.label})")
else:
    print("No path found")

# Reverse direction (no path exists)
reverse_path = await graph.find_path(n3, n1)
print(f"\nReverse path exists: {reverse_path is not None}")

Path from Start to End:
  Start → Process (label: ['step1'])
  Process → End (label: ['step2'])

Reverse path exists: False


## 7. Diamond DAG Example

In [8]:
# Diamond structure: A → B, A → C, B → D, C → D
dag = Graph()
a = Node(content={"name": "A"})
b = Node(content={"name": "B"})
c = Node(content={"name": "C"})
d = Node(content={"name": "D"})

for node in [a, b, c, d]:
    dag.add_node(node)

dag.add_edge(Edge(head=a.id, tail=b.id))
dag.add_edge(Edge(head=a.id, tail=c.id))
dag.add_edge(Edge(head=b.id, tail=d.id))
dag.add_edge(Edge(head=c.id, tail=d.id))

print("Diamond DAG:")
print(f"  Heads: {[n.content['name'] for n in dag.get_heads()]}")
print(f"  Tails: {[n.content['name'] for n in dag.get_tails()]}")
print(f"  Multiple predecessors of D: {[n.content['name'] for n in dag.get_predecessors(d)]}")
print(f"  Multiple successors of A: {[n.content['name'] for n in dag.get_successors(a)]}")

Diamond DAG:
  Heads: ['A']
  Tails: ['D']
  Multiple predecessors of D: ['B', 'C']
  Multiple successors of A: ['B', 'C']


## 8. EdgeCondition: Conditional Traversal

In [9]:
# Custom condition: gate traversal based on properties
class ThresholdCondition(EdgeCondition):
    async def apply(self, threshold: float = 10.0) -> bool:
        value = self.properties.get("value", 0.0)
        return value <= threshold


# Create graph with conditional edge
cond_graph = Graph()
x = Node(content={"name": "X"})
y = Node(content={"name": "Y"})
cond_graph.add_node(x)
cond_graph.add_node(y)

# Edge with condition (value=15 > threshold=10 → blocked)
cond_edge = Edge(head=x.id, tail=y.id, condition=ThresholdCondition(properties={"value": 15.0}))
cond_graph.add_edge(cond_edge)

# Path finding with conditions
path_unchecked = await cond_graph.find_path(x, y, check_conditions=False)
path_checked = await cond_graph.find_path(x, y, check_conditions=True)

print(f"Path without checking conditions: {path_unchecked is not None}")
print(f"Path with conditions (blocked): {path_checked is not None}")

Path without checking conditions: True
Path with conditions (blocked): False


## 9. Serialization: Preserving Graph Structure

In [10]:
# Serialize to dict
data = graph.to_dict(mode="python")
print(f"Serialized keys: {list(data.keys())}")
print(f"Nodes in serialized data: {len(data['nodes']['items'])}")

# Deserialize from dict
restored = Graph.from_dict(data)
print(f"\nRestored graph: {len(restored)} nodes, {len(restored.edges)} edges")
print(f"Adjacency lists rebuilt: {len(restored._out_edges)} out-edge sets")

# Verify graph operations work after deserialization
restored_heads = restored.get_heads()
print(f"Heads after deserialization: {[n.content['name'] for n in restored_heads]}")

Serialized keys: ['id', 'created_at', 'metadata', 'nodes', 'edges']
Nodes in serialized data: 3

Restored graph: 3 nodes, 2 edges
Adjacency lists rebuilt: 3 out-edge sets
Heads after deserialization: ['Start']


## Summary

**Graph** provides:

**Core Operations:**
- `add_node()`, `remove_node()` - Node management
- `add_edge()`, `remove_edge()` - Edge management
- Direct access: `graph.nodes[node_id]`, `graph.edges[edge_id]`

**Adjacency Queries (O(1) via pre-computed lists):**
- `get_predecessors(node)` - Nodes pointing to this node
- `get_successors(node)` - Nodes this node points to
- `get_heads()` - Source nodes (no incoming edges)
- `get_tails()` - Sink nodes (no outgoing edges)

**Algorithms:**
- `is_acyclic()` - DFS cycle detection (O(V+E))
- `topological_sort()` - Kahn's algorithm for DAGs (O(V+E))
- `await find_path(start, end)` - BFS shortest path (O(V+E))

**Advanced Features:**
- `EdgeCondition` - Runtime traversal gates (conditional path finding)
- Serialization - `to_dict()` / `from_dict()` with automatic adjacency rebuilding
- Thread-safe operations via `@synchronized` decorator

**Use Cases:** Workflow graphs, dependency resolution, task scheduling, state machines