In [5]:
import sys
sys.path.insert(0, r"C:\Users\PC\Desktop\graphglue")
from graphglue.core.graph import Graph


In [7]:
# ---------- Setup ----------
G = Graph(directed=True)
conditions = ["Healthy", "Stressed", "Disease"]
for c in conditions:
    G.add_layer(c, condition=c)

# Entities
proteins = [f"P{i}" for i in range(1, 151)]   # P1..P150
transcripts = [f"T{i}" for i in range(1, 61)]  # T1..T60 (treat as vertices)
enz_edge_entities = [f"edge_rxn_{i}" for i in range(1, 11)]  # edge-entities for reactions

# Seed some vertex attributes
for p in proteins[:10]:
    G.add_vertex(p, layer="Healthy", family="kinase")
for p in proteins[10:]:
    G.add_vertex(p, layer="Healthy")
for t in transcripts:
    G.add_vertex(t, layer="Healthy", kind="transcript")
for ee in enz_edge_entities:
    G.add_edge_entity(ee, layer="Healthy", role="enzyme")

# Propagate initial vertices to all layers (cheaply)
for lid in ["Stressed", "Disease"]:
    G._layers[lid]["vertices"].update(G._layers["Healthy"]["vertices"])

In [9]:
# ---------- Build PPI edges in all layers ----------
import random

def rand_weight(base=1.0, jitter=0.5):
    return max(0.05, base + (random.random() - 0.5) * 2 * jitter)

ppis = []
for _ in range(320):
    u, v = random.sample(proteins, 2)
    w = rand_weight(1.2, 0.6)
    e = G.add_edge(u, v, layer="Healthy", weight=w, edge_directed=False)
    ppis.append(e)

# Stress/disease layer variants (override per-layer weights)
for eid in ppis:
    # Stressed: mostly +10% with jitter
    G.add_edge_to_layer("Stressed", eid)
    G.set_edge_layer_attrs("Stressed", eid, weight=G.edge_weights[eid] * rand_weight(1.10, 0.1))
    # Disease: some edges get weaker; others stronger
    G.add_edge_to_layer("Disease", eid)
    factor = 0.7 if random.random() < 0.4 else 1.3
    G.set_edge_layer_attrs("Disease", eid, weight=G.edge_weights[eid] * rand_weight(factor, 0.15))

In [11]:
# ---------- Complexes as undirected hyperedges ----------
complexes = []
for _ in range(12):
    members = set(random.sample(proteins, random.choice([3, 4, 5])))
    hid = G.add_hyperedge(members=members, layer="Healthy", weight=rand_weight(1.0, 0.2))
    complexes.append(hid)
    # complex exists in all layers (same membership)
    for lid in ["Stressed", "Disease"]:
        G.add_edge_to_layer(lid, hid)

In [13]:
# ---------- Directed signaling cascades as hyperedges ----------
cascades = []
while len(cascades) < 8:
    head = set(random.sample(proteins, random.choice([1, 2])))
    tail = set(random.sample(proteins, random.choice([2, 3, 4])))
    if head & tail:
        continue  # resample until disjoint
    hid = G.add_hyperedge(head=head, tail=tail, layer="Healthy", weight=rand_weight(1.0, 0.4))
    cascades.append(hid)
    for lid in ["Stressed", "Disease"]:
        G.add_edge_to_layer(lid, hid)

In [15]:
# ---------- Reactions connecting vertices to edge-entities ----------
for ee in enz_edge_entities:
    s, t = random.sample(proteins, 2)
    G.add_edge(s, ee, layer="Healthy", edge_type="vertex_edge", weight=1.0 + random.random())
    G.add_edge(ee, t, layer="Healthy", edge_type="vertex_edge", weight=1.0 + random.random())
    # propagate across layers
    for lid in ["Stressed", "Disease"]:
        G._layers[lid]["edges"].update(G._layers["Healthy"]["edges"])

In [17]:
# ---------- Basic sanity ----------
print("vertices:", G.number_of_vertices(), "Edges:", G.number_of_edges())

# Only true "vertices" are counted by number_of_vertices() (proteins + transcripts)
expected_vertices = len(set(proteins)) + len(set(transcripts))  # 150 + 60 = 210
assert G.number_of_vertices() >= expected_vertices, f"Expected ≥{expected_vertices}, got {G.number_of_vertices()}"

