-
Notifications
You must be signed in to change notification settings - Fork 0
cursor loop mode
Added in v3.8.0 (noetl/server#196). The cursor loop mode lets a playbook step repeatedly claim and process a frame of work rows from a database table until the claim returns nothing — draining the work queue entirely without holding a worker slot open between frames.
- name: process_records
loop:
cursor:
kind: postgres # the claim tool (postgres is the default)
auth: <alias> # keychain alias for the claim DB
claim: | # atomic lease SQL (FOR UPDATE SKIP LOCKED … RETURNING …)
SELECT patient_id, name, status
FROM work_queue
WHERE status = 'pending'
LIMIT {{ __frame_max_rows | default(1) }}
FOR UPDATE SKIP LOCKED
RETURNING patient_id, name;
iterator: patient # row columns exposed as iter.patient.<col>
spec:
mode: cursor
max_in_flight: 1
frame:
max_rows: 50 # injected as __frame_max_rows into the claim SQL
row_concurrency: 1 # rows within one frame dispatched concurrently
# lease_seconds / heartbeat_seconds / max_seconds / max_bytes / process:
# all accepted (advisory) — the playbook's own claim SQL manages leases
tool:
- name: fetch_record
kind: http
spec:
policy:
rules:
- do: retry
# … http config …
- name: save_record
kind: postgres
# … postgres config …| Field | Meaning |
|---|---|
loop.cursor.kind |
Tool kind for the claim step. postgres is the only validated kind today. |
loop.cursor.auth |
Keychain alias resolved to DB connection credentials. |
loop.cursor.claim |
SQL that atomically leases a batch of rows and returns them (FOR UPDATE SKIP LOCKED … RETURNING …). {{ __frame_max_rows }} is injected from loop.spec.frame.max_rows. |
loop.iterator |
Name for the iteration variable. A claimed row's columns become iter.<name>.<column>. |
loop.spec.mode |
Must be cursor. Other values: sequential (default), parallel. |
loop.spec.frame.max_rows |
How many rows the claim SQL may return per frame. Injected as __frame_max_rows. |
loop.spec.frame.row_concurrency |
How many rows of one frame are dispatched concurrently through the tool: body. |
The cursor loop is orchestrator-driven, not worker-driven. No worker holds a slot while waiting for the next claim; the orchestrator issues a claim command to a worker when a new frame is needed and fans out the body once the claim result arrives.
Entry
│
├── emit step.enter
└── issue claim command (frame 0)
│
▼
Claim result arrives (worker executes claim SQL, returns rows)
│
├── 0 rows → DRAIN: complete the step, route arcs with event.name = loop.done
│
└── K rows → fan out the body per claimed row (bounded by frame.row_concurrency)
│
▼
All body rows complete
│
└── issue next claim command (frame 1, 2, …)
Claim commands carry a __cursor_frame counter in the command context so the orchestrator can match incoming completions to the right frame. Body commands carry the claimed row data as iter.<iterator> context.
StepInfo.is_cursor is set on the step so that individual claim or body command completions do not mark the step complete — only the DRAIN condition (claim returning 0 rows) does.
When a claim returns 0 rows the orchestrator sets __cursor_drained = true on the step context and completes the step with event.name = loop.done. Downstream arcs whose when: guard tests event.name == "loop.done" then activate.
Arc when: expressions and step set: blocks may reference the just-completed step's result as {{ output.<field> }}. This is an alias for the step name and was added alongside cursor mode to unblock output-gated arcs in playbooks that check the claim result count (e.g. {{ output.data.row_count }}).
Previously playbooks had to use the step name directly; output matches the Python orchestrator convention and lets a playbook use the same expression regardless of the step name.
A playbook may loop back to a cursor step from a downstream arc (for example, to re-run it for a different data type). When a cursor step re-enters, the orchestrator resets frame tracking for that run so frame 0 of the re-entry does not merge with the prior drained run's frame history.
| sequential | parallel | cursor | |
|---|---|---|---|
| Iterations known at entry? | Yes — from loop.items
|
Yes | No — discovered per-claim |
| Who decides "more work"? | Orchestrator at entry | Orchestrator at entry | Orchestrator after each claim result |
| Worker slot between items? | No — orchestrator re-issues | No | No — orchestrator re-issues |
| Claim SQL required? | No | No | Yes |
The Python worker ran a long-lived self-looping worker that held a thread between frames. The Rust implementation honors the execution model: no worker holds a slot between frames. Each claim executes as a normal postgres tool command on a worker; the orchestrator drives the loop. Stale-row re-claiming is the playbook's own responsibility via its claim SQL (a reclaim-stale CTE that re-selects rows where the lease timestamp has expired).
| Symbol | File |
|---|---|
LoopMode::Cursor |
src/playbook/types.rs |
CursorClaim |
src/playbook/types.rs |
FrameSpec |
src/playbook/types.rs |
StepInfo.is_cursor |
src/engine/state.rs |
| Orchestrator entry hook |
src/engine/orchestrator.rs — handle_step_enter
|
| Cursor-drive block |
src/engine/orchestrator.rs — advance_cursor
|
reconstruct_cursor_frames |
src/engine/commands.rs |
__cursor_drained drain flag |
src/engine/state.rs |
Validated end-to-end via test_pft_flow_v2 — the patient-fetch flow that mirrors the production state_report_generation playbook — against a throttling/error-injecting paginated-api test server on the local kind cluster (noetl/ai-meta#100). Result: all_passed: true, 5/5 per data type (assessments, conditions, medications, vital_signs, demographics).
- Home — server wiki overview
- Event-sourced execution — how events drive orchestrator state transitions
- noetl/tools wiki — postgres tool — the claim tool that executes the SQL
- noetl/ai-meta#100 — umbrella tracking the implementation + PFT validation
- Execution model — why the orchestrator drives the loop, not a long-lived worker
- Event envelope
- Event-sourced execution
- API surface
- Runtime shape (compiled + plug-in ring)
- Cursor / claim loop mode
- noetl/cli wiki
- noetl/worker wiki
- noetl/tools wiki
- noetl/noetl wiki — Python implementation (twin during migration)
- noetl/ops wiki