-
Notifications
You must be signed in to change notification settings - Fork 0
sharding design
This page is the durable design doc for Phase F of the Rust server FastAPI parity port (noetl/ai-meta#49). R1 of Phase F (noetl/server#40) codifies the survey findings into the design that R2–R5 implement.
Scope: the cluster-level orchestration that lets noetl-server run as N replicas, each owning a slice of executions, while every playbook still observes single-server semantics.
Out of scope (callable later, not now): multi-region; per-tenant sharding; dynamic re-sharding (adding shards on the fly); cross-shard joins.
-
Shard key:
hash(execution_id) % Non the full 64-bit snowflake. -
Routing: gateway-aware for Phase F. The ingress load balancer
extracts
execution_idfrom the request and dispatches to the owning shard via consistent hashing. Server-aware proxying is a Phase G optimization. -
Per-execution tables (
event,command,execution,outbox,variables) partition byexecution_id. -
Cluster-wide tables (
catalog,credential,keychain,runtime) live on a single Postgres master for Phase F; per- shard read replicas as Phase G if replication lag bites. -
Load-bearing prerequisite: app-side snowflake IDs must land
before R4 (DB sharding). Per
observability.mdPrinciple 3,execution_idis generated in the application, not by the DB. Currently the Rust server still callsnoetl.snowflake_id(); this wants to flip first.
The survey (Umbrella-Rust-Server-Port Phase F section)
found that every per-execution operation is already keyed by
execution_id:
-
publish_command_notification(execute.rs:535) already derives NATS subjects asnoetl.commands.{system|shared}.<execution_id>, mirroring the Python pool_routing scheme. Pattern extends trivially tonoetl.commands.{shard_N}.<execution_id>. - The orchestrator (
engine/orchestrator.rs::WorkflowOrchestrator) is stateless re: execution — every trigger loads events fresh from the DB. No in-memory state means sharding adds routing complexity only, not state-migration complexity. - Advisory locks for command claiming (
events.rs::claim_command) are scoped tocommand_id, which lives on one shard. No cross-shard locking needed. - Transient variables (
noetl.variables) are already keyed byexecution_id.
Sharding is an orchestration problem.
fn shard_for(execution_id: i64, shard_count: u32) -> u32 {
use std::hash::{Hash, Hasher};
let mut h = std::collections::hash_map::DefaultHasher::new();
execution_id.hash(&mut h);
(h.finish() % shard_count as u64) as u32
}Why hash(execution_id) % N, not execution_id % N:
The snowflake i64 has layout [timestamp: 41][machine_id: 10][sequence: 12].
A naive modulo on the raw value gives hot spots because the low-
order bits (the 12-bit sequence) cycle predictably within each ms
on each machine, and the high-order bits (timestamp) advance
together across all producers. Modulo on a hash distributes evenly.
Why not timestamp % N: all executions started in the same
second hit the same shard.
Why not machine_id % N: executions cluster by the machine
that generated the snowflake (the gateway pod, in practice).
Stability across restarts: DefaultHasher is not stable
across Rust releases. Pick a stable hash function (xxhash or
std::hash::SipHasher with a fixed key) before R2 lands.
The ingress load balancer extracts execution_id from the
request and dispatches to the owning shard's replica group
using consistent hashing. Implementation options:
-
Kong — Lua plugin to extract
execution_idand route. -
nginx —
mapdirective + upstream group per shard. -
GCP Cloud Load Balancer / Envoy — header-based routing
- custom request-extraction logic.
The shard assignment scheme lives in two places (LB config + the
server's own shard_for function); a drift-guard test in
repos/server/tests validates they agree.
Trade-off: simpler operationally (one network hop, centralized routing logic), but the LB becomes a single point of failure for sharding. Mitigated by running the LB itself in HA mode.
Any server replica accepts any request; if shard_for(execution_id)
doesn't match the local shard, the server proxies the request to
the owning shard's HTTP endpoint. Requires inter-shard mTLS + a
static routing table or service-mesh integration.
Lands as a safety net only if the gateway-aware path proves fragile in production. Phase F doesn't need it.
Partition by execution_id:
| Table | Key columns | Partition scheme | Citation |
|---|---|---|---|
noetl.event |
(event_id, execution_id, catalog_id, ...) |
by execution_id hash |
db/queries/events.rs |
noetl.command |
(command_id, event_id, execution_id, ...) |
by execution_id hash |
db/queries/commands.rs |
noetl.execution |
(execution_id, catalog_id, status, ...) |
by execution_id hash |
db/queries/executions.rs |
noetl.outbox |
(outbox_id, execution_id, subject, ...) |
by execution_id hash |
db/queries/outbox.rs |
noetl.variables |
(var_id, execution_id, var_name, ...) |
by execution_id hash |
db/queries/variables.rs |
Partitioning approach (R4 decision):
- Citus — managed partitioning; transparent to the application; joins across shards work. Operationally simplest.
- Per-shard separate schemas — explicit; easier to debug; no joins; manual fan-out for cluster-wide queries.
- Per-shard separate DBs — most isolated; highest ops cost; needs an aggregation layer for any cross-shard view.
Lean recommendation: start with Citus. Migration to per-shard DBs is mechanical if Citus proves inadequate later.
| Table | Why cluster-wide | Phase F strategy |
|---|---|---|
noetl.catalog |
Global playbook catalog; read-only from each execution's perspective | Single-master Postgres; every shard reads from it |
noetl.credential |
Global credential store; read-only from each execution | Same |
noetl.keychain |
Keyed by catalog_id, not execution_id
|
Same |
noetl.runtime |
Worker pool registration / heartbeat; cluster-wide | Same |
Phase G optimization (only if replication lag becomes the bottleneck): add per-shard read-only replicas of the cluster-wide tables. Writes still go to the master; reads go local.
| Endpoint | execution_id source | Handler |
|---|---|---|
POST /api/execute |
body (server generates snowflake; subject to app-side-snowflake migration) | handlers/execute.rs::execute_playbook |
POST /api/events |
body (string-wire i64) | handlers/events.rs::handle_event |
POST /api/events/batch |
body | handlers/events.rs::handle_batch_events |
GET /api/commands/{event_id} |
path → DB lookup of execution_id | handlers/events.rs::get_command |
POST /api/commands/{event_id}/claim |
path → DB lookup | handlers/events.rs::claim_command |
GET /api/executions/{execution_id} |
path | handlers/executions.rs::get_execution |
GET /api/executions/{execution_id}/status |
path | handlers/executions.rs::get_status |
POST /api/executions/{execution_id}/cancel |
path | handlers/executions.rs::cancel |
GET /api/executions/{execution_id}/cancellation-check |
path | handlers/executions.rs::cancellation_check |
POST /api/executions/{execution_id}/finalize |
path | handlers/executions.rs::finalize |
GET /api/executions/{execution_id}/events/stream (SSE) |
path | handlers/executions.rs::stream_events |
GET / POST / DELETE /api/vars/{execution_id} |
path | handlers/variables.rs |
POST /api/internal/outbox/claim |
per-execution batch (claim scopes by execution_id) |
handlers/internal.rs::outbox_claim |
POST /api/internal/outbox/mark-published |
body (carries execution_id) |
handlers/internal.rs::outbox_mark_published |
POST /api/internal/outbox/mark-failed |
body | handlers/internal.rs::outbox_mark_failed |
POST /api/internal/events/project |
body | handlers/internal.rs::events_project |
| Endpoint | Why | Handler |
|---|---|---|
GET /api/executions (list) |
cluster-wide query (aggregates across all shards in Phase F via a fan-out wrapper, or use cluster-wide read-only view via Citus) | handlers/executions.rs::list |
GET/POST /api/catalog/* |
global catalog | handlers/catalog.rs |
POST/GET /api/credentials/* |
global credential store | handlers/credentials.rs |
POST/GET /api/keychain/{catalog_id}/* |
keyed by catalog_id
|
handlers/keychain.rs |
POST /api/worker/pool/{register,deregister,heartbeat} |
worker pools are global | handlers/runtime.rs |
GET /api/worker/pools |
cluster-wide read | handlers/runtime.rs::list_pools |
GET /api/internal/outbox/pending-count |
cluster-wide count (KEDA scaler trigger) | handlers/internal.rs::outbox_pending_count |
GET /metrics |
per-replica prometheus surface | handlers/metrics.rs |
GET /healthz, GET /readyz
|
per-replica health probes | (route definition) |
GET /api/executions (list) is the awkward case: clients expect
a single global view of all executions, but each shard only knows
its own slice. R4 decision: aggregate via Citus's cross-shard query
(if Citus is picked) or via a small in-server fan-out helper that
queries all shard replicas in parallel and merges the result.
Either is bounded in cost because the list is paginated.
Per observability.md Principle 3, execution_id is generated in
the application using a Rust snowflake library, not by the DB.
Currently the Rust server still calls noetl.snowflake_id() (a
Postgres function) on every event / command create. This wants to
flip before R4 (DB sharding). Reasons:
-
Per-shard DB sequences cluster IDs to one shard. If each
shard's
noetl.snowflake_id()continues to generate from its own DB, then the natural assignmentshard_for(execution_id)would always pick that shard (the machine_id portion encodes the shard). Defeats the routing. - Cross-shard generation needs global coordination. If we instead want IDs to span shards, we'd need a single master to generate all snowflakes (write bottleneck), or some inter- shard coordination scheme (complex).
- Application-side generation is the right shape anyway. Per observability.md, spans need the ID at span-creation time; retries are idempotent only with a stable ID; tests need deterministic IDs; cross-component publish doesn't have to wait for the INSERT round trip.
Migration shape (a small follow-up sub-issue, will spin out during R2 or earlier as standalone):
- Add a Rust snowflake helper to
repos/server/src/snowflake.rs(or pullsnowflaked/sonyflakefrom crates.io). - Derive
machine_idfromWORKER_IDenv var (set per pod by the deployment manifest), hashed to 10 bits. - Generate IDs in the application; pass them explicitly to every
INSERT. Leave the DB-sidenoetl.snowflake_id()function in place as a fallback for ad-hoc admin SQL writes (per observability.md's per-component migration order).
| Round | Scope | Output |
|---|---|---|
| R1 (this round) | Sharding design doc + endpoint inventory codification | This page + sub-issue noetl/server#40 closed. ai-meta umbrella Forward section refreshed. |
| R1.5 (prerequisite) | App-side snowflake ID migration | New repos/server/src/snowflake.rs helper; WORKER_ID-derived machine_id; explicit ID generation at every INSERT call site. DB-side noetl.snowflake_id() stays as the ad-hoc fallback. |
| R2 | Server-side shard_id() helper + (optional) route_to_shard() proxy |
New repos/server/src/state.rs helpers; HTTP proxy fallback for mis-routed requests (gives a safety net while gateway routing rolls out). Stable hash function chosen (xxhash or fixed-key SipHasher). |
| R3 | Gateway-side dispatch + load balancer config | Ingress LB extracts execution_id and routes by consistent hashing. Drift-guard test validates LB config + shard_for agree. |
| R4 | DB sharding + per-shard schema migration | Partition event / command / execution / outbox / variables by execution_id via Citus (lean recommendation) or per-shard schemas. Replicate catalog / credential / keychain / runtime (or keep on shared read-only path). Schema migration dry-run on kind first. |
| R5 | Cutover + validation | Spin up N=2 shard replicas in kind; run Phase D e2e harness across shards; confirm execution affinity (all events for one execution hit one shard); prod canary + traffic split on the ingress. |
-
DB partition scheme: Citus vs per-shard schemas vs per-shard DBs.
R4 decision (recorded 2026-06-04): per-shard physical DBs,
NOT Citus. Reason: Citus adds a Postgres-extension dep across
the deployed image (kind + GKE); per-shard physical DBs give
cluster-level isolation ("one slow query in shard 1 doesn't
block shard 0") with no extension. Tradeoff: more ops cost
(N Postgres clusters) and a cluster-wide fan-out helper for
the
GET /api/executionslist endpoint (R4-4).
R4 ships with a flat NOETL_SHARDS env var carrying
semicolon-separated DSN strings + an NOETL_CLUSTER_DSN env var
for the cluster-wide master. See R4-1 (noetl/server#49,
v2.13.0) for the implementation.
This is the right shape for kind dev + single-cloud prod but the wrong shape for multi-cloud + per-shard credential rotation
- runtime topology changes. The correct long-term shape is:
| Layer | Where it lives |
|---|---|
NOETL_CLUSTER_DSN env var |
bootstrap — connects to cluster master |
NOETL_KEYCHAIN_MASTER_KEY env var |
decrypts noetl.keychain (already exists per credential service) |
noetl.keychain (on cluster master) |
per-shard creds + per-external-storage creds, encrypted |
noetl.shard_endpoint (NEW table, on cluster master) |
shard topology; rows reference keychain aliases |
| Per-shard pools | built at startup by resolving shard_endpoint rows + decrypting keychain entries |
Proposed noetl.shard_endpoint schema:
CREATE TABLE noetl.shard_endpoint (
shard_index int PRIMARY KEY,
host text NOT NULL,
port int DEFAULT 5432,
database text DEFAULT 'noetl',
auth_kind text NOT NULL, -- 'gcp_workload_identity' |
-- 'aws_iam' | 'password' |
-- 'mtls_cert'
credential_alias text REFERENCES noetl.keychain(alias),
active bool NOT NULL DEFAULT true,
notes text,
created_at timestamptz NOT NULL DEFAULT now()
);noetl.result_ref (already exists for Arrow IPC pointers) gains:
ALTER TABLE noetl.result_ref
ADD COLUMN credential_alias text REFERENCES noetl.keychain(alias),
ADD COLUMN auth_kind text;So the same model covers BOTH:
- NoETL-internal shard endpoints (cross-cloud, per-shard creds).
- External storage pointers (GCS / S3 / MinIO / etc. — each row carries its own credential alias).
Why deferred to Phase G:
- R4 needs to ship + kind-validate the routing + handler cutover in a single-cloud configuration first. The env-var DSN shape is sufficient for that proof-of-shape.
- Adding
noetl.shard_endpoint+ the keychain-resolution loader is a multi-PR refactor ofShardingConfig+ bootstrap. Doing it before R4-3/4/5 lands gates the rest of R4 on a design that can mature on its own track. - Per
agents/rules/execution-model.md§ "Secrets and credentials rule", tenant database DSNs are business-logic credentials and belong in the keychain. The currentNOETL_SHARDSenv var is non-final for production multi-cloud; it's documented as such here.
Phase G shape (when it lands):
| Round | Scope |
|---|---|
| G1 |
noetl.shard_endpoint schema + cluster-master loader; result_ref.credential_alias migration. |
| G2 |
ShardingConfig::from_cluster_db() — env-var path becomes kind-dev fallback only. |
| G3 |
/api/internal/shards/topology endpoint for system-pool admin tooling. |
| G4 | Hot-reload — refresh shard map on signal without restart. |
| G5 | Cross-cloud kind validation (kind master + simulated remote shard via container network). |
R1's deliverable is durable design content. No runtime validation
(it's a wiki commit). R2 onwards each ship their own kind-validation
per agents/rules/deployment-validation.md.
-
Umbrella-Rust-Server-Porton ai-meta wiki — the cross-repo phase tracker. -
event-envelope— the wire format; shard-routing-transparent. -
runtime-shape— the 4-binary runtime. -
agents/rules/data-access-boundary.md— Reason 2 ("Sharding readiness") cites this design. -
agents/rules/observability.md— Principle 3 (the app-side snowflake prerequisite). -
agents/rules/execution-model.md— the gateway / worker / playbook boundary that sharding doesn't change.
- Event envelope
- Event-sourced execution
- API surface
- Runtime shape (compiled + plug-in ring)
- Cursor / claim loop mode
- noetl/cli wiki
- noetl/worker wiki
- noetl/tools wiki
- noetl/noetl wiki — Python implementation (twin during migration)
- noetl/ops wiki