# Edge-entities are tracked as entity_type == 'edge' (not included in number_of_vertices)
edge_entity_ids = set(enz_edge_entities)
edge_entity_count = sum(1 for _id, et in G.entity_types.items() if et == "edge" and _id in edge_entity_ids)
assert edge_entity_count == len(edge_entity_ids), f"Expected {len(edge_entity_ids)} edge-entities, got {edge_entity_count}"

# Edges: PPIs (320) + complexes (12) + cascades (8) + reaction links (10*2) = 360 minimum
assert G.number_of_edges() >= 320 + 12 + 8 + (10 * 2)


vertices: 210 Edges: 360


In [19]:
# ---------- Views & top edges by condition ----------
import polars as pl

for cond in conditions:
    EV = G.edges_view(layer=cond, resolved_weight=True)
    print(f"[{cond}] edges_view rows =", EV.height)
    top = (
        EV
        .filter(pl.col("kind") == "binary")
        .sort("effective_weight", descending=True)
        .select(["edge_id", "source", "target", "effective_weight"])
        .head(5)
    )
    print(f"\nTop 5 binary edges by effective_weight in {cond}:")
    print(top)

[Healthy] edges_view rows = 360

Top 5 binary edges by effective_weight in Healthy:
shape: (5, 4)
┌──────────┬────────────┬────────────┬──────────────────┐
│ edge_id  ┆ source     ┆ target     ┆ effective_weight │
│ ---      ┆ ---        ┆ ---        ┆ ---              │
│ str      ┆ str        ┆ str        ┆ f64              │
╞══════════╪════════════╪════════════╪══════════════════╡
│ edge_350 ┆ P75        ┆ edge_rxn_6 ┆ 1.985889         │
│ edge_352 ┆ P138       ┆ edge_rxn_7 ┆ 1.970894         │
│ edge_344 ┆ P140       ┆ edge_rxn_3 ┆ 1.942072         │
│ edge_355 ┆ edge_rxn_8 ┆ P135       ┆ 1.863431         │
│ edge_342 ┆ P54        ┆ edge_rxn_2 ┆ 1.793103         │
└──────────┴────────────┴────────────┴──────────────────┘
[Stressed] edges_view rows = 360

Top 5 binary edges by effective_weight in Stressed:
shape: (5, 4)
┌──────────┬────────┬────────┬──────────────────┐
│ edge_id  ┆ source ┆ target ┆ effective_weight │
│ ---      ┆ ---    ┆ ---    ┆ ---              │
│ str      ┆ s

In [20]:
# ---------- Layer analytics ----------
stats = G.layer_statistics()
print("\nLayer stats:", stats)

conserved = G.conserved_edges(min_layers=3)  # present in all 3 conditions
print("\nConserved edges (in all conditions):", len(conserved))

disease_specific = G.layer_specific_edges("Disease")
print("Disease-specific edges:", len(disease_specific))

changes = G.temporal_dynamics(["Healthy", "Stressed", "Disease"], metric="edge_change")
print("\nTemporal edge changes (Healthy→Stressed→Disease):", changes)
assert len(changes) == 2


Layer stats: {'Healthy': {'vertices': 220, 'edges': 360, 'attributes': {'condition': 'Healthy'}}, 'Stressed': {'vertices': 220, 'edges': 360, 'attributes': {'condition': 'Stressed'}}, 'Disease': {'vertices': 220, 'edges': 360, 'attributes': {'condition': 'Disease'}}}

Conserved edges (in all conditions): 360
Disease-specific edges: 0

Temporal edge changes (Healthy→Stressed→Disease): [{'added': 0, 'removed': 0, 'net_change': 0}, {'added': 0, 'removed': 0, 'net_change': 0}]


In [23]:
# ---------- Presence queries ----------
some_e = next(iter(G.edge_to_idx.keys()))
print("\nEdge presence for", some_e, ":", G.edge_presence_across_layers(edge_id=some_e))
some_p = random.choice(proteins)
print("vertex presence for", some_p, ":", G.vertex_presence_across_layers(some_p))


Edge presence for edge_0 : ['Healthy', 'Stressed', 'Disease']
vertex presence for P84 : ['Healthy', 'Stressed', 'Disease']


In [25]:
# ---------- Traversal checks ----------
q = random.choice(proteins)
print(f"\nNeighbors({q}) =>", G.neighbors(q)[:10])
print(f"Out({q}) =>", G.out_neighbors(q)[:10])
print(f"In({q}) =>", G.in_neighbors(q)[:10])


