Skip to content

noetl executor adoption

Kadyapam edited this page May 31, 2026 · 14 revisions

noetl-executor adoption

Companion to the cli wiki's executor-crate-architecture page from the worker side. Documents which surfaces the worker imports from noetl-executor, which stay worker-local, and the sub-PR landing history.

Why the worker depends on noetl-executor

§ H.10 of the Rust migration roadmap established that the noetl CLI and noetl-worker are fundamentally different control loops — recursive tree walker vs pull-model consumer — but share the same:

  • YAML playbook types (Step, Tool, NextFormat, …)
  • Template rendering rules ({{ workload.x }} substitution)
  • Case-condition operator semantics (Eq / Gt / Contains / Matches / …)
  • Event envelope shape (ExecutorEvent mirrors Python's noetl.event table)
  • Tool dispatch bridge onto the noetl-tools registry

noetl-executor is the home for that shared surface. The CLI ships it as a workspace member crate in noetl/cli; the worker depends on it via crates.io.

What the worker imports

Surface Used by worker module Notes
noetl_executor::condition::Operator executor::case_evaluator 12-variant enum re-exported for backward compatibility.
noetl_executor::condition::Condition executor::case_evaluator Structured { left, op, right } envelope.
noetl_executor::condition::evaluate_structured_condition executor::case_evaluator::CaseEvaluator::evaluate_conditions Per-condition evaluation.

Adopted in R-1.2 PR-2d-2 (noetl/worker#6, noetl-worker 2.0.0):

Surface Worker call site Notes
noetl_executor::worker::source::CommandSource new nats::source::NatsCommandSource (wraps NatsSubscriber + ControlPlaneClient + worker_id) Trait the wrapper implements; Worker::process_commands drives through source.next() + source.ack() / source.nack().
noetl_executor::worker::source::ClaimOutcome NATS source next() return + Worker::process_commands match arms 4-state enum (Claimed(Command) / AlreadyClaimed / RetryLater(String) / Failed(String)) — maps 1:1 onto the worker's pre-PR-2d-2 ClaimResult.
noetl_executor::worker::source::Pulled<H> NATS source next() return Generic wrapper { outcome, ack: H } — H = worker-local NatsAckHandle { message, notification } (NOT bare Message). See "AckHandle design" below.
noetl_executor::worker::source::Command translated from crate::client::Command at the source seam; consumed by CommandExecutor::execute(&Command) Enriched in 0.3.0 with render_context: HashMap<String, Value> + attempts: u32. Worker's Command.render_context() / meta.attempts map cleanly.

AckHandle design — why NatsAckHandle { message, notification } not bare Message

The executor's ClaimOutcome doesn't carry execution_id / command_id / step on the AlreadyClaimed / RetryLater(String) / Failed(String) variants — only on Claimed(Command). But observability.md Principle 4 says every WARN/ERROR carries execution_id as a structured field.

Resolved by embedding the CommandNotification in the source's AckHandle:

pub struct NatsAckHandle {
    pub message: Message,
    pub notification: CommandNotification,
}

impl CommandSource for NatsCommandSource {
    type AckHandle = NatsAckHandle;
    // ...
}

The notification metadata rides through the pull lifecycle alongside the NATS message handle, so Worker::process_commands has execution_id / command_id / step available on every variant's log line without forcing another noetl-executor breaking change.

Lossless WorkerCommand → ExecutorCommand translation

Worker field Executor field Notes
command_id() (from meta) command_id Worker computes a fallback if meta is missing.
execution_id execution_id Already i64 on both sides since R-1.2 PR-2a.
node_name step Worker's step() accessor returns &node_name.
action tool_kind E.g. "http", "postgres", "rhai".
context (full JSON) input Carries tool_config + cases + args + nested config. CommandExecutor::execute extracts each section separately.
render_context() render_context HashMap<String, Value> on both sides.
meta.attempts attempts Defaults to 0 if missing.

The translate_carries_full_context_as_input_including_cases unit test in src/nats/source.rs locks in that nothing gets dropped at the seam — input carries the worker's entire context JSON so cases extraction still works via command.input.get("cases").

What stays worker-local

Worker-specific shapes that don't belong in the shared crate:

Worker-local surface Why
worker::Worker Pull-loop control flow. Claim, dispatch, ack/nack — pull-model semantics that the CLI's tree walker doesn't share.
nats::subscriber::NatsSubscriber NATS JetStream binding. CLI has no NATS surface.
client::control_plane::ControlPlaneClient HTTP client to the Python control plane. CLI uses local YAML, not a server.
events::emitter::EventEmitter Retry logic over ControlPlaneClient.emit_event. Worker-specific retry policy.
executor::case_evaluator::Case / CaseAction / CaseResult / CaseEvaluator Pull-loop control flow. CaseAction::{Continue, Exit, SetVar, Goto, Retry, Fail} are dispatch-side decisions the CLI's tree walker handles differently.
client::control_plane::WorkerEvent Wire format to the Python server's /api/events endpoint. Diverges from noetl_executor::events::ExecutorEvent; reconciliation is a tracked cross-repo follow-up.

R-1.2 sub-PR landing history (worker side)

Sub-PR Scope noetl-worker version PR
R-1.2 PR-2c First worker PR depending on noetl-executor. Adds noetl-executor = "0.2" dep; replaces inline Operator + Condition + per-condition resolution helpers with re-exports + delegation to evaluate_structured_condition. Keeps pull-loop control flow (Case / CaseAction / CaseResult / CaseEvaluator) per § H.10. case_evaluator.rs 437 → 344 LoC (-93 net); 3 new tests lock in worker contract. 1.1.0 → 1.1.2 (after pipeline fix) noetl/worker#2
R-1.2 PR-2d-2 Final worker-side R-1.2 sub-PR. Bumps noetl-executor = "0.3"; new src/nats/source.rs (+312 LoC) with NatsCommandSource impl-ing the trait. CommandExecutor::execute refactored to take the executor's Command. Worker::process_commands driven through source.next() + source.ack/nack(). Lossless WorkerCommand → ExecutorCommand translation locked in by 8 unit tests. Observability addendum: nats.pull + command.execute spans per observability.md Principle 1; execution_id structured field on every WARN/ERROR per Principle 4 (enabled by NatsAckHandle { message, notification } design). 2.0.0 (major bump from feat!: prefix) noetl/worker#6
R-1.2 PR-2e Prometheus metrics harness + /metrics endpoint per observability.md Principle 2. New src/metrics.rs (+358 LoC) defines a lazy-init global registry with 7 metrics: noetl_worker_pulls_total{outcome}, noetl_worker_pull_duration_seconds, noetl_worker_dispatch_duration_seconds{tool_kind}, noetl_worker_dispatch_errors_total{tool_kind}, noetl_worker_event_emit_duration_seconds{event_type}, noetl_worker_event_emit_retries_total{event_type}, noetl_worker_concurrent_dispatches. New src/metrics_server.rs (+167 LoC) exposes them via axum on dedicated port (default 0.0.0.0:9090, WORKER_METRICS_BIND env override). Instrumented at NatsCommandSource::next, CommandExecutor::execute (3 exit paths: success / tool error / case-fail), CommandExecutor::emit_event, and the worker semaphore. outcome_label(&ClaimOutcome) derives the label string from the enum directly — no string literals at call sites. README extended with observability section. Closes noetl/ai-meta#32. 2.1.0 (minor bump from feat: prefix) noetl/worker#8
R-1.2 PR-EE-3 Last PR in the four-PR cross-stack event envelope reconciliation series (after EE-1 / EE-2 / EE-4 on noetl/cli / noetl/server / noetl/noetl). Replaces the worker-local WorkerEvent (3 fields — event_type + execution_id + payload) with ExecutorEvent re-exported from noetl_executor::events (9 fields — adds top-level step, status, created_at, context plus optional event_id, worker_id, meta). EventEmitter now carries worker_id so every envelope is stamped at the source per observability.md Principle 4; private build_event helper stamps created_at = Utc::now(). All 7 helper methods + CommandExecutor::emit_event take step + status parameters; status derives from lifecycle (STARTED / COMPLETED / FAILED) except step.exit (takes case-defined status) and command.completed (takes tool terminal status). event_id left as None for now — server's gen_snowflake() DB default fires; app-side snowflake generation tracked as a follow-up. 4 new tests lock in the wire shape (mirrors Python's TestFullExecutorEnvelopeRoundTrips on the broker side). Breaking → feat!: → major bump 2.1.0 → 3.0.0. Closes noetl/worker#10. 3.0.0 noetl/worker#11
CI fix release.yml trigger fix (add gh workflow run step in semantic-release.yml). Pipeline started self-healing from this point onward. (no version bump — fix-only) noetl/worker#4
CI fix actions: write + issues: write + pull-requests: write permissions in semantic-release.yml. Final piece of the pipeline self-heal. 1.1.2 (caught up the unpublished 1.1.0 + 1.1.1) noetl/worker#5

Sub-PRs in flight or pending

  • Event envelope reconciliation (cross-repo) — ✅ series complete after noetl/worker#11 landed (noetl-worker 3.0.0). All four producers / consumers now emit + accept the same wire format on /api/events:
    • PR-EE-1 ✅ noetl/cli (#37, noetl-executor 0.3.1): ExecutorEvent enriched with optional event_id / worker_id / meta + payload serde alias.
    • PR-EE-2 ✅ noetl/server Rust (#6, noetl-server 2.0.1 after pipeline-fix #7): EventRequest renamed nameevent_type with #[serde(alias = "name")]; payload accepts context alias; added optional event_id / status / created_at. See server-wiki event-envelope.
    • PR-EE-4 ✅ noetl/noetl Python (#639, noetl 3.0.0): EventEmitRequest accepts name / step / payload as validation aliases for event_type / node_name / context; worker_id lifted to top-level; EventType: Literal[...]str (the Literal was already dead code — production Python uses dot-notation event types throughout).
    • PR-EE-3 ✅ this repo (#11, noetl-worker 3.0.0): the switch documented in the row above. Tracked on noetl/ai-meta#30.
  • App-side snowflake event_id — current worker code passes event_id: None on every emit; the server's gen_snowflake() DB default fires. observability.md Principle 3 wants application-side generation so the id exists at span-creation time. Follow-up after PR-EE-3.
  • meta.attempts propagationCommand.attempts exists on the executor's 0.3.0 wire shape but outgoing events leave meta: None. Surface this through meta in a follow-up so retry behaviour is grep-able from the event log.
  • NATS consumer lag metric — needs periodic poll task against JetStream consumer info API. Follow-up from PR-2e.

Companion sub-PRs on noetl/cli

These ship via noetl/cli's release pipeline; the worker picks them up by bumping the noetl-executor semver requirement.

Sub-PR Scope crates.io version
R-1.2 PR-1 Publish noetl-executor 0.1.0 to crates.io; fix the cli's release pipeline. noetl/cli#32 0.1.0
R-1.2 PR-2a Align execution_id to i64 across events, runtime, worker::source. Breaking → 0.2.0. noetl/cli#33 0.2.0
R-1.2 PR-2b Add structured Condition + 12-variant Operator + evaluate_structured_condition to condition. Minor → 0.2.1. noetl/cli#34 0.2.1
R-1.2 PR-2d-1 Redesign worker::source::CommandSource trait with ack lifecycle + 4-state ClaimOutcome + Pulled<H> wrapper + associated AckHandle type. Enriches Command with render_context + attempts. Adds in-crate MockSource test helper. Breaking → 0.3.0. Bin auto-bumped to noetl 4.0.0. See the cli wiki's executor-crate-architecture page for the design-decisions table. noetl/cli#35 0.3.0

Versioning policy

The worker pins noetl-executor = "0.2" (semver-compatible to any 0.2.x). Patch releases of noetl-executor are picked up automatically on the next worker build; minor / major bumps require an explicit Cargo.toml change in the worker.

The cli + worker release cadences are independent. Worker is currently at 1.1.0 (one minor release ahead of 1.0.0); cli is at 3.1.0.

Related

Clone this wiki locally