Skip to content

executor crate architecture

Kadyapam edited this page Jun 1, 2026 · 24 revisions

noetl-cli workspace crates

Since v2.17.1+ the CLI is a Cargo workspace. The root crate (noetl, producing the noetl and ntl binaries) is unchanged; sibling crates host shared infrastructure that other repos (noetl-worker, noetl-server) depend on.

This page documents the workspace layout + each member crate's purpose + the deliberate architectural decision that the CLI keeps its own control loop rather than sharing one with the worker (§ H.10).

Workspace members

Crate Roadmap phase Purpose
noetl (root) The CLI binary. Produces noetl + ntl.
noetl-executor R-1.1 Shared execution utilities — YAML playbook types, template engine, condition evaluator, CommandSource trait, EventSink trait. Adopted by noetl-worker.
noetl-arrow-cache R-2.1 Same-node zero-copy IPC cache. Rust mirror of Python's ArrowIpcSharedMemoryCache; producers push Arrow IPC byte streams into POSIX shared memory, colocated consumers attach via JSON IpcHint.
noetl-arrow-flight-client R-2.3 Phase B Rust consumer for the noetl-server's Arrow Flight do_get endpoint. FlightResolver::resolve(ref_uri) turns a noetl://... URI into typed RecordBatches over gRPC, with zero JSON serialisation in the consumer leg. Pairs with the Python server in noetl/noetl#643.

Source layout

repos/cli/
  Cargo.toml                    # workspace root + noetl binary
  src/
    main.rs
    playbook_runner.rs          # CLI tree walker (control loop stays here)
    ...
  executor/                     # workspace member: noetl-executor
    Cargo.toml
    src/
      lib.rs
      playbook.rs               # YAML playbook types
      template.rs               # render_template + Rhai/JSON helpers
      condition.rs              # evaluate_condition + evaluate_rhai_condition
      capabilities.rs           # validate_capabilities + ValidationReport
      runtime.rs                # ExecutionContext + CredentialResolver trait
      events.rs                 # EventSink trait + ExecutorEvent + EventEmitter
      tools_bridge.rs           # noetl-tools registry bridge (scaffold)
      worker/
        mod.rs
        source.rs               # Command + CommandSource (worker-only)
  arrow-cache/                  # workspace member: noetl-arrow-cache (R-2.1)
    Cargo.toml
    README.md
    src/
      lib.rs                    # ArrowIpcSharedMemoryCache + IpcHint re-exports
      hint.rs                   # IpcHint (JSON shape matches Python 1:1)
      cache.rs                  # POSIX shm cache; put/get/delete/sweep_expired
  arrow-flight-client/          # workspace member: noetl-arrow-flight-client (R-2.3 Phase B)
    Cargo.toml
    README.md
    src/
      lib.rs                    # FlightResolver + FlightError typed variants
      tests.rs                  # in-test Rust Flight server stub for round-trip

noetl-arrow-cache

Same-node zero-copy IPC for the NoETL columnar data plane — R-2.1 of the Rust migration roadmap (Appendix H of the global hybrid cloud blueprint).

What it gives you: a small in-process cache keyed by an IpcHint token. Producers push Arrow IPC byte streams into shared memory; colocated consumers read them back without going through the durable storage path. The hint is JSON-serialisable and crosses process boundaries through any transport; the bytes themselves stay on the same machine.

Cross-stack compatibility: byte-for-byte mirror of Python's ArrowIpcSharedMemoryCache in noetl/core/storage/ipc_cache.py.

  • IpcHint JSON shape matches the Pydantic model 1:1 (same kind / shm_name / schema_digest / byte_length / row_count / producer / node_id / lease_expires_at / media_type fields).
  • POSIX shm primitive on both sides (Python via multiprocessing.shared_memory.SharedMemory, Rust via the shared_memory crate — same shm_open + mmap syscalls).
  • shm_name format {namespace[:12]}_{stamp:8-hex}_{digest:8-hex} matches Python exactly.
  • Namespace sanitisation mirrors Python's [^A-Za-z0-9_] regex.