Neighbors(P38) => ['P126', 'P48', 'P149', 'P129', 'P150', 'P41', 'P95']
Out(P38) => ['P126', 'P48', 'P149', 'P129', 'P150', 'P41', 'P95']
In(P38) => ['P126', 'P48', 'P149', 'P129', 'P150', 'P41', 'P95']


In [27]:
# ---------- Subgraph slice & copy ----------
H = G.subgraph_from_layer("Disease", resolve_layer_weights=True)
assert set(H.vertices()).issubset(set(G.vertices()))
assert set(H.edges()).issubset(set(G.edges()))
print("\nDisease subgraph: vertices =", H.number_of_vertices(), "edges =", H.number_of_edges())

K = G.copy()
assert set(K.vertices()) == set(G.vertices())
assert set(K.edges()) == set(G.edges())
# hyperedge shape preserved
any_hyper = next(e for e,k in G.edge_kind.items() if k == "hyper")
assert K.edge_kind.get(any_hyper) == "hyper"
# layer sets preserved
for lid in G.list_layers(include_default=True):
    assert K._layers[lid]["vertices"] == G._layers[lid]["vertices"]
    assert K._layers[lid]["edges"] == G._layers[lid]["edges"]
print("copy() OK")


Disease subgraph: vertices = 210 edges = 360
copy() OK


In [29]:
# ---------- Remove operations stress ----------
to_drop_vertices = random.sample(proteins, 5)
for n in to_drop_vertices:
    if n in G.entity_to_idx:
        G.remove_vertex(n)
print("\nAfter removing 5 proteins: vertices =", G.number_of_vertices(), "edges =", G.number_of_edges())

to_drop_edges = list(G.edge_to_idx.keys())[:10]
for eid in to_drop_edges:
    if eid in G.edge_to_idx:
        G.remove_edge(eid)
print("After removing 10 edges: vertices =", G.number_of_vertices(), "edges =", G.number_of_edges())


After removing 5 proteins: vertices = 205 edges = 336
After removing 10 edges: vertices = 205 edges = 326


In [31]:
# ---------- Audit & memory ----------
audit = G.audit_attributes()
print("\nAudit:", audit)
mem_bytes = G.memory_usage()
print("Approx memory usage (bytes):", int(mem_bytes))
assert mem_bytes > 0

print("\nReality-check finished ✅")


