Skip to content

sharding design

Kadyapam edited this page Jun 4, 2026 · 2 revisions

Sharding Design (Phase F)

This page is the durable design doc for Phase F of the Rust server FastAPI parity port (noetl/ai-meta#49). R1 of Phase F (noetl/server#40) codifies the survey findings into the design that R2–R5 implement.

Scope: the cluster-level orchestration that lets noetl-server run as N replicas, each owning a slice of executions, while every playbook still observes single-server semantics.

Out of scope (callable later, not now): multi-region; per-tenant sharding; dynamic re-sharding (adding shards on the fly); cross-shard joins.

TL;DR

  1. Shard key: hash(execution_id) % N on the full 64-bit snowflake.
  2. Routing: gateway-aware for Phase F. The ingress load balancer extracts execution_id from the request and dispatches to the owning shard via consistent hashing. Server-aware proxying is a Phase G optimization.
  3. Per-execution tables (event, command, execution, outbox, variables) partition by execution_id.
  4. Cluster-wide tables (catalog, credential, keychain, runtime) live on a single Postgres master for Phase F; per- shard read replicas as Phase G if replication lag bites.
  5. Load-bearing prerequisite: app-side snowflake IDs must land before R4 (DB sharding). Per observability.md Principle 3, execution_id is generated in the application, not by the DB. Currently the Rust server still calls noetl.snowflake_id(); this wants to flip first.

Why the architecture is already ready

The survey (Umbrella-Rust-Server-Port Phase F section) found that every per-execution operation is already keyed by execution_id:

  • publish_command_notification (execute.rs:535) already derives NATS subjects as noetl.commands.{system|shared}.<execution_id>, mirroring the Python pool_routing scheme. Pattern extends trivially to noetl.commands.{shard_N}.<execution_id>.
  • The orchestrator (engine/orchestrator.rs::WorkflowOrchestrator) is stateless re: execution — every trigger loads events fresh from the DB. No in-memory state means sharding adds routing complexity only, not state-migration complexity.
  • Advisory locks for command claiming (events.rs::claim_command) are scoped to command_id, which lives on one shard. No cross-shard locking needed.
  • Transient variables (noetl.variables) are already keyed by execution_id.

Sharding is an orchestration problem.

Shard assignment

fn shard_for(execution_id: i64, shard_count: u32) -> u32 {
    use std::hash::{Hash, Hasher};
    let mut h = std::collections::hash_map::DefaultHasher::new();
    execution_id.hash(&mut h);
    (h.finish() % shard_count as u64) as u32
}

Why hash(execution_id) % N, not execution_id % N:

The snowflake i64 has layout [timestamp: 41][machine_id: 10][sequence: 12]. A naive modulo on the raw value gives hot spots because the low- order bits (the 12-bit sequence) cycle predictably within each ms on each machine, and the high-order bits (timestamp) advance together across all producers. Modulo on a hash distributes evenly.

Why not timestamp % N: all executions started in the same second hit the same shard.

Why not machine_id % N: executions cluster by the machine that generated the snowflake (the gateway pod, in practice).

Stability across restarts: DefaultHasher is not stable across Rust releases. Pick a stable hash function (xxhash or std::hash::SipHasher with a fixed key) before R2 lands.

Routing strategy

Phase F: gateway-aware

The ingress load balancer extracts execution_id from the request and dispatches to the owning shard's replica group using consistent hashing. Implementation options:

  • Kong — Lua plugin to extract execution_id and route.
  • nginxmap directive + upstream group per shard.
  • GCP Cloud Load Balancer / Envoy — header-based routing
    • custom request-extraction logic.

The shard assignment scheme lives in two places (LB config + the server's own shard_for function); a drift-guard test in repos/server/tests validates they agree.

Trade-off: simpler operationally (one network hop, centralized routing logic), but the LB becomes a single point of failure for sharding. Mitigated by running the LB itself in HA mode.

Phase G (deferred): server-aware proxy fallback

Any server replica accepts any request; if shard_for(execution_id) doesn't match the local shard, the server proxies the request to the owning shard's HTTP endpoint. Requires inter-shard mTLS + a static routing table or service-mesh integration.

Lands as a safety net only if the gateway-aware path proves fragile in production. Phase F doesn't need it.

Per-execution tables (shardable)

Partition by execution_id:

Table Key columns Partition scheme Citation
noetl.event (event_id, execution_id, catalog_id, ...) by execution_id hash db/queries/events.rs
noetl.command (command_id, event_id, execution_id, ...) by execution_id hash db/queries/commands.rs
noetl.execution (execution_id, catalog_id, status, ...) by execution_id hash db/queries/executions.rs
noetl.outbox (outbox_id, execution_id, subject, ...) by execution_id hash db/queries/outbox.rs
noetl.variables (var_id, execution_id, var_name, ...) by execution_id hash db/queries/variables.rs

Partitioning approach (R4 decision):

  • Citus — managed partitioning; transparent to the application; joins across shards work. Operationally simplest.
  • Per-shard separate schemas — explicit; easier to debug; no joins; manual fan-out for cluster-wide queries.
  • Per-shard separate DBs — most isolated; highest ops cost; needs an aggregation layer for any cross-shard view.

Lean recommendation: start with Citus. Migration to per-shard DBs is mechanical if Citus proves inadequate later.

Cluster-wide tables (NOT shardable)

Table Why cluster-wide Phase F strategy
noetl.catalog Global playbook catalog; read-only from each execution's perspective Single-master Postgres; every shard reads from it
noetl.credential Global credential store; read-only from each execution Same
noetl.keychain Keyed by catalog_id, not execution_id Same
noetl.runtime Worker pool registration / heartbeat; cluster-wide Same

Phase G optimization (only if replication lag becomes the bottleneck): add per-shard read-only replicas of the cluster-wide tables. Writes still go to the master; reads go local.

Endpoint inventory

Per-execution (route by execution_id)

Endpoint execution_id source Handler
POST /api/execute body (server generates snowflake; subject to app-side-snowflake migration) handlers/execute.rs::execute_playbook
POST /api/events body (string-wire i64) handlers/events.rs::handle_event
POST /api/events/batch body handlers/events.rs::handle_batch_events
GET /api/commands/{event_id} path → DB lookup of execution_id handlers/events.rs::get_command
POST /api/commands/{event_id}/claim path → DB lookup handlers/events.rs::claim_command
GET /api/executions/{execution_id} path handlers/executions.rs::get_execution
GET /api/executions/{execution_id}/status path handlers/executions.rs::get_status
POST /api/executions/{execution_id}/cancel path handlers/executions.rs::cancel
GET /api/executions/{execution_id}/cancellation-check path handlers/executions.rs::cancellation_check
POST /api/executions/{execution_id}/finalize path handlers/executions.rs::finalize
GET /api/executions/{execution_id}/events/stream (SSE) path handlers/executions.rs::stream_events
GET / POST / DELETE /api/vars/{execution_id} path handlers/variables.rs
POST /api/internal/outbox/claim per-execution batch (claim scopes by execution_id) handlers/internal.rs::outbox_claim
POST /api/internal/outbox/mark-published body (carries execution_id) handlers/internal.rs::outbox_mark_published
POST /api/internal/outbox/mark-failed body handlers/internal.rs::outbox_mark_failed
POST /api/internal/events/project body handlers/internal.rs::events_project

Cluster-wide (any shard answers)

Endpoint Why Handler
GET /api/executions (list) cluster-wide query (aggregates across all shards in Phase F via a fan-out wrapper, or use cluster-wide read-only view via Citus) handlers/executions.rs::list
GET/POST /api/catalog/* global catalog handlers/catalog.rs
POST/GET /api/credentials/* global credential store handlers/credentials.rs
POST/GET /api/keychain/{catalog_id}/* keyed by catalog_id handlers/keychain.rs
POST /api/worker/pool/{register,deregister,heartbeat} worker pools are global handlers/runtime.rs
GET /api/worker/pools cluster-wide read handlers/runtime.rs::list_pools
GET /api/internal/outbox/pending-count cluster-wide count (KEDA scaler trigger) handlers/internal.rs::outbox_pending_count
GET /metrics per-replica prometheus surface handlers/metrics.rs
GET /healthz, GET /readyz per-replica health probes (route definition)

GET /api/executions (list) is the awkward case: clients expect a single global view of all executions, but each shard only knows its own slice. R4 decision: aggregate via Citus's cross-shard query (if Citus is picked) or via a small in-server fan-out helper that queries all shard replicas in parallel and merges the result. Either is bounded in cost because the list is paginated.

Load-bearing prerequisite: app-side snowflake IDs

Per observability.md Principle 3, execution_id is generated in the application using a Rust snowflake library, not by the DB.

Currently the Rust server still calls noetl.snowflake_id() (a Postgres function) on every event / command create. This wants to flip before R4 (DB sharding). Reasons:

  1. Per-shard DB sequences cluster IDs to one shard. If each shard's noetl.snowflake_id() continues to generate from its own DB, then the natural assignment shard_for(execution_id) would always pick that shard (the machine_id portion encodes the shard). Defeats the routing.
  2. Cross-shard generation needs global coordination. If we instead want IDs to span shards, we'd need a single master to generate all snowflakes (write bottleneck), or some inter- shard coordination scheme (complex).
  3. Application-side generation is the right shape anyway. Per observability.md, spans need the ID at span-creation time; retries are idempotent only with a stable ID; tests need deterministic IDs; cross-component publish doesn't have to wait for the INSERT round trip.

Migration shape (a small follow-up sub-issue, will spin out during R2 or earlier as standalone):

  1. Add a Rust snowflake helper to repos/server/src/snowflake.rs (or pull snowflaked / sonyflake from crates.io).
  2. Derive machine_id from WORKER_ID env var (set per pod by the deployment manifest), hashed to 10 bits.
  3. Generate IDs in the application; pass them explicitly to every INSERT. Leave the DB-side noetl.snowflake_id() function in place as a fallback for ad-hoc admin SQL writes (per observability.md's per-component migration order).

Phase F decomposition

Round Scope Output
R1 (this round) Sharding design doc + endpoint inventory codification This page + sub-issue noetl/server#40 closed. ai-meta umbrella Forward section refreshed.
R1.5 (prerequisite) App-side snowflake ID migration New repos/server/src/snowflake.rs helper; WORKER_ID-derived machine_id; explicit ID generation at every INSERT call site. DB-side noetl.snowflake_id() stays as the ad-hoc fallback.
R2 Server-side shard_id() helper + (optional) route_to_shard() proxy New repos/server/src/state.rs helpers; HTTP proxy fallback for mis-routed requests (gives a safety net while gateway routing rolls out). Stable hash function chosen (xxhash or fixed-key SipHasher).
R3 Gateway-side dispatch + load balancer config Ingress LB extracts execution_id and routes by consistent hashing. Drift-guard test validates LB config + shard_for agree.
R4 DB sharding + per-shard schema migration Partition event / command / execution / outbox / variables by execution_id via Citus (lean recommendation) or per-shard schemas. Replicate catalog / credential / keychain / runtime (or keep on shared read-only path). Schema migration dry-run on kind first.
R5 Cutover + validation Spin up N=2 shard replicas in kind; run Phase D e2e harness across shards; confirm execution affinity (all events for one execution hit one shard); prod canary + traffic split on the ingress.

Open question NOT decided in R1

  • DB partition scheme: Citus vs per-shard schemas vs per-shard DBs. R4 decision (recorded 2026-06-04): per-shard physical DBs, NOT Citus. Reason: Citus adds a Postgres-extension dep across the deployed image (kind + GKE); per-shard physical DBs give cluster-level isolation ("one slow query in shard 1 doesn't block shard 0") with no extension. Tradeoff: more ops cost (N Postgres clusters) and a cluster-wide fan-out helper for the GET /api/executions list endpoint (R4-4).

Phase G — keychain-backed shard endpoints (deferred decision)

R4 ships with a flat NOETL_SHARDS env var carrying semicolon-separated DSN strings + an NOETL_CLUSTER_DSN env var for the cluster-wide master. See R4-1 (noetl/server#49, v2.13.0) for the implementation.

This is the right shape for kind dev + single-cloud prod but the wrong shape for multi-cloud + per-shard credential rotation

  • runtime topology changes. The correct long-term shape is:
Layer Where it lives
NOETL_CLUSTER_DSN env var bootstrap — connects to cluster master
NOETL_KEYCHAIN_MASTER_KEY env var decrypts noetl.keychain (already exists per credential service)
noetl.keychain (on cluster master) per-shard creds + per-external-storage creds, encrypted
noetl.shard_endpoint (NEW table, on cluster master) shard topology; rows reference keychain aliases
Per-shard pools built at startup by resolving shard_endpoint rows + decrypting keychain entries

Proposed noetl.shard_endpoint schema:

CREATE TABLE noetl.shard_endpoint (
    shard_index       int  PRIMARY KEY,
    host              text NOT NULL,
    port              int  DEFAULT 5432,
    database          text DEFAULT 'noetl',
    auth_kind         text NOT NULL,  -- 'gcp_workload_identity' |
                                      -- 'aws_iam' | 'password' |
                                      -- 'mtls_cert'
    credential_alias  text REFERENCES noetl.keychain(alias),
    active            bool NOT NULL DEFAULT true,
    notes             text,
    created_at        timestamptz NOT NULL DEFAULT now()
);

noetl.result_ref (already exists for Arrow IPC pointers) gains:

ALTER TABLE noetl.result_ref
    ADD COLUMN credential_alias text REFERENCES noetl.keychain(alias),
    ADD COLUMN auth_kind         text;

So the same model covers BOTH:

  • NoETL-internal shard endpoints (cross-cloud, per-shard creds).
  • External storage pointers (GCS / S3 / MinIO / etc. — each row carries its own credential alias).

Why deferred to Phase G:

  • R4 needs to ship + kind-validate the routing + handler cutover in a single-cloud configuration first. The env-var DSN shape is sufficient for that proof-of-shape.
  • Adding noetl.shard_endpoint + the keychain-resolution loader is a multi-PR refactor of ShardingConfig + bootstrap. Doing it before R4-3/4/5 lands gates the rest of R4 on a design that can mature on its own track.
  • Per agents/rules/execution-model.md § "Secrets and credentials rule", tenant database DSNs are business-logic credentials and belong in the keychain. The current NOETL_SHARDS env var is non-final for production multi-cloud; it's documented as such here.

Phase G shape (when it lands):

Round Scope
G1 noetl.shard_endpoint schema + cluster-master loader; result_ref.credential_alias migration.
G2 ShardingConfig::from_cluster_db() — env-var path becomes kind-dev fallback only.
G3 /api/internal/shards/topology endpoint for system-pool admin tooling.
G4 Hot-reload — refresh shard map on signal without restart.
G5 Cross-cloud kind validation (kind master + simulated remote shard via container network).

Validation

R1's deliverable is durable design content. No runtime validation (it's a wiki commit). R2 onwards each ship their own kind-validation per agents/rules/deployment-validation.md.

See also