-
Notifications
You must be signed in to change notification settings - Fork 0
subscribe
noetl subscribe <spec.yaml> runs a kind: Subscription listener
standalone in local mode — no Kubernetes, no NATS-dispatch server is
required for the listening itself. It holds the subscription's source (NATS /
Pub/Sub / Kafka) open in-process and turns each received message into one
playbook run, emitting an event-sourced JSONL trail on disk.
This is Phase 6 of the subscription/listener RFC
(noetl/ai-meta#90). The whole
point: the same event model across CLI-local, in-cluster (the
WORKER_MODE=subscription worker runtime), and out-of-cluster (Cloud Run) —
only the event sink (file vs POST /api/events), the spool backend
(local_disk vs nats_object/gcs), and the dispatch target vary.
noetl subscribe is glue around the same building blocks the in-cluster
runtime uses, all shipped in noetl-tools
v3.5.0:
- the source clients (
build_source→ NATS / Pub/Sub-pull / Kafka-poll) and theirpoll()bounded drain; - the header-directive engine (RFC §7) — allowlisted redirect / pool / idempotency / content + W3C trace;
- the store-and-forward spool engine (RFC §8) — circuit breaker, buffer, ordered replay, idempotency, dead-letter, retention.
It emits the shared ExecutorEvent envelope through the
noetl-events EventSink trait — the same envelope the worker
emits, here written to a local FileEventSink (one JSON object per line).
A received message either:
-
runs the target playbook in-process via the local
PlaybookRunner(the same enginenoetl exec --runtime localuses) — this is the pure-local default; or -
POSTs
/api/executeto a control plane when--dispatch server(and--server-url) are set.
The message body is merged to the top level of the dispatched playbook's
workload (a JSON message {"order_id": 1} exposes {{ workload.order_id }}),
and the full normalized envelope is available under {{ workload.message }}
(id / data / headers / attributes / metadata).
noetl subscribe <spec.yaml> [options]
| Flag | Default | Meaning |
|---|---|---|
--dispatch <local|server> |
local |
In-process PlaybookRunner, or POST /api/execute. |
--server-url <url> |
resolved base URL | Control plane for --dispatch server. |
--events <path> |
./<name>-events.jsonl |
The JSONL event-sourced trail. |
--spool-dir <dir> |
spec spool.path, else ./.noetl-spool/<slug>
|
local_disk spool directory. |
--playbook-dir <dir> |
spec's directory | Base dir for relative dispatch.playbook refs. |
--credential <file> |
— | Local credential JSON injected for the source's auth: alias. |
--max-messages <N> |
0 (continuous) |
Stop after N handled messages (bounded run / proof). |
--once |
off | Drain the source once, then exit. |
--verbose |
off | Verbose dispatch output. |
Set RUST_LOG=info (or warn) to see the listener's diagnostics; the command
installs a stderr tracing subscriber honoring RUST_LOG.
noetl subscribe reads a kind: Subscription catalog spec — the same type the
server validates and the worker runs. Example (NATS):
apiVersion: noetl.io/v1
kind: Subscription
metadata:
name: orders_local
spec:
source: nats
url: nats://localhost:4222
user: noetl # explicit — see "NATS credentials" below
password: noetl
stream: ORDERS_LOCAL
consumer: orders-local # the durable pull consumer must already exist
runtime: { batch: 50, timeout_ms: 3000 }
dispatch:
playbook: ./process_order.yaml # a file path resolved against the spec dir
payload_from: message.json
execution_pool: localNATS credentials. The NATS source connects via async-nats
ConnectOptions, which does not honoruser:passembedded in the URL. Supply explicituser/passwordfields, or anauth: <alias>plus--credential cred.json(a JSON file withurl/user/password/token).
Runnable examples ship in
examples/subscribe/
(orders_subscription.yaml, orders_spool_subscription.yaml,
process_order.yaml).
Every run produces a JSONL file where each line is one ExecutorEvent,
identical in shape to the rows the in-cluster runtime writes to noetl.event:
-
Lifecycle:
subscription.lifecycle(REGISTERED → ACTIVE → DRAINING → DEACTIVATED) brackets the run. -
Per message:
subscription.message.received(ingress, with normalized headers) →playbook.started→playbook.completed/playbook.failed(the child execution), plussubscription.message.directives_appliedwhen a header directive fires. -
Spool/circuit (when buffering, see below):
subscription.circuit.opened/subscription.message.spooled/subscription.circuit.closed/subscription.spool.draining/subscription.message.replayed/subscription.message.dead_lettered.
Because the trail is append-only with one self-describing event per line, an entire run — including an outage — is replayable from the file alone.
Declare a spool: block to make the listener resilient to a downstream
outage. In local mode the backend is always local_disk (RFC §8.6) — a spec
authored for the in-cluster nats_object / gcs backend runs locally
unchanged (the backend is rewritten; ordering / circuit / idempotency logic is
backend-agnostic).
spool:
mode: buffer_and_ack
backend: local_disk # forced in local mode regardless of value
ordering: global
circuit:
trip_after: 1
probe_after_ms: 500
probe_interval_ms: 400
downstream:
- { name: warehouse, type: tcp, target: "127.0.0.1:19099" }
drain: { max_replay_attempts: 5, on_recovery: ordered_then_live }When the declared downstream is unreachable, the circuit opens and each message
is buffered durably to the spool directory (recv_seq-ordered object keys, a
SHA-256 + noetl://spool/... ref per item) instead of being dropped. When the
downstream recovers (active probe), the spool drains and replays the buffered
messages in receive order, idempotently (keyed on idempotency_key, falling
back to message_id); a poison message is dead-lettered after
max_replay_attempts. Circuit-breaker state persists to a local file under the
spool dir's control/, so a restart mid-outage rehydrates the breaker phase.
# 1. stream + durable consumer + publish 6 messages (NATS CLI)
# 2. downstream DOWN (port 19099 closed); run the listener continuously:
noetl subscribe examples/subscribe/orders_spool_subscription.yaml \
--events ./spool-trail.jsonl --spool-dir ./spooldir
# → circuit.opened, 6 × message.spooled to ./spooldir, 0 dispatched
# 3. bring the downstream up (bind 127.0.0.1:19099):
# → circuit.closed, spool.draining, 6 × message.replayed in order,
# spool drained to 0
-
noetl exec— the local-runtime playbook execution path the in-process dispatcher reuses. -
noetl-events— theExecutorEventenvelope +EventSinktrait theFileEventSinkimplements. - Subscription / Listener RFC (umbrella #90) — the full model; §5.3 (CLI local mode), §7 (directives), §8 (spool).
-
worker wiki — the in-cluster
WORKER_MODE=subscriptionruntime that emits the same event model. -
tools wiki: subscription-tool
— the source clients + spool engine
noetl subscribeconsumes.
NoETL CLI
Contexts
- Context model
context addcontext init --from-gatewaycontext updatecontext port-forwardcontext list / use / current / delete
Auth
Architecture
Cross-wiki