Public surface:

API Purpose
ArrowIpcSharedMemoryCache::new() / with_config(CacheConfig) Construct. Defaults read NOETL_IPC_CACHE_BUDGET_BYTES, NOETL_NODE_ID / NODE_NAME / K8S_NODE_NAME / HOSTNAME env vars (same as Python).
put_arrow_ipc(payload, schema_digest, row_count, lease_seconds)IpcHint Copy bytes into shm, register a lease, return the hint to give the consumer.
get(&IpcHint)Vec<u8> Read bytes back. Errors on expired lease + cross-node hint.
delete(name)bool Unlink the shm region. Idempotent.
sweep_expired(now, grace_seconds)usize Lazy GC; piggy-backed on put.
used_bytes()u64 Live-byte accounting.

Where it fits in the data plane:

  • Producer step: an noetl-tools step that produces a large RecordBatch (DuckDB / Postgres / HTTP CSV / etc.) calls noetl_tools::arrow_codec::encode_record_batch to get the IPC bytes, then cache.put_arrow_ipc(...) to publish them. The returned IpcHint rides the event log alongside the durable result_uri.
  • Consumer step (colocated, same node): reads the hint, calls cache.get(&hint), decodes via noetl_tools::arrow_codec::decode_record_batches. No network round-trip, no disk I/O.
  • Consumer step (different node): sees the hint's node_id doesn't match its own, falls back to the durable copy via the normal result_uri path.

The Arrow Flight gRPC endpoint on noetl-worker (R-2.2) is the follow-up that exposes the same bytes over network for non-colocated consumers.

What this crate is NOT:

  • Not a durable store — entries evict on lease expiry + budget pressure. The durable copy is the authority; this cache is acceleration for the colocated hot path.
  • Not an Arrow IPC codec — encoding lives in noetl_tools::arrow_codec; this crate moves opaque bytes.
  • Not network-aware — IpcHint.node_id tells consumers to fall back when the producer is on a different node.

noetl-arrow-flight-client

Rust consumer for the noetl-server's Arrow Flight do_get endpoint — R-2.3 Phase B of the Rust migration roadmap. Pairs with the Python server in noetl/noetl#643 (Phase A); the wire-format contract is documented on the noetl wiki at arrow_flight_result_fetch.

What it gives you: an async FlightResolver that turns a noetl://execution/<eid>/result/<step>/<id> URI into typed RecordBatches over gRPC. Zero JSON serialisation in the consumer leg; the wire format matches the same Arrow IPC stream the worker stages in shm on the R-2.2 colocated fast path, so cross-node + same-node consumers share a single decoder.

use noetl_arrow_flight_client::{FlightResolver, FlightError};

let resolver = FlightResolver::connect("grpc://noetl.noetl.svc.cluster.local:8083").await?;
match resolver.resolve(ref_uri).await {
    Ok(batches) => /* zero-copy Arrow RecordBatch ... */,
    Err(FlightError::NonTabular { .. }) => /* fall back to HTTP /api/result/resolve */,
    Err(FlightError::Server { .. }) => /* hard error */,
    Err(FlightError::Transport { .. }) => /* network */,
}

A resolve_rows(ref_uri) convenience flattens batches into Vec<serde_json::Value> for callers that want JSON-shaped row dicts; prefer resolve when columnar access is enough.

Boundary discipline: the resolver is a thin RPC client. All scrubbing + tier dispatch + auth happen server-side; the credential scrub the server applies (per the R-2.x scrubber) round-trips through this client unchanged.