Audit: {'extra_vertex_rows': ['edge_rxn_5', 'edge_rxn_8', 'edge_rxn_10', 'edge_rxn_1', 'edge_rxn_7', 'edge_rxn_3', 'edge_rxn_4', 'edge_rxn_6', 'edge_rxn_9', 'edge_rxn_2'], 'extra_edge_rows': [], 'missing_vertex_rows': [], 'missing_edge_rows': ['edge_175', 'edge_183', 'edge_233', 'edge_88', 'edge_195', 'edge_15', 'edge_13', 'edge_46', 'edge_332', 'edge_178', 'edge_355', 'edge_119', 'edge_266', 'edge_229', 'edge_63', 'edge_148', 'edge_143', 'edge_282', 'edge_238', 'edge_95', 'edge_170', 'edge_226', 'edge_10', 'edge_230', 'edge_191', 'edge_29', 'edge_26', 'edge_61', 'edge_49', 'edge_51', 'edge_20', 'edge_160', 'edge_260', 'edge_125', 'edge_349', 'edge_218', 'edge_11', 'edge_87', 'edge_244', 'edge_59', 'edge_217', 'edge_263', 'edge_269', 'edge_80', 'edge_156', 'edge_335', 'edge_169', 'edge_272', 'edge_157', 'edge_243', 'edge_340', 'edge_352', 'edge_253', 'edge_339', 'edge_99', 'edge_50', 'edge_318', 'edge_245', 'edge_304', 'edge_141', 'edge_82', 'edge_159', 'edge_337', 'edge_242', 'edge_1

In [33]:
events = G.history()           # list[dict]
df = G.history(as_df=True)     # Polars DF [DataFrame]

In [35]:
print(df.head())
events[:3]

shape: (5, 10)
┌─────────┬─────────────────────────────┬──────────┬─────────────────┬───┬────────┬───────────┬─────────┬────────────┐
│ version ┆ ts_utc                      ┆ mono_ns  ┆ op              ┆ … ┆ result ┆ vertex_id ┆ layer   ┆ attributes │
│ ---     ┆ ---                         ┆ ---      ┆ ---             ┆   ┆ ---    ┆ ---       ┆ ---     ┆ ---        │
│ i64     ┆ str                         ┆ i64      ┆ str             ┆   ┆ str    ┆ str       ┆ str     ┆ struct[1]  │
╞═════════╪═════════════════════════════╪══════════╪═════════════════╪═══╪════════╪═══════════╪═════════╪════════════╡
│ 1       ┆ 2025-10-13T11:34:07.839001Z ┆ 44764700 ┆ set_layer_attrs ┆ … ┆ null   ┆ null      ┆ null    ┆ null       │
│ 2       ┆ 2025-10-13T11:34:07.840061Z ┆ 45896500 ┆ set_layer_attrs ┆ … ┆ null   ┆ null      ┆ null    ┆ null       │
│ 3       ┆ 2025-10-13T11:34:07.841887Z ┆ 47243200 ┆ set_layer_attrs ┆ … ┆ null   ┆ null      ┆ null    ┆ null       │
│ 4       ┆ 2025-10-13T11:34:07.8

[{'version': 1,
  'ts_utc': '2025-10-13T11:34:07.839001Z',
  'mono_ns': 44764700,
  'op': 'set_layer_attrs',
  'layer_id': 'Healthy',
  'attrs': {'condition': 'Healthy'},
  'result': None},
 {'version': 2,
  'ts_utc': '2025-10-13T11:34:07.840061Z',
  'mono_ns': 45896500,
  'op': 'set_layer_attrs',
  'layer_id': 'Stressed',
  'attrs': {'condition': 'Stressed'},
  'result': None},
 {'version': 3,
  'ts_utc': '2025-10-13T11:34:07.841887Z',
  'mono_ns': 47243200,
  'op': 'set_layer_attrs',
  'layer_id': 'Disease',
  'attrs': {'condition': 'Disease'},
  'result': None}]

In [37]:
import sys, pathlib

# add repo root to Python path
sys.path.append(str(pathlib.Path.cwd().parent))
from graphglue.io import csv as graph_csv

In [39]:
csv1_path = "csv1_edges.csv"
pl.DataFrame({
    "source": ["A","A","B","C","D"],
    "target": ["B","C","C","D","A"],
    "weight": [1, 2, 3, 1, 5],
    "directed": [True, True, False, True, True],
    "layer": ["L1","L1","L1","L2","L2"],
}) #.write_csv(csv1_path)

#pl.read_csv(csv1_path).head()


source,target,weight,directed,layer
str,str,i64,bool,str
"""A""","""B""",1,True,"""L1"""
"""A""","""C""",2,True,"""L1"""
"""B""","""C""",3,False,"""L1"""
"""C""","""D""",1,True,"""L2"""
"""D""","""A""",5,True,"""L2"""


In [41]:
"""G = graph_csv.load_csv_to_graph(
    csv1_path,
    schema="auto",            # or 'edge_list'/'incidence'/'adjacency'/'hyperedge'/'lil'
    default_layer=None,       # fallback if no layer column is present
    default_directed=None,    # fallback if no directed column and cannot infer
    default_weight=1.0,
)

# Quick sanity: show first rows of an edges view (columns depend on your Graph implementation)
edges = G.edges_view(layer=None, include_directed=True, resolved_weight=True)
edges.head()
"""

'G = graph_csv.load_csv_to_graph(\n    csv1_path,\n    schema="auto",            # or \'edge_list\'/\'incidence\'/\'adjacency\'/\'hyperedge\'/\'lil\'\n    default_layer=None,       # fallback if no layer column is present\n    default_directed=None,    # fallback if no directed column and cannot infer\n    default_weight=1.0,\n)\n\n# Quick sanity: show first rows of an edges view (columns depend on your Graph implementation)\nedges = G.edges_view(layer=None, include_directed=True, resolved_weight=True)\nedges.head()\n'

In [43]:
# Count entities and edges (attribute names based on your class; adjust if different)
num_entities = G.global_entity_count        # vertices + edge-entities
num_edges    = G.global_edge_count          # binary + hyper

print("entities:", num_entities, "edges:", num_edges)

# A light “degree” summary from edges_view for binary edges only (skip hyper)
df = G.edges_view(include_directed=True, resolved_weight=True)

# Try to find endpoint column names robustly
cols = {c.lower(): c for c in df.columns}
# Common possibilities:
src_col = next((cols[c] for c in ["source","src","u","from"]), None)
dst_col = next((cols[c] for c in ["target","dst","v","to"]), None)

if src_col and dst_col:
    # out-degree (directed) / degree (undirected)
    out_deg = df.group_by(src_col).len().rename({src_col: "vertex", "len": "out_degree"})
    in_deg  = df.group_by(dst_col).len().rename({dst_col: "vertex", "len": "in_degree"})
    deg = out_deg.join(in_deg, on="vertex", how="outer").fill_null(0)
    deg = deg.with_columns((pl.col("out_degree")+pl.col("in_degree")).alias("total_degree"))
    deg.sort("total_degree", descending=True).head(10)
else:
    print("Skip degree summary: endpoint columns not found in edges_view output (likely hyperedge-only or different schema).")


entities: <bound method Graph.global_entity_count of <graphglue.core.graph.Graph object at 0x000001DAB96D3BC0>> edges: <bound method Graph.global_edge_count of <graphglue.core.graph.Graph object at 0x000001DAB96D3BC0>>


(Deprecated in version 0.20.29)
  deg = out_deg.join(in_deg, on="vertex", how="outer").fill_null(0)


In [45]:
# Add a new vertex and an edge on layer L3
G.add_vertex("E")
eid = G.add_edge("E", "A", layer="L3", directed=True, weight=2.5)

# Per-layer weight override example:
G.set_edge_layer_attrs("L3", eid, weight=3.0)

# Inspect the updated edges
G.edges_view(include_directed=True, resolved_weight=True).tail()


edge_id,kind,directed,global_weight,effective_weight,source,target,edge_type
str,str,bool,f64,f64,str,str,str
"""edge_355""","""binary""",True,1.863431,1.863431,"""edge_rxn_8""","""P135""","""vertex_edge"""
"""edge_357""","""binary""",True,1.271529,1.271529,"""edge_rxn_9""","""P120""","""vertex_edge"""
"""edge_358""","""binary""",True,1.576317,1.576317,"""P114""","""edge_rxn_10""","""vertex_edge"""
"""edge_359""","""binary""",True,1.345649,1.345649,"""edge_rxn_10""","""P124""","""vertex_edge"""
"""edge_360""","""binary""",True,2.5,2.5,"""E""","""A""","""regular"""


In [47]:
csv2_path = "csv2_edges_view.csv"
G.edges_view(layer=None, include_directed=True, resolved_weight=True)
csv2_path


'csv2_edges_view.csv'

In [49]:
def export_edge_list_csv(G, path, layer=None):
    df = G.edges_view(layer=layer, include_directed=True, resolved_weight=True)
    cols = {c.lower(): c for c in df.columns}
    src_col = next((cols[c] for c in ["source","src","u","from"]), None)
    dst_col = next((cols[c] for c in ["target","dst","v","to"]), None)
    dir_col = next((cols[c] for c in ["directed"]), None)
    w_eff   = next((cols[c] for c in ["effective_weight","weight","w"]), None)

    if not (src_col and dst_col):
        raise ValueError("No binary endpoint columns found; the view may be hyperedge-only. Try the generic edges_view export.")

    out = pl.DataFrame({
        "source": df[src_col],
        "target": df[dst_col],
        "weight": df[w_eff] if w_eff else pl.Series([1.0]*df.height),
        "directed": df[dir_col] if dir_col in df.columns else pl.Series([None]*df.height),
        "layer": pl.Series([layer]*df.height) if layer else pl.Series([None]*df.height),
    })
    out #.write_csv(path)

# Usage:
csv2_edge_list_path = "csv2_edge_list.csv"
export_edge_list_csv(G, csv2_edge_list_path, layer=None)
csv2_edge_list_path


'csv2_edge_list.csv'

In [51]:
# In-memory look at last few events
hist = G.history(as_df=True)   # DF [DataFrame]
hist.tail()

# Save to Parquet/CSV/JSON [JavaScript Object Notation]/NDJSON [Newline-Delimited JSON]
G.export_history("graph_history.parquet")


1277

In [53]:
import sys, pathlib
sys.path.append(str(pathlib.Path.cwd().parent))

from graphglue.io import excel as graph_excel
from graphglue.core.graph import Graph


In [55]:
G = graph_excel.load_excel_to_graph("graph_input.xlsx", schema="auto")

In [56]:
G.global_entity_count()

60

In [59]:
from graphglue.adapters.networkx import to_backend, to_nx, from_nx

In [61]:
nxG, man = to_nx(G, directed=True, hyperedge_mode="skip")

In [63]:
from graphglue.adapters.igraph import to_backend, to_igraph, from_igraph

In [65]:
nxG, man = to_igraph(G, directed=True, hyperedge_mode="skip")

In [67]:
G.shape

(60, 767)

In [69]:
parts = G.nx.louvain_communities(G)                          # warns on loss if neededparts = G.nx.louvain_communities(G, _nx_directed=False)      # force undirected NX backendparts = G.nx.louvain_communities(G, _nx_hyperedge="expand")  # expand hyperedges

In [71]:
path = G.nx.shortest_path(G, source="B701", target="MS6 8")
dist = G.nx.shortest_path_length(G, source="B701", target="MS6 8")

In [73]:
print(dist)
path

2


['B701', 'MS8 21', 'MS6 8']

In [75]:
G.nx.clear()   

In [89]:
# Deterministic smoke tests for the lazy NX (NetworkX) proxy

# ---------- G1: PATH GRAPH (for weighted/unweighted shortest paths) ----------
def build_path_graph() -> Graph:
    """
    Directed chain a→b→c→d→e→f with weights on each edge.
    - Weighted shortest path a→f = 1+2+3+1+4 = 11
    - Unweighted hops a→f = 5
    """
    G = Graph(directed=True)
    # vertices (+ labels for the label→ID mapping)
    G.add_vertex("a", name="alpha")
    G.add_vertex("b", name="bravo")
    G.add_vertex("c", name="charlie")
    G.add_vertex("d", name="delta")
    G.add_vertex("e", name="echo")
    G.add_vertex("f", name="phi")

    # pure chain (NO chords)
    G.add_edge("a", "b", weight=1)
    G.add_edge("b", "c", weight=2)
    G.add_edge("c", "d", weight=3)
    G.add_edge("d", "e", weight=1)
    G.add_edge("e", "f", weight=4)

    return G


# ---------- G2: COMMUNITY GRAPH (two cliques + weak bridge) ----------
def build_community_graph() -> Graph:
    """
    Two undirected cliques: K6 on {a..f} and K4 on {w,x,y,z}, joined by a single weak bridge e--x (weight=0.01).
    Louvain should give communities of sizes [4,6] (stable with seed); betweenness top in {'e','x'};
    PR (PageRank) top is 'e' in undirected view (highest degree).
    """
    G = Graph(directed=True)  # we’ll add undirected edges explicitly

    for v in ["a","b","c","d","e","f","w","x","y","z"]:
        G.add_vertex(v)

    # K6 clique on a..f (undirected, weight=1)
    k6 = ["a","b","c","d","e","f"]
    for i in range(len(k6)):
        for j in range(i+1, len(k6)):
            G.add_edge(k6[i], k6[j], weight=1, edge_directed=False)

    # K4 clique on w,x,y,z (undirected, weight=1)
    k4 = ["w","x","y","z"]
    for i in range(len(k4)):
        for j in range(i+1, len(k4)):
            G.add_edge(k4[i], k4[j], weight=1, edge_directed=False)

    # Single weak bridge e--x
    G.add_edge("e", "x", weight=0.01, edge_directed=False)

    return G

def run_tests():
    # ----- G1: shortest paths -----
    G1 = build_path_graph()

    # Weighted Dijkstra via labels -> expect 11.0
    dist_w = G1.nx.shortest_path_length(
        G1, source="alpha", target="phi", weight="weight", _nx_label_field="name"
    )
    print("[G1:dijkstra weighted] alpha->phi:", dist_w, "(expect 11.0)")

    # Unweighted hop count -> expect 5 (a-b-c-d-e-f)
    dist_hops = G1.nx.shortest_path_length(G1, source="a", target="f", weight=None)
    print("[G1:unweighted hops] a->f:", dist_hops, "(expect 5)")

    # Cache invalidation: add fast a->f edge (weight=2) and re-check -> expect 2.0
    G1.add_edge("a", "f", weight=2)
    dist_new = G1.nx.shortest_path_length(G1, source="a", target="f", weight="weight")
    print("[G1:after mutation] a->f:", dist_new, "(expect 2.0)")

    # ----- G2: communities / centrality / PR / components -----
    G2 = build_community_graph()

    # Louvain on undirected view -> expect sizes [4, 6]
    comms = G2.nx.louvain_communities(G2, _nx_directed=False, seed=42, weight="weight")
    sizes = sorted(len(c) for c in comms)
    print("[G2:louvain] sizes:", sizes, "(expect [4, 6])")

    # Betweenness centrality (unweighted, undirected) -> bridge endpoints dominate
    bc = G2.nx.betweenness_centrality(G2, _nx_directed=False, normalized=True)
    top_bc = max(bc, key=bc.get)
    print("[G2:betweenness] top:", top_bc, "(expect 'e' or 'x')")

    # PageRank (PR) on undirected view (unweighted edges) -> highest degree node is 'e'
    pr = G2.nx.pagerank(G2, _nx_directed=False)
    top_pr = max(pr, key=pr.get)
    print("[G2:pagerank] top:", top_pr, "(expect 'e')")

    # Connected components (undirected) -> single component of size 10
    comps = list(G2.nx.connected_components(G2, _nx_directed=False))
    print("[G2:connected components]:", [sorted(c) for c in comps], "(expect one component of size 10)")


if __name__ == "__main__":
    run_tests()


[G1:dijkstra weighted] alpha->phi: 11 (expect 11.0)
[G1:unweighted hops] a->f: 5 (expect 5)
[G1:after mutation] a->f: 2 (expect 2.0)
[G2:louvain] sizes: [4, 6] (expect [4, 6])
[G2:betweenness] top: e (expect 'e' or 'x')
[G2:pagerank] top: e (expect 'e')
[G2:connected components]: [['a', 'b', 'c', 'd', 'e', 'f', 'w', 'x', 'y', 'z']] (expect one component of size 10)


In [101]:
# Deterministic smoke tests for the lazy ig (igraph) proxy

from graphglue.core.graph import Graph

# ---------- G1: PATH GRAPH (for weighted/unweighted shortest paths) ----------
def build_path_graph() -> Graph:
    """
    Directed chain a→b→c→d→e→f with weights on each edge.
    - Weighted shortest path a→f = 1+2+3+1+4 = 11
    - Unweighted hops a→f = 5
    """
    G = Graph(directed=True)
    # vertices (+ labels for the label→ID mapping)
    G.add_vertex("a", name="alpha")
    G.add_vertex("b", name="bravo")
    G.add_vertex("c", name="charlie")
    G.add_vertex("d", name="delta")
    G.add_vertex("e", name="echo")
    G.add_vertex("f", name="phi")

    # pure chain (NO chords)
    G.add_edge("a", "b", weight=1)
    G.add_edge("b", "c", weight=2)
    G.add_edge("c", "d", weight=3)
    G.add_edge("d", "e", weight=1)
    G.add_edge("e", "f", weight=4)
    return G


# ---------- G2: COMMUNITY GRAPH (two cliques + weak bridge) ----------
def build_community_graph() -> Graph:
    """
    Two undirected cliques: K6 on {a..f} and K4 on {w,x,y,z}, joined by a single weak bridge e--x (weight=0.01).
    multilevel (Louvain-like) should give communities of sizes [4,6] (stable with seed);
    betweenness top in {'e','x'}; PageRank (PR) top is 'e'.
    """
    G = Graph(directed=True)  # add undirected edges explicitly

    for v in ["a","b","c","d","e","f","w","x","y","z"]:
        G.add_vertex(v)

    # K6 clique on a..f (undirected, weight=1)
    k6 = ["a","b","c","d","e","f"]
    for i in range(len(k6)):
        for j in range(i + 1, len(k6)):
            G.add_edge(k6[i], k6[j], weight=1, edge_directed=False)

    # K4 clique on w,x,y,z (undirected, weight=1)
    k4 = ["w","x","y","z"]
    for i in range(len(k4)):
        for j in range(i + 1, len(k4)):
            G.add_edge(k4[i], k4[j], weight=1, edge_directed=False)

    # Single weak bridge e--x
    G.add_edge("e", "x", weight=0.01, edge_directed=False)
    return G


# ---------- helpers ----------
def _unwrap_ig_distance(obj):
    """igraph returns [[dist]] for single source/target; unwrap to a scalar."""
    if isinstance(obj, (list, tuple)) and len(obj) == 1:
        inner = obj[0]
        if isinstance(inner, (list, tuple)) and len(inner) == 1:
            return inner[0]
        return inner
    return obj


def run_tests():
    # ----- G1: shortest paths -----
    G1 = build_path_graph()

    # Weighted Dijkstra via labels -> expect 11.0
    dist_w = G1.ig.shortest_paths_dijkstra(
        source="alpha", target="phi", weights="weight", _ig_guess_labels=False
    )
    dist_w = _unwrap_ig_distance(dist_w)
    print("[IG:G1 dijkstra weighted] alpha->phi:", dist_w, "(expect 11.0)")

    # Unweighted hop count -> expect 5 (alpha→phi)
    dist_hops = G1.ig.distances(
        source="alpha", target="phi", weights=None, _ig_guess_labels=False
    )
    dist_hops = _unwrap_ig_distance(dist_hops)
    print("[IG:G1 unweighted hops] alpha->phi:", dist_hops, "(expect 5)")

    # Cache invalidation: add fast a->f edge (weight=2) and re-check -> expect 2.0
    G1.add_edge("a", "f", weight=2)
    dist_new = G1.ig.shortest_paths_dijkstra(
        source="alpha", target="phi", weights="weight", _ig_guess_labels=False
    )
    dist_new = _unwrap_ig_distance(dist_new)
    print("[IG:G1 after mutation] alpha->phi:", dist_new, "(expect 2.0)")

    # ----- G2: communities / betweenness / PR / components -----
    G2 = build_community_graph()

    # multilevel (Louvain-like) on undirected view -> expect sizes [4, 6]
    vc = G2.ig.community_multilevel(weights="weight", _ig_directed=False)
    sizes = sorted(vc.sizes())
    print("[IG:G2 multilevel] sizes:", sizes, "(expect [4, 6])")

    # Betweenness centrality (undirected). igraph returns list aligned to vertex order.
    igG_und = G2.ig.backend(directed=False)  # no simple=True needed here
    names = igG_und.vs["name"] if "name" in igG_und.vs.attributes() else list(range(igG_und.vcount()))
    bc_vals = G2.ig.betweenness(directed=False, weights=None)
    top_bc = max(dict(zip(names, bc_vals)), key=lambda k: dict(zip(names, bc_vals))[k])
    print("[IG:G2 betweenness] top:", top_bc, "(expect 'e' or 'x')")

    # PageRank (undirected, unweighted) -> 'e' should dominate
    pr_vals = G2.ig.pagerank(directed=False)
    top_pr = max(dict(zip(names, pr_vals)), key=lambda k: dict(zip(names, pr_vals))[k])
    print("[IG:G2 pagerank] top:", top_pr, "(expect 'e')")

    # Connected components (undirected) -> single component of size 10
    comps = G2.ig.components(_ig_directed=False)  # VertexClustering
    comp_sizes = sorted(comps.sizes())
    print("[IG:G2 connected components]:", comp_sizes, "(expect [10])")

    # ----- Optional: verify simple collapse + aggregation policy -----
    G3 = Graph(directed=True)
    G3.add_vertex("u"); G3.add_vertex("v")
    # parallel undirected edges with attrs
    G3.add_edge("u", "v", weight=5, capacity=3, edge_directed=False)
    G3.add_edge("u", "v", weight=2, capacity=7, edge_directed=False)

    try:
        # Preferred: via proxy (needs the one-line proxy patch shown above)
        igG_simple = G3.ig.backend(
            directed=False, simple=True,
            needed_attrs={"weight", "capacity"},
            edge_aggs={"weight": "min", "capacity": "sum"}
        )
        e = igG_simple.es[0]
        print("[IG:G3 collapse agg via proxy] weight:", e["weight"], "capacity:", e["capacity"], "(expect 2, 10)")
    except Exception:
        # Fallback: ensure attrs exist, then collapse using igraph.simplify
        igG_raw = G3.ig.backend(directed=False, needed_attrs={"weight", "capacity"})
        igG_raw.simplify(multiple=True, loops=True,
                         combine_edges={"weight": "min", "capacity": "sum"})
        e = igG_raw.es[0]
        # after simplify, attrs may be missing if combine didn't carry them; handle safely
        w = e["weight"] if "weight" in igG_raw.es.attributes() else None
        c = e["capacity"] if "capacity" in igG_raw.es.attributes() else None
        print("[IG:G3 collapse agg via simplify] weight:", w, "capacity:", c, "(expect 2, 10)")


if __name__ == "__main__":
    run_tests()


[IG:G1 dijkstra weighted] alpha->phi: 11.0 (expect 11.0)
[IG:G1 unweighted hops] alpha->phi: 5 (expect 5)
[IG:G1 after mutation] alpha->phi: 2.0 (expect 2.0)
[IG:G2 multilevel] sizes: [4, 6] (expect [4, 6])
[IG:G2 betweenness] top: e (expect 'e' or 'x')
[IG:G2 pagerank] top: e (expect 'e')
[IG:G2 connected components]: [10] (expect [10])
[IG:G3 collapse agg via simplify] weight: 2 capacity: 10.0 (expect 2, 10)
