-
Notifications
You must be signed in to change notification settings - Fork 0
nats tool
NatsTool (tool kind nats) gives playbook steps read/write access to NATS
JetStream, the Key-Value store, and the Object Store. It matches the Python
executor's nats tool kind surface
(noetl/tools/nats/executor.py).
No long-lived subscriptions or push consumers are provided. Holding a worker
slot while waiting for a NATS message violates the NoETL execution model
(agents/rules/execution-model.md).
For a bounded JetStream pull-consumer drain (fetch up to N / until empty /
until timeout, ack, return), use the subscription tool's
NATS source — it shares the same bounded drain that js_consume uses. Use the
JetStream pull-loop built into the worker itself for the worker's own
command consumption.
Source: src/tools/nats.rs
tool:
kind: nats
auth: my_nats_credential # credential alias (preferred)
# — OR — explicit fields:
url: "nats://localhost:4222"
user: alice
password: "{{ secrets.nats_password }}"
token: "{{ secrets.nats_token }}"
operation: kv_get # required; see operation table below
bucket: my_bucket # KV / Object Store operations
key: my_key # KV operations
value: "hello" # kv_put
ttl: 3600 # kv_put — informational (NATS enforces at bucket level)
pattern: "prefix.*" # kv_keys glob filter
name: report.pdf # Object Store operations
data: "text content" # object_put (string or base64)
encoding: utf-8 # object_get / object_put: "utf-8" (default) or "base64"
description: "Q2 report" # object_put description
stream: ORDERS # JetStream operations
subject: orders.created # js_publish / js_get_msg
data: { order_id: 42 } # js_publish payload
headers: # js_publish headers (optional)
X-Source: payments
seq: 100 # js_get_msg by sequence number
last: true # js_get_msg — fetch last messageThe auth field is a credential alias resolved via ctx.get_secret(). The
resolved JSON must contain at minimum a url field:
{
"url": "nats://nats.internal:4222",
"user": "worker",
"password": "s3cr3t",
"token": null
}user + password or token are mutually exclusive; token takes precedence
when both are present.
| Operation | Required fields | Optional fields | Result shape |
|---|---|---|---|
kv_get |
bucket, key
|
— | { status, bucket, key, value } |
kv_put |
bucket, key, value
|
ttl |
{ status, bucket, key, revision } |
kv_delete |
bucket, key
|
— | { status, bucket, key } |
kv_keys |
bucket |
pattern |
{ status, bucket, keys[], count } |
kv_purge |
bucket, key
|
— | { status, bucket, key } |
kv_get returns status: "not_found" (not an error) when the key does not
exist.
kv_put serializes value as JSON when it is an object or array; strings are
written as-is bytes.
kv_keys pattern is a shell-style glob (* matches any characters).
| Operation | Required fields | Optional fields | Result shape |
|---|---|---|---|
object_get |
bucket, name
|
encoding |
{ status, bucket, name, data, size } |
object_put |
bucket, name, data
|
encoding, description
|
{ status, bucket, name, size } |
object_delete |
bucket, name
|
— | { status, bucket, name } |
object_list |
bucket |
— | { status, bucket, objects[], count } |
object_info |
bucket, name
|
— | { status, bucket, name, size, description, chunks } |
encoding: "base64" — object_get returns base64-encoded string; object_put
decodes the data string from base64 before storing.
| Operation | Required fields | Optional fields | Result shape |
|---|---|---|---|
js_publish |
subject, data
|
headers |
{ status, stream, seq, duplicate } |
js_get_msg |
stream, one of: seq / last / subject
|
— | { status, stream, subject, seq, data } |
js_stream_info |
stream |
— | { status, stream, config{}, state{} } |
js_publish sends with ack and returns the sequence number the server assigned.
js_get_msg with last: true fetches the last message on the stream (or on
subject when both are set).
steps:
- name: cache_token
tool:
kind: nats
auth: nats_credential
operation: kv_put
bucket: auth_cache
key: "session/{{ execution_id }}"
value: "{{ oauth_token }}"
ttl: 3600
- name: publish_ready
tool:
kind: nats
auth: nats_credential
operation: js_publish
subject: "events.session.ready"
data:
execution_id: "{{ execution_id }}"
session_key: "session/{{ execution_id }}"
headers:
X-Execution-ID: "{{ execution_id }}"Each dispatch opens a nats.op tracing span with operation and
execution_id attributes. Duration is returned in the ToolResult.duration_ms
field. No per-operation Prometheus metrics are added in this version; histogram
instrumentation tracks at the worker boundary level.
- Tool kinds overview — all tool kinds index
- noetl/worker wiki — noetl-executor-adoption — how the worker dispatches through the registry
-
agents/rules/execution-model.md— why subscriptions are out of scope - Python reference:
noetl/tools/nats/executor.py - PR: noetl/tools#12
- Umbrella issue: noetl/ai-meta#38