-
Notifications
You must be signed in to change notification settings - Fork 0
executor crate architecture
Since v2.17.1+ the CLI is a Cargo workspace. The root crate
(noetl, producing the noetl and ntl binaries) is unchanged;
sibling crates host shared infrastructure that other repos
(noetl-worker, noetl-server) depend on.
This page documents the workspace layout + each member crate's purpose + the deliberate architectural decision that the CLI keeps its own control loop rather than sharing one with the worker (§ H.10).
| Crate | Roadmap phase | Purpose |
|---|---|---|
noetl (root) |
— | The CLI binary. Produces noetl + ntl. |
noetl-executor |
R-1.1 | Shared execution utilities — YAML playbook types, template engine, condition evaluator, CommandSource trait, EventSink trait. Adopted by noetl-worker. |
noetl-arrow-cache |
R-2.1 | Same-node zero-copy IPC cache. Rust mirror of Python's ArrowIpcSharedMemoryCache; producers push Arrow IPC byte streams into POSIX shared memory, colocated consumers attach via JSON IpcHint. |
noetl-arrow-flight-client |
R-2.3 Phase B + C1 + C2.2 + C2.3 | Rust consumer for the noetl-server's Arrow Flight endpoint. FlightResolver::resolve(ref_uri) turns a noetl://... URI into typed RecordBatches over gRPC; get_flight_info(ref_uri) fetches the metadata (schema + row count + endpoints) without materialising the rows. connect_with_tls(endpoint, FlightTlsConfig) adds TLS so the client can talk to the TLS-fronted server (Phase C2.1). connect_with(endpoint, FlightConfig) accepts the full TLS + bearer-token bundle for Phase C2.3 auth — the client sends Authorization: Bearer <token> on every gRPC call, validated by the server's BearerTokenMiddlewareFactory (NOETL_FLIGHT_BEARER_TOKENS). Crates.io: 0.1.0 (Phase B, resolve / resolve_rows), 0.2.0 (Phase C1, adds get_flight_info), 0.3.0 (Phase C2.2, adds FlightTlsConfig builder + connect_with_tls + enables tonic's tls feature), 0.4.0 (Phase C2.3, adds FlightAuth + FlightConfig + connect_with). Pairs with the Python server in noetl/noetl#643 + #644 + #646 + #647. |
repos/cli/
Cargo.toml # workspace root + noetl binary
src/
main.rs
playbook_runner.rs # CLI tree walker (control loop stays here)
...
executor/ # workspace member: noetl-executor
Cargo.toml
src/
lib.rs
playbook.rs # YAML playbook types
template.rs # render_template + Rhai/JSON helpers
condition.rs # evaluate_condition + evaluate_rhai_condition
capabilities.rs # validate_capabilities + ValidationReport
runtime.rs # ExecutionContext + CredentialResolver trait
events.rs # EventSink trait + ExecutorEvent + EventEmitter
tools_bridge.rs # noetl-tools registry bridge (scaffold)
worker/
mod.rs
source.rs # Command + CommandSource (worker-only)
arrow-cache/ # workspace member: noetl-arrow-cache (R-2.1)
Cargo.toml
README.md
src/
lib.rs # ArrowIpcSharedMemoryCache + IpcHint re-exports
hint.rs # IpcHint (JSON shape matches Python 1:1)
cache.rs # POSIX shm cache; put/get/delete/sweep_expired
arrow-flight-client/ # workspace member: noetl-arrow-flight-client (R-2.3 Phase B)
Cargo.toml
README.md
src/
lib.rs # FlightResolver + FlightError typed variants
tests.rs # in-test Rust Flight server stub for round-trip
Same-node zero-copy IPC for the NoETL columnar data plane — R-2.1 of the Rust migration roadmap (Appendix H of the global hybrid cloud blueprint).
What it gives you: a small in-process cache keyed by an
IpcHint token. Producers push Arrow IPC byte streams into
shared memory; colocated consumers read them back without going
through the durable storage path. The hint is JSON-serialisable
and crosses process boundaries through any transport; the bytes
themselves stay on the same machine.
Cross-stack compatibility: byte-for-byte mirror of Python's
ArrowIpcSharedMemoryCache in noetl/core/storage/ipc_cache.py.
-
IpcHintJSON shape matches the Pydantic model 1:1 (samekind/shm_name/schema_digest/byte_length/row_count/producer/node_id/lease_expires_at/media_typefields). - POSIX shm primitive on both sides (Python via
multiprocessing.shared_memory.SharedMemory, Rust via theshared_memorycrate — sameshm_open+mmapsyscalls). -
shm_nameformat{namespace[:12]}_{stamp:8-hex}_{digest:8-hex}matches Python exactly. - Namespace sanitisation mirrors Python's
[^A-Za-z0-9_]regex.
Public surface:
| API | Purpose |
|---|---|
ArrowIpcSharedMemoryCache::new() / with_config(CacheConfig)
|
Construct. Defaults read NOETL_IPC_CACHE_BUDGET_BYTES, NOETL_NODE_ID / NODE_NAME / K8S_NODE_NAME / HOSTNAME env vars (same as Python). |
put_arrow_ipc(payload, schema_digest, row_count, lease_seconds) → IpcHint
|
Copy bytes into shm, register a lease, return the hint to give the consumer. |
get(&IpcHint) → Vec<u8>
|
Read bytes back. Errors on expired lease + cross-node hint. |
delete(name) → bool
|
Unlink the shm region. Idempotent. |
sweep_expired(now, grace_seconds) → usize
|
Lazy GC; piggy-backed on put. |
used_bytes() → u64
|
Live-byte accounting. |
Where it fits in the data plane:
-
Producer step: an
noetl-toolsstep that produces a large RecordBatch (DuckDB / Postgres / HTTP CSV / etc.) callsnoetl_tools::arrow_codec::encode_record_batchto get the IPC bytes, thencache.put_arrow_ipc(...)to publish them. The returnedIpcHintrides the event log alongside the durableresult_uri. -
Consumer step (colocated, same node): reads the hint,
calls
cache.get(&hint), decodes vianoetl_tools::arrow_codec::decode_record_batches. No network round-trip, no disk I/O. -
Consumer step (different node): sees the hint's
node_iddoesn't match its own, falls back to the durable copy via the normalresult_uripath.
The Arrow Flight gRPC endpoint on noetl-worker (R-2.2) is the follow-up that exposes the same bytes over network for non-colocated consumers.
What this crate is NOT:
- Not a durable store — entries evict on lease expiry + budget pressure. The durable copy is the authority; this cache is acceleration for the colocated hot path.
- Not an Arrow IPC codec — encoding lives in
noetl_tools::arrow_codec; this crate moves opaque bytes. - Not network-aware —
IpcHint.node_idtells consumers to fall back when the producer is on a different node.
Rust consumer for the noetl-server's Arrow Flight do_get
endpoint — R-2.3 Phase B of the Rust migration roadmap. Pairs
with the Python server in noetl/noetl#643 (Phase A);
the wire-format contract is documented on the noetl wiki at
arrow_flight_result_fetch.
What it gives you: an async FlightResolver that turns a
noetl://execution/<eid>/result/<step>/<id> URI into typed
RecordBatches over gRPC. Zero JSON serialisation in the
consumer leg; the wire format matches the same Arrow IPC stream
the worker stages in shm on the R-2.2 colocated fast path, so
cross-node + same-node consumers share a single decoder.
use noetl_arrow_flight_client::{FlightResolver, FlightError};
// Use `http://...` (plaintext h2c) or `https://...` (TLS-fronted, Phase C2).
// The `grpc://` scheme some Flight clients accept is NOT valid for tonic —
// HTTP/2's `:scheme` pseudo-header must be `http` or `https`.
let resolver = FlightResolver::connect("http://noetl.noetl.svc.cluster.local:8083").await?;
match resolver.resolve(ref_uri).await {
Ok(batches) => /* zero-copy Arrow RecordBatch ... */,
Err(FlightError::NonTabular { .. }) => /* fall back to HTTP /api/result/resolve */,
Err(FlightError::Server { .. }) => /* hard error */,
Err(FlightError::Transport { .. }) => /* network */,
}A resolve_rows(ref_uri) convenience flattens batches into
Vec<serde_json::Value> for callers that want JSON-shaped row
dicts; prefer resolve when columnar access is enough.
Phase C1 (crates.io 0.2.0+) adds get_flight_info(ref_uri) for
metadata-only fetches:
let info = resolver.get_flight_info(ref_uri).await?;
info.schema // SchemaRef (pre-decoded from IPC bytes)
info.total_records // i64 — row count without materialising
info.total_bytes // i64 — encoded payload size
info.endpoints // Vec<FlightEndpointSummary>; ticket bytes match the input ref URIUseful for sizing buffers, picking a backend based on row count, or skipping the fetch entirely for non-tabular refs.
Boundary discipline: the resolver is a thin RPC client. All
scrubbing + tier dispatch + auth happen server-side; the
credential scrub the server applies (per the R-2.x scrubber)
round-trips through this client unchanged. Per
agents/rules/execution-model.md
fetch policy (when/whether/which backend) lives in the playbook
layer — the result_fetch tool kind
in noetl-tools consumes this crate and exposes the policy knobs
to playbook authors.
Phase scope:
- ✅ Phase A — Python server
do_get(noetl/noetl#643). - ✅ Phase B — this crate's initial release (noetl/cli#41, crates.io 0.1.0).
- ✅ Phase C1 —
get_flight_infoon both server + this crate (noetl/noetl#644 + noetl/cli#43, crates.io 0.2.0). - ✅ Playbook surface —
result_fetchtool kind innoetl-tools2.11.0+. - Phase C2 deferred: mTLS + token auth for non-cluster-internal callers. Fires when external-exposure deployment surfaces.
- Multi-endpoint sharding deferred: fires when the result-store grows beyond one Python pod.
| Module | Purpose | Used by |
|---|---|---|
playbook |
Pydantic-like YAML types: Playbook, Step, Tool, NextFormat, RuntimeCapabilities, etc. All field accessors pub. |
CLI + worker |
template |
render_template, render_template_with_result, get_json_path, json_to_rhai, rhai_to_json_string. Takes &HashMap<String, String> views of the per-execution variables + step results so each binary owns its own context shape. |
CLI + worker |
condition |
evaluate_condition (simple {{ a == b }} / 'in' / truthy) and evaluate_rhai_condition (full Rhai expression eval). Same context-view contract as template. |
CLI + worker |
capabilities |
validate_capabilities + ValidationReport / ValidationError. Pure function: returns the report rather than bail!ing so the CLI can format errors with playbook_path and the worker with execution_id. |
CLI + worker |
runtime |
ExecutionContext (executor-side variant with async step_results + Arc<dyn CredentialResolver>); CredentialResolver trait. |
worker (CLI uses its own ExecutionContext in playbook_runner.rs) |
events |
ExecutorEvent (mirrors the Python noetl.runtime.events.report_event envelope), EventSink trait, NoopSink, EventEmitter. |
CLI + worker |
tools_bridge |
Scaffold for replacing the CLI's inline tool implementations with calls into the noetl-tools registry. Filled in incrementally per Strategy B (one tool kind per sub-PR). |
CLI (worker already uses noetl-tools directly) |
worker::source |
Command envelope + CommandSource trait. Worker-only: the CLI's tree walker doesn't consume this. |
worker (R-1.3) |
| Where it lives | Why | |
|---|---|---|
The CLI's recursive tree walker (run, execute_step, execute_next_steps, execute_router_arcs) |
repos/cli/src/playbook_runner.rs |
Natural fit for local YAML execution; flattening into a pull-model iterator would lose local-debug clarity |
The CLI's inline tool implementations (execute_tool, execute_shell_command, execute_http_request, execute_duckdb_query, etc.) |
repos/cli/src/playbook_runner.rs (today) |
Migrated incrementally to noetl-tools registry via tools_bridge per Strategy B; ~870 LoC of inline tool dispatch |
| The worker's NATS pull loop | repos/worker/src/ |
Different shape than tree walker; tied to NATS durable-consumer semantics |
RunOutcome (the CLI's JSON output envelope) |
repos/cli/src/playbook_runner.rs |
Not a YAML input type; worker has a different output envelope (event-log writes) |
Mid-implementation discovery, documented in § H.10 of the global hybrid cloud blueprint:
- The CLI is a recursive tree walker. It loads the YAML, walks the workflow, evaluates
nextarcs /caseconditions /thenblocks in place, dispatches each step inline. Control flow is the call stack. - The worker is a pull-model consumer. It subscribes to a NATS durable consumer, pulls one command at a time, executes it, emits events, repeats. No tree. No recursion.
- The original "unified
CommandSourcetrait — both binaries plug in their own impl" was the wrong abstraction for the CLI. Flattening the tree walker into a pull-model iterator loses local-debug clarity, complicatescase/thenstate management, and breaks integration tests written against the tree shape.
noetl-executor was re-scoped from a control-loop crate to a utilities-and-types crate. The CLI's tree walker stays. The worker's pull loop stays. Both call into the executor for the same template rendering, condition evaluation, credential resolution, capability validation, and event shape.
| Topic | Where |
|---|---|
| The migration roadmap | Appendix H of the global hybrid cloud blueprint |
| The architectural finding | § H.10 of the same doc |
The Polars-pattern endpoint (pip install noetl ships Rust runtime + Python wrapper) |
§ H.9 |
| Apache Arrow data plane | § H.4 + § H.11 (local-mode Feather buffer) |
| Tracking issues | noetl/cli#19 (this CLI's R-1.1 sub-issue) · noetl/ai-meta#30 (umbrella) |
| Sub-PR | Scope | PR |
|---|---|---|
| R-1.1 PR-1 | Crate skeleton (lib.rs, runtime, events, source/dispatch placeholders) | #20 |
| R-1.1 PR-2a | YAML types → noetl-executor::playbook
|
#21 |
| R-1.1 PR-2b | Utilities (template + condition + capabilities); § H.10 restructure (placeholder LocalPlaybookSource removed; CommandSource → worker::source) |
#22 |
| R-1.1 PR-2c-1 |
noetl-tools = "2.8.7" dep + tools_bridge scaffold |
#23 |
| R-1.1 PR-2c-2 |
tools_bridge adapters: BridgeContext, to_tools_context, to_tools_config (all 8 Tool variants), from_tools_result; dispatch_via_registry becomes async with per-tool-kind match arms |
#24 |
| R-1.1 PR-2c-3 |
Tool::Rhai arm wired through noetl-tools::RhaiTool; new to_tools_context_for_rhai helper groups CLI flat variables into nested Maps for Rhai field access; CLI's inline execute_rhai_script + rhai_http_request deleted (~220 LoC) |
#25 |
| R-1.1 PR-2c-4 |
Tool::Shell arm wired through noetl-tools::ShellTool; per-command dispatch loop preserves CLI's "fresh bash invocation per command" semantics; new shell_command_config(&str) helper; CLI's inline execute_shell_command deleted (~79 LoC) |
#26 |
| R-1.1 PR-2c-5 |
Tool::Http arm wired through noetl-tools::HttpTool; new resolve_auth_to_bearer helper (CLI AuthConfig → Bearer token via noetl-tools::auth::GcpAuth); new http_tool_config helper that injects the Bearer header into request headers; new reshape_http_result helper that maps noetl-tools' {status_code, headers, body} envelope back to the CLI's pre-existing {status, body} shape so playbook steps keep branching on <step>.body.status. Tool::Auth arm also routes through resolve_auth_to_bearer so both paths share the GCP ADC code path. CLI's inline execute_http_request + get_auth_token deleted (~148 LoC) |
#27 |
| R-1.1 PR-2c-6 |
Tool::DuckDb arm wired through noetl-tools::DuckdbTool; new duckdb_tool_config helper (translates CLI's Vec<String> params to noetl-tools' Vec<serde_json::Value> and maps db → db_path); new reshape_duckdb_result helper unwraps noetl-tools' {columns, rows, row_count} envelope back to the CLI's pre-existing rows-array shape (and {affected_rows} back to {"status": "ok"} for non-SELECT). Empty / whitespace-only query short-circuits to an empty outcome, mirroring the CLI's existing guard. Path resolution + mkdir -p stay at the CLI call site since the bridge has no knowledge of the playbook directory. CLI's inline execute_duckdb_query deleted (~55 LoC). Feature gain: params that were silently ignored (_params: &[String]) are now bound at ? placeholders |
#28 |
| R-1.1 PR-2c-7 | Codifies the § H.10 finding for Tool::Playbook — sub-playbook execution stays in the CLI's tree walker (PlaybookRunner::new(path).run() is the recursion case; the bridge cannot replace it without re-opening § H.10). Replaces the silent BridgeOutcome::empty() stub with anyhow::bail! naming § H.10 so accidental misuse is loud rather than silent. Variable preparation (parent merge + DSL v2 input: / DSL v1 args: rendering + workload. prefix) DID move into a new noetl_executor::tools_bridge::prepare_sub_playbook_vars helper — pure, reusable, testable. CLI's call site uses the helper but keeps its PlaybookRunner recursion inline. No semantic divergences (no execution path changed) |
#29 |
| R-1.1 PR-2c-8 | Final substantive sub-PR. Codifies that Tool::Auth and Tool::Sink stay inline by design but extracts the shareable pure logic into bridge helpers: auth_context_updates(provider, token, project) (replaces inline set_variable calls; preserves CLI's pre-PR-2c-8 ordering), format_sink_payload(format, raw) (json passthrough / yaml dump / csv conversion), and json_to_csv(json_str) (lifted verbatim from CLI). Dispatch arms for both kinds bail loudly with helper-pointing messages. CLI's inline json_to_csv deleted (~42 LoC). Removes the dispatch_via_registry_returns_empty_for_unwired_kind test — every Tool variant now either dispatches through the registry, bails with a § H.10 finding, or bails as unsupported. GCS → object_store migration tracked as a separate follow-up (R-2.x scope, not R-1.x). No semantic divergences |
#30 |
| R-1.1 PR-2d | Wrap-up PR. Adds executor/tests/dispatch.rs with 12 end-to-end integration tests covering dispatch_via_registry for Rhai / Shell / DuckDb (full registry round-trip) plus the bail-loudly paths for Playbook / Auth / Sink / Unsupported. Adds executor/README.md summarising the crate's scope, module layout, and § H.10 rationale. Updates the crate's lib.rs top-level docs to include tools_bridge in the module-layout list with its full surface (helpers + which kinds bail vs dispatch). Closes noetl/cli#19
|
#31 |
R-1.1 is complete after PR-2d ships. See § H of the global hybrid cloud blueprint for the wider R-1.x → R-5 roadmap. Next phases: R-1.2 (worker NATS pull loop adopts noetl-executor), R-1.3 (worker depends on the crate end-to-end), R-2 (Apache Arrow data plane), R-3 (object_store integration including the GCS sink migration deferred from PR-2c-8).
Each tool replacement that crosses a behaviour line documents the deltas in the PR body and here.
| PR | Surface | CLI behaviour (pre-replacement) |
noetl-tools behaviour |
User-visible impact |
|---|---|---|---|---|
| #25 | rhai timestamp()
|
chrono::Local::now().format("%H:%M:%S") → "14:23:45"
|
chrono::Utc::now().timestamp().to_string() → "1716847425"
|
Scripts that displayed timestamp() for human reading need updating |
| #25 | rhai HTTP helpers (http_get, http_post, http_delete, *_auth) |
curl subprocess |
Direct reqwest calls |
Same surface; different error shape on network failures |
| #25 | rhai get_gcp_token
|
gcloud auth print-access-token shellout |
gcp_auth crate (workload-identity aware) |
Better on GKE pods; equivalent on hosts with gcloud configured |
| #26 | shell stdout streaming | Line-by-line to terminal as command runs | Collected; returned at end | Breaks real-time output UX for long-running shell steps — users see nothing until command completes |
| #26 | shell result envelope | Captured stdout string |
data: {exit_code, stdout, stderr} JSON |
Transparent — bridge unwraps data["stdout"] and trims trailing \n
|
| #27 | http transport |
curl subprocess |
reqwest direct |
Same envelope on success; different error shape on transport failure (anyhow message vs curl exit code) |
| #27 | http auth (GCP ADC) |
gcloud auth print-access-token shellout |
gcp_auth crate |
Better on GKE pods (workload-identity aware); equivalent on hosts with gcloud configured |
| #27 | http JSON body | Sent verbatim via curl -d; caller had to set Content-Type |
Auto-detected as JSON; reqwest sets Content-Type | Most callers were already sending JSON with Content-Type: application/json; transparent |
| #27 | http result envelope | {"status": int, "body": <json-or-string>} |
data: {"status_code": int, "headers": {...}, "body": <json>} |
Transparent — bridge's reshape_http_result maps back to the CLI's {status, body} shape so 4xx/5xx come back inside the envelope (not as anyhow errors) |
| #27 | Tool::Auth GCP token |
gcloud auth print-access-token shellout (separate code path from rhai get_gcp_token) |
gcp_auth crate via shared resolve_auth_to_bearer helper |
Both Tool::Http and Tool::Auth now share the GCP ADC code path — eliminates the divergence between the two |
| #28 | duckdb SELECT result envelope | JSON array of row objects (pretty-printed) | data: {"columns": [...], "rows": [...], "row_count": N} |
Transparent — bridge's reshape_duckdb_result maps back to the CLI's rows-array shape |
| #28 | duckdb non-SELECT result envelope | Literal string {"status": "ok"}
|
data: {"affected_rows": N} |
Transparent — bridge maps back; affected_rows dropped (CLI never exposed it) |
| #28 | duckdb params binding |
_params: &[String] accepted but silently ignored |
Bound as JSON values at ? placeholders |
Feature gain — playbooks that intended their params: field would now see them applied (no breakage for playbooks with stale params + no ? placeholders since DuckDB ignores extra params) |
| #28 | duckdb path resolution + mkdir |
resolve_duckdb_path + fs::create_dir_all(parent) in execute_duckdb_query
|
Open as-given, no mkdir | CLI keeps owning these at the call site before dispatch; bridge has no knowledge of the playbook directory |
-
playbook_runner.rs: 2,688 → 1,606 lines (-1,082 net across PR-2a + PR-2b + PR-2c-3 + PR-2c-4 + PR-2c-5 + PR-2c-6 + PR-2c-7 + PR-2c-8) - New code in
noetl-executor: 7 modules + 1 worker submodule + integration tests, ~3,500 LoC -
noetl-executorunit tests: 0 → 80 across PR-1 + PR-2a + PR-2b + PR-2c-1 → PR-2c-8 -
noetl-executorintegration tests: 0 → 12 (PR-2d) - Workspace-wide tests: 174 passing (80 noetl-executor unit + 12 noetl-executor integration + 41 noetl + 41 ntl)
- Tool kinds wired through the registry: 4 (Rhai, Shell, Http, DuckDb query)
- Tool kinds staying inline per § H.10 with helpers extracted: 3 (Playbook, Auth, Sink)
After R-1.1 wrapped, the crate was published to crates.io and noetl-worker began adopting it. The cli's release pipeline had broken silently during R-1.1 (the bin's path-only dep on the unpublished noetl-executor failed cargo publish validation); R-1.2 PR-1 fixed that as the opening move.
| Sub-PR | Scope | crates.io | PR |
|---|---|---|---|
| R-1.2 PR-1 |
noetl-executor published with publish = true + crates.io metadata; bin's dep becomes { path = "executor", version = "0.1" }; release workflow publishes noetl-executor before the bin. Unblocks the noetl release pipeline that had been failing for 3 consecutive runs (bin was stuck at 2.17.1 while local Cargo.toml was at 2.24.0). |
noetl-executor 0.1.0 (2026-05-30T06:04Z); noetl 2.24.0 (2026-05-30T06:14Z, first publish since 2.17.1) |
#32 |
| R-1.2 PR-2a | Align all execution_id representations to i64 across events::ExecutorEvent, events::EventEmitter, runtime::ExecutionId (was pub struct ExecutionId(pub String) → now pub type ExecutionId = i64), and worker::source::Command. Required because the worker's CommandNotification.execution_id + WorkerEvent.execution_id + Python's noetl.event.execution_id / noetl.command.execution_id bigint columns all use i64 — the executor's String shapes were 0.1.x placeholders that became load-bearing once the worker started adopting. Breaking change → minor bump 0.1.0 → 0.2.0. Semantic-release picked up the "Breaking change" language and bumped the bin to noetl 3.0.0. |
noetl-executor 0.2.0 (2026-05-30T06:34Z); noetl 3.0.0 (2026-05-30T06:37Z) |
#33 |
| R-1.2 PR-2b | Add structured Condition + 12-variant Operator (Eq / Ne / Gt / Lt / Gte / Lte / Contains / Matches (regex) / Truthy / Falsy / In / NotIn) + evaluate_structured_condition(cond, ctx, result) to executor::condition alongside the existing template-style helpers. The worker (R-1.2 PR-2c) carries structured case / when blocks on its NATS command envelopes; this PR moves the operator semantics into the shared crate so the CLI's tree walker and the worker's pull loop agree. Adds public types → minor bump 0.2.0 → 0.2.1. Bin auto-bumped to noetl 3.1.0. |
noetl-executor 0.2.1 (2026-05-30T06:56Z); noetl 3.1.0 (2026-05-30T06:58Z) |
#34 |
| R-1.2 PR-2d-1 | Redesign worker::source::CommandSource trait with ack lifecycle + richer Command to fit what the worker's real NATS pull loop needs (vs the placeholder 0.2.x shape). New types: ClaimOutcome enum (Claimed(Command) / AlreadyClaimed / RetryLater(String) / Failed(String)), Pulled<H> struct ({ outcome, ack }), associated AckHandle type, ack(h) / nack(h) methods. Command enriched with render_context: HashMap<String, Value> (#[serde(default)]) + attempts: u32 (#[serde(default)]). In-crate MockSource test helper records ack/nack lifecycle for downstream worker tests. Breaking → major bump 0.2.1 → 0.3.0. Semantic-release picked up the feat!: prefix; bin auto-bumped to noetl 4.0.0. |
noetl-executor 0.3.0; noetl 4.0.0
|
#35 |
| R-1.2 PR-EE-1 | First of four-PR cross-repo event envelope reconciliation series (alignment between noetl-executor ExecutorEvent, noetl-server Rust EventRequest, Python EventEmitRequest, and worker WorkerEvent). Enriches ExecutorEvent with three optional fields the server schemas already expect: event_id: Option<i64> (app-side snowflake per observability.md Principle 3), worker_id: Option<String> (emitting pod id), meta: Option<serde_json::Value> (free-form: retries, parent_event_id, catalog_id). All #[serde(default, skip_serializing_if = "Option::is_none")] — older wire-format consumers see no new keys; producers that don't populate them omit them entirely. Plus #[serde(alias = "payload")] on context so the worker's pre-EE WorkerEvent.payload deserializes cleanly. Non-breaking → minor bump 0.3.0 → 0.3.1. Bin auto-bumped to noetl 4.1.0. |
noetl-executor 0.3.1; noetl 4.1.0
|
#37 |
The worker is now a consumer of noetl-executor as a crates.io dep. Each adoption PR slims the worker by reusing the shared implementation.
| Sub-PR | Scope | noetl-worker version | PR |
|---|---|---|---|
| R-1.2 PR-2c | First worker PR depending on noetl-executor. Adds noetl-executor = "0.2" dep; replaces inline Operator + Condition + the per-condition resolution helpers (resolve_value, resolve_json_value, json_path, compare_numeric, is_truthy, value_to_f64) with re-exports of noetl_executor::condition::{Operator, Condition} and delegation to evaluate_structured_condition from CaseEvaluator::evaluate_conditions. Keeps Case, CaseAction, CaseResult, CaseEvaluator (pull-loop control flow specific to the worker per § H.10). case_evaluator.rs 437 → 344 LoC (-93 net); 3 new tests lock in worker contract (first_match_wins, no_match_returns_none, and_semantics). Auto-bumped to noetl-worker 1.1.0. |
noetl-worker 1.1.2 (catch-up published 2026-05-30 after the worker pipeline-trigger fix in noetl/worker#4 + #5) |
noetl/worker#2 |
| R-1.2 PR-2d-2 | Final worker-side R-1.2 sub-PR. Bumps noetl-executor = "0.2" → "0.3"; adds new src/nats/source.rs with NatsCommandSource { subscriber, client, worker_id } implementing the 0.3.0 trait. AckHandle = NatsAckHandle { message, notification } — embeds the notification metadata alongside the NATS message so Worker::process_commands can correlate execution_id / command_id / step on every ClaimOutcome variant (per observability.md Principle 4). CommandExecutor::execute refactored to take &noetl_executor::worker::source::Command. Worker::process_commands driven through source.next() + source.ack/nack() instead of inline subscriber + claim calls. Lossless WorkerCommand → ExecutorCommand translation with 8 dedicated tests including translate_carries_full_context_as_input_including_cases (locks in that tool_config + cases + args + render_context all ride through the input field). Observability addendum (commit e2b6d57): nats.pull + command.execute spans per observability.md Principle 1. Auto-bumped to noetl-worker 2.0.0 from the feat!: prefix. |
noetl-worker 2.0.0 (released via the now-self-healing pipeline) |
noetl/worker#6 |
The 0.3.0 redesign codifies four API choices that surfaced when the worker's actual NATS pull loop was mapped against the placeholder 0.2.x shape. Each one trades simplicity for fidelity to the worker's real semantics.
| Decision | Choice | Rationale |
|---|---|---|
Pulled<H> shape |
struct Pulled<H> { outcome, ack } (not a tuple) |
Tuples force positional access (pulled.0, pulled.1) and are awkward to extend. A struct lets future additions (e.g. pull_time, redelivery_count) land as new fields without breaking callers. |
AckHandle typing |
Associated type on the trait (type AckHandle: Send + Sync) — not a trait-level generic parameter |
Keeps dyn CommandSource possible for callers that want trait-object dispatch (e.g. test fixtures that hold a Box<dyn CommandSource<AckHandle = ()>>). A generic on the trait itself would have made it !dyn-compatible. |
Error carrier in ClaimOutcome::RetryLater(String) / Failed(String) |
Plain String — not anyhow::Error or a custom error enum |
Keeps ClaimOutcome Clone + Send + Sync + 'static without lifetime gymnastics, so it can sit in a Vec for tests / be cloned across tasks / serialize for diagnostics. Callers that need structured error info attach it to the outcome's diagnostic event. |
Command.render_context + attempts |
Both #[serde(default)]
|
Older wire formats (worker 1.1.x) that don't carry these fields still deserialize cleanly; new callers populate them explicitly. Forward compat for the inevitable event-envelope reconciliation. |
| Lifecycle invariant | "Each Pulled.ack handle must be consumed exactly once via ack(h) or nack(h) before the next next() call in the same task" — documented in the trait docstring |
Forces explicit lifecycle accounting without requiring the trait to track in-flight handles itself. Source impls that need stronger guarantees (idempotent ack, double-ack panics) own that themselves. |
The MockSource impl shipped in executor::worker::source::tests is a reusable test fixture — it records every ack/nack call in an Arc<Mutex<Vec<MockAck>>> so downstream consumers (worker, future local-mode adapters) can lift it for dispatcher-level unit tests.
-
Event envelope reconciliation — ✅ series complete, all 4 PRs landed. PR-EE-1 (this PR-2 row above):
ExecutorEventnow carries optionalevent_id,worker_id,metafields plus apayloadserde alias, so the executor's envelope is a superset of what either server expects. PR-EE-2 (noetl/server#6, noetl-server 2.0.0; pipeline-fix #7 auto-republished as 2.0.1): RustEventRequestrenamedname→event_typewith#[serde(alias = "name")], added#[serde(alias = "context")]onpayload, added optionalevent_id/status/created_atfields (server-side defaults preserve back-compat per server-wiki event-envelope). PR-EE-4 (noetl/noetl#639, noetl 3.0.0): PythonEventEmitRequestnow acceptsname/step/payloadas validation aliases forevent_type/node_name/context;worker_idlifted to top-level;EventType: Literal[...]→str(the Literal was already dead code — production Python uses dot-notation event types throughout). PR-EE-3 (noetl/worker#11, noetl-worker 3.0.0): replacesWorkerEventwithExecutorEventre-export fromnoetl_executor::events;EventEmittercarriesworker_idso every envelope is stamped at the source perobservability.mdPrinciple 4. All four producers / consumers now emit + accept the same wire format on/api/events. Tracked on noetl/ai-meta#30. -
GCS sink →
object_store— deferred from R-1.1 PR-2c-8; R-3 scope (data plane). -
noetl-workerrelease pipeline gap — worker'ssemantic-release.ymlinitially didn't triggerrelease-workeron tag creation (same bug noetl/cli had pre-R-1.2 PR-1). Fixed in noetl/worker#4 (addedgh workflow runstep) + noetl/worker#5 (added missingactions: writepermission). Pipeline now self-healing as of noetl-worker 1.1.2.
NoETL CLI
Contexts
- Context model
context addcontext init --from-gatewaycontext updatecontext port-forwardcontext list / use / current / delete
Auth
Architecture
Cross-wiki