-
Notifications
You must be signed in to change notification settings - Fork 0
subscription tool
SubscriptionTool (tool kind subscription) lets a playbook step fetch a
bounded batch from a message source — NATS JetStream, Google Pub/Sub
(pull), or Kafka — ack it per policy, and return the normalized batch as the
step result. It is js_consume generalised across backends behind a reusable
source-client abstraction.
It is not a long-lived listener. Every operation is a bounded drain that
returns within timeout_ms (hard-capped at 5000 ms), so it holds a worker
slot no longer than any other tool — honoring the NoETL execution model
(agents/rules/execution-model.md).
The continuous subscription runtime, gateway push ingress, store-and-forward
spool, and header-directive engine are later phases of the
subscription/listener RFC
(noetl/ai-meta#90); this tool is
Phase 1 (Mode A — bounded drain).
Source: src/tools/subscription.rs
· abstraction src/tools/source/
Available in noetl-tools ≥ 3.2.0.
- step: drain_orders
tool:
kind: subscription
source: nats # nats | pubsub | kafka
operation: poll # bounded fetch — returns immediately (Phase 1 only op)
auth: "nats_main" # credential alias — no default connection
# --- bounded-drain knobs (all sources) ---
batch: 100 # max messages this drain; default 100, capped 1000
timeout_ms: 4000 # max wait; default 1000, capped 5000
ack: on_success # on_success (default) | auto | manual | none (bool also accepted)
# --- NATS ---
stream: ORDERS # JetStream stream
consumer: orders-drain # durable pull consumer (MUST already exist)
# --- Pub/Sub ---
subscription: "projects/acme/subscriptions/telemetry-pull"
endpoint: "localhost:8085" # optional — Pub/Sub emulator host
# --- Kafka ---
brokers: "broker:9092" # comma-separated string or array of host:port
topic: edge.clicks
group: noetl-edgeack |
Behaviour |
|---|---|
on_success (default) / auto / true
|
Ack every fetched message before returning. |
manual / false
|
Do not ack; messages stay pending and the source redelivers them on the next drain. Pub/Sub ack ids ride back in ack_ids. |
none |
Never ack, surface no ack ids (a pure peek). |
ToolResult.data:
{
"status": "success",
"source": "nats",
"operation": "poll",
"count": 3,
"acked": true,
"ack_mode": "on_success",
"ack_ids": [],
"messages": [
{
"id": "12", // stream seq / Pub/Sub messageId / topic:partition:offset
"data": { "order_id": 42 }, // JSON-parsed body, or the UTF-8 string
"headers": { "x-kind": "order" }, // normalized (lowercased) metadata channel
"attributes": { ... }, // raw per-source metadata
"metadata": { "subject": "...", "stream_seq": 12, "consumer_seq": 3, "delivered": 1, "pending": 0 }
}
],
"metrics": { "noetl_subscription_messages_fetched_total": 3, "source": "nats" }
}Downstream steps fan out over drain_orders.messages (a step's result
fields bind as {{ <step>.<field> }} — drain_orders.count,
drain_orders.messages, drain_orders.acked).
source |
Feature | How it drains | Notes |
|---|---|---|---|
nats |
always on | JetStream pull-consumer fetch (the shared drain_pull_consumer, which js_consume also uses) |
the durable consumer must already exist |
pubsub |
pubsub (default) |
REST subscriptions:pull + :acknowledge, ADC token via gcp_auth |
endpoint/PUBSUB_EMULATOR_HOST for the emulator; no gRPC dependency |
kafka |
kafka (default) |
pure-Rust kafka crate consumer-group poll in spawn_blocking
|
Phase-1 limits: no record headers, soft batch cap, plaintext only |
pubsub and kafka are default features, so the shipped worker supports all
three; a minimal build can opt out.
All three backends are validated live end-to-end on kind — publish/produce
N → bounded drain → ack → execution COMPLETED → event trail
(call.done/command.completed/playbook.completed) — to the same bar:
source |
Broker (kind) | Status |
|---|---|---|
nats |
in-cluster NATS JetStream |
Live — examples/subscription_e2e (count=5, acked=true, COMPLETED). |
pubsub |
Pub/Sub emulator (ops ci/manifests/pubsub-emulator/) |
Live — tests/fixtures/subscription_pubsub_drain (count=5, acked=true, COMPLETED). |
kafka |
single-broker KRaft apache/kafka (ops ci/manifests/kafka/) |
Live — tests/fixtures/subscription_kafka_drain (count=5, acked=true, COMPLETED). |
Runners: noetl/e2e scripts/kind_validate_subscription_{pubsub,kafka}.sh.
Validated on server v3.1.0 + worker v5.15.2 + tools v3.2.0; no adapter code
change was needed — both new backends worked as-is against real brokers (the
pure-Rust kafka crate talks to Kafka 3.9 KRaft; the Pub/Sub REST backend
works against the emulator).
The architectural deliverable is the tools::source::SourceClient trait:
#[async_trait]
pub trait SourceClient: Send + Sync {
fn source_name(&self) -> &'static str;
async fn poll(&self, opts: &PollOptions) -> Result<PollOutcome, ToolError>;
}One poll call performs the whole atomic drain — connect, fetch up to
batch / until empty / until timeout_ms, ack per PollOptions::ack, return
a normalized PollOutcome of PolledMessages. Shared helpers decode_payload
and normalize_headers shape every backend's output into the same envelope.
Later RFC phases reuse the trait: a continuous runtime calls poll in a loop;
a gateway push reuses the normalizers to shape a webhook into the same shape.
The auth field is a credential alias resolved via ctx.get_secret(). For
nats the credential JSON carries at least {"url": "nats://..."} (optional
user / password / token) — identical to NatsTool. No default
connection: an alias is required for an authenticated source
(agents/rules/no-default-connection.md).
- Span
tool.dispatch.subscription(withsource,operation,execution_id). -
noetl_subscription_messages_fetched_total{source}count in the result. -
execution_idon every line.
-
NatsTool — the
natstool; itsjs_consumeshares the bounded NATS drain with this tool. - Consumers — the worker's own JetStream pull loop.
- RFC: Umbrella — Subscription / Listener Tool