Skip to content

cursor loop mode

Kadyapam edited this page Jun 15, 2026 · 1 revision

Cursor / claim loop mode (mode: cursor)

Added in v3.8.0 (noetl/server#196). The cursor loop mode lets a playbook step repeatedly claim and process a frame of work rows from a database table until the claim returns nothing — draining the work queue entirely without holding a worker slot open between frames.

Playbook shape

- name: process_records
  loop:
    cursor:
      kind: postgres       # the claim tool (postgres is the default)
      auth: <alias>        # keychain alias for the claim DB
      claim: |             # atomic lease SQL (FOR UPDATE SKIP LOCKED … RETURNING …)
        SELECT patient_id, name, status
        FROM work_queue
        WHERE status = 'pending'
        LIMIT {{ __frame_max_rows | default(1) }}
        FOR UPDATE SKIP LOCKED
        RETURNING patient_id, name;
    iterator: patient      # row columns exposed as iter.patient.<col>
    spec:
      mode: cursor
      max_in_flight: 1
      frame:
        max_rows: 50       # injected as __frame_max_rows into the claim SQL
        row_concurrency: 1 # rows within one frame dispatched concurrently
        # lease_seconds / heartbeat_seconds / max_seconds / max_bytes / process:
        # all accepted (advisory) — the playbook's own claim SQL manages leases
  tool:
    - name: fetch_record
      kind: http
      spec:
        policy:
          rules:
            - do: retry
      # … http config …
    - name: save_record
      kind: postgres
      # … postgres config …

Key fields

Field Meaning
loop.cursor.kind Tool kind for the claim step. postgres is the only validated kind today.
loop.cursor.auth Keychain alias resolved to DB connection credentials.
loop.cursor.claim SQL that atomically leases a batch of rows and returns them (FOR UPDATE SKIP LOCKED … RETURNING …). {{ __frame_max_rows }} is injected from loop.spec.frame.max_rows.
loop.iterator Name for the iteration variable. A claimed row's columns become iter.<name>.<column>.
loop.spec.mode Must be cursor. Other values: sequential (default), parallel.
loop.spec.frame.max_rows How many rows the claim SQL may return per frame. Injected as __frame_max_rows.
loop.spec.frame.row_concurrency How many rows of one frame are dispatched concurrently through the tool: body.

Orchestrator-driven semantics

The cursor loop is orchestrator-driven, not worker-driven. No worker holds a slot while waiting for the next claim; the orchestrator issues a claim command to a worker when a new frame is needed and fans out the body once the claim result arrives.

Entry
  │
  ├── emit step.enter
  └── issue claim command (frame 0)
        │
        ▼
Claim result arrives (worker executes claim SQL, returns rows)
  │
  ├── 0 rows  → DRAIN: complete the step, route arcs with event.name = loop.done
  │
  └── K rows  → fan out the body per claimed row (bounded by frame.row_concurrency)
                    │
                    ▼
              All body rows complete
                    │
                    └── issue next claim command (frame 1, 2, …)

Claim and body command tagging

Claim commands carry a __cursor_frame counter in the command context so the orchestrator can match incoming completions to the right frame. Body commands carry the claimed row data as iter.<iterator> context.

StepInfo.is_cursor is set on the step so that individual claim or body command completions do not mark the step complete — only the DRAIN condition (claim returning 0 rows) does.

Drain and routing

When a claim returns 0 rows the orchestrator sets __cursor_drained = true on the step context and completes the step with event.name = loop.done. Downstream arcs whose when: guard tests event.name == "loop.done" then activate.

The output namespace

Arc when: expressions and step set: blocks may reference the just-completed step's result as {{ output.<field> }}. This is an alias for the step name and was added alongside cursor mode to unblock output-gated arcs in playbooks that check the claim result count (e.g. {{ output.data.row_count }}).

Previously playbooks had to use the step name directly; output matches the Python orchestrator convention and lets a playbook use the same expression regardless of the step name.

Loop-back re-entry

A playbook may loop back to a cursor step from a downstream arc (for example, to re-run it for a different data type). When a cursor step re-enters, the orchestrator resets frame tracking for that run so frame 0 of the re-entry does not merge with the prior drained run's frame history.

Difference from sequential / parallel loops

sequential parallel cursor
Iterations known at entry? Yes — from loop.items Yes No — discovered per-claim
Who decides "more work"? Orchestrator at entry Orchestrator at entry Orchestrator after each claim result
Worker slot between items? No — orchestrator re-issues No No — orchestrator re-issues
Claim SQL required? No No Yes

Difference from the Python frame-leasing model

The Python worker ran a long-lived self-looping worker that held a thread between frames. The Rust implementation honors the execution model: no worker holds a slot between frames. Each claim executes as a normal postgres tool command on a worker; the orchestrator drives the loop. Stale-row re-claiming is the playbook's own responsibility via its claim SQL (a reclaim-stale CTE that re-selects rows where the lease timestamp has expired).

Relevant types and files

Symbol File
LoopMode::Cursor src/playbook/types.rs
CursorClaim src/playbook/types.rs
FrameSpec src/playbook/types.rs
StepInfo.is_cursor src/engine/state.rs
Orchestrator entry hook src/engine/orchestrator.rshandle_step_enter
Cursor-drive block src/engine/orchestrator.rsadvance_cursor
reconstruct_cursor_frames src/engine/commands.rs
__cursor_drained drain flag src/engine/state.rs

Validation

Validated end-to-end via test_pft_flow_v2 — the patient-fetch flow that mirrors the production state_report_generation playbook — against a throttling/error-injecting paginated-api test server on the local kind cluster (noetl/ai-meta#100). Result: all_passed: true, 5/5 per data type (assessments, conditions, medications, vital_signs, demographics).

Related

Clone this wiki locally