Skip to content

Umbrella Subscription Listener

Kadyapam edited this page Jun 12, 2026 · 16 revisions

Umbrella — Subscription / Listener Tool (RFC)

ai-task: noetl/ai-meta#90 · Opened: 2026-06-11 · Revised: 2026-06-11 (v3 — header/attribute-as-instruction directive layer) · Status: ✅ CLOSED — all 7 phases shipped + live-proven (2026-06-12). Phase 7 (scale hardening) lands: POST /api/execute/batch, an opt-in exactly-once dedup window, and per-subscription rate limits / backpressure — server v3.5.0 (server#189) + worker v5.19.0 (worker#79) + ops (ops#176) + e2e (e2e#48). Live on kind: batch 12→12 COMPLETED + per-message traceparent preserved; dedup duplicate→1 execution + audit event; rate-limit engaged + 10/10 messages → executions (no loss). Refinement follow-ups: #94 (s3 spool backend) + #93 (cross-restart drain) SHIPPED 2026-06-12 — tools v3.7.1 + worker v5.20.0, live-proven on kind/MinIO (outage→buffer→restart→startup auto-drain, ordered+idempotent, no loss); #92 (shared noetl-directives crate) SHIPPED 2026-06-12 — published noetl-directives 0.1.0 + tools v3.8.0 + gateway v3.4.0 (de-vendored, edge stays lean); the optional noetl-spool extraction is deferred (single consumer, no drift). #91 (live OIDC) validation complete + proven live 2026-06-12 — real Google-signed token verified against the live JWKS + full HTTP run on kind (gateway#30 + e2e#50 merged → #91 CLOSED; ai-meta pointers gateway f175a87 + e2e f7a24de); tools#57 (real-Pub/Sub pull) still open. Earlier: Phase 3 (2026-06-11)gateway push-ingress (Mode C) + auth-gated directive trust lands. POST /ingress/{listener} on the gateway terminates untrusted webhook / Pub-Sub-push traffic as a verify-and-forward gatekeeper (no DB on the ingress path): it verifies the delivery (HMAC / bearer / Pub-Sub OIDC, secret resolved from the Wallet by alias via the server's GET /api/internal/ingress/{listener}) and only then applies the header directives + forwards one POST /api/execute per delivery on the dedicated pool. The auth gate is a structural invariant (verify_then_plan): a tampered/unauth delivery is rejected before any directive is parsed, so an unauthenticated caller can never drive routing (RFC §7.5). Shipped across noetl-gateway v3.3.0 (gateway#28) + noetl-server v3.3.0 (server#182) + ops (ops#172) + e2e (e2e#43). Live on kind: HMAC 12/12 + bearer 12/12 assertions — verified deliveries → executions on the subscription pool → COMPLETED, allowlisted redirect honored only post-verification, tampered/unauth → 401 with no execution + no directive; Pub/Sub-push envelope + attributes-channel directive proven live; OIDC signature unit-proven (every negative). Phase 2 (2026-06-11): kind: Subscription is a first-class catalog type with an event-sourced lifecycle, the continuous listener runtime (Mode B) turns each message into one execution on a dedicated pool segment, and the header-directive engine (redirect / pool / idempotency / content / W3C trace) lands. Shipped across noetl-tools v3.3.0 (tools#52), noetl-server v3.2.0 (server#180), noetl-worker v5.16.0 (worker#73), ops (ops#171), e2e (e2e#42). Live on kind: 6 NATS messages → 6 executions on the dedicated subscription pool → all COMPLETED; 2 header-redirected; W3C traceparent into all 6 children's meta.trace; full lifecycle registered→activated→paused→resumed→draining→deactivated event-logged. Phase 1 (2026-06-11): the bounded-drain subscription tool (Mode A) in noetl-tools v3.2.0 (tools#50) + server v3.1.0 ToolKind (server#178) + worker v5.15.2 credential aliases (worker#71). Phases 3–7 remain design / RFC. Gateway push (Mode C) is Phase 3; spool Phase 4. · Primary repo: repos/tools (the tool surface) — spans server / gateway / worker / cli / ops. · Relates to: System Pool Design (#46), Secrets Wallet (#61), Rust Server Port (#49).

This page IS the RFC. It is grounded in the code as it exists on 2026-06-11 — every "today" claim cites a file + line. Read the Execution Model and Data Access Boundary pages first; the design is constrained by both.

Revision history:

  • v2 — (1) a first-class catalog type kind: Subscription that completely isolates this workload class (own type, runtime, command segment, lifecycle, scaling); (2) a configurable store-and-forward spool layer (§8). See §2.1, §4.1, §8.
  • v3 — a configurable header / attribute directive layer (§7): per-source metadata (Pub/Sub attributes, Kafka/NATS headers, HTTP headers) normalized into one headers map and, via an opt-in allowlist, honored as instructions — header-driven redirect/pool routing, idempotency key, content hints, and W3C trace/mesh propagation. Untrusted by default; push-ingress directive trust is gated on auth.

1. Problem statement

NoETL playbooks can fetch from a message source on a bounded cadence but cannot subscribe to one.

The nats tool's js_consume operation is deliberately a bounded pull-consumer fetch — it asks a named durable consumer for up to batch messages, waits at most timeout_ms (capped at 5000ms), and returns whatever it got (repos/tools/src/tools/nats.rs:8-20). The header comment states the constraint explicitly:

The tool deliberately does NOT expose long-lived subscriptions or push consumers — those would hold a worker slot indefinitely while waiting for an external event, which violates the NoETL execution model.

That constraint is correct and load-bearing. A worker slot is an atomic compute block (agents/rules/execution-model.md); the pull loop acquires a semaphore permit per command (repos/worker/src/worker.rs:234-250) and a block "must not hold a worker slot waiting for an external operation that takes more than a few seconds" (callback/hook rule).

So there is no first-class way for a playbook to say "subscribe to this Google Pub/Sub subscription / NATS stream / Kafka topic / inbound webhook, and run this playbook for every message". Users want exactly that, plus six hard requirements that a naive "just add a subscribe tool" design fails:

  1. Push and pull both. Pull = a runtime actively fetches + acks. Push = an endpoint receives messages/webhooks (Pub/Sub push, generic webhooks). Sources span Google Pub/Sub (both modes), NATS, Kafka, and must be extensible.
  2. IoT / high-volume isolation. A source can produce a firehose that would overwhelm the shared command stream and starve normal playbooks. This class must run on dedicated runtimes — an in-cluster KEDA pool AND an out-of-cluster option (Cloud Run) — so it never degrades the main server/worker path, while events still flow back for traceability.
  3. Same event model everywhere. Local (CLI), in-cluster, and out-of-cluster execution must all emit the same event-sourced records, so a listener is debuggable identically regardless of where it runs.
  4. Complete workload isolation as a first-class type (v2). This class is operationally and schematically different enough from request-driven playbooks that it deserves its own catalog type, not a flag on the ordinary playbook kind — its own validation, lifecycle, runtime, command segment, and scaling. See §2.1.
  5. Resilience / store-and-forward (v2). When a downstream the subscription depends on (target storage, a database, a Pub/Sub/Kafka topic it produces to) is unavailable, the workload must be able to accumulate incoming data in a fallback buffer (object store, or local disk in CLI mode) and replay it in order when the dependency recovers — all configurable. See §8.
  6. Header / attribute directives (v3). Each source carries a metadata channel (Pub/Sub attributes, Kafka/NATS headers, HTTP headers). When present, these can act as instructions that influence handling — redirect/routing, priority/pool, idempotency key, content hint, distributed-trace/mesh context — and it must be configurable and allowlisted, never implicit trust of arbitrary headers. See §7.

2. The core model

A listener is inherently long-lived. A worker tool is inherently atomic. These cannot be the same thing. The mistake to avoid is bolting a blocking subscribe() onto the Tool trait (repos/tools/src/registry.rs:132-144) — that reintroduces exactly the slot-holding the platform forbids.

The resolution is to split "listening" from "processing" and give each a home that matches its lifetime:

        SOURCE                INGRESS (long-lived)            PROCESSING (atomic)
   ┌───────────────┐
   │ Pub/Sub pull  │──┐
   │ NATS stream   │  │   ┌──────────────────────┐      ┌──────────────────────┐
   │ Kafka topic   │──┼──►│  Subscription runtime│ per  │  Normal NoETL         │
   └───────────────┘  │   │  normalize headers → │ msg  │  execution on a       │
                      │   │  directives (§7) →   │─────►│  DEDICATED pool:      │
   ┌───────────────┐  │   │  fetch + ack + spool │      │  POST /api/execute    │
   │ Pub/Sub push  │  │   └──────────────────────┘      │  → noetl.commands.    │
   │ Webhook POST  │──┴──►┌──────────────────────┐      │      <pool>.<eid>      │
   └───────────────┘      │  Gateway push-ingress│─────►│  → dedicated worker   │
                          │  (auth → directives) │ per  │  → tool dispatch      │
                          └──────────────────────┘ msg  │  → call.done event    │
                                                        └──────────────────────┘
                              ▲ spool on downstream-down │
                              │                          ▼
                          ┌────────────────────────────────────┐
                          │  Store-and-forward spool (§8)       │
                          │  object store / local disk;         │
                          │  noetl://spool/... refs; ordered     │
                          │  replay on recovery; event-logged   │
                          └────────────────────────────────────┘

2.1 kind: Subscription — a first-class catalog type (v2, recommended)

This RFC's recommended model is a dedicated catalog entry type, kind: Subscription, registered alongside the ordinary kind: Playbook. This is the answer to the old open question (distinct kind vs a trigger: block) — distinct kind wins, because the two workload classes differ on every axis that matters:

