Causal-aware execution runtime for production Python systems.
cegraph tracks causal dependencies between Python functions, records execution traces, runs counterfactual simulations, and dispatches adaptive fallback when constraints are violated. Built for production MLOps, dynamic decisioning, and root-cause analysis.
3.77% overhead · 16/16 tests · v0.1.0 · Python 3.10+ · Zero heavy deps
Production models degrade silently. Isolate the causal node, simulate counterfactuals, and route to fallback — no downtime.
@causal_node(sensitivity=["feature_distribution"])
def encode(raw: dict) -> dict:
return preprocessor.transform(raw)
@causal_node(constraint=lambda x: x > 0.5)
def predict(features: dict) -> float:
return model.predict(features)
with Context(buffer_size=5000) as ctx:
score = predict(encode(input_data))When predict degrades, optimize() returns fallback_cache or fallback_bypass automatically.
"What if I raise price 5%?" answered with confidence scores — no A/B test required.
cf = counterfactual(
base_trace=ctx.tracer.records,
interventions={"pricing_model": {"multiplier": 1.05}},
n_perturbations=100,
)
print(f"Impact: {cf.overall_impact:.3f}, Confidence: {cf.confidence:.2f}")
print(f"Ranked nodes: {cf.node_impacts}")Replace correlation-timing debugging with explicit causal traces. Incidents: hours → minutes.
summary = ctx.tracer.summary()
for node, stats in summary.items():
print(f"{node}: count={stats['count']}, mean={stats['mean_latency']:.2f}ms, p95={stats['p95_latency']:.2f}ms")pip install cegraph📦 PyPI: https://pypi.org/project/cegraph 📚 Docs: https://cegraph-docs.pages.dev
Requires Python 3.10+ and numpy 1.24+.
A complete 4-node pipeline — runnable as-is:
from cegraph import causal_node, CausalGraph, Context, counterfactual, optimize
@causal_node(sensitivity=["symbol"])
def fetch_price(symbol: str) -> dict:
return {"price": 100.0, "symbol": symbol}
@causal_node(sensitivity=["price"])
def apply_tax(data: dict) -> float:
return data["price"] * 1.1
@causal_node(sensitivity=["price"])
def apply_discount(base: float) -> float:
return base * 0.95
@causal_node(constraint=lambda x: x > 0)
def finalize(price: float) -> float:
return round(price, 2)
graph = CausalGraph()
graph.connect(fetch_price, apply_tax)
graph.connect(apply_tax, apply_discount)
graph.connect(apply_discount, finalize)
graph.validate()
with Context(buffer_size=1000) as ctx:
data = fetch_price("AAPL")
taxed = apply_tax(data)
discounted = apply_discount(taxed)
result = finalize(discounted)
print(f"Result: {result}")Output:
>>> Result: 104.5
| API | Description | Key Parameters |
|---|---|---|
@causal_node() |
Decorate a function for causal tracking | sensitivity, constraint, low_sensitivity |
CausalGraph |
Directed graph with cycle/type validation | .connect(src, dst), .validate() |
Context(graph, buffer_size, sample_rate) |
Session scope, binds tracer to all calls | graph, buffer_size, sample_rate |
CausalTracer |
Lock-free ring buffer, O(1) append, lossy overflow | buffer_size, sample_rate |
TraceRecord |
__slots__ dataclass — per-node execution record |
node_name, latency_ms, input_hash, ... |
counterfactual() |
Deterministic perturbation engine (seed=42) | base_trace, interventions, n_perturbations |
optimize() |
Fallback dispatch: cache → bypass → error | objective, constraints |
| Exception | Raised when |
|---|---|
CausalCycleError |
Cycle detected in graph. Message includes full path. |
CausalTypeError |
Return type of src node != param type of dst node. |
CausalConstraintViolation |
Node constraint returned False. |
TracerOverflowWarning |
Ring buffer full, overwriting oldest records. |
Native (baseline) ████████████████████ 100%
cegraph @causal_node ████████████████████▌ 103.77% ✓
OpenTelemetry (typical) ████████████████████████ ~120%
Full APM agent (typical) ████████████████████████████ ~140%
Measured over 10,000 iterations using time.perf_counter().
Full benchmark: tests/test_overhead.py
See CONTRIBUTING.md for development setup, code style, and PR process.
MIT — see LICENSE for details.