Skip to content

subscribe

Kadyapam edited this page Jun 12, 2026 · 1 revision

noetl subscribe — local-mode subscription listener

noetl subscribe <spec.yaml> runs a kind: Subscription listener standalone in local mode — no Kubernetes, no NATS-dispatch server is required for the listening itself. It holds the subscription's source (NATS / Pub/Sub / Kafka) open in-process and turns each received message into one playbook run, emitting an event-sourced JSONL trail on disk.

This is Phase 6 of the subscription/listener RFC (noetl/ai-meta#90). The whole point: the same event model across CLI-local, in-cluster (the WORKER_MODE=subscription worker runtime), and out-of-cluster (Cloud Run) — only the event sink (file vs POST /api/events), the spool backend (local_disk vs nats_object/gcs), and the dispatch target vary.

What it reuses

noetl subscribe is glue around the same building blocks the in-cluster runtime uses, all shipped in noetl-tools v3.5.0:

  • the source clients (build_source → NATS / Pub/Sub-pull / Kafka-poll) and their poll() bounded drain;
  • the header-directive engine (RFC §7) — allowlisted redirect / pool / idempotency / content + W3C trace;
  • the store-and-forward spool engine (RFC §8) — circuit breaker, buffer, ordered replay, idempotency, dead-letter, retention.

It emits the shared ExecutorEvent envelope through the noetl-events EventSink trait — the same envelope the worker emits, here written to a local FileEventSink (one JSON object per line).

Dispatch model (RFC §5.3)

A received message either:

  • runs the target playbook in-process via the local PlaybookRunner (the same engine noetl exec --runtime local uses) — this is the pure-local default; or
  • POSTs /api/execute to a control plane when --dispatch server (and --server-url) are set.

The message body is merged to the top level of the dispatched playbook's workload (a JSON message {"order_id": 1} exposes {{ workload.order_id }}), and the full normalized envelope is available under {{ workload.message }} (id / data / headers / attributes / metadata).

Usage

noetl subscribe <spec.yaml> [options]
Flag Default Meaning
--dispatch <local|server> local In-process PlaybookRunner, or POST /api/execute.
--server-url <url> resolved base URL Control plane for --dispatch server.
--events <path> ./<name>-events.jsonl The JSONL event-sourced trail.
--spool-dir <dir> spec spool.path, else ./.noetl-spool/<slug> local_disk spool directory.
--playbook-dir <dir> spec's directory Base dir for relative dispatch.playbook refs.
--credential <file> Local credential JSON injected for the source's auth: alias.
--max-messages <N> 0 (continuous) Stop after N handled messages (bounded run / proof).
--once off Drain the source once, then exit.
--verbose off Verbose dispatch output.

Set RUST_LOG=info (or warn) to see the listener's diagnostics; the command installs a stderr tracing subscriber honoring RUST_LOG.

The spec

noetl subscribe reads a kind: Subscription catalog spec — the same type the server validates and the worker runs. Example (NATS):

apiVersion: noetl.io/v1
kind: Subscription
metadata:
  name: orders_local
spec:
  source: nats
  url: nats://localhost:4222
  user: noetl          # explicit — see "NATS credentials" below
  password: noetl
  stream: ORDERS_LOCAL
  consumer: orders-local        # the durable pull consumer must already exist
  runtime: { batch: 50, timeout_ms: 3000 }
  dispatch:
    playbook: ./process_order.yaml   # a file path resolved against the spec dir
    payload_from: message.json
    execution_pool: local

NATS credentials. The NATS source connects via async-nats ConnectOptions, which does not honor user:pass embedded in the URL. Supply explicit user / password fields, or an auth: <alias> plus --credential cred.json (a JSON file with url / user / password / token).

Runnable examples ship in examples/subscribe/ (orders_subscription.yaml, orders_spool_subscription.yaml, process_order.yaml).

The event-sourced trail

Every run produces a JSONL file where each line is one ExecutorEvent, identical in shape to the rows the in-cluster runtime writes to noetl.event:

  • Lifecycle: subscription.lifecycle (REGISTERED → ACTIVE → DRAINING → DEACTIVATED) brackets the run.
  • Per message: subscription.message.received (ingress, with normalized headers) → playbook.startedplaybook.completed / playbook.failed (the child execution), plus subscription.message.directives_applied when a header directive fires.
  • Spool/circuit (when buffering, see below): subscription.circuit.opened / subscription.message.spooled / subscription.circuit.closed / subscription.spool.draining / subscription.message.replayed / subscription.message.dead_lettered.

Because the trail is append-only with one self-describing event per line, an entire run — including an outage — is replayable from the file alone.

Store-and-forward spool (local_disk)

Declare a spool: block to make the listener resilient to a downstream outage. In local mode the backend is always local_disk (RFC §8.6) — a spec authored for the in-cluster nats_object / gcs backend runs locally unchanged (the backend is rewritten; ordering / circuit / idempotency logic is backend-agnostic).

  spool:
    mode: buffer_and_ack
    backend: local_disk      # forced in local mode regardless of value
    ordering: global
    circuit:
      trip_after: 1
      probe_after_ms: 500
      probe_interval_ms: 400
      downstream:
        - { name: warehouse, type: tcp, target: "127.0.0.1:19099" }
    drain: { max_replay_attempts: 5, on_recovery: ordered_then_live }

When the declared downstream is unreachable, the circuit opens and each message is buffered durably to the spool directory (recv_seq-ordered object keys, a SHA-256 + noetl://spool/... ref per item) instead of being dropped. When the downstream recovers (active probe), the spool drains and replays the buffered messages in receive order, idempotently (keyed on idempotency_key, falling back to message_id); a poison message is dead-lettered after max_replay_attempts. Circuit-breaker state persists to a local file under the spool dir's control/, so a restart mid-outage rehydrates the breaker phase.

Worked example — outage → buffer → recovery → replay

# 1. stream + durable consumer + publish 6 messages (NATS CLI)
# 2. downstream DOWN (port 19099 closed); run the listener continuously:
noetl subscribe examples/subscribe/orders_spool_subscription.yaml \
  --events ./spool-trail.jsonl --spool-dir ./spooldir
#    → circuit.opened, 6 × message.spooled to ./spooldir, 0 dispatched
# 3. bring the downstream up (bind 127.0.0.1:19099):
#    → circuit.closed, spool.draining, 6 × message.replayed in order,
#      spool drained to 0

Related

Clone this wiki locally