Skip to content

noetl executor adoption

Kadyapam edited this page Jun 1, 2026 · 14 revisions

noetl-executor adoption

Companion to the cli wiki's executor-crate-architecture page from the worker side. Documents which surfaces the worker imports from noetl-executor, which stay worker-local, and the sub-PR landing history.

Why the worker depends on noetl-executor

§ H.10 of the Rust migration roadmap established that the noetl CLI and noetl-worker are fundamentally different control loops — recursive tree walker vs pull-model consumer — but share the same:

  • YAML playbook types (Step, Tool, NextFormat, …)
  • Template rendering rules ({{ workload.x }} substitution)
  • Case-condition operator semantics (Eq / Gt / Contains / Matches / …)
  • Event envelope shape (ExecutorEvent mirrors Python's noetl.event table)
  • Tool dispatch bridge onto the noetl-tools registry

noetl-executor is the home for that shared surface. The CLI ships it as a workspace member crate in noetl/cli; the worker depends on it via crates.io.

What the worker imports

Surface Used by worker module Notes
noetl_executor::condition::Operator executor::case_evaluator 12-variant enum re-exported for backward compatibility.
noetl_executor::condition::Condition executor::case_evaluator Structured { left, op, right } envelope.
noetl_executor::condition::evaluate_structured_condition executor::case_evaluator::CaseEvaluator::evaluate_conditions Per-condition evaluation.

Adopted in R-1.2 PR-2d-2 (noetl/worker#6, noetl-worker 2.0.0):

Surface Worker call site Notes
noetl_executor::worker::source::CommandSource new nats::source::NatsCommandSource (wraps NatsSubscriber + ControlPlaneClient + worker_id) Trait the wrapper implements; Worker::process_commands drives through source.next() + source.ack() / source.nack().
noetl_executor::worker::source::ClaimOutcome NATS source next() return + Worker::process_commands match arms 4-state enum (Claimed(Command) / AlreadyClaimed / RetryLater(String) / Failed(String)) — maps 1:1 onto the worker's pre-PR-2d-2 ClaimResult.
noetl_executor::worker::source::Pulled<H> NATS source next() return Generic wrapper { outcome, ack: H } — H = worker-local NatsAckHandle { message, notification } (NOT bare Message). See "AckHandle design" below.
noetl_executor::worker::source::Command translated from crate::client::Command at the source seam; consumed by CommandExecutor::execute(&Command) Enriched in 0.3.0 with render_context: HashMap<String, Value> + attempts: u32. Worker's Command.render_context() / meta.attempts map cleanly.

AckHandle design — why NatsAckHandle { message, notification } not bare Message

The executor's ClaimOutcome doesn't carry execution_id / command_id / step on the AlreadyClaimed / RetryLater(String) / Failed(String) variants — only on Claimed(Command). But observability.md Principle 4 says every WARN/ERROR carries execution_id as a structured field.

Resolved by embedding the CommandNotification in the source's AckHandle:

pub struct NatsAckHandle {
    pub message: Message,
    pub notification: CommandNotification,
}

impl CommandSource for NatsCommandSource {
    type AckHandle = NatsAckHandle;
    // ...
}

The notification metadata rides through the pull lifecycle alongside the NATS message handle, so Worker::process_commands has execution_id / command_id / step available on every variant's log line without forcing another noetl-executor breaking change.

Lossless WorkerCommand → ExecutorCommand translation

Worker field Executor field Notes
command_id() (from meta) command_id Worker computes a fallback if meta is missing.
execution_id execution_id Already i64 on both sides since R-1.2 PR-2a.
node_name step Worker's step() accessor returns &node_name.
action tool_kind E.g. "http", "postgres", "rhai".
context (full JSON) input Carries tool_config + cases + args + nested config. CommandExecutor::execute extracts each section separately.
render_context() render_context HashMap<String, Value> on both sides.
meta.attempts attempts Defaults to 0 if missing.

The translate_carries_full_context_as_input_including_cases unit test in src/nats/source.rs locks in that nothing gets dropped at the seam — input carries the worker's entire context JSON so cases extraction still works via command.input.get("cases").

What stays worker-local

Worker-specific shapes that don't belong in the shared crate:

Worker-local surface Why
worker::Worker Pull-loop control flow. Claim, dispatch, ack/nack — pull-model semantics that the CLI's tree walker doesn't share.
nats::subscriber::NatsSubscriber NATS JetStream binding. CLI has no NATS surface.
client::control_plane::ControlPlaneClient HTTP client to the Python control plane. CLI uses local YAML, not a server.
events::emitter::EventEmitter Retry logic over ControlPlaneClient.emit_event. Worker-specific retry policy.
executor::case_evaluator::Case / CaseAction / CaseResult / CaseEvaluator Pull-loop control flow. CaseAction::{Continue, Exit, SetVar, Goto, Retry, Fail} are dispatch-side decisions the CLI's tree walker handles differently.
client::control_plane::WorkerEvent Wire format to the Python server's /api/events endpoint. Diverges from noetl_executor::events::ExecutorEvent; reconciliation is a tracked cross-repo follow-up.

R-1.2 sub-PR landing history (worker side)

Sub-PR Scope noetl-worker version PR
R-1.2 PR-2c First worker PR depending on noetl-executor. Adds noetl-executor = "0.2" dep; replaces inline Operator + Condition + per-condition resolution helpers with re-exports + delegation to evaluate_structured_condition. Keeps pull-loop control flow (Case / CaseAction / CaseResult / CaseEvaluator) per § H.10. case_evaluator.rs 437 → 344 LoC (-93 net); 3 new tests lock in worker contract. 1.1.0 → 1.1.2 (after pipeline fix) noetl/worker#2
R-1.2 PR-2d-2 Final worker-side R-1.2 sub-PR. Bumps noetl-executor = "0.3"; new src/nats/source.rs (+312 LoC) with NatsCommandSource impl-ing the trait. CommandExecutor::execute refactored to take the executor's Command. Worker::process_commands driven through source.next() + source.ack/nack(). Lossless WorkerCommand → ExecutorCommand translation locked in by 8 unit tests. Observability addendum: nats.pull + command.execute spans per observability.md Principle 1; execution_id structured field on every WARN/ERROR per Principle 4 (enabled by NatsAckHandle { message, notification } design). 2.0.0 (major bump from feat!: prefix) noetl/worker#6
R-1.2 PR-2e Prometheus metrics harness + /metrics endpoint per observability.md Principle 2. New src/metrics.rs (+358 LoC) defines a lazy-init global registry with 7 metrics: noetl_worker_pulls_total{outcome}, noetl_worker_pull_duration_seconds, noetl_worker_dispatch_duration_seconds{tool_kind}, noetl_worker_dispatch_errors_total{tool_kind}, noetl_worker_event_emit_duration_seconds{event_type}, noetl_worker_event_emit_retries_total{event_type}, noetl_worker_concurrent_dispatches. New src/metrics_server.rs (+167 LoC) exposes them via axum on dedicated port (default 0.0.0.0:9090, WORKER_METRICS_BIND env override). Instrumented at NatsCommandSource::next, CommandExecutor::execute (3 exit paths: success / tool error / case-fail), CommandExecutor::emit_event, and the worker semaphore. outcome_label(&ClaimOutcome) derives the label string from the enum directly — no string literals at call sites. README extended with observability section. Closes noetl/ai-meta#32. 2.1.0 (minor bump from feat: prefix) noetl/worker#8
R-1.2 PR-EE-3 Last PR in the four-PR cross-stack event envelope reconciliation series (after EE-1 / EE-2 / EE-4 on noetl/cli / noetl/server / noetl/noetl). Replaces the worker-local WorkerEvent (3 fields — event_type + execution_id + payload) with ExecutorEvent re-exported from noetl_executor::events (9 fields — adds top-level step, status, created_at, context plus optional event_id, worker_id, meta). EventEmitter now carries worker_id so every envelope is stamped at the source per observability.md Principle 4; private build_event helper stamps created_at = Utc::now(). All 7 helper methods + CommandExecutor::emit_event take step + status parameters; status derives from lifecycle (STARTED / COMPLETED / FAILED) except step.exit (takes case-defined status) and command.completed (takes tool terminal status). event_id left as None for now — server's gen_snowflake() DB default fires; app-side snowflake generation tracked as a follow-up. 4 new tests lock in the wire shape (mirrors Python's TestFullExecutorEnvelopeRoundTrips on the broker side). Breaking → feat!: → major bump 2.1.0 → 3.0.0. Closes noetl/worker#10. 3.0.0 noetl/worker#11
App-side snowflake event_id First EE-3 follow-up. New src/snowflake.rs (~290 LoC incl docs + tests) — in-tree FNV-1a + Mutex<State> snowflake generator whose bit layout mirrors noetl.core.common.get_snowflake_id 1:1 (41-bit ms ts since 2024-01-01Z + 10-bit node + 12-bit sequence; epoch + node configurable via NOETL_SNOWFLAKE_EPOCH_MS / NOETL_SNOWFLAKE_NODE_ID / NOETL_SHARD_ID — same env knobs the Python broker reads). EventEmitter + CommandExecutor hold Arc<SnowflakeGen> and populate event_id: Some(gen.next_id()) on every emitted envelope per observability.md Principle 3 — the id exists at span-creation time + survives retries. Worker::new constructs ONE generator per process from worker_id + env, logs the effective node_id at startup so shard collisions are diagnosable. No new external dep — chrono + std::sync::Mutex only. 9 new tests; one (id_layout_matches_python_helper_formula) reconstructs the Python helper's bit-packing formula from a Rust-generated id to lock in cross-stack compatibility. Constructor signatures changed → feat: + BREAKING CHANGE: footer → semantic-release auto-bumped 3.0.0 → 4.0.0. Closes noetl/worker#12. 4.0.0 noetl/worker#14
meta.attempts propagation Second EE-3 follow-up. Threads the executor Command.attempts: u32 (added in noetl-executor 0.3.0 PR-2d-1) through every emitted envelope's meta field so retry behaviour rides the event log end-to-end. Projectors no longer need to reach back into worker logs to compute per-step retry distributions. CommandExecutor::emit_event + EventEmitter::build_event + all 7 emit_* helpers gained an attempts: u32 parameter; all 7 call sites in CommandExecutor::execute pass command.attempts. Design choice: include meta: Some({"attempts": N}) always — even attempts = 0 is a meaningful signal ("first try"); projector reads uniformly without a presence check. ~25 bytes per event × typical volumes is below noise. 3 tests updated + 1 new (test_build_event_carries_attempts_zero_for_first_try). Constructor signatures changed → feat: + BREAKING CHANGE: footer → semantic-release auto-bumped 4.0.0 → 5.0.0. Closes noetl/worker#13. 5.0.0 noetl/worker#15
NATS consumer-lag metric PR-2e follow-up. PR-2e shipped 7 worker metrics but explicitly deferred consumer-lag because JetStream's consumer-info API is pull-style. This PR fills that gap. New src/nats/lag_poller.rs (~140 LoC) — periodic poll task spawned from Worker::run alongside the heartbeat + metrics-server tasks. Two new gauges: noetl_worker_nats_consumer_pending (backlog the worker hasn't claimed yet) + noetl_worker_nats_consumer_ack_pending (currently in-flight); both labeled stream + consumer. New NatsSubscriber::consumer_lag() accessor wraps Consumer::info(). Cadence is WORKER_NATS_LAG_POLL_INTERVAL env (default 5s; clamped to ≥1s — sub-second polling burns JetStream RPC for no observability win). KEDA can now drive worker-pool autoscaling off the worker's own /metrics endpoint via prometheus-trigger queries — no separate exporter needed. Errors fetching consumer info log at WARN and don't crash the worker (metrics non-critical to the dispatch path). 4 new tests; one (nats_consumer_lag_gauges_emit_in_prometheus_text) locks in the wire format KEDA scrapes. Behaviour-additive → feat: → semantic-release auto-bumped 5.0.0 → 5.1.0. Closes noetl/worker#16. 5.1.0 noetl/worker#17
Kind validation fixes (2026-05-31) Three latent worker bugs surfaced during the end-to-end kind-validation pass against the live Python broker. Each had never fired before because the Rust worker had only been exercised against MockSource in unit tests + anonymous-NATS dev configs. (1) noetl/worker#19async_nats::connect(url) silently drops inline user:pass@ URL credentials; fix extracts them and feeds ConnectOptions::with_user_and_password. Also reads NATS_USER/NATS_PASSWORD env vars. (2) noetl/worker#21/api/worker/pool/{register,heartbeat,deregister} payloads aligned with the Python broker's RuntimeRegistrationRequest schema (name, component_type, runtime, etc.); pre-fix the worker died at startup with "Field required: body.name". (3) noetl/worker#23CommandNotification.command_id accepts numeric JSON (broker emits bigint snowflakes); custom deserializer stringifies to keep the in-memory String type stable. Combined cadence: 5.1.0 → 5.1.1 → 5.1.2. See noetl/ai-meta#30 validation summary. 5.1.1, 5.1.2 #19, #21, #23
call.done reference-only payload series (noetl/worker#24) Four-PR progression aligning the Rust worker's call.done payload with the Python broker's _validate_reference_only_payload contract (allowed keys at payload.result: {status, reference, context, command_id}). (1) #25 (5.1.3) — minimum-viable {status}-only emit; the kind-validation pass had been failing with "payload.result includes unsupported keys: data, duration_ms, exit_code, stderr, stdout". Restores forward progress at the cost of stripping all tool output from the event. (2) #26 (5.2.0) — restores data flow by emitting result.context (the broker permits inline context up to NOETL_EVENT_RESULT_CONTEXT_MAX_BYTES, default 100 KB). Downstream Jinja templates referencing {{ step.stdout }} work again for in-budget results. (3) #27 (5.2.1) — Rust-side pre-check against the budget so over-budget tool output is visibly dropped ({status}-only + WARN log carrying execution_id + context_bytes) rather than getting silently truncated by the broker. Locks INLINE_CONTEXT_MAX_BYTES = 102_400 in a regression test so the two sides stay in sync. (4) #28 (5.3.0, this entry) — closes the colocated-consumer slice via the new noetl-arrow-cache 0.1.0 crate (R-2.1). Worker::new constructs one Arc<ArrowIpcSharedMemoryCache> per process; over-budget contexts get staged via put_arrow_ipc with media_type = "application/json" and the returned IpcHint rides the event as result.reference. The {status}-only fallback still fires only when the cache put itself fails (out of space, name collision, etc.). Tests: build_call_done_result_stages_oversized_context_via_cache proves the reference path (no context, populated reference with media_type = application/json, valid shm_name + byte_length, and cache.used_bytes() reflects the staged bytes). Cross-node consumers (durable result_store path) stay as the follow-up on the same umbrella issue. 5.1.3, 5.2.0, 5.2.1, 5.3.0 #25, #26, #27, #28
Durable result-store cross-node slice (closes noetl/worker#24) Fifth + final PR in the call.done payload series. Over-budget contexts now PUT to the Python server's /api/result/{execution_id} endpoint (ControlPlaneClient::put_result) and the returned ResultRef (noetl://execution/<eid>/result/<step>/<id> URI + meta) rides the event as result.reference. Workers on a different K8s node can fetch the bytes via GET /api/result/resolve; colocated consumers keep the shm fast path via a nested ipc: IpcHint field on the ResultRef. Wire shape mirrors Python's noetl.core.storage.models.ResultRef exactly so consumer code that already assumes the kind: "result_ref" discriminator + ref/store/scope/meta/ipc fields works without producer discrimination. build_call_done_result is now async (#[tokio::test] for the branch tests) and has a 5-row fallback chain: inline → durable+ipc → durable-only → ipc-only (#28 shape) → status-only. Three new metrics on the global registry: noetl_worker_result_store_put_duration_seconds (histogram, success-only so p99 stays clean), noetl_worker_result_store_put_bytes_total + noetl_worker_result_store_put_errors_total (counters). Structured logs at INFO (success) / WARN (durable fail, shm fallback) / ERROR (combined fail) all carry execution_id per observability.md Principle 4. In-test axum mock of the result-store endpoint on a per-test random port lets the branch tests run without a real server. 68 → 69 lib tests pass. Worker-side credential scrubbing intentionally not added — the Python server's PUT /api/result endpoint already calls producer_scrub_payload at the boundary; worker-side scrubbing would only shorten the wire-transit window (follow-up). 5.4.0 #29
R-2.2: tabular tool outputs as Arrow IPC bytes in shm cache Bumps noetl-tools = "2.10" to pick up arrow_codec::try_encode_tabular_json (noetl/tools#7). In the over-budget branch of build_call_done_result, tries the tabular encoder first: Some(enc) → shm bytes are Arrow IPC stream, IpcHint.media_type = "application/vnd.apache.arrow.stream", IpcHint.row_count = enc.row_count, IpcHint.schema_digest = "arrow"; None → shm bytes stay JSON (unchanged from #28). Durable PUT stays JSON-only — the server's data: Any endpoint stores it as-is so cross-node consumers see no behaviour change. The 5-row fallback chain from #29 is preserved; only the shm payload encoding changes per-context. Colocated consumers switch on IpcHint.media_type to pick the right decoder (pyarrow.ipc.RecordBatchStreamReader for Arrow, JSON parse for the fallback) — mirrors the Python worker's same fast path. Tests: 69 → 70. New build_call_done_result_uses_arrow_ipc_for_tabular_tool_output exercises a 6000-row × 4-column DuckDB-shape payload (Int64 / Utf8 / Float64 / Boolean inference) and asserts that the Arrow IPC byte length is smaller than the JSON serialisation (the compression win is the whole point of R-2.2). Existing non-tabular tests still assert media_type = "application/json" so the fallback path is locked in. 5.5.0 #30
Producer-side credential scrubbing (closes a long-standing follow-up on #29) Adds src/scrub.rs — a Rust-side credential scrubber that runs once at the top of build_call_done_result so all three emit paths (inline result.context → broker event log, durable PUT → /api/result/, shm cache → colocated consumers) ride a scrubbed clone. Mirrors the shape Python's noetl.core.credential_refs.producer_scrub_payload uses: is_sensitive_key() matches against ~50 known-sensitive token names with case + separator normalisation + substring matching (so db_password matches against password); looks_like_secret_value() recognises Bearer / Basic / JWT / -----BEGIN PRIVATE KEY----- / vendor prefixes (sk-, sk-ant-, AIza, ya29., ghp_ / ghs_ / gho_ / ghu_ / ghr_, github_pat_, xox[bapsr]-) anywhere in a string preceded by a separator; scrub_in_place() walks JSON in place replacing matches with [REDACTED]. Deliberate skips: keychain-manifest-driven scrub (Rust worker doesn't propagate Python's _keychain_manifest yet; server-side scrub at the /api/result boundary catches the durable path) and broad "long alphanumeric" heuristic (false-positives on execution / event ids). Tests: 70 → 87. 15 new tests in scrub::tests (key matching, value patterns, false-positive guards, nested recursion, scrub_cloned non-mutation, DuckDB-shape rowset); 2 new integration tests in executor::command::tests proving the inline path AND the R-2.2 tabular path both emit credentials as [REDACTED]. 5.6.0 #31
CI fix release.yml trigger fix (add gh workflow run step in semantic-release.yml). Pipeline started self-healing from this point onward. (no version bump — fix-only) noetl/worker#4
CI fix actions: write + issues: write + pull-requests: write permissions in semantic-release.yml. Final piece of the pipeline self-heal. 1.1.2 (caught up the unpublished 1.1.0 + 1.1.1) noetl/worker#5

Sub-PRs in flight or pending

  • Event envelope reconciliation (cross-repo) — ✅ series complete after noetl/worker#11 landed (noetl-worker 3.0.0). All four producers / consumers now emit + accept the same wire format on /api/events:
    • PR-EE-1 ✅ noetl/cli (#37, noetl-executor 0.3.1): ExecutorEvent enriched with optional event_id / worker_id / meta + payload serde alias.
    • PR-EE-2 ✅ noetl/server Rust (#6, noetl-server 2.0.1 after pipeline-fix #7): EventRequest renamed nameevent_type with #[serde(alias = "name")]; payload accepts context alias; added optional event_id / status / created_at. See server-wiki event-envelope.
    • PR-EE-4 ✅ noetl/noetl Python (#639, noetl 3.0.0): EventEmitRequest accepts name / step / payload as validation aliases for event_type / node_name / context; worker_id lifted to top-level; EventType: Literal[...]str (the Literal was already dead code — production Python uses dot-notation event types throughout).
    • PR-EE-3 ✅ this repo (#11, noetl-worker 3.0.0): the switch documented in the row above. Tracked on noetl/ai-meta#30.
  • App-side snowflake event_id — ✅ landed in noetl/worker#14 (noetl-worker 4.0.0). src/snowflake.rs generates the id; layout matches the Python helper so Rust + Python ids share the same noetl.event.event_id bigint column.
  • meta.attempts propagation — ✅ landed in noetl/worker#15 (noetl-worker 5.0.0). Every emitted envelope now carries meta.attempts so retry behaviour rides the event log end-to-end; projectors no longer need to reach back into worker logs to compute per-step retry distributions.
  • NATS consumer lag metric — ✅ landed in noetl/worker#17 (noetl-worker 5.1.0). Periodic poll task updates the noetl_worker_nats_consumer_pending + noetl_worker_nats_consumer_ack_pending gauges; KEDA prometheus-trigger queries the worker's own /metrics endpoint for autoscaling.
  • call.done reference-only contract (noetl/worker#24) — ✅ closed. Five PRs (#25 → #26 → #27 → #28 → #29) landed the full progression: status-only → inline context → Rust-side budget check → colocated shm reference (R-2.1) → durable result-store cross-node reference. Final wire shape is a 5-row fallback chain (build_call_done_result in src/executor/command.rs): inline context for ≤ 100 KB; ResultRef-shaped reference with optional nested ipc: IpcHint for over-budget; IpcHint-only when durable fails; {status} only when everything fails. Mirrors Python's noetl.core.storage.models.ResultRef exactly so consumers don't discriminate by producer.

The R-1.2 worker chunk + every R-1.2 follow-up is done. R-2.1 (shm cache + durable result store) and R-2.2 (tabular outputs as Arrow IPC) are both fully landed on the worker side, and the producer-side credential scrub now runs on all three emit paths (inline / durable / shm). Remaining open work: live kind-cluster validation of R-2.1 + R-2.2 + credential scrub end-to-end with a DuckDB / Postgres playbook producing > 100 KB tabular output. Next major Appendix H phase after that: R-2.3 (Arrow Flight gRPC endpoint for high-volume cross-node fetch, replacing the JSON-over-HTTP /api/result/resolve path for tabular data), tracked on noetl/ai-meta#30.

Companion sub-PRs on noetl/cli

These ship via noetl/cli's release pipeline; the worker picks them up by bumping the noetl-executor semver requirement.

Sub-PR Scope crates.io version
R-1.2 PR-1 Publish noetl-executor 0.1.0 to crates.io; fix the cli's release pipeline. noetl/cli#32 0.1.0
R-1.2 PR-2a Align execution_id to i64 across events, runtime, worker::source. Breaking → 0.2.0. noetl/cli#33 0.2.0
R-1.2 PR-2b Add structured Condition + 12-variant Operator + evaluate_structured_condition to condition. Minor → 0.2.1. noetl/cli#34 0.2.1
R-1.2 PR-2d-1 Redesign worker::source::CommandSource trait with ack lifecycle + 4-state ClaimOutcome + Pulled<H> wrapper + associated AckHandle type. Enriches Command with render_context + attempts. Adds in-crate MockSource test helper. Breaking → 0.3.0. Bin auto-bumped to noetl 4.0.0. See the cli wiki's executor-crate-architecture page for the design-decisions table. noetl/cli#35 0.3.0

Versioning policy

The worker pins noetl-executor = "0.2" (semver-compatible to any 0.2.x). Patch releases of noetl-executor are picked up automatically on the next worker build; minor / major bumps require an explicit Cargo.toml change in the worker.

The cli + worker release cadences are independent. Worker is currently at 1.1.0 (one minor release ahead of 1.0.0); cli is at 3.1.0.

Related