Axis kind: Playbook (request-driven) kind: Subscription (source-driven)
Trigger POST /api/execute from a user / gateway / parent A source: it activates and runs until deactivated
Lifetime Ephemeral — one DAG run, then gone Long-lived — a registered, activatable resource
Scheduler Orchestrated as a DAG of steps Activated on a runtime; per-message it dispatches an ordinary run
Runtime Shared worker pool (noetl.commands.shared) Dedicated pool + command segment (noetl.commands.subscription / a per-subscription segment)
Scaling KEDA on shared-command lag KEDA on source backlog (Pub/Sub undelivered / Kafka lag) + a dedicated execution pool
Lifecycle ops execute / cancel register → activate → pause / resume → drain → deactivate
Validation step/tool DAG schema source + mode + dispatch + headers + spool schema (no workflow: DAG required)
RBAC per-execution activation is a privileged op (it opens a long-lived ingress)

Modelling it as a flag on an ordinary playbook would entangle this lifecycle into the step orchestrator, force subscription concerns through the DAG validator, and put the firehose on the shared command stream by default. A distinct type keeps the isolation schematic (different catalog kind, different validation) and operational (different runtime, pool, segment, scaler) — exactly requirement 4.

A kind: Subscription entry lives in the catalog (noetl.catalog) like any other, but the control plane treats it differently: it is never dispatched as a one-shot DAG. Instead a subscription runtime activates it (pull/continuous) or the gateway registers its ingress route (push). The three prongs below are the activation modes of one kind: Subscription type.

2.2 Three activation modes (prongs), each reusing existing primitives

  • Mode A — bounded-drain (scheduled). mode: pull, activation: scheduled. The subscription's fetch runs as an atomic bounded drain — a registry tool (tool: subscription, op poll) modelled byte-for-byte on js_consume — driven on a cadence by the orchestrator (the system/projector pattern, System Pool Design). Fits the worker slot contract with zero new runtime. Ships first (Phase 1).
  • Mode B — continuous (dedicated runtime). mode: pull, activation: continuous. A long-lived subscription runtime (a new run-mode of the worker workspace, like the system pool reused the worker binary) holds the subscription open and turns each message into a normal POST /api/execute. Runs on an in-cluster KEDA-scaled dedicated pool or out-of-cluster on Cloud Run.
  • Mode C — push (gateway-fronted). mode: push. Webhooks and Pub/Sub push land on the gateway, which already is the authenticated ingress edge (repos/gateway/src/main.rs:230-288). The gateway verifies the token/secret, maps the request to an execution, returns 2xx. No long-lived process — the source holds the "subscription," NoETL receives.

In all three, the message becomes a normal execution_id-scoped run on the subscription's dedicated pool. Listening is decoupled from the atomic block model; the block model is never violated.

3. Fit with the current tool / dispatch / event-sourcing model

3.1 The bounded-drain tool (Mode A) is a registry tool like any other

Adding it touches the same surfaces every tool kind touches (repos/tools/src/tools/mod.rs:56-79, the 18 existing kinds):

  • New repos/tools/src/tools/subscription.rs implementing the Tool trait — name() -> "subscription", async execute() returning a ToolResult with the fetched batch in data.
  • registry.register(SubscriptionTool::new()) in create_default_registry().
  • Serde config struct parsed from the flattened ToolConfig.config (repos/tools/src/registry.rs:13-33).
  • CLI Tool enum variant for local parsing (repos/cli/executor/src/playbook.rs:336-390).
  • Observability triad per agents/rules/observability.md: a tool.dispatch.subscription span, a noetl_subscription_messages_fetched_total{source} counter, execution_id on every line.

It is js_consume generalised across backends. The worker dispatch match (repos/worker/src/executor/command.rs:205-457) needs no change — registry lookup is generic.

3.2 The subscription runtime (Mode B) reuses the worker binary's shape, not its loop

The system worker pool was built by reusing the existing noetl/worker Rust binary with distinct configuration — "NOT a new compiled binary" (System Pool Design, Phase 1). The subscription runtime follows the same philosophy but inverts the loop: where the worker pulls commands and runs blocks, the runtime pulls source messages and emits executions. It is a thin new run-mode (a noetl subscribe / noetl-subscription build of the same workspace) whose body is:

loop {
    let batch = source.fetch(batch_size, timeout).await?;   // Pub/Sub / NATS / Kafka client
    for msg in batch {
        let env = normalize_headers(msg);                   // §7.1 — uniform headers map
        let route = resolve_directives(env, spec.headers);  // §7.2 — allowlisted instructions
        match dispatch(env, route) {                        // POST /api/execute (target pool from route)
            Ok(_)            => source.ack(msg).await?,      // ack only after server 202s
            Err(downstream)  => spool_or_stop(msg, downstream).await?,   // §8
        }
    }
}

Crucially the runtime does not execute playbook logic and does not touch noetl.* tables — it calls the server API (agents/rules/data-access-boundary.md). It is an ingress producer. The actual work runs on the subscription's dedicated pool via noetl.commands.<pool>.<eid> (extending the pool_segment routing at repos/server/src/handlers/execute.rs:705-709, which today maps system/system, else shared).

3.3 Gateway push-ingress (Mode C) extends the existing callback edge

The gateway already terminates inbound async callbacks (POST /api/internal/callback, repos/gateway/src/main.rs:237) and runs per-request auth middleware (repos/gateway/src/auth/middleware.rs:20-114). A webhook/push endpoint is the same pattern with a different verifier (§6) — and directive trust is gated on that verification (§7.5). The gateway never touches domain data — it authenticates and forwards, exactly its charter (gateway = gatekeeper only, Execution Model).

3.4 Event-sourcing: the message's life is the execution's event log

Every ingested message produces an execution_id (repos/server/src/handlers/execute.rs:84-88, snowflake at the application boundary per agents/rules/observability.md Principle 3) and a playbook.started event. The source message id, normalized headers, applied directives, and trace context ride in the event meta. From there it is an ordinary run: command.issuedcommand.claimedcall.doneplaybook.completed (repos/server/src/db/models/event.rs:10-53). Subscription lifecycle, directive, and spool operations get their own event types (§4.3, §7.6, §8.4). The event log is the traceability story — the same in local, in-cluster, and out-of-cluster modes, because all three emit the same envelope. An out-of-cluster Cloud Run runtime emits via POST /api/events / POST /api/execute over HTTPS, never via a private DB connection.

4. Catalog + playbook-facing schema (YAML)

4.1 kind: Subscription — the recommended shape

One catalog type carries all three activation modes. The dispatch block names the ordinary playbook to run per message. runtime, pool, headers, spool, and scaling are the isolation + routing + resilience knobs.

# Continuous pull on a dedicated in-cluster pool (Mode B) — IoT scale.
apiVersion: noetl.io/v1
kind: Subscription
metadata:
  name: iot-sensor-stream
  path: subscriptions/iot-sensor-stream
spec:
  source: pubsub                          # pubsub | nats | kafka | webhook
  mode: pull
  activation: continuous                  # continuous (Mode B) | scheduled (Mode A)
  auth: "pubsub_iot"                       # Secrets Wallet alias — no default (no-default-connection.md)
  subscription: "projects/acme/subscriptions/sensors"

  runtime:
    target: dedicated-pool                # dedicated-pool | cloud-run | local
    pool: iot                             # dedicated runtime pool name
    batch: 500
    max_in_flight: 1000                   # outstanding un-acked / un-spooled cap (backpressure)

  dispatch:
    playbook: domain/ingest_sensor_reading
    payload_from: message.json            # message.body | message.json | message.attributes
    execution_pool: iot                   # default downstream segment → noetl.commands.iot.<eid>

  headers:                                # §7 — header-directive layer (optional; default off)
    normalize: true
    directives:                           # the ALLOWLIST — only these keys act as instructions
      - header: "x-noetl-pool"
        controls: dispatch.execution_pool
        allowed: ["iot", "priority"]
      - header: "x-idempotency-key"
        controls: idempotency_key
    trace:
      propagate: w3c                      # honor traceparent / tracestate / baggage
      baggage_allowlist: ["tenant"]
    passthrough: data                     # non-allowlisted headers → message.headers (data only)

  spool:                                  # §8 — store-and-forward (optional; default off)
    mode: hybrid                          # off | buffer_and_ack | hybrid
    backend: gcs                          # gcs | s3 | nats_object | local_disk
    bucket: "gs://acme-noetl-spool/iot"
    auth: "gcs_spool"
    circuit: { trip_after: 5, probe_after_ms: 30000 }
    ordering: per_key
    ordering_key: "device_id"
    retention: { max_age_hours: 72, max_bytes: "50Gi", on_full: stop_acking }
    drain: { on_recovery: ordered_then_live, rate_per_sec: 200, max_replay_attempts: 10 }

  scaling:
    keda: { trigger: pubsub, min: 1, max: 20, lag_threshold: 500 }
# Scheduled bounded drain (Mode A) — ships first, no new runtime.
apiVersion: noetl.io/v1
kind: Subscription
metadata: { name: orders-drain, path: subscriptions/orders-drain }
spec:
  source: nats
  mode: pull
  activation: scheduled                   # orchestrator drives a bounded fetch on cadence
  cadence: "*/1 * * * *"                  # or KEDA on consumer lag
  auth: "nats_main"
  stream: "ORDERS"
  consumer: "orders-drain"                # durable consumer must already exist
  runtime: { target: dedicated-pool, pool: subscription, batch: 50, timeout_ms: 3000 }
  dispatch: { playbook: domain/process_order, payload_from: message.json }
  # headers + spool omitted → defaults off; a scheduled drain naturally
  # stop-acks (the source holds the backlog) — see §8.5.