Phase scope:

  • ✅ Phase A — Python server do_get (noetl/noetl#643).
  • ✅ Phase B — this crate (Rust client + tests, noetl/cli#41).
  • Worker call-site wiring deferred until a concrete cross-node consumer surfaces (the worker currently RECEIVES inputs from the broker rather than fetching them; the first real caller is more likely to be the Rust noetl-server once it gains its own result-store backend).
  • Phase C planned: multi-endpoint FlightInfo for sharded result tiers, mTLS + token auth for non-cluster-internal callers.

noetl-executor

What lives in noetl-executor and why

Module Purpose Used by
playbook Pydantic-like YAML types: Playbook, Step, Tool, NextFormat, RuntimeCapabilities, etc. All field accessors pub. CLI + worker
template render_template, render_template_with_result, get_json_path, json_to_rhai, rhai_to_json_string. Takes &HashMap<String, String> views of the per-execution variables + step results so each binary owns its own context shape. CLI + worker
condition evaluate_condition (simple {{ a == b }} / 'in' / truthy) and evaluate_rhai_condition (full Rhai expression eval). Same context-view contract as template. CLI + worker
capabilities validate_capabilities + ValidationReport / ValidationError. Pure function: returns the report rather than bail!ing so the CLI can format errors with playbook_path and the worker with execution_id. CLI + worker
runtime ExecutionContext (executor-side variant with async step_results + Arc<dyn CredentialResolver>); CredentialResolver trait. worker (CLI uses its own ExecutionContext in playbook_runner.rs)
events ExecutorEvent (mirrors the Python noetl.runtime.events.report_event envelope), EventSink trait, NoopSink, EventEmitter. CLI + worker
tools_bridge Scaffold for replacing the CLI's inline tool implementations with calls into the noetl-tools registry. Filled in incrementally per Strategy B (one tool kind per sub-PR). CLI (worker already uses noetl-tools directly)
worker::source Command envelope + CommandSource trait. Worker-only: the CLI's tree walker doesn't consume this. worker (R-1.3)

What does NOT live in noetl-executor

Where it lives Why
The CLI's recursive tree walker (run, execute_step, execute_next_steps, execute_router_arcs) repos/cli/src/playbook_runner.rs Natural fit for local YAML execution; flattening into a pull-model iterator would lose local-debug clarity
The CLI's inline tool implementations (execute_tool, execute_shell_command, execute_http_request, execute_duckdb_query, etc.) repos/cli/src/playbook_runner.rs (today) Migrated incrementally to noetl-tools registry via tools_bridge per Strategy B; ~870 LoC of inline tool dispatch
The worker's NATS pull loop repos/worker/src/ Different shape than tree walker; tied to NATS durable-consumer semantics
RunOutcome (the CLI's JSON output envelope) repos/cli/src/playbook_runner.rs Not a YAML input type; worker has a different output envelope (event-log writes)

Architectural finding (§ H.10 of Appendix H)

Mid-implementation discovery, documented in § H.10 of the global hybrid cloud blueprint:

  • The CLI is a recursive tree walker. It loads the YAML, walks the workflow, evaluates next arcs / case conditions / then blocks in place, dispatches each step inline. Control flow is the call stack.
  • The worker is a pull-model consumer. It subscribes to a NATS durable consumer, pulls one command at a time, executes it, emits events, repeats. No tree. No recursion.
  • The original "unified CommandSource trait — both binaries plug in their own impl" was the wrong abstraction for the CLI. Flattening the tree walker into a pull-model iterator loses local-debug clarity, complicates case / then state management, and breaks integration tests written against the tree shape.

noetl-executor was re-scoped from a control-loop crate to a utilities-and-types crate. The CLI's tree walker stays. The worker's pull loop stays. Both call into the executor for the same template rendering, condition evaluation, credential resolution, capability validation, and event shape.

Cross-references

Topic Where
The migration roadmap Appendix H of the global hybrid cloud blueprint
The architectural finding § H.10 of the same doc
The Polars-pattern endpoint (pip install noetl ships Rust runtime + Python wrapper) § H.9
Apache Arrow data plane § H.4 + § H.11 (local-mode Feather buffer)
Tracking issues noetl/cli#19 (this CLI's R-1.1 sub-issue) · noetl/ai-meta#30 (umbrella)

Sub-PR landing history

Sub-PR Scope PR
R-1.1 PR-1 Crate skeleton (lib.rs, runtime, events, source/dispatch placeholders) #20
R-1.1 PR-2a YAML types → noetl-executor::playbook #21
R-1.1 PR-2b Utilities (template + condition + capabilities); § H.10 restructure (placeholder LocalPlaybookSource removed; CommandSource → worker::source) #22
R-1.1 PR-2c-1 noetl-tools = "2.8.7" dep + tools_bridge scaffold #23
R-1.1 PR-2c-2 tools_bridge adapters: BridgeContext, to_tools_context, to_tools_config (all 8 Tool variants), from_tools_result; dispatch_via_registry becomes async with per-tool-kind match arms #24
R-1.1 PR-2c-3 Tool::Rhai arm wired through noetl-tools::RhaiTool; new to_tools_context_for_rhai helper groups CLI flat variables into nested Maps for Rhai field access; CLI's inline execute_rhai_script + rhai_http_request deleted (~220 LoC) #25
R-1.1 PR-2c-4 Tool::Shell arm wired through noetl-tools::ShellTool; per-command dispatch loop preserves CLI's "fresh bash invocation per command" semantics; new shell_command_config(&str) helper; CLI's inline execute_shell_command deleted (~79 LoC) #26
R-1.1 PR-2c-5 Tool::Http arm wired through noetl-tools::HttpTool; new resolve_auth_to_bearer helper (CLI AuthConfig → Bearer token via noetl-tools::auth::GcpAuth); new http_tool_config helper that injects the Bearer header into request headers; new reshape_http_result helper that maps noetl-tools' {status_code, headers, body} envelope back to the CLI's pre-existing {status, body} shape so playbook steps keep branching on <step>.body.status. Tool::Auth arm also routes through resolve_auth_to_bearer so both paths share the GCP ADC code path. CLI's inline execute_http_request + get_auth_token deleted (~148 LoC) #27
R-1.1 PR-2c-6 Tool::DuckDb arm wired through noetl-tools::DuckdbTool; new duckdb_tool_config helper (translates CLI's Vec<String> params to noetl-tools' Vec<serde_json::Value> and maps dbdb_path); new reshape_duckdb_result helper unwraps noetl-tools' {columns, rows, row_count} envelope back to the CLI's pre-existing rows-array shape (and {affected_rows} back to {"status": "ok"} for non-SELECT). Empty / whitespace-only query short-circuits to an empty outcome, mirroring the CLI's existing guard. Path resolution + mkdir -p stay at the CLI call site since the bridge has no knowledge of the playbook directory. CLI's inline execute_duckdb_query deleted (~55 LoC). Feature gain: params that were silently ignored (_params: &[String]) are now bound at ? placeholders #28
R-1.1 PR-2c-7 Codifies the § H.10 finding for Tool::Playbook — sub-playbook execution stays in the CLI's tree walker (PlaybookRunner::new(path).run() is the recursion case; the bridge cannot replace it without re-opening § H.10). Replaces the silent BridgeOutcome::empty() stub with anyhow::bail! naming § H.10 so accidental misuse is loud rather than silent. Variable preparation (parent merge + DSL v2 input: / DSL v1 args: rendering + workload. prefix) DID move into a new noetl_executor::tools_bridge::prepare_sub_playbook_vars helper — pure, reusable, testable. CLI's call site uses the helper but keeps its PlaybookRunner recursion inline. No semantic divergences (no execution path changed) #29
R-1.1 PR-2c-8 Final substantive sub-PR. Codifies that Tool::Auth and Tool::Sink stay inline by design but extracts the shareable pure logic into bridge helpers: auth_context_updates(provider, token, project) (replaces inline set_variable calls; preserves CLI's pre-PR-2c-8 ordering), format_sink_payload(format, raw) (json passthrough / yaml dump / csv conversion), and json_to_csv(json_str) (lifted verbatim from CLI). Dispatch arms for both kinds bail loudly with helper-pointing messages. CLI's inline json_to_csv deleted (~42 LoC). Removes the dispatch_via_registry_returns_empty_for_unwired_kind test — every Tool variant now either dispatches through the registry, bails with a § H.10 finding, or bails as unsupported. GCS → object_store migration tracked as a separate follow-up (R-2.x scope, not R-1.x). No semantic divergences #30
R-1.1 PR-2d Wrap-up PR. Adds executor/tests/dispatch.rs with 12 end-to-end integration tests covering dispatch_via_registry for Rhai / Shell / DuckDb (full registry round-trip) plus the bail-loudly paths for Playbook / Auth / Sink / Unsupported. Adds executor/README.md summarising the crate's scope, module layout, and § H.10 rationale. Updates the crate's lib.rs top-level docs to include tools_bridge in the module-layout list with its full surface (helpers + which kinds bail vs dispatch). Closes noetl/cli#19 #31

R-1.1 is complete after PR-2d ships. See § H of the global hybrid cloud blueprint for the wider R-1.x → R-5 roadmap. Next phases: R-1.2 (worker NATS pull loop adopts noetl-executor), R-1.3 (worker depends on the crate end-to-end), R-2 (Apache Arrow data plane), R-3 (object_store integration including the GCS sink migration deferred from PR-2c-8).

Documented semantic divergences (per-PR cross-reference)

Each tool replacement that crosses a behaviour line documents the deltas in the PR body and here.

PR Surface CLI behaviour (pre-replacement) noetl-tools behaviour User-visible impact
#25 rhai timestamp() chrono::Local::now().format("%H:%M:%S")"14:23:45" chrono::Utc::now().timestamp().to_string()"1716847425" Scripts that displayed timestamp() for human reading need updating
#25 rhai HTTP helpers (http_get, http_post, http_delete, *_auth) curl subprocess Direct reqwest calls Same surface; different error shape on network failures
#25 rhai get_gcp_token gcloud auth print-access-token shellout gcp_auth crate (workload-identity aware) Better on GKE pods; equivalent on hosts with gcloud configured
#26 shell stdout streaming Line-by-line to terminal as command runs Collected; returned at end Breaks real-time output UX for long-running shell steps — users see nothing until command completes
#26 shell result envelope Captured stdout string data: {exit_code, stdout, stderr} JSON Transparent — bridge unwraps data["stdout"] and trims trailing \n
#27 http transport curl subprocess reqwest direct Same envelope on success; different error shape on transport failure (anyhow message vs curl exit code)
#27 http auth (GCP ADC) gcloud auth print-access-token shellout gcp_auth crate Better on GKE pods (workload-identity aware); equivalent on hosts with gcloud configured
#27 http JSON body Sent verbatim via curl -d; caller had to set Content-Type Auto-detected as JSON; reqwest sets Content-Type Most callers were already sending JSON with Content-Type: application/json; transparent
#27 http result envelope {"status": int, "body": <json-or-string>} data: {"status_code": int, "headers": {...}, "body": <json>} Transparent — bridge's reshape_http_result maps back to the CLI's {status, body} shape so 4xx/5xx come back inside the envelope (not as anyhow errors)
#27 Tool::Auth GCP token gcloud auth print-access-token shellout (separate code path from rhai get_gcp_token) gcp_auth crate via shared resolve_auth_to_bearer helper Both Tool::Http and Tool::Auth now share the GCP ADC code path — eliminates the divergence between the two
#28 duckdb SELECT result envelope JSON array of row objects (pretty-printed) data: {"columns": [...], "rows": [...], "row_count": N} Transparent — bridge's reshape_duckdb_result maps back to the CLI's rows-array shape
#28 duckdb non-SELECT result envelope Literal string {"status": "ok"} data: {"affected_rows": N} Transparent — bridge maps back; affected_rows dropped (CLI never exposed it)
#28 duckdb params binding _params: &[String] accepted but silently ignored Bound as JSON values at ? placeholders Feature gain — playbooks that intended their params: field would now see them applied (no breakage for playbooks with stale params + no ? placeholders since DuckDB ignores extra params)
#28 duckdb path resolution + mkdir resolve_duckdb_path + fs::create_dir_all(parent) in execute_duckdb_query Open as-given, no mkdir CLI keeps owning these at the call site before dispatch; bridge has no knowledge of the playbook directory

Stats — R-1.1 complete (after PR-2d)

  • playbook_runner.rs: 2,688 → 1,606 lines (-1,082 net across PR-2a + PR-2b + PR-2c-3 + PR-2c-4 + PR-2c-5 + PR-2c-6 + PR-2c-7 + PR-2c-8)
  • New code in noetl-executor: 7 modules + 1 worker submodule + integration tests, ~3,500 LoC
  • noetl-executor unit tests: 0 → 80 across PR-1 + PR-2a + PR-2b + PR-2c-1 → PR-2c-8
  • noetl-executor integration tests: 0 → 12 (PR-2d)
  • Workspace-wide tests: 174 passing (80 noetl-executor unit + 12 noetl-executor integration + 41 noetl + 41 ntl)
  • Tool kinds wired through the registry: 4 (Rhai, Shell, Http, DuckDb query)
  • Tool kinds staying inline per § H.10 with helpers extracted: 3 (Playbook, Auth, Sink)

R-1.2 — publishing + worker adoption

After R-1.1 wrapped, the crate was published to crates.io and noetl-worker began adopting it. The cli's release pipeline had broken silently during R-1.1 (the bin's path-only dep on the unpublished noetl-executor failed cargo publish validation); R-1.2 PR-1 fixed that as the opening move.

Sub-PR landing history (noetl/cli)

Sub-PR Scope crates.io PR
R-1.2 PR-1 noetl-executor published with publish = true + crates.io metadata; bin's dep becomes { path = "executor", version = "0.1" }; release workflow publishes noetl-executor before the bin. Unblocks the noetl release pipeline that had been failing for 3 consecutive runs (bin was stuck at 2.17.1 while local Cargo.toml was at 2.24.0). noetl-executor 0.1.0 (2026-05-30T06:04Z); noetl 2.24.0 (2026-05-30T06:14Z, first publish since 2.17.1) #32
R-1.2 PR-2a Align all execution_id representations to i64 across events::ExecutorEvent, events::EventEmitter, runtime::ExecutionId (was pub struct ExecutionId(pub String) → now pub type ExecutionId = i64), and worker::source::Command. Required because the worker's CommandNotification.execution_id + WorkerEvent.execution_id + Python's noetl.event.execution_id / noetl.command.execution_id bigint columns all use i64 — the executor's String shapes were 0.1.x placeholders that became load-bearing once the worker started adopting. Breaking change → minor bump 0.1.0 → 0.2.0. Semantic-release picked up the "Breaking change" language and bumped the bin to noetl 3.0.0. noetl-executor 0.2.0 (2026-05-30T06:34Z); noetl 3.0.0 (2026-05-30T06:37Z) #33
R-1.2 PR-2b Add structured Condition + 12-variant Operator (Eq / Ne / Gt / Lt / Gte / Lte / Contains / Matches (regex) / Truthy / Falsy / In / NotIn) + evaluate_structured_condition(cond, ctx, result) to executor::condition alongside the existing template-style helpers. The worker (R-1.2 PR-2c) carries structured case / when blocks on its NATS command envelopes; this PR moves the operator semantics into the shared crate so the CLI's tree walker and the worker's pull loop agree. Adds public types → minor bump 0.2.0 → 0.2.1. Bin auto-bumped to noetl 3.1.0. noetl-executor 0.2.1 (2026-05-30T06:56Z); noetl 3.1.0 (2026-05-30T06:58Z) #34
R-1.2 PR-2d-1 Redesign worker::source::CommandSource trait with ack lifecycle + richer Command to fit what the worker's real NATS pull loop needs (vs the placeholder 0.2.x shape). New types: ClaimOutcome enum (Claimed(Command) / AlreadyClaimed / RetryLater(String) / Failed(String)), Pulled<H> struct ({ outcome, ack }), associated AckHandle type, ack(h) / nack(h) methods. Command enriched with render_context: HashMap<String, Value> (#[serde(default)]) + attempts: u32 (#[serde(default)]). In-crate MockSource test helper records ack/nack lifecycle for downstream worker tests. Breaking → major bump 0.2.1 → 0.3.0. Semantic-release picked up the feat!: prefix; bin auto-bumped to noetl 4.0.0. noetl-executor 0.3.0; noetl 4.0.0 #35
R-1.2 PR-EE-1 First of four-PR cross-repo event envelope reconciliation series (alignment between noetl-executor ExecutorEvent, noetl-server Rust EventRequest, Python EventEmitRequest, and worker WorkerEvent). Enriches ExecutorEvent with three optional fields the server schemas already expect: event_id: Option<i64> (app-side snowflake per observability.md Principle 3), worker_id: Option<String> (emitting pod id), meta: Option<serde_json::Value> (free-form: retries, parent_event_id, catalog_id). All #[serde(default, skip_serializing_if = "Option::is_none")] — older wire-format consumers see no new keys; producers that don't populate them omit them entirely. Plus #[serde(alias = "payload")] on context so the worker's pre-EE WorkerEvent.payload deserializes cleanly. Non-breaking → minor bump 0.3.0 → 0.3.1. Bin auto-bumped to noetl 4.1.0. noetl-executor 0.3.1; noetl 4.1.0 #37

Worker adoption history (noetl/worker)

The worker is now a consumer of noetl-executor as a crates.io dep. Each adoption PR slims the worker by reusing the shared implementation.

Sub-PR Scope noetl-worker version PR
R-1.2 PR-2c First worker PR depending on noetl-executor. Adds noetl-executor = "0.2" dep; replaces inline Operator + Condition + the per-condition resolution helpers (resolve_value, resolve_json_value, json_path, compare_numeric, is_truthy, value_to_f64) with re-exports of noetl_executor::condition::{Operator, Condition} and delegation to evaluate_structured_condition from CaseEvaluator::evaluate_conditions. Keeps Case, CaseAction, CaseResult, CaseEvaluator (pull-loop control flow specific to the worker per § H.10). case_evaluator.rs 437 → 344 LoC (-93 net); 3 new tests lock in worker contract (first_match_wins, no_match_returns_none, and_semantics). Auto-bumped to noetl-worker 1.1.0. noetl-worker 1.1.2 (catch-up published 2026-05-30 after the worker pipeline-trigger fix in noetl/worker#4 + #5) noetl/worker#2
R-1.2 PR-2d-2 Final worker-side R-1.2 sub-PR. Bumps noetl-executor = "0.2""0.3"; adds new src/nats/source.rs with NatsCommandSource { subscriber, client, worker_id } implementing the 0.3.0 trait. AckHandle = NatsAckHandle { message, notification } — embeds the notification metadata alongside the NATS message so Worker::process_commands can correlate execution_id / command_id / step on every ClaimOutcome variant (per observability.md Principle 4). CommandExecutor::execute refactored to take &noetl_executor::worker::source::Command. Worker::process_commands driven through source.next() + source.ack/nack() instead of inline subscriber + claim calls. Lossless WorkerCommand → ExecutorCommand translation with 8 dedicated tests including translate_carries_full_context_as_input_including_cases (locks in that tool_config + cases + args + render_context all ride through the input field). Observability addendum (commit e2b6d57): nats.pull + command.execute spans per observability.md Principle 1. Auto-bumped to noetl-worker 2.0.0 from the feat!: prefix. noetl-worker 2.0.0 (released via the now-self-healing pipeline) noetl/worker#6

Design decisions — CommandSource trait (PR-2d-1)

The 0.3.0 redesign codifies four API choices that surfaced when the worker's actual NATS pull loop was mapped against the placeholder 0.2.x shape. Each one trades simplicity for fidelity to the worker's real semantics.

Decision Choice Rationale
Pulled<H> shape struct Pulled<H> { outcome, ack } (not a tuple) Tuples force positional access (pulled.0, pulled.1) and are awkward to extend. A struct lets future additions (e.g. pull_time, redelivery_count) land as new fields without breaking callers.
AckHandle typing Associated type on the trait (type AckHandle: Send + Sync) — not a trait-level generic parameter Keeps dyn CommandSource possible for callers that want trait-object dispatch (e.g. test fixtures that hold a Box<dyn CommandSource<AckHandle = ()>>). A generic on the trait itself would have made it !dyn-compatible.
Error carrier in ClaimOutcome::RetryLater(String) / Failed(String) Plain String — not anyhow::Error or a custom error enum Keeps ClaimOutcome Clone + Send + Sync + 'static without lifetime gymnastics, so it can sit in a Vec for tests / be cloned across tasks / serialize for diagnostics. Callers that need structured error info attach it to the outcome's diagnostic event.
Command.render_context + attempts Both #[serde(default)] Older wire formats (worker 1.1.x) that don't carry these fields still deserialize cleanly; new callers populate them explicitly. Forward compat for the inevitable event-envelope reconciliation.
Lifecycle invariant "Each Pulled.ack handle must be consumed exactly once via ack(h) or nack(h) before the next next() call in the same task" — documented in the trait docstring Forces explicit lifecycle accounting without requiring the trait to track in-flight handles itself. Source impls that need stronger guarantees (idempotent ack, double-ack panics) own that themselves.

The MockSource impl shipped in executor::worker::source::tests is a reusable test fixture — it records every ack/nack call in an Arc<Mutex<Vec<MockAck>>> so downstream consumers (worker, future local-mode adapters) can lift it for dispatcher-level unit tests.

Deferred work surfaced during R-1.2

  • Event envelope reconciliation — ✅ series complete, all 4 PRs landed. PR-EE-1 (this PR-2 row above): ExecutorEvent now carries optional event_id, worker_id, meta fields plus a payload serde alias, so the executor's envelope is a superset of what either server expects. PR-EE-2 (noetl/server#6, noetl-server 2.0.0; pipeline-fix #7 auto-republished as 2.0.1): Rust EventRequest renamed nameevent_type with #[serde(alias = "name")], added #[serde(alias = "context")] on payload, added optional event_id / status / created_at fields (server-side defaults preserve back-compat per server-wiki event-envelope). PR-EE-4 (noetl/noetl#639, noetl 3.0.0): Python EventEmitRequest now accepts name / step / payload as validation aliases for event_type / node_name / context; worker_id lifted to top-level; EventType: Literal[...]str (the Literal was already dead code — production Python uses dot-notation event types throughout). PR-EE-3 (noetl/worker#11, noetl-worker 3.0.0): replaces WorkerEvent with ExecutorEvent re-export from noetl_executor::events; EventEmitter carries worker_id so every envelope is stamped at the source per observability.md Principle 4. All four producers / consumers now emit + accept the same wire format on /api/events. Tracked on noetl/ai-meta#30.
  • GCS sink → object_store — deferred from R-1.1 PR-2c-8; R-3 scope (data plane).
  • noetl-worker release pipeline gap — worker's semantic-release.yml initially didn't trigger release-worker on tag creation (same bug noetl/cli had pre-R-1.2 PR-1). Fixed in noetl/worker#4 (added gh workflow run step) + noetl/worker#5 (added missing actions: write permission). Pipeline now self-healing as of noetl-worker 1.1.2.

Clone this wiki locally