Skip to content

subscription tool

Kadyapam edited this page Jun 11, 2026 · 2 revisions

SubscriptionTool — bounded-drain message subscription poll

SubscriptionTool (tool kind subscription) lets a playbook step fetch a bounded batch from a message source — NATS JetStream, Google Pub/Sub (pull), or Kafka — ack it per policy, and return the normalized batch as the step result. It is js_consume generalised across backends behind a reusable source-client abstraction.

It is not a long-lived listener. Every operation is a bounded drain that returns within timeout_ms (hard-capped at 5000 ms), so it holds a worker slot no longer than any other tool — honoring the NoETL execution model (agents/rules/execution-model.md). The continuous subscription runtime, gateway push ingress, store-and-forward spool, and header-directive engine are later phases of the subscription/listener RFC (noetl/ai-meta#90); this tool is Phase 1 (Mode A — bounded drain).

Source: src/tools/subscription.rs · abstraction src/tools/source/

Available in noetl-tools3.2.0.


Playbook config shape

- step: drain_orders
  tool:
    kind: subscription
    source: nats                 # nats | pubsub | kafka
    operation: poll              # bounded fetch — returns immediately (Phase 1 only op)
    auth: "nats_main"            # credential alias — no default connection
    # --- bounded-drain knobs (all sources) ---
    batch: 100                   # max messages this drain; default 100, capped 1000
    timeout_ms: 4000             # max wait; default 1000, capped 5000
    ack: on_success              # on_success (default) | auto | manual | none  (bool also accepted)
    # --- NATS ---
    stream: ORDERS               # JetStream stream
    consumer: orders-drain       # durable pull consumer (MUST already exist)
    # --- Pub/Sub ---
    subscription: "projects/acme/subscriptions/telemetry-pull"
    endpoint: "localhost:8085"   # optional — Pub/Sub emulator host
    # --- Kafka ---
    brokers: "broker:9092"       # comma-separated string or array of host:port
    topic: edge.clicks
    group: noetl-edge

Ack policy

ack Behaviour
on_success (default) / auto / true Ack every fetched message before returning.
manual / false Do not ack; messages stay pending and the source redelivers them on the next drain. Pub/Sub ack ids ride back in ack_ids.
none Never ack, surface no ack ids (a pure peek).

Result shape

ToolResult.data:

{
  "status": "success",
  "source": "nats",
  "operation": "poll",
  "count": 3,
  "acked": true,
  "ack_mode": "on_success",
  "ack_ids": [],
  "messages": [
    {
      "id": "12",                       // stream seq / Pub/Sub messageId / topic:partition:offset
      "data": { "order_id": 42 },       // JSON-parsed body, or the UTF-8 string
      "headers": { "x-kind": "order" }, // normalized (lowercased) metadata channel
      "attributes": { ... },            // raw per-source metadata
      "metadata": { "subject": "...", "stream_seq": 12, "consumer_seq": 3, "delivered": 1, "pending": 0 }
    }
  ],
  "metrics": { "noetl_subscription_messages_fetched_total": 3, "source": "nats" }
}

Downstream steps fan out over drain_orders.messages (a step's result fields bind as {{ <step>.<field> }}drain_orders.count, drain_orders.messages, drain_orders.acked).

Source backends

source Feature How it drains Notes
nats always on JetStream pull-consumer fetch (the shared drain_pull_consumer, which js_consume also uses) the durable consumer must already exist
pubsub pubsub (default) REST subscriptions:pull + :acknowledge, ADC token via gcp_auth endpoint/PUBSUB_EMULATOR_HOST for the emulator; no gRPC dependency
kafka kafka (default) pure-Rust kafka crate consumer-group poll in spawn_blocking Phase-1 limits: no record headers, soft batch cap, plaintext only

pubsub and kafka are default features, so the shipped worker supports all three; a minimal build can opt out.

Live validation

All three backends are validated live end-to-end on kind — publish/produce N → bounded drain → ack → execution COMPLETED → event trail (call.done/command.completed/playbook.completed) — to the same bar:

source Broker (kind) Status
nats in-cluster NATS JetStream Liveexamples/subscription_e2e (count=5, acked=true, COMPLETED).
pubsub Pub/Sub emulator (ops ci/manifests/pubsub-emulator/) Livetests/fixtures/subscription_pubsub_drain (count=5, acked=true, COMPLETED).
kafka single-broker KRaft apache/kafka (ops ci/manifests/kafka/) Livetests/fixtures/subscription_kafka_drain (count=5, acked=true, COMPLETED).

Runners: noetl/e2e scripts/kind_validate_subscription_{pubsub,kafka}.sh. Validated on server v3.1.0 + worker v5.15.2 + tools v3.2.0; no adapter code change was needed — both new backends worked as-is against real brokers (the pure-Rust kafka crate talks to Kafka 3.9 KRaft; the Pub/Sub REST backend works against the emulator).

The source-client abstraction

The architectural deliverable is the tools::source::SourceClient trait:

#[async_trait]
pub trait SourceClient: Send + Sync {
    fn source_name(&self) -> &'static str;
    async fn poll(&self, opts: &PollOptions) -> Result<PollOutcome, ToolError>;
}

One poll call performs the whole atomic drain — connect, fetch up to batch / until empty / until timeout_ms, ack per PollOptions::ack, return a normalized PollOutcome of PolledMessages. Shared helpers decode_payload and normalize_headers shape every backend's output into the same envelope. Later RFC phases reuse the trait: a continuous runtime calls poll in a loop; a gateway push reuses the normalizers to shape a webhook into the same shape.

Credential shape

The auth field is a credential alias resolved via ctx.get_secret(). For nats the credential JSON carries at least {"url": "nats://..."} (optional user / password / token) — identical to NatsTool. No default connection: an alias is required for an authenticated source (agents/rules/no-default-connection.md).

Observability

  • Span tool.dispatch.subscription (with source, operation, execution_id).
  • noetl_subscription_messages_fetched_total{source} count in the result.
  • execution_id on every line.

Related

Clone this wiki locally