# Push: Pub/Sub push fronted by the gateway (Mode C).
apiVersion: noetl.io/v1
kind: Subscription
metadata: { name: billing-events, path: subscriptions/billing-events }
spec:
  source: pubsub
  mode: push
  ingress:
    gateway_path: /ingress/billing-events
    verify:
      type: pubsub_oidc                   # Google-signed OIDC JWT on the push request
      audience: "https://gw.noetl.acme/ingress/billing-events"
      service_account: "pubsub-push@acme.iam.gserviceaccount.com"
  dispatch:
    playbook: domain/handle_billing_event
    payload_from: message.json
    execution_pool: subscription
  spool:
    mode: buffer_and_ack                  # push/webhook can't redeliver → buffer is the durability tier
    backend: gcs
    bucket: "gs://acme-noetl-spool/billing"
    auth: "gcs_spool"
# Push: generic inbound webhook (Mode C), HMAC-verified.
apiVersion: noetl.io/v1
kind: Subscription
metadata: { name: stripe-webhook, path: subscriptions/stripe-webhook }
spec:
  source: webhook
  mode: push
  ingress:
    gateway_path: /ingress/stripe
    verify:
      type: hmac_sha256                   # signature over the raw body
      header: "Stripe-Signature"
      secret: "stripe_webhook_secret"     # Wallet alias — NOT an env var
  dispatch: { playbook: domain/handle_stripe_event, payload_from: message.body }
  spool: { mode: buffer_and_ack, backend: gcs, bucket: "gs://acme-noetl-spool/stripe", auth: "gcs_spool" }
# Kafka continuous pull on Cloud Run (Mode B, out-of-cluster).
apiVersion: noetl.io/v1
kind: Subscription
metadata: { name: edge-clickstream, path: subscriptions/edge-clickstream }
spec:
  source: kafka
  mode: pull
  activation: continuous
  auth: "kafka_edge"
  topic: "edge.clicks"
  group: "noetl-edge"
  runtime:
    target: cloud-run                     # runs OUTSIDE the main k8s cluster
    server_url: "https://api.noetl.acme"
    concurrency: 80                       # Cloud Run per-instance concurrency = backpressure knob
    max_instances: 50
  dispatch: { playbook: domain/ingest_click, payload_from: message.json, execution_pool: shared }
  spool: { mode: hybrid, backend: gcs, bucket: "gs://acme-noetl-spool/clicks", auth: "gcs_spool" }

4.2 The bounded-drain tool: subscription (Mode A internals)

activation: scheduled compiles to a normal playbook step using a new atomic tool kind — the unit Phase 1 ships. Also usable standalone inside any ordinary playbook:

- step: drain_telemetry
  tool:
    kind: subscription
    source: pubsub
    operation: poll                       # bounded fetch — returns immediately
    auth: "pubsub_telemetry"
    subscription: "projects/acme/subscriptions/telemetry-pull"
    batch: 100
    timeout_ms: 4000
    ack: on_success                       # on_success | auto | manual(return ack ids)
  next:
    - step: process_batch                 # downstream steps fan out over drain_telemetry.messages

4.3 Subscription lifecycle events

Activation is a state machine, recorded in the event log so the operator can replay "when did this subscription go down / drain / pause": subscription.registered, subscription.activated, subscription.paused, subscription.resumed, subscription.draining, subscription.deactivated. New event_type values alongside the existing taxonomy (repos/server/src/db/models/event.rs:10-53).

5. Architecture per execution target

5.1 In-cluster dedicated pool (KEDA-scaled)

The same KEDA shape the worker pools already use — one ScaledObject per pool, NATS-JetStream consumer-lag trigger (repos/ops/ci/manifests/keda/scaledobject-worker-rust-pool.yaml:36-63). Two complementary scalers:

  • Subscription runtime pool scales on source backlog (Pub/Sub num_undelivered_messages, Kafka consumer lag, NATS consumer lag). min 1 (a pull runtime must always hold the subscription); max sized to source partitions / Pub/Sub flow-control.
  • Dedicated execution pool for the subscription's runs scales on the noetl.commands.<pool> JetStream consumer lag — identical to scaledobject-worker-system-pool.yaml but a new iot / subscription segment via the same pool_segment routing.

Isolation boundary: the firehose lands on noetl.commands.iot.*, consumed by noetl_worker_pool_iot, scaled independently. A backlog on IoT traffic scales the IoT pool, never touching noetl_worker_pool_shared. The shared command stream and shared worker pool are unaffected — the in-cluster answer to requirements 2 + 4. Header-driven x-noetl-pool routing (§7.3) lands a message on whichever allowlisted segment the directive selects, so even within a subscription, priority traffic can land on a separate pool.

5.2 Out-of-cluster (Cloud Run)

For IoT scale that should not consume cluster capacity at all, the runtime runs as a Cloud Run service:

  • Pull mode: Cloud Run service with min-instances ≥ 1 holding the Pub/Sub/Kafka subscription, fetching + dispatching via HTTPS to the control-plane server_url. Scales on Cloud Run's own autoscaler.
  • Push mode: Cloud Run is the natural push target — Pub/Sub push or a webhook hits the URL directly (or via the gateway). Scales 0→N on request rate; --concurrency is per-instance backpressure; --max-instances the ceiling.

Isolation boundary: out-of-cluster means the firehose never enters the cluster network until it is already a well-formed execution request. The runtime authenticates to the control plane with a service-account bearer token (the NOETL_INTERNAL_API_TOKEN shape the system pool uses, System Pool Design "Auth flow").

Event flow back: the Cloud Run runtime emits POST /api/execute (plus subscription/directive/spool audit events via POST /api/events) over HTTPS. It holds no DB connection (agents/rules/data-access-boundary.md).

5.3 CLI local mode

Local mode today runs a playbook in-process with no server/NATS and emits a RunOutcome to stdout (repos/cli/src/playbook_runner.rs:47-378). The executor already has a pluggable EventSinkStdoutEventSink for CLI, NatsEventSink for the worker, same event shape, different sink (repos/cli/executor/src/events.rs).

Extend local mode with noetl subscribe <subscription.yaml>:

  • Holds the subscription in-process.
  • For each message runs the dispatch.playbook in-process (reusing the local PlaybookRunner), or posts to a configured server_url.
  • Honors the same header directives (§7) and emits event-sourced records via the EventSink: StdoutEventSink (pretty) or a new FileEventSink (one event/line JSONL).
  • Spool backend = local_disk (§8.6).

Same event model in local, in-cluster, and out-of-cluster — only the sink (stdout / file / POST /api/events), the spool backend, and the trace target vary.

6. Gateway auth + secret design (push / webhook)

The gateway is the only component that terminates untrusted inbound traffic, so push/webhook verification lives there. It stays a gatekeeper — verify, then forward; never touch domain data.

Where secrets live. Every verification secret (HMAC signing key, expected bearer token, Pub/Sub audience config) is a Secrets Wallet entry, resolved by alias — never a gateway env var. The wallet uses forward-only envelope encryption: a per-record DEK (AES-256-GCM) wrapped by a KEK from the configured provider (local dev / gcp-kms) (repos/server/src/crypto/envelope.rs, repos/server/src/services/{keychain,credential}.rs).

verify.type Check the gateway performs Secret source
hmac_sha256 Recompute HMAC over the raw body, constant-time compare against the header value Wallet alias
bearer Constant-time compare Authorization: Bearer against expected token (secrets.compare_digest shape the server already uses for /api/internal/*) Wallet alias
pubsub_oidc Validate the Google-signed OIDC JWT: signature vs Google JWKS, aud == configured audience, email == configured push SA Config in the Subscription spec; Google JWKS public
none Reject — push ingress always verifies

Flow (extends repos/gateway/src/main.rs:230-288 public routes):

POST /ingress/{subscription}  (no session auth — a source, not a user)
  1. look up the Subscription spec by gateway_path
  2. resolve verify secret from the Wallet (alias → decrypted value)
  3. verify per the table above
       fail → 401/403, log subscription.message.rejected, source retries
  4. ON SUCCESS ONLY: parse header directives (§7) — never before
  5. forward POST {server}/api/execute  (or spool, if circuit open — §8)
  6. return 2xx so the source acks

Observability triad: span gateway.ingress.{subscription}, counters noetl_ingress_received_total{subscription} / noetl_ingress_rejected_total{subscription,reason}, execution_id on the forward.

7. Header / attribute directives (v3)

Every source carries a metadata channel alongside the payload — Pub/Sub message attributes, Kafka record headers, NATS headers, and HTTP headers for webhook/push ingress. These can act as instructions that influence handling — route the message elsewhere, raise its priority, supply an idempotency key, carry a trace context. This must be opt-in and allowlisted: NoETL never trusts arbitrary inbound headers to drive routing.

7.1 Normalization — one uniform headers map

Regardless of source, the runtime normalizes the metadata channel into a single message.headers map on the ingested envelope (lowercased keys, string values; multi-value headers arrayed). The raw per-source map is preserved as message.attributes for round-trips.

Source Channel Normalized into
Pub/Sub PubsubMessage.attributes (map) message.headers
Kafka record headers (key → bytes) message.headers (utf-8 decoded)
NATS message headers (multi-value) message.headers
Webhook / Pub/Sub push HTTP request headers message.headers

payload_from (§4.1) selects the body; message.headers is always available to the dispatched playbook and to the directive engine.

7.2 Directive mapping (configurable allowlist)

A headers: block declares which keys are recognized as instructions and what each controls. Only keys in this allowlist act as directives; all other headers pass through as data into message.headers and can never influence routing/pool/dedup.

headers:
  normalize: true                         # build message.headers from the source channel
  directives:                             # the ALLOWLIST — only these keys are instructions
    - header: "x-noetl-route"
      controls: dispatch.playbook          # header-driven redirect to a different target playbook
      allowed: ["domain/handle_billing", "domain/handle_fraud", "domain/handle_event"]
    - header: "x-noetl-pool"
      controls: dispatch.execution_pool    # route to a different worker pool / command segment
      allowed: ["iot", "priority", "shared"]
    - header: "x-priority"
      controls: priority
      map: { high: priority, normal: shared }
    - header: "x-idempotency-key"
      controls: idempotency_key            # feeds the dedup window (§10 OQ2) + spool item key
    - header: "content-type"
      controls: content_type               # parse hint for the playbook
  trace:
    propagate: w3c                          # honor traceparent / tracestate / baggage
    baggage_allowlist: ["tenant", "request_id"]
  passthrough: data                         # non-allowlisted headers → message.headers (data only)

Directive categories (the controls a header may bind to):

controls Effect Value constraint
dispatch.playbook Redirect — run a different target playbook than the subscription default allowed: list (no arbitrary playbooks)
dispatch.execution_pool Route the run to a different worker pool / command segment allowed: list
priority Map to a pool/segment by priority class map: value→pool
idempotency_key Feed the dedup window (§10 OQ2) + the spool item key free (a key, not a target)
content_type / schema_hint Tell the playbook how to parse the body free (hint only)
trace (§7.4) W3C trace context + allowlisted baggage baggage_allowlist

7.3 Redirect — header-driven dynamic dispatch

The runtime / gateway resolves the effective dispatch.playbook + execution_pool from the directives before calling POST /api/execute. An execution_pool override changes the pool_segment so the run lands on noetl.commands.<override>.<eid> — the same routing seam the subscription/system segments use (repos/server/src/handlers/execute.rs:705-709). So one subscription can fan messages to different playbooks and different pools based on a trusted header, while the allowed: value list prevents routing to anything not pre-approved.

# Redirect example — one subscription, header-driven targets.
apiVersion: noetl.io/v1
kind: Subscription
metadata: { name: events, path: subscriptions/events }
spec:
  source: pubsub
  mode: push
  ingress:
    gateway_path: /ingress/events
    verify: { type: pubsub_oidc, audience: "https://gw.noetl.acme/ingress/events", service_account: "pubsub-push@acme.iam.gserviceaccount.com" }
  dispatch:
    playbook: domain/handle_event           # DEFAULT target
    payload_from: message.json
    execution_pool: shared                   # DEFAULT pool
  headers:
    directives:
      - header: "x-noetl-route"
        controls: dispatch.playbook
        allowed: ["domain/handle_billing", "domain/handle_fraud", "domain/handle_event"]
      - header: "x-noetl-pool"
        controls: dispatch.execution_pool
        allowed: ["priority", "shared"]
    trace: { propagate: w3c }
  # A push carrying  x-noetl-route: domain/handle_fraud,  x-noetl-pool: priority
  # — on a request that PASSED pubsub_oidc — runs handle_fraud on the priority
  # pool instead of the default, and ONLY because both values are allowlisted.

7.4 Service-mesh / distributed-trace propagation

Today NoETL correlates with request_id (gateway, repos/gateway/src/sse.rs:266) and execution_id (rides every wire format, agents/rules/observability.md Principle 4). There is no W3C trace propagation yet (no traceparent/opentelemetry in the Rust tree). This layer adds it:

  • On ingest, if trace.propagate: w3c and a traceparent header is present, the runtime extracts the W3C trace context (traceparent + tracestate + allowlisted baggage).
  • It is stamped onto the execution's event meta.trace (repos/server/src/db/models/event.rs meta JSONB; repos/server/src/handlers/execute.rs:630 already threads meta) and set as a span attribute, then propagated on the command.issued NATS message and into any child executions the playbook spawns (sub-playbook dispatch carries the same trace context). So a message is traceable from the upstream mesh hop → gateway/runtime → execution → child runs → event log, joining NoETL's spans to the caller's distributed trace.
  • Mesh routing metadata (e.g. x-mesh-*) can be carried as baggage and surfaced to the playbook, but only allowlisted baggage keys cross the boundary.

execution_id remains the primary NoETL trace key; traceparent is the external join so cross-system traces stitch together. (Per Observability Principle 4, neither rides a Prometheus metric label — cardinality stays bounded; trace context lives in meta + spans.)

7.5 Security — untrusted by default

  • Allowlist only. Only directives[].header keys are honored as instructions; everything else is data. No implicit trust of arbitrary headers.
  • Value allowlists. allowed: / map: constrain even an allowlisted header so it can't pick an arbitrary playbook / pool / credential.
  • Push/webhook: directive trust is gated on auth. A push/webhook request's headers are honored as directives only after the request passes HMAC / bearer / OIDC verification (§6). Verification runs first (§6 flow step 3); a request that fails is rejected before any directive is parsed (step 4), so an unauthenticated caller can never drive routing. For pull sources (Pub/Sub/Kafka/NATS) the channel is authenticated by the subscription's own Wallet credential, so its attributes/headers are trusted to the same degree as the stream itself — still constrained to the allowlist.
  • Secrets Wallet boundary. Directive values are not secrets. A directive may name a credential alias (e.g. redirect to a downstream that needs auth) only if the alias is itself allowlisted — a header can never select an arbitrary credential. The alias resolves through the Wallet as usual; the decrypted value never rides a header or the event log.

7.6 Directives in the event log

Which directives were applied is recorded so handling is auditable: a subscription.message.directives_applied event (or the ingest event's meta) carries { message_id, applied: [{header, controls, effective_value}], route_override: {playbook, pool}, trace: {traceparent, tracestate} }. The normalized message.headers ride in the execution's meta. An auditor can see exactly how a header changed routing, which pool a message landed on, and the external trace it belongs to.

7.7 Across the three runtimes

The normalization + directive engine lives in the shared ingest path:

  • In-cluster runtime / Cloud Run: same engine; pull-source headers trusted via the subscription credential.
  • Gateway push: the engine runs after auth verification (§6, §7.5); only then are directives honored.
  • CLI local: same engine; headers from the local source client; directives + trace recorded to the FileEventSink.

8. Resilience — store-and-forward spool (v2)

The default backpressure model (§9) is stop-acking → source redelivers: cheapest and most durable, because the source (Pub/Sub / Kafka / JetStream) is the durable buffer. But it has two gaps:

  1. Bounded source retention. A multi-hour downstream outage can exceed Pub/Sub's redelivery window or a short-retention Kafka topic — data is lost to the source's own GC.
  2. Push/webhooks don't redeliver reliably. A generic webhook that gets a non-2xx may retry a few times then give up. Stop-acking has nothing to stop — the data is gone.

The spool is a configurable store-and-forward layer that closes both gaps: when a downstream the subscription depends on is unavailable, accumulate incoming messages in a durable fallback buffer and replay them in order on recovery.

8.1 What "downstream" means + circuit-breaker trip

"Downstream" = whatever the dispatch.playbook ultimately depends on to make progress: a target object store / DB it writes to, or a Pub/Sub / Kafka topic it produces to. Detection is a circuit breaker keyed per downstream dependency:

  • The runtime watches dispatch outcomes. A run that fails with a downstream-unavailable signal (connection refused / timeout / 5xx / call.error whose tool error is classified retryable — repos/worker/src/executor/command.rs already classifies terminal vs retryable) increments the breaker.
  • circuit.trip_after consecutive failures open the circuit → the subscription transitions to spooling: new messages go straight to the spool instead of attempting the failing downstream.
  • After circuit.probe_after_ms the breaker goes half-open — one probe dispatch. Success → closed → drain (§8.3). Failure → re-open.

Circuit state is persisted (NATS KV in-cluster, a small server endpoint or KV for Cloud Run, a local file in CLI) so it survives a runtime restart mid-outage.

8.2 Buffer-and-ack vs stop-acking — the durability tradeoff (configurable)

The crux, exposed as spool.mode:

spool.mode Behaviour when downstream is down Durability owner Cost Use when
off (default) Stop-acking. Don't ack; let the source redeliver later. The source None Pull sources with generous retention; short blips
buffer_and_ack Write the message to the spool backend, then ack the source. The spool backend (e.g. GCS 11-nines) 1 object write / msg Push/webhooks (can't redeliver); long outages; short-retention sources
hybrid Stop-acking first (absorb short blips on the source); escalate to buffer-and-ack when the source nears its redelivery/retention limit, or always-buffer for push. Source, then spool Writes only during sustained outage The general recommendation for pull at scale

buffer_and_ack survives arbitrarily long outages and non-redelivering sources, but moves durability from the battle-tested source to the spool backend (and adds a write per message). off is free and leans on the source. hybrid gets the best of both.

8.3 Spool backend, ordering, retention, drain

Backend (configurable) reuses existing object-store primitives — no new storage subsystem:

  • gcs / s3: spool object written via the tenant's bucket credential (an external system → keychain-resolved, direct access allowed, agents/rules/data-access-boundary.md). Mirrors how the artifact / result_fetch tools and the Result Store stage payloads under noetl:// refs (repos/server/src/services/result_store.rs, repos/tools/src/tools/artifact.rs).
  • nats_object: the NATS Object Store ops already in the nats tool (object_put / object_get / object_list).
  • local_disk: a directory, for CLI mode.

Each spooled item is one object with metadata { subscription, source, message_id, recv_seq (snowflake), sha256, ordering_key, attempts }. Payload bytes go to the object store; the metadata + a noetl://spool/... ref go to the event log (§8.4) — the same payload-ref split the Result Store already uses.

Ordering on replay. spool.ordering: global (strict receive order, serialises the drain), per_key (per ordering_key lanes — per device_id, Kafka partition, Pub/Sub ordering key; recommended for IoT), none (any order, max throughput).

Retention. max_age_hours + max_bytes; on_full: stop_acking (fall back to source-redelivery), drop_to_dlq, or alert_only.

Drain on recovery. Circuit closes → drain replays in ordering order at drain.rate_per_sec. drain.on_recovery: ordered_then_live (drain backlog fully before resuming live — preserves global order) or interleave (resume live + drain concurrently — lower latency, safe only with per_key/none). An item failing drain.max_replay_attempts times is a poison message → dead-lettered.

8.4 Spool + the event log (traceability)

Every spool operation is event-sourced: subscription.message.spooled { message_id, recv_seq, spool_ref: "noetl://spool/...", reason, sha256 }, subscription.circuit.opened / subscription.circuit.closed, subscription.spool.draining, subscription.message.replayed { message_id, recv_seq, execution_id }, subscription.message.dead_lettered. A reviewer can reconstruct an entire outage from the log; payload bytes live in the object store under the ref, metadata in noetl.event (same discipline as the Result Store).

8.5 Interaction with the scheduled bounded drain (Mode A)

Mode A naturally stop-acks — it fetches a bounded batch, and if the downstream is down the batch simply isn't acked and the next cadence re-fetches it. So Mode A defaults to spool: off (the source is the buffer). It only needs a spool for a non-redelivering / short-retention source, where buffer_and_ack applies identically.

8.6 Spool across the three runtimes

Runtime Spool backend Circuit state Event sink
In-cluster dedicated pool gcs / s3 / nats_object NATS KV POST /api/events
Cloud Run (out-of-cluster) gcs / s3 (keychain-auth, external) NATS KV or a server KV endpoint POST /api/events over HTTPS
CLI local local_disk local file FileEventSink (JSONL)

9. Scaling + backpressure

Mode Scale signal Backpressure mechanism
Bounded-drain (A) Orchestrator cadence + batch Bounded by construction; slow downstream → deeper backlog next drain (or spool, §8.5). No unbounded buffer.
In-cluster runtime (B) KEDA on source backlog max_in_flight caps outstanding un-acked/un-spooled messages; runtime stops fetching at the cap. Downstream down → stop-ack or spool (§8).
In-cluster execution pool (B) KEDA on noetl.commands.<pool> lag Dedicated pool absorbs the firehose independently of shared; header x-noetl-pool can split priority traffic further (§7.3).
Cloud Run runtime (B) Cloud Run autoscaler (req/CPU/concurrency) --concurrency bounds per-instance in-flight; --max-instances the ceiling.
Gateway push (C) Source push rate; gateway scales as a stateless deployment Reject 429/503 when downstream saturated → source backs off; OR buffer_and_ack to spool (§8.2) for non-redelivering sources.
CLI local n/a (single process) In-process max_in_flight; loop blocks fetching at the cap.

Unifying principle: NoETL never buffers an unbounded firehose in process memory. Un-acked messages stay in the source (the durable buffer built for it); when the source can't be the buffer (long outage, non-redelivering push), the spool (§8) is the durable buffer instead. Every mode's backpressure is "stop acking / stop fetching / return non-2xx / spool-and-ack" — at-least-once delivery, idempotency handled by the playbook keying on message_id (carried in event meta, on every spooled item, and feedable via the idempotency_key header directive, §7.2).

10. Open questions / risks

Resolved:

  • Catalog kind: Subscription vs a trigger: block. Resolved: distinct kind: Subscription (§2.1).

Still open:

  1. Runtime packaging: new run-mode of noetl/worker or new binary? Lean: new noetl subscribe run-mode in the worker workspace, shared client + source crates. Decide at Phase 2.
  2. Exactly-once vs at-least-once — now sharper because the spool replays AND a header can supply an idempotency_key. All sources are at-least-once; spool drain re-delivers. Do we offer an opt-in server-side dedup window (noetl.ingest_dedup(idempotency_key | message_id, subscription, seen_at) behind /api/internal/ingest)? Costs a DB write per message. Lean: document the idempotency contract (prefer the header idempotency_key, fall back to message_id); offer dedup opt-in for low-volume critical streams.
  3. Circuit-breaker scope — per-downstream-dependency vs per-subscription. Lean: per-declared-downstream, per-subscription default for the single-downstream case.
  4. Spool ordering vs throughput. Lean: per_key default with a documented ordering_key; none when order-independent.
  5. Spool object lifecycle, GC, cost ceiling at IoT volume. Lean: drained items deleted immediately; retention.max_bytes + on_full the ceiling; noetl_subscription_spool_bytes gauge.
  6. Poison messages under global ordering create a gap when dead-lettered. Lean: acceptable, with a subscription.message.dead_lettered event so the gap is auditable.
  7. Header-directive precedence + conflicts (v3). If a header redirects to a pool whose downstream circuit is open, does the redirected target get spooled, or the default? Lean: the spool/circuit decision happens on the resolved target, after the directive applies. Multi-value headers (Kafka allows duplicate keys): last-wins for directives, arrayed for data.
  8. idempotency_key directive vs message_id (v3). When both exist, which keys the dedup window + spool item? Lean: the explicit header wins; message_id is the fallback.
  9. Letting a header select among credential aliases (v3). Even allowlisted, is header-chosen auth acceptable? Needs a security review before it ships; default off — a header may pick a target, not a credential, in v1.
  10. Trace propagation depth (v3). Propagating traceparent into child executions is bounded by playbook nesting; confirm no unbounded fan-out of trace context and that it never becomes a metric label (Observability P4).
  11. Source-client dependency weight. Feature-gate per source.
  12. Pub/Sub push needs a public TLS OIDC endpoint. GKE Ingress in front of the gateway, or Cloud Run. Document in repos/ops.
  13. /api/execute per message at IoT scale. Lean: add POST /api/execute/batch in the scale-hardening phase; start per-message.
  14. Ack timing depends on spool.mode (ack-after-dispatch / ack-after-spool-write); ack-after-completion stays opt-in.
  15. /ingress/* attack surface. Mandatory per-subscription verification (no verify: none), strict body-size limits, per-subscription rate limits, directive parsing only post-auth (§7.5). Threat-model before Phase 3.

11. Phased implementation plan

Each phase is independently shippable and kind-validated (agents/rules/deployment-validation.md) before the next.

Phase Deliverable Repos Depends on
1 ✅ Bounded-drain subscription tool (Mode A). tool: kind: subscription, operation: poll for nats (wrap js_consume), pubsub pull, kafka poll. Atomic registry tool, feature-gated source clients. A scheduled-drain example. Shipped 2026-06-11 — noetl-tools v3.2.0 (tools#50) + ops#169. tools (primary), worker (dep bump), ops (example) — (reuses worker model)
2 ✅ kind: Subscription type + runtime (Mode B) + header-directive engine (§7). New catalog kind + validation; continuous run-mode; event-sourced lifecycle; subscription pool segment in server routing + dedicated KEDA pool. Header normalization + directive allowlist + redirect/pool/idempotency/content + W3C trace into the execution meta + child propagation. Shipped 2026-06-11 — tools v3.3.0 (tools#52), server v3.2.0 (server#180), worker v5.16.0 (worker#73), ops (ops#171), e2e (e2e#42). Live E2E green. CLI parse deferred to Phase 6 (the noetl subscribe local mode owns it; Phase-2 registration is a thin server POST). server (catalog kind + routing + trace in meta), worker (run-mode + directive engine), tools (directive engine + factory), ops (pool + KEDA), cli (Phase 6) Phase 1; #46 routing
3 ✅ Gateway push-ingress (Mode C) + auth-gated directive trust. POST /ingress/{listener} routes; hmac_sha256 / bearer / pubsub_oidc verifiers; Wallet-alias secrets resolved via GET /api/internal/ingress/{listener}; directives parsed only after verification (§7.5) via the verify_then_plan invariant; ingress + directive audit events; Pub/Sub push + webhook. Shipped 2026-06-11 — gateway v3.3.0 (gateway#28) + server v3.3.0 (server#182) + ops (ops#172) + e2e (e2e#43). Live E2E green (HMAC 12/12 + bearer 12/12; Pub/Sub-push envelope proven; OIDC signature unit-proven). gateway (primary), server (config endpoint), ops (gateway token env) Phase 2; #61 Wallet
4 ✅ Store-and-forward spool (§8). Configurable spool (off / buffer_and_ack / hybrid); per-downstream circuit breaker; nats_object + local_disk backends (gcs/s3 same trait, tracked); ordered replay (global / per_key / none) + idempotency + dead-letter + retention/GC; 6 spool/circuit event types; circuit state in NATS KV. Shipped + live-validated on kind 2026-06-12 — tools v3.4.0 (tools#54), server v3.4.1 (server#184 + server#185), worker v5.17.0 (worker#75), ops (ops#173), e2e (e2e#44 + e2e#45). Live outage proof green: 6 buffered during outage (0 dispatched, no loss) → 6 drained+replayed in order on recovery → circuit cycled → idempotency held. tools (spool engine + backends), worker (run-loop wiring + KV), server (config validation + lifecycle fix), ops (downstream + NATS env), e2e (outage proof) Phases 2–3 (matters most for push, Phase 3)
5 ✅ Out-of-cluster Cloud Run target. Container + Cloud Run recipe; service-account bearer auth; HTTPS event/directive/spool flow-back; gcs spool backend. Shipped 2026-06-12 — tools v3.5.0 (tools#56) GcsBackend, worker v5.18.0 (worker#77) gcs wiring + bearer + $PORT bind, server v3.4.2 (server#187) gcs-credential-optional (ADC), ops (ops#175) automation/cloud-run/, docs (docs#179), e2e (e2e#47). Live out-of-cluster proof on Cloud Run (noetl-demo-19700101): 6/6 Pub/Sub messages → dispatched over HTTPS (cloudflared tunnel) → COMPLETED on the subscription pool; 1 message buffered to the real GCS bucket under a live outage (sha256 envelope, reason=circuit_open). Finding: real Pub/Sub sync-pull needs timeout_ms ≥ 10s (tools#57). ops (primary), worker (run-mode parity), docs Phase 2; Phase 4 for spool-on-CloudRun
6 ✅ CLI local noetl subscribe + FileEventSink + local_disk spool. In-process subscription honoring directives; event-sourced JSONL; local-disk spool. Shipped 2026-06-12 — cli v4.11.0 (cli#60, closes cli#59). cli-only: the source clients + spool engine already ship in noetl-tools v3.5.0 (Phases 1–5), so no tools change / crate cascade. Live local proof green (against in-cluster NATS on kind): 5 msgs → in-process PlaybookRunner dispatch → 5 COMPLETED, 19-event JSONL trail; local_disk spool outage: downstream down → 6 message.spooled (0 dispatched, no loss) → recovery → circuit closed → 6 message.replayed in order → drained to 0. Deterministic outage→replay→idempotency unit-proven (real engine). cli (primary; reuses noetl-tools source + spool unchanged) Phases 1, 2, 4
7 ✅ Scale hardening. POST /api/execute/batch (N→N executions, partial-failure contained, per-message routing/trace/dedup preserved); opt-in exactly-once dedup window (noetl.subscription_dedup, bounded-by-age, race-safe INSERT … ON CONFLICT, scoped by subscription, event-logged, default off — RFC §10 OQ1); per-subscription rate limits / backpressure (deterministic token-bucket RateGovernor: max_in_flight clamps the batch, max_dispatch_per_sec throttles the fetch side so the source keeps the backlog — no loss; subscription.rate_limited event). Shipped + live-validated on kind 2026-06-12 — server v3.5.0 (server#189), worker v5.19.0 (worker#79), ops (ops#176), e2e (e2e#48). Live: batch 12→12 COMPLETED on the subscription pool + per-message traceparent preserved + runtime used execute_batch; dedup duplicate→1 execution + subscription.message.deduplicated; rate-limit engaged + all 10 messages → executions (no loss). No tools change → no crate cascade. server (batch + dedup table), worker (runtime: batch/dedup/rate-limit), ops, e2e Phases 2–4

Phase 1 is the minimum cut — real value (unified bounded subscription fetch across Pub/Sub/NATS/Kafka) using only the existing worker/dispatch/event model, no new runtime/gateway/header surface. The header-directive engine lands in Phase 2 (it is a dispatch-layer feature: routing/pool/idempotency/content + W3C trace into the execution); auth-gated directive trust for push lands in Phase 3 with the gateway ingress. The spool (Phase 4) lands right after push, where buffer_and_ack is non-optional.

When each phase is dispatched, open per-submodule sub-issues per issue-tracking.md Tier 2, each linking up to #90. All 7 phases are shipped + live-proven — #90 is CLOSED (2026-06-12). The RFC's seven planned phases (bounded-drain tool → kind: Subscription runtime → gateway push → store-and-forward spool → Cloud Run → CLI local → scale hardening) are all live on kind / Cloud Run. Refinement follow-ups were spun out as separately-tracked ai-task issues at close: #91 (live OIDC signature), #92 (shared noetl-directives/noetl-spool crates), #93 (cross-restart spool drain auto-trigger), #94 (s3 spool backend wiring), and noetl/tools#57 (real-Pub/Sub pull default). None is a planned RFC phase or a load-bearing gap in the shipped surface.

12. Recent activity

Date Event
2026-06-12 (#91 — live OIDC signature validation) The push-ingress pubsub_oidc verifier is proven against the REAL Google JWKS — closing the one positive-path gap Phase 3 deferred (unit tests sign with a self-minted key). A genuinely Google-signed OIDC token was minted by impersonating the #90 Phase 5 runtime SA (--audiences + --include-email → exact Pub/Sub-push claims), then run through the gateway's verifier against the live JWKS (fetched via the gateway's own fetch_google_jwks): valid → verified; wrong-aud → oidc_wrong_audience; wrong-SA → oidc_wrong_sa; tampered → oidc_bad_signature. Full HTTP run (gateway binary → kind server, real token to /ingress/oidcbilling): 4 received → 1 dispatched (valid → 202 + one COMPLETED child 323957201221718016 on the subscription pool) → 3 rejected (tampered 401 / wrong-aud 403 / missing 401), zero executions from rejects (noetl_ingress_dispatched_total{oidcbilling}=1). Test-only gateway#30 (#[ignore]d live test) + runner e2e#50. No GCP cost-bearing resources created; the scoped tokenCreator binding was removed after the run. Merged + #91 CLOSED (gateway f175a87 + e2e f7a24de; test-only, no release cut).
2026-06-12 (Phase 7 shipped + live proof green — #90 CLOSED) Scale hardening (RFC §9 / §10) landed + live-proven — all 7 phases complete, #90 closed. (1) server v3.5.0 (server#189, closes server#188): POST /api/execute/batch — N execute requests in one round-trip, N executions, partial-failure contained (a bad item is an error result at its index, the rest still run), each item keeping its own path/pool/trace/dedup so per-message traceability + directive routing + W3C trace are intact (reuses the single-execute execute_one path; server still owns every DB write). Plus the opt-in exactly-once dedup window (RFC §10 OQ1): noetl.subscription_dedup (idempotent startup DDL, bounded by age, server-owned, on the cluster pool so it's a single authority); execute takes an optional dedup: { key, window_secs } scoped by parent_execution_id (the subscription); a duplicate within the window collapses to the existing execution + a subscription.message.deduplicated audit event (no second playbook_started, no fan-out); race-safe via INSERT … ON CONFLICT DO NOTHING; default off (a DB write per message is too costly at IoT scale). Structural validation of dispatch.batch_dispatch/batch_max, dedup, limits. Metrics noetl_execute_outcomes_total + noetl_execute_batch_size. (2) worker v5.19.0 (worker#79, closes worker#78): when dispatch.batch_dispatch: true the runtime drains a backlog and dispatches via execute_batch in chunks of batch_max (each item its own DispatchItem with playbook/pool/trace/dedup); opt-in dedup stamps the dedup block (idempotency_key→message_id, OQ8); per-subscription rate limits (RFC §9) via a new deterministic token-bucket RateGovernor (src/ratelimit.rs) — max_in_flight clamps the poll batch, max_dispatch_per_sec throttles the fetch side so over the cap the runtime stops fetching (source keeps the backlog, redelivers — no loss); a subscription.rate_limited event marks the off→on edge. Metrics noetl_subscription_batch_dispatch_total / _batch_messages_total / _rate_limited_total. (3) ops (ops#176): subscription_scale_hardened.yaml example. (4) e2e (e2e#48): kind_validate_subscription_scale.sh + 3 fixtures. No tools change → no crate cascade. Live proof on kind (server v3.5.0 + worker v5.19.0): batch — 12 pre-loaded → children=12 completed=12 pooled=12 traced=12 (12→12, all COMPLETED on the subscription pool, per-message traceparent preserved), runtime used execute_batch (server handled 5 batch calls); dedup — a duplicate (same x-idempotency-key) + a distinct key → children=2 deduplicated_events=1 (the dup collapsed to one execution), and direct-curl proved within-window→duplicate / outside-window→allowed / dedup-off→no-collapse; rate-limit — burst of 10 at max_dispatch_per_sec=2rate_limited_events=1 children=10 completed=10 (limit engaged, every message became an execution — no loss). Unit: server batch/dedup/validation + worker RateGovernor (throttle→recover no-loss), spec parse, dedup-key resolution, batch client shapes. #90 CLOSED — refinement follow-ups spun out: #91 (live OIDC), #92 (shared directives/spool crates), #93 (cross-restart spool drain), #94 (s3 spool wiring), tools#57 (real-Pub/Sub pull).
2026-06-12 (Phase 6 shipped + live local proof green) CLI local noetl subscribe (RFC §5.3) landed + live-proven — Phases 1–6 now complete. cli v4.11.0 (cli#60, closes cli#59): a noetl subscribe <spec.yaml> command that runs a kind: Subscription listener standalone in local mode — no k8s, no NATS-dispatch server for the listening itself — reusing the same noetl_tools::tools::source clients + header-directive engine + noetl_tools::spool engine the in-cluster worker runtime uses, and emitting the same ExecutorEvent envelope to a local FileEventSink (one event/line JSONL → replayable trail). Local dispatch model (RFC §5.3): in-process via PlaybookRunner (the pure-local default) or POST /api/execute (--dispatch server). local_disk spool (RFC §8.6): circuit breaker + buffer + ordered replay + idempotency + dead-letter against a local dir, circuit state in a local file, the six spool/circuit events to the FileEventSink. New src/subscribe/{mod,spec,sink,dispatch,runtime,spool}.rs + examples/subscribe/. cli-only — the source + spool surface already ships in noetl-tools v3.5.0, so no tools change / crate cascade (bumps the noetl-tools lock 3.0.0 → 3.5.0 via the executor's "3" constraint). Wiki: cli subscribe. Tests: 12 subscribe tests + full bin suite (53) green, incl. a deterministic local outage → local_disk spool → ordered replay → idempotency proof exercising the real engine. Live proof (local mode, in-cluster NATS on kind): (1) drain — published 5 → received=5 dispatched=5 failed=0, 19-event JSONL trail (lifecycle×4 + received×5 + playbook.started×5 + completed×5), every line round-trips as ExecutorEvent; (2) local_disk spool outage — tcp downstream down → circuit.opened6 message.spooled to local_disk (recv_seq-ordered keys on disk), 0 dispatched (no loss) → downstream up → circuit.closed + spool.draining6 message.replayed in receive order → spool drained to 0 (pending_spooled=0). Finding: the NATS source connects via async-nats ConnectOptions, which does NOT honor user:pass embedded in the URL — specs use explicit user/password fields (documented in examples/subscribe/). #90 stays open for Phase 7 (scale hardening, volume-gated).
2026-06-12 (Phase 5 shipped + live out-of-cluster proof green) Out-of-cluster Cloud Run target (RFC §5.2) + the gcs spool backend landed + live-proven. (1) tools v3.5.0 (tools#56): noetl_tools::spool::GcsBackend — the GCS implementation of the Phase-4 SpoolBackend trait over the JSON API, reusing the existing GcpAuth (ADC) + reqwest (no new dependency); one bucket / many subscriptions by prefix, live+dlq split, recv_seq-ordered keys, idempotent put/delete; gcs feature (default-on). Live round-trip proven against a real GCS bucket. (2) worker v5.18.0 (worker#77): wires spool.backend: gcs into the WORKER_MODE=subscription run-loop (ADC / Workload Identity; in-memory circuit out-of-cluster); optional NOETL_INTERNAL_API_TOKEN bearer auth to the control plane; $PORT-aware metrics/health bind (Cloud Run startup probe, no new HTTP code). (3) server v3.4.2 (server#187): gcs/s3 spool credential made optional — absent → ADC/Workload Identity (the Cloud Run platform-bucket path), present → keychain alias for a tenant bucket (the Phase-4 validation wrongly required it). (4) ops (ops#175): automation/cloud-run/ — least-priv SA + spool bucket + Pub/Sub setup-gcp.sh, Cloud Build + gcloud run deploy deploy.sh (min=1 --no-cpu-throttling singleton), teardown.sh, declarative service.yaml, README. (5) docs (docs#179): the Cloud Run runtime architecture page. (6) e2e (e2e#47): Pub/Sub-source + gcs-spool fixture + hybrid Cloud Run validation driver. Live proof on Cloud Run (noetl-demo-19700101, server reached via a cloudflared tunnel to the kind cluster): the runtime deployed + ran ($PORT health bound, startup probe green), activated against the server over HTTPS (register+activate in the event log), spool runtime active backend=gcs; 6/6 Pub/Sub messages → one POST /api/execute each over HTTPS → COMPLETED on the subscription pool; GCS spool under a live outage — killing the tunnel opened the circuit and a message buffered durably to the real GCS bucket (…/spool/000…001-<id>, sha256 + reason=circuit_open). Live-vs-simulated: GCS backend, Cloud Run deploy/run, out-of-cluster HTTPS dispatch roundtrip, and GCS spool-write-under-outage are all live; cross-restart GCS drain was not auto-triggered (the documented in-memory-circuit limitation — drain+replay+idempotency was proven live in Phase 4 with the same engine). Finding: the pubsub source's synchronous pull (emulator-validated in Phase 1) needs timeout_ms ≥ 10s against real Pub/Sub — the 2s NATS default stalls; tracked tools#57. All test resources torn down (no cost-bearing resources left; runtime SA kept, free). #90 stays open (Phases 6–7).
2026-06-12 (Phase 4 shipped + live outage proof green) Store-and-forward spool + per-downstream circuit breaker (§8) landed + live-validated on kind. (1) tools v3.4.0 (tools#54): noetl_tools::spool — a pure per-downstream circuit breaker (trip-after-N / half-open / close; NATS-KV-serializable; one breaker per declared downstream, resolving OQ2), the SpoolItem envelope (SHA-256 + noetl://spool/<sub>/<recv_seq>/<id> ref — the Result-Store payload-ref split — + recv_seq-ordered object keys so a lexical list == receive order), a SpoolBackend trait + nats_object (reuses the NATS Object Store) + local_disk backends, and the engine (ordering global / per_key lanes / none; idempotency idempotency_key→message_id; poison→dead-letter after max_replay_attempts; retention max_age/max_bytes/on_full + GC — the OQ3 cost ceiling) + http/tcp/nats probes. 44 unit tests cover the data-loss-safety logic under simulated outage + a real-NATS nats_object integration test. (2) worker v5.17.0 (worker#75): wires the spool into the WORKER_MODE=subscription run-loop — probe → circuit → spool-or-dispatch → ack, NATS-KV circuit persistence (survives a restart mid-outage), drain-on-recovery, the 6 event types + spool metrics (noetl_subscription_spool_bytes gauge). buffer_and_ack (push default) + hybrid loss-safe; off = spool disabled. (3) server v3.4.1 (server#184 + server#185): spool: block validation at catalog registration + the lifecycle-status fix — spool/circuit events share the subscription's execution_id but must not corrupt its lifecycle status (the queries now match only the six lifecycle event types; without it, an open circuit 500'd subscription_get/activate). (4) ops (ops#173): toggleable spool-downstream-echo + runtime NATS env. (5) e2e (e2e#44 + e2e#45): kind_validate_subscription_spool.sh. Live proof on kind (6 messages): scale the downstream to 0 → subscription.circuit.opened → publish 6 → 6 subscription.message.spooled (recv_seq 1-6, each carrying the noetl://spool ref + sha256), 0 dispatched while open (no loss); scale back to 1 → subscription.circuit.closed + subscription.spool.draining6 subscription.message.replayed6 child executions COMPLETED on the subscription pool → spool drained to 0 → exactly 6 distinct children (idempotency held). The entire outage is reconstructable from noetl.event. Decisions: OQ2 per-downstream scope; OQ3 immediate-GC + max_bytes ceiling + gauge; OQ8 idempotency_key wins over message_id; OQ14 ack-after-dispatch stop-ack tracked (buffer_and_ack/hybrid are loss-safe). Deferred/tracked: gcs/s3 backends (same trait, need bucket creds — Cloud-Run path), gateway edge spool + shared noetl-directives/noetl-spool crate extraction, hybrid stop-ack-blip optimisation. #90 stays open (Phases 5–7).
2026-06-11 RFC drafted + filed as #90; board 3 (Todo); wiki page created. No code.
2026-06-11 (v2) First-class kind: Subscription catalog type (resolves the trigger-block question — full schematic + operational isolation) + a configurable store-and-forward spool layer (§8). 7-phase re-cut (spool = Phase 4).
2026-06-11 (v3) Configurable header / attribute directive layer (§7) — per-source metadata normalized into one headers map; opt-in allowlist of directives for redirect/pool routing, idempotency key, content hints, and W3C trace/mesh propagation; untrusted by default, push-ingress directive trust gated on auth (§7.5). Engine lands in Phase 2; auth-gated push directives in Phase 3.
2026-06-11 (Phase 1 shipped) Bounded-drain subscription tool (Mode A) landed. New atomic registry tool subscription (operation: poll) + the reusable SourceClient abstraction (PolledMessage/PollOptions/PollOutcome/AckMode + decode_payload/normalize_headers) with three backends: NATS (refactors js_consume into the shared drain_pull_consumer), Pub/Sub pull (REST + gcp_auth, emulator support), Kafka poll (pure-Rust, feature-gated). Worker dispatches via the generic registry — no dispatch-match change. NATS poll path validated live against the in-cluster NATS JetStream broker (create stream → publish → drain → ack → second-drain-empty); Pub/Sub emulator-gated; Kafka adapter-level. Ships in noetl-tools v3.2.0 (tools#50, closes tools#49) + worker bump worker#70 + ops example (ops#169). Wiki: SubscriptionTool.
2026-06-11 (full playbook E2E + 2 fixes) In-cluster playbook-dispatch E2E green. Built worker + server images carrying the tool, rolled on kind, ran examples/subscription_e2e: subscription poll drained count=5, acked=true, NATS consumer 5→0 pending, execution COMPLETED with call.done/command.completed/playbook.completed in the event log. The E2E surfaced two gaps the unit + live-NATS tests couldn't: (1) the server orchestrator validates tool.kind against a typed ToolKind enum → server v3.1.0 adds the Subscription variant (server#178); (2) the worker's apply_credential only knew postgres/bearer/api_key/basic → worker v5.15.2 resolves nats/pubsub/kafka aliases by merging connection fields into the tool config (worker#71). Cluster restored to a clean :dev build (re-smoke green).
2026-06-11 (Phase 3 shipped + live E2E green) Gateway push-ingress (Mode C) + auth-gated directive trust landed. (1) Gateway v3.3.0 (gateway#28): POST /ingress/{listener} — verify-and-forward gatekeeper (no DB on the ingress path). src/ingress/verify.rsHMAC-SHA256 over the raw body (constant-time, optional sha256=), bearer (constant-time), Google Pub/Sub OIDC (RS256 vs JWKS, aud+email/service_account+email_verified+exp). src/ingress/directives.rs — serde-only vendored port of the tools v3.3.0 engine (edge must not pull duckdb/kube); allowlist + value-allowlist preserved. verify_then_plan fuses verify + directive resolution so a failed verification yields no DispatchPlan — an unauthenticated caller can never drive routing (RFC §7.5). Pub/Sub-push envelopes unwrapped (attributes channel); auth headers stripped from the forwarded workload; first /metrics (noetl_ingress_*). (2) Server v3.3.0 (server#182): push catalog validation (ingress.verify required, none rejected) + GET /api/internal/ingress/{listener} (service-account-gated) resolving the verify secret from the Wallet by alias + idempotent registration. (3) Ops (ops#172): gateway NOETL_INTERNAL_API_TOKEN env. (4) E2E (e2e#43): kind_validate_subscription_push.shHMAC 12/12 + bearer 12/12 green: N signed deliveries → one execution per delivery on the subscription pool → COMPLETED; allowlisted x-noetl-route redirect honored only after verification; the auth gate — a tampered (bad-signature) and an unsigned/unauth delivery (both carrying the redirect header) → 401, no execution, no directive applied. Pub/Sub-push envelope unwrap + attributes-channel directive proven live (base64 data decoded, redirect via attribute); OIDC signature path unit-proven (every negative: bad-sig / expired / wrong-aud / wrong-SA / unknown-kid). Fast-follow tracked: extract a lean shared noetl-directives crate so gateway + tools consume one engine.
2026-06-11 (Phase 2 shipped + live E2E green) kind: Subscription type + continuous runtime (Mode B) + header-directive engine landed. (1) Server v3.2.0 (server#180): kind: Subscription first-class catalog type (source/mode/dispatch validation, no step-DAG); event-sourced lifecycle endpoints /api/subscriptions (register→activate→pause/resume→drain→deactivate, GET list/get); execution_pool override on /api/execute routing the whole run to noetl.commands.<pool>.<eid> (persisted in playbook_started meta, orchestrator reads it back); W3C trace into meta.trace + command notification + child-execution inheritance. (2) Worker v5.16.0 (worker#73): WORKER_MODE=subscription continuous runtime — build the SourceClient via the tools build_source factory, register+activate, loop poll()→one POST /api/execute per message on the dedicated pool, apply directives + emit directives_applied, drain+deactivate on SIGTERM. (3) Tools v3.3.0 (tools#52): the header-directive engine (DirectiveSpec/DispatchPlan — allowlisted redirect/pool/priority/idempotency/content + W3C trace, untrusted by default) + public build_source. (4) Ops (ops#171): dedicated subscription pool + runtime (Recreate strategy) + KEDA scaler. (5) E2E (e2e#42): kind_validate_subscription_runtime.sh13/13 green: 6 msgs → 6 executions on the dedicated pool, all COMPLETED; 2 header-redirected to a different playbook; W3C traceparent into all 6 children's meta.trace; full lifecycle event-logged. Three integration gaps the E2E surfaced + fixed in-PR (the Phase-1 pattern): noetl.resource FK seed for the subscription kind; noetl.event.created_at decoded as TIMESTAMP not TIMESTAMPTZ; SIGTERM (not just SIGINT) drives the runtime drain on K8s pod termination. Also: idempotent register (reuse per path), Recreate strategy (singleton drain handoff).
2026-06-11 (Pub/Sub + Kafka live E2E — parity reached) All three Phase-1 backends now at live-E2E parity. Stood up the two remaining brokers in kind and ran real bounded-drain playbooks: Pub/Sub (gcloud SDK emulator, ops#170) → publish 5 → drain count=5 acked=trueCOMPLETED + event trail; Kafka (single-broker KRaft apache/kafka:3.9.1, same ops PR) → produce 5 → drain count=5 acked=trueCOMPLETED + event trail. Fixtures + runners in e2e#41 (subscription_{pubsub,kafka}_drain.yaml + kind_validate_subscription_{pubsub,kafka}.sh). No adapter code change needed — both backends worked as-is against real brokers (pure-Rust kafka crate ↔ Kafka 3.9 KRaft; Pub/Sub REST ↔ emulator). The one fix: the <step>.output.<field> accessor never resolved (when: arcs skipped → drain stalled); corrected to <step>.<field> in both fixtures and the latent ops subscription_drain.yaml example. Validated on server v3.1.0 + worker v5.15.2 + tools v3.2.0.

13. Next concrete steps

  1. Review the kind: Subscription model (§2.1) and the header directive allowlist + security model (§7.2, §7.5) — confirm directives are honored only post-auth for push and only for allowlisted keys/values.
  2. Review the spool tradeoff matrix (§8.2) — confirm hybrid (pull) / buffer_and_ack (push) defaults. **Done 2026-06-12 — Phase 4 shipped
    • live outage proof green** (tools v3.4.0, server v3.4.1, worker v5.17.0). buffer_and_ack + hybrid are loss-safe; per-downstream circuit breaker (OQ2), noetl_object/local_disk backends, ordered replay + idempotency
    • dead-letter + retention/GC, all event-logged. Next: Phase 5 — out-of-cluster Cloud Run target Done 2026-06-12 — Phase 5 shipped + live out-of-cluster proof green (tools v3.5.0 gcs backend, worker v5.18.0, server v3.4.2, ops automation/cloud-run/, docs, e2e). Cloud Run runtime → Pub/Sub → HTTPS dispatch → COMPLETED + GCS spool-under-outage, all live on noetl-demo-19700101. Next: Phase 6 — CLI local noetl subscribe Done 2026-06-12 — Phase 6 shipped + live local proof green (cli v4.11.0, cli#60). noetl subscribe runs a kind:Subscription standalone in local mode → in-process PlaybookRunner dispatch (or POST /api/execute) → FileEventSink JSONL trail; local_disk spool with circuit-breaker + ordered replay. cli-only (reuses noetl-tools v3.5.0 source + spool unchanged). Live: 5/5 drained+dispatched+COMPLETED; 6/6 buffered-to-local_disk under outage → replayed-in-order on recovery. Phases 1–6 complete. Next: Phase 7 — scale hardening (volume-gated)POST /api/execute/batch, opt-in server-side dedup window, per-subscription rate limits; lands when real volume justifies it. #90 stays open for it. Open follow-ups: harden the pubsub source for real Pub/Sub (streaming pull / higher default timeout_mstools#57); persist Cloud-Run circuit state to a server KV endpoint (in-memory today); extract a lean shared noetl-directives / noetl-spool crate so gateway + tools + worker consume one engine; the s3 spool backend.
  3. Confirm Phase 1 scope — the subscription drain tool unifying js_consume + Pub/Sub-pull + Kafka-poll. Open noetl/tools#NN. Done 2026-06-11 — shipped as noetl-tools v3.2.0 (tools#50, closes tools#49). Remaining Phase 1 housekeeping: worker noetl-tools dependency bump to v3.2.0 (cascade after the crates.io publish), then a full in-cluster playbook-dispatch E2E on a worker carrying the tool.
  4. Phase 3 — gateway push-ingress (Mode C) + auth-gated directive trust. Done 2026-06-11POST /ingress/{listener} on the gateway (v3.3.0, gateway#28) verifies hmac_sha256 / bearer / pubsub_oidc (Wallet-alias secrets via the server's GET /api/internal/ingress/{listener}, server v3.3.0 server#182) and parses directives only after verification (§7.5, verify_then_plan). Live E2E green (HMAC 12/12 + bearer 12/12; Pub/Sub-push proven; OIDC unit-proven). Next: Phase 4 — store-and-forward spool (§8), which matters most for push (where buffer_and_ack is non-optional). Fast-follow: extract a lean shared noetl-directives crate so the gateway + tools consume one engine instead of the vendored copy.
  5. Decide open questions 2, 8, 9, 14 before Phase 3/4 — dedup, idempotency-key vs message_id, header-chosen credentials, ack timing. (OQ1 resolved: new WORKER_MODE=subscription run-mode of the worker binary. OQ7 resolved: explicit-pool wins over priority, multi-value last-wins.)
  6. Consider an agents/rules/streaming-isolation.md rule if the dedicated-pool + out-of-cluster + spool + directive-allowlist boundary is accepted as durable convention (parallels data-access-boundary.md).

14. Related

  • Execution Model — the atomic-block + callback rules this RFC is constrained by.
  • Data Access Boundary — why out-of-cluster runtimes call the API (and why the spool's payload goes to the tenant object store while its metadata/directives flow through the event log).
  • Observabilityexecution_id as the primary trace key; the v3 header layer adds the external W3C traceparent join (§7.4).
  • System Pool Design (#46) — the dedicated-pool + bearer-token-auth primitives reused here.
  • Secrets Wallet (#61) — push/webhook verification secrets, spool-bucket credentials, and the directive-alias boundary (§7.5).
  • Rust Server Port (#49) — owns the kind: Subscription catalog type, trace-in-meta, and /api/internal/* ingest/circuit endpoints.
  • repos/tools/src/tools/nats.rs:8-20 — the bounded-js_consume-not-subscriptions anchor.
  • repos/server/src/services/result_store.rs + repos/tools/src/tools/artifact.rs — the noetl:// payload-ref staging the spool reuses.
  • repos/gateway/src/sse.rs:266 + repos/server/src/handlers/execute.rs:630 — the request_id correlation + meta threading the trace layer builds on.

NoETL Dashboard

Active Umbrellas

Closed Umbrellas

Conventions

Per-repo wikis

Clone this wiki locally