diff --git a/.changeset/structured-loop-container.md b/.changeset/structured-loop-container.md new file mode 100644 index 000000000..c52e729dc --- /dev/null +++ b/.changeset/structured-loop-container.md @@ -0,0 +1,26 @@ +--- +"@objectstack/spec": minor +"@objectstack/service-automation": minor +--- + +feat(automation): structured control-flow constructs (ADR-0031) — loop container + +Adopt structured control-flow as the native, AI-authored flow model (ADR-0031), +choosing representation **(B) nested sub-structure**: containers carry their body +as a self-contained single-entry/single-exit region in `config`. + +- **spec**: new `automation/control-flow.zod.ts` defining the `loop` container + (`config.body`), `parallel` block (`config.branches[]`, implicit join), and + `try/catch/retry` (`config.try`/`config.catch`/`config.retry`) configs, plus + region well-formedness analysis (`analyzeRegion`, `findRegionEntry`) and + `validateControlFlow` (single-entry/single-exit, acyclic; bounded loop). +- **engine**: `registerFlow()` now rejects malformed control-flow regions before + a flow can run; new `AutomationEngine.runRegion()` executes a body region in + the enclosing variable scope without touching the shared DAG traversal. +- **loop executor**: replaces the no-op `loop` stub with a real iteration + container — binds the iterator/index variables and runs the body once per item + under a hard max-iteration guard. Legacy flat-graph loops (no `config.body`) + keep working — the construct is additive. + +Parallel-block and try/catch *engine execution* and BPMN interop mapping remain +follow-ups (issue #1479, tasks 3–5). diff --git a/docs/adr/0031-advanced-flow-node-executors-and-dag.md b/docs/adr/0031-advanced-flow-node-executors-and-dag.md index 091676765..f70f5cc6f 100644 --- a/docs/adr/0031-advanced-flow-node-executors-and-dag.md +++ b/docs/adr/0031-advanced-flow-node-executors-and-dag.md @@ -129,6 +129,29 @@ structured containers; **to be decided in the implementation ADR/PR**: The choice, plus designer rendering of containers and migration of existing flows, is the first task below. +### Representation decision (#1479) — **(B) nested sub-structure** + +The implementation adopts **(B)**: each container carries its body as a +self-contained region in `config` — `config.body` for `loop`, +`config.branches[]` for `parallel`, `config.try`/`config.catch` for `try_catch` +(see `@objectstack/spec` `automation/control-flow.zod.ts`). Rationale: + +1. **Well-formed by construction.** A nested region is its *own* graph, so + single-entry is intrinsic and there are no scope markers to balance or leak + across — validation (`analyzeRegion`/`validateControlFlow`) is local. +2. **The shared `engine.ts` traversal stays untouched.** The container executor + runs its body via a scoped `AutomationEngine.runRegion()`; the main DAG + `traverseNext` never learns about scope markers (deliberate, given the + multi-agent discipline around `engine.ts`). The container's ordinary + out-edges remain the after-loop/after-block continuation, so the DAG + invariant for ordinary edges holds. +3. **Cleaner AST for AI** — the design center (ADR-0010/0011). + +Existing flat-graph loops (a `loop` node with no `config.body`) keep their legacy +behavior — the constructs are **additive**, activated only when the nested +structure is present. Migrating legacy flat loops and designer rendering of +nested containers (in `../objectui`) are deferred follow-ups. + ## Consequences - **Positive**: AI (and humans) author from a small set of constructs that are diff --git a/examples/app-showcase/src/flows/index.ts b/examples/app-showcase/src/flows/index.ts index 4d014b7b6..80ff2e02e 100644 --- a/examples/app-showcase/src/flows/index.ts +++ b/examples/app-showcase/src/flows/index.ts @@ -504,6 +504,63 @@ export const TaskDoneNotifyOwnerFlow = defineFlow({ ], }); +/** + * Batch Reminders — demonstrates the ADR-0031 **structured loop container**. + * + * The `loop` node owns a bounded **body region** (`config.body`, a + * single-entry/single-exit sub-graph) and iterates it over a collection: each + * task is bound to `task` (and its index to `taskIndex`) in the enclosing + * variable scope, and the body sends a reminder. A hard `maxIterations` guard + * keeps iteration bounded. The loop node's ordinary out-edge (`→ end`) is the + * after-loop continuation — the DAG invariant for ordinary edges is preserved. + */ +export const BatchRemindersFlow = defineFlow({ + name: 'showcase_batch_reminders', + label: 'Batch Task Reminders (Loop)', + description: 'Iterates a collection of tasks and sends a reminder for each (structured loop container, ADR-0031).', + type: 'autolaunched', + variables: [ + { name: 'tasks', type: 'list', isInput: true, isOutput: false }, + ], + nodes: [ + { id: 'start', type: 'start', label: 'Start' }, + { + id: 'loop_tasks', + type: 'loop', + label: 'For each task', + config: { + collection: '{tasks}', + iteratorVariable: 'task', + indexVariable: 'taskIndex', + maxIterations: 500, + body: { + nodes: [ + { + id: 'send_reminder', + type: 'script', + label: 'Send Reminder', + config: { + actionType: 'email', + inputs: { + to: '{task.owner.email}', + subject: 'Reminder ({taskIndex}): {task.title}', + template: 'showcase_task_reminder_email', + }, + }, + }, + ], + edges: [], + }, + }, + }, + { id: 'end', type: 'end', label: 'End' }, + ], + edges: [ + { id: 'e1', source: 'start', target: 'loop_tasks' }, + { id: 'e2', source: 'loop_tasks', target: 'end' }, + ], +}); + export const allFlows = [ TaskCompletedFlow, ReassignWizardFlow, @@ -515,4 +572,5 @@ export const allFlows = [ TaskFollowUpFlow, NotifyOwnerSubflow, TaskDoneNotifyOwnerFlow, + BatchRemindersFlow, ]; diff --git a/packages/services/service-automation/src/builtin/index.ts b/packages/services/service-automation/src/builtin/index.ts index 1de9da440..3d5479599 100644 --- a/packages/services/service-automation/src/builtin/index.ts +++ b/packages/services/service-automation/src/builtin/index.ts @@ -10,7 +10,8 @@ * descriptors with `source: 'builtin'`. * * Scope (built-in baseline): - * - logic — decision / assignment / loop (engine core) + * - logic — decision / assignment (engine core) + * - logic — loop (structured iteration container, ADR-0031) * - data — get/create/update/delete_record (platform CRUD baseline) * - human — screen / script (core flow capability) * - io — http_request (foundational outbound I/O) @@ -28,6 +29,7 @@ import type { PluginContext } from '@objectstack/core'; import type { AutomationEngine } from '../engine.js'; import { registerLogicNodes } from './logic-nodes.js'; +import { registerLoopNode } from './loop-node.js'; import { registerCrudNodes } from './crud-nodes.js'; import { registerScreenNodes } from './screen-nodes.js'; import { registerHttpNodes } from './http-nodes.js'; @@ -37,6 +39,7 @@ import { registerWaitNode } from './wait-node.js'; import { registerSubflowNode } from './subflow-node.js'; export { registerLogicNodes } from './logic-nodes.js'; +export { registerLoopNode } from './loop-node.js'; export { registerCrudNodes } from './crud-nodes.js'; export { registerScreenNodes } from './screen-nodes.js'; export { registerHttpNodes } from './http-nodes.js'; @@ -52,6 +55,7 @@ export { registerSubflowNode } from './subflow-node.js'; */ export function installBuiltinNodes(engine: AutomationEngine, ctx: PluginContext): void { registerLogicNodes(engine, ctx); + registerLoopNode(engine, ctx); registerCrudNodes(engine, ctx); registerScreenNodes(engine, ctx); registerHttpNodes(engine, ctx); diff --git a/packages/services/service-automation/src/builtin/logic-nodes.ts b/packages/services/service-automation/src/builtin/logic-nodes.ts index 6c685cf86..647d00f9d 100644 --- a/packages/services/service-automation/src/builtin/logic-nodes.ts +++ b/packages/services/service-automation/src/builtin/logic-nodes.ts @@ -5,7 +5,10 @@ import { defineActionDescriptor } from '@objectstack/spec/automation'; import type { AutomationEngine } from '../engine.js'; /** - * Logic built-in nodes — decision / assignment / loop. + * Logic built-in nodes — decision / assignment. + * + * (The `loop` container is registered separately — see `loop-node.ts` — as a + * structured iteration construct per ADR-0031.) * * Part of the automation engine's foundational vocabulary, so the core * {@link AutomationServicePlugin} seeds them directly (ADR-0018). These are NOT @@ -52,27 +55,5 @@ export function registerLogicNodes(engine: AutomationEngine, ctx: PluginContext) }, }); - // loop node — iterate over a collection - engine.registerNodeExecutor({ - type: 'loop', - descriptor: defineActionDescriptor({ - type: 'loop', version: '1.0.0', name: 'Loop', - description: 'Iterate over a collection.', - icon: 'repeat', category: 'logic', source: 'builtin', - }), - async execute(node, variables, _context) { - const config = node.config as Record | undefined; - const collectionName = config?.collection as string | undefined; - if (collectionName) { - const collection = variables.get(collectionName); - if (Array.isArray(collection)) { - variables.set('$loopItems', collection); - variables.set('$loopIndex', 0); - } - } - return { success: true }; - }, - }); - - ctx.logger.info('[Logic Nodes] 3 built-in node executors registered'); + ctx.logger.info('[Logic Nodes] 2 built-in node executors registered'); } diff --git a/packages/services/service-automation/src/builtin/loop-node.test.ts b/packages/services/service-automation/src/builtin/loop-node.test.ts new file mode 100644 index 000000000..9ab4dc4e2 --- /dev/null +++ b/packages/services/service-automation/src/builtin/loop-node.test.ts @@ -0,0 +1,199 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +import { describe, it, expect, beforeEach } from 'vitest'; +import { AutomationEngine } from '../engine.js'; +import type { NodeExecutor } from '../engine.js'; +import { registerLoopNode } from './loop-node.js'; + +function silentLogger() { + return { info() {}, warn() {}, error() {}, debug() {}, child() { return silentLogger(); } } as any; +} +function ctx() { + return { logger: silentLogger(), getService() { throw new Error('none'); } } as any; +} + +describe('loop container executor (ADR-0031)', () => { + let engine: AutomationEngine; + let visited: unknown[]; + + beforeEach(() => { + engine = new AutomationEngine(silentLogger()); + visited = []; + registerLoopNode(engine, ctx()); + + // A body node that records the current iterator value + index. + engine.registerNodeExecutor({ + type: 'collect', + async execute(node, variables) { + const cfg = (node.config ?? {}) as Record; + const itemVar = (cfg.itemVar as string) ?? 'item'; + const idxVar = (cfg.idxVar as string) ?? 'i'; + visited.push({ item: variables.get(itemVar), index: variables.get(idxVar) }); + return { success: true }; + }, + } as NodeExecutor); + + // A body node that always fails (to exercise failure propagation). + engine.registerNodeExecutor({ + type: 'boom', + async execute() { return { success: false, error: 'kaboom' }; }, + } as NodeExecutor); + + // Seed the collection into a flow variable via assignment. + engine.registerNodeExecutor({ + type: 'seed', + async execute(node, variables) { + const cfg = (node.config ?? {}) as Record; + for (const [k, v] of Object.entries(cfg)) variables.set(k, v); + return { success: true }; + }, + } as NodeExecutor); + }); + + const loopFlow = (loopConfig: Record, seed: Record) => ({ + name: 'loop_flow', + label: 'Loop Flow', + type: 'autolaunched' as const, + nodes: [ + { id: 'start', type: 'start', label: 'Start' }, + { id: 'seed', type: 'seed', label: 'Seed', config: seed }, + { id: 'loop', type: 'loop', label: 'Loop', config: loopConfig }, + { id: 'after', type: 'collect', label: 'After', config: { itemVar: 'sentinel', idxVar: 'sentinelIdx' } }, + { id: 'end', type: 'end', label: 'End' }, + ], + edges: [ + { id: 'e1', source: 'start', target: 'seed' }, + { id: 'e2', source: 'seed', target: 'loop' }, + { id: 'e3', source: 'loop', target: 'after' }, + { id: 'e4', source: 'after', target: 'end' }, + ], + }); + + it('iterates the body region once per item, binding iterator + index', async () => { + engine.registerFlow('loop_flow', loopFlow( + { + collection: '{items}', + iteratorVariable: 'item', + indexVariable: 'i', + body: { nodes: [{ id: 'b', type: 'collect', label: 'Body', config: { itemVar: 'item', idxVar: 'i' } }], edges: [] }, + }, + { items: ['a', 'b', 'c'] }, + )); + + const result = await engine.execute('loop_flow'); + + expect(result.success).toBe(true); + // Three body iterations, then the after-loop node ran exactly once. + expect(visited).toEqual([ + { item: 'a', index: 0 }, + { item: 'b', index: 1 }, + { item: 'c', index: 2 }, + { item: undefined, index: undefined }, // the 'after' node (sentinel var unset) + ]); + }); + + it('runs a multi-node body region in order each iteration', async () => { + engine.registerFlow('loop_flow', loopFlow( + { + collection: '{items}', + iteratorVariable: 'item', + body: { + nodes: [ + { id: 'b1', type: 'collect', label: 'First', config: { itemVar: 'item' } }, + { id: 'b2', type: 'collect', label: 'Second', config: { itemVar: 'item' } }, + ], + edges: [{ id: 'be', source: 'b1', target: 'b2' }], + }, + }, + { items: ['x', 'y'] }, + )); + + const result = await engine.execute('loop_flow'); + expect(result.success).toBe(true); + // Two nodes × two items = four body visits (+1 after node). + expect(visited.filter((v: any) => v.item === 'x' || v.item === 'y')).toHaveLength(4); + }); + + it('handles an empty collection (zero iterations) and still continues', async () => { + engine.registerFlow('loop_flow', loopFlow( + { + collection: '{items}', + iteratorVariable: 'item', + body: { nodes: [{ id: 'b', type: 'collect', label: 'Body', config: { itemVar: 'item' } }], edges: [] }, + }, + { items: [] }, + )); + + const result = await engine.execute('loop_flow'); + expect(result.success).toBe(true); + expect(visited).toEqual([{ item: undefined, index: undefined }]); // only the after node + }); + + it('fails when the collection does not resolve to an array', async () => { + engine.registerFlow('loop_flow', loopFlow( + { + collection: '{notAList}', + body: { nodes: [{ id: 'b', type: 'collect', label: 'Body' }], edges: [] }, + }, + { notAList: 'hello' }, + )); + + const result = await engine.execute('loop_flow'); + expect(result.success).toBe(false); + expect(result.error).toMatch(/did not resolve to an array/); + }); + + it('enforces the max-iteration guard', async () => { + engine.registerFlow('loop_flow', loopFlow( + { + collection: '{items}', + iteratorVariable: 'item', + maxIterations: 2, + body: { nodes: [{ id: 'b', type: 'collect', label: 'Body', config: { itemVar: 'item' } }], edges: [] }, + }, + { items: [1, 2, 3, 4] }, + )); + + const result = await engine.execute('loop_flow'); + expect(result.success).toBe(false); + expect(result.error).toMatch(/exceeds maxIterations 2/); + }); + + it('propagates a body failure as a flow failure', async () => { + engine.registerFlow('loop_flow', loopFlow( + { + collection: '{items}', + iteratorVariable: 'item', + body: { nodes: [{ id: 'b', type: 'boom', label: 'Body' }], edges: [] }, + }, + { items: [1] }, + )); + + const result = await engine.execute('loop_flow'); + expect(result.success).toBe(false); + expect(result.error).toMatch(/kaboom/); + }); + + it('rejects a malformed loop body at registerFlow (well-formedness)', () => { + expect(() => + engine.registerFlow('bad_loop', loopFlow( + { + collection: '{items}', + // two entry/exit nodes, no edges → not single-entry/single-exit + body: { nodes: [{ id: 'a', type: 'collect', label: 'A' }, { id: 'b', type: 'collect', label: 'B' }], edges: [] }, + }, + { items: [1] }, + )), + ).toThrow(/loop 'loop' body/); + }); + + it('keeps legacy flat-graph loop behavior when no body is given', async () => { + engine.registerFlow('loop_flow', loopFlow( + { collection: 'items' }, // bare var name, legacy stub path + { items: [1, 2] }, + )); + const result = await engine.execute('loop_flow'); + // Legacy loop just falls through to the after node — no error. + expect(result.success).toBe(true); + }); +}); diff --git a/packages/services/service-automation/src/builtin/loop-node.ts b/packages/services/service-automation/src/builtin/loop-node.ts new file mode 100644 index 000000000..69fd4f075 --- /dev/null +++ b/packages/services/service-automation/src/builtin/loop-node.ts @@ -0,0 +1,126 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +import type { PluginContext } from '@objectstack/core'; +import { defineActionDescriptor, LOOP_MAX_ITERATIONS_CEILING } from '@objectstack/spec/automation'; +import type { FlowRegionParsed } from '@objectstack/spec/automation'; +import type { AutomationContext } from '@objectstack/spec/contracts'; +import type { AutomationEngine } from '../engine.js'; +import { interpolate } from './template.js'; + +/** + * `loop` built-in node — a **structured iteration container** (ADR-0031). + * + * Replaces the previous no-op `loop` stub. The node owns a bounded **body + * region** (`config.body`, a single-entry/single-exit sub-graph) and drives + * iteration over a collection: for each item it binds the current value to + * `config.iteratorVariable` (and the zero-based index to `config.indexVariable`, + * when given) in the **enclosing variable scope** and runs the body region. A + * **hard max-iteration guard** (`config.maxIterations`, clamped to + * {@link LOOP_MAX_ITERATIONS_CEILING}) keeps termination analyzable. + * + * The body region runs as a unit via {@link AutomationEngine.runRegion}; the + * loop node's *ordinary* out-edges in the main graph remain the "after-loop" + * continuation, so the DAG invariant for ordinary edges is preserved. + * + * **Back-compat:** a `loop` node with no `config.body` keeps the legacy + * flat-graph behavior (sets `$loopItems`/`$loopIndex` and falls through) — the + * container construct is additive. + */ +export function registerLoopNode(engine: AutomationEngine, ctx: PluginContext): void { + engine.registerNodeExecutor({ + type: 'loop', + descriptor: defineActionDescriptor({ + type: 'loop', + version: '2.0.0', + name: 'Loop', + description: 'Iterate a body region over a collection (bounded, structured container).', + icon: 'repeat', + category: 'logic', + source: 'builtin', + configSchema: { + type: 'object', + properties: { + collection: { type: 'string', description: 'Template/variable resolving to the array to iterate' }, + iteratorVariable: { type: 'string', description: 'Loop variable holding the current item' }, + indexVariable: { type: 'string', description: 'Optional loop variable holding the current index' }, + maxIterations: { type: 'integer', minimum: 1, maximum: LOOP_MAX_ITERATIONS_CEILING }, + body: { + type: 'object', + description: 'Loop body region (single-entry/single-exit sub-graph)', + properties: { nodes: { type: 'array' }, edges: { type: 'array' } }, + }, + }, + required: ['collection'], + }, + }), + async execute(node, variables, context) { + const cfg = (node.config ?? {}) as Record; + const body = cfg.body as FlowRegionParsed | undefined; + + // ── Legacy flat-graph loop (no body) — preserve prior stub behavior. ── + if (body == null) { + const collectionName = typeof cfg.collection === 'string' ? cfg.collection : undefined; + if (collectionName) { + const legacy = variables.get(collectionName); + if (Array.isArray(legacy)) { + variables.set('$loopItems', legacy); + variables.set('$loopIndex', 0); + } + } + return { success: true }; + } + + // ── Structured loop container. ── + const iteratorVariable = typeof cfg.iteratorVariable === 'string' && cfg.iteratorVariable + ? cfg.iteratorVariable + : 'item'; + const indexVariable = typeof cfg.indexVariable === 'string' && cfg.indexVariable + ? cfg.indexVariable + : undefined; + + // Resolve the collection: a `{token}` template, a bare variable name, or + // (defensively) an already-resolved array. + const rawCollection = cfg.collection; + let collection: unknown; + if (Array.isArray(rawCollection)) { + collection = rawCollection; + } else if (typeof rawCollection === 'string') { + collection = interpolate(rawCollection, variables, context ?? ({} as AutomationContext)); + if ((collection == null) && variables.has(rawCollection)) { + collection = variables.get(rawCollection); + } + } + + if (!Array.isArray(collection)) { + return { + success: false, + error: `loop '${node.id}': collection '${String(rawCollection)}' did not resolve to an array`, + }; + } + + // Hard iteration guard. + const requested = typeof cfg.maxIterations === 'number' ? cfg.maxIterations : LOOP_MAX_ITERATIONS_CEILING; + const maxIterations = Math.min(requested, LOOP_MAX_ITERATIONS_CEILING); + if (collection.length > maxIterations) { + return { + success: false, + error: + `loop '${node.id}': collection length ${collection.length} exceeds maxIterations ${maxIterations}`, + }; + } + + let iterations = 0; + for (let i = 0; i < collection.length; i++) { + variables.set(iteratorVariable, collection[i]); + if (indexVariable) variables.set(indexVariable, i); + // Body runs in the shared scope; iterator var + mutations are visible. + await engine.runRegion(body, variables, context ?? ({} as AutomationContext)); + iterations++; + } + + return { success: true, output: { iterations } }; + }, + }); + + ctx.logger.info('[Loop Node] 1 built-in node executor registered'); +} diff --git a/packages/services/service-automation/src/engine.ts b/packages/services/service-automation/src/engine.ts index 278674f3f..3146997ff 100644 --- a/packages/services/service-automation/src/engine.ts +++ b/packages/services/service-automation/src/engine.ts @@ -4,7 +4,8 @@ import type { FlowParsed, FlowNodeParsed, FlowEdgeParsed } from '@objectstack/sp import type { ExecutionLog, ActionDescriptor } from '@objectstack/spec/automation'; import type { AutomationContext, AutomationResult, ResumeSignal, IAutomationService, ScreenSpec } from '@objectstack/spec/contracts'; import type { Logger } from '@objectstack/spec/contracts'; -import { FlowSchema, FLOW_STRUCTURAL_NODE_TYPES, defineActionDescriptor } from '@objectstack/spec/automation'; +import { FlowSchema, FLOW_STRUCTURAL_NODE_TYPES, validateControlFlow, findRegionEntry, defineActionDescriptor } from '@objectstack/spec/automation'; +import type { FlowRegionParsed } from '@objectstack/spec/automation'; import type { Connector } from '@objectstack/spec/integration'; import { ConnectorSchema } from '@objectstack/spec/integration'; // Static import (not a lazy `require`): the engine ships as ESM ("type":"module"), @@ -595,6 +596,11 @@ export class AutomationEngine implements IAutomationService { // DAG cycle detection this.detectCycles(parsed); + // ADR-0031 — validate structured control-flow constructs (loop bodies, + // parallel branches, try/catch regions) are well-formed (single-entry/ + // single-exit, acyclic). Reject the malformed before it can run. + validateControlFlow(parsed); + // ADR-0018 §M1 — validate node types against the live action registry. // The protocol no longer gates `type` with a closed enum; membership is // checked here instead. Soft-fail (warn, don't throw): a flow authored @@ -1340,6 +1346,53 @@ export class AutomationEngine implements IAutomationService { } } + /** + * Execute a structured control-flow **region** (ADR-0031) — the nested + * body of a `loop` container (or, later, a `parallel` branch / `try_catch` + * region). The region is a self-contained single-entry/single-exit + * sub-graph carried in the container's `config`; it runs in the **enclosing + * variable scope** (the caller's `variables` map), so the iterator variable + * and any body mutations are visible to the surrounding flow — a region is + * NOT a separate `subflow` invocation. + * + * The region executes against a synthetic flow view of its own + * nodes/edges, so the main DAG traversal (`traverseNext`) is never aware of + * scope markers — keeping the shared traversal untouched. + * + * Body step logs are kept in a region-local array (not yet merged into the + * parent run log); surfacing per-iteration steps is a follow-up. + * + * Durable pause (`suspend`) inside a region is not supported in this + * iteration — it is converted into a clear error (mirrors the `subflow` + * nested-pause guard). + */ + async runRegion( + region: FlowRegionParsed, + variables: Map, + context: AutomationContext, + ): Promise { + const entryId = findRegionEntry(region); + const entry = region.nodes.find(n => n.id === entryId); + if (!entry) { + throw new Error(`region entry node '${entryId}' not found`); + } + // A synthetic flow view — executeNode/traverseNext only read `nodes`/`edges`. + const subFlow = { nodes: region.nodes, edges: region.edges ?? [] } as unknown as FlowParsed; + // TODO(#1479): merge region step logs into the parent run log so + // per-iteration body steps surface in run observability. + const regionSteps: StepLogEntry[] = []; + try { + await this.executeNode(entry, subFlow, variables, context, regionSteps); + } catch (err) { + if (isSuspendSignal(err)) { + throw new Error( + `durable pause inside a structured region (node '${err.nodeId}') is not supported`, + ); + } + throw err; + } + } + /** * Execute a promise with timeout using Promise.race. */ diff --git a/packages/spec/src/automation/control-flow.test.ts b/packages/spec/src/automation/control-flow.test.ts new file mode 100644 index 000000000..224d252e8 --- /dev/null +++ b/packages/spec/src/automation/control-flow.test.ts @@ -0,0 +1,216 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +import { describe, it, expect } from 'vitest'; +import { + LoopConfigSchema, + ParallelConfigSchema, + TryCatchConfigSchema, + FlowRegionSchema, + analyzeRegion, + findRegionEntry, + validateControlFlow, + LOOP_MAX_ITERATIONS_CEILING, + LOOP_NODE_TYPE, + PARALLEL_NODE_TYPE, + TRY_CATCH_NODE_TYPE, +} from './control-flow.zod'; + +const node = (id: string, type = 'assignment') => ({ id, type, label: id }); +const edge = (id: string, source: string, target: string) => ({ id, source, target }); + +describe('canonical construct type ids', () => { + it('are distinct from BPMN interop node types', () => { + expect(LOOP_NODE_TYPE).toBe('loop'); + expect(PARALLEL_NODE_TYPE).toBe('parallel'); + expect(TRY_CATCH_NODE_TYPE).toBe('try_catch'); + }); +}); + +describe('LoopConfigSchema', () => { + it('accepts a structured loop container with a body region', () => { + const parsed = LoopConfigSchema.parse({ + collection: '{items}', + iteratorVariable: 'item', + indexVariable: 'i', + body: { + nodes: [node('w', 'noop')], + edges: [], + }, + }); + expect(parsed.iteratorVariable).toBe('item'); + expect(parsed.body?.nodes).toHaveLength(1); + }); + + it('defaults iteratorVariable to "item"', () => { + const parsed = LoopConfigSchema.parse({ collection: '{items}' }); + expect(parsed.iteratorVariable).toBe('item'); + }); + + it('accepts a legacy flat-graph loop (no body)', () => { + const parsed = LoopConfigSchema.parse({ collection: '{tasks}', iteratorVariable: 'currentTask' }); + expect(parsed.body).toBeUndefined(); + }); + + it('rejects maxIterations above the engine ceiling', () => { + expect(() => + LoopConfigSchema.parse({ collection: '{x}', maxIterations: LOOP_MAX_ITERATIONS_CEILING + 1 }), + ).toThrow(); + }); +}); + +describe('ParallelConfigSchema', () => { + it('requires at least two branches', () => { + expect(() => ParallelConfigSchema.parse({ branches: [{ nodes: [node('a')] }] })).toThrow(); + }); + + it('accepts two branch regions', () => { + const parsed = ParallelConfigSchema.parse({ + branches: [ + { name: 'A', nodes: [node('a')] }, + { name: 'B', nodes: [node('b')] }, + ], + }); + expect(parsed.branches).toHaveLength(2); + }); +}); + +describe('TryCatchConfigSchema', () => { + it('accepts a try region with catch + retry', () => { + const parsed = TryCatchConfigSchema.parse({ + try: { nodes: [node('t')] }, + catch: { nodes: [node('c')] }, + retry: { maxRetries: 3, retryDelayMs: 500 }, + }); + expect(parsed.errorVariable).toBe('$error'); + expect(parsed.retry?.maxRetries).toBe(3); + }); +}); + +describe('analyzeRegion — well-formedness', () => { + it('accepts a single-node region (entry == exit)', () => { + const a = analyzeRegion(FlowRegionSchema.parse({ nodes: [node('only')] })); + expect(a.errors).toEqual([]); + expect(a.entryId).toBe('only'); + expect(a.exitId).toBe('only'); + }); + + it('accepts a linear single-entry/single-exit chain', () => { + const a = analyzeRegion( + FlowRegionSchema.parse({ + nodes: [node('a'), node('b'), node('c')], + edges: [edge('e1', 'a', 'b'), edge('e2', 'b', 'c')], + }), + ); + expect(a.errors).toEqual([]); + expect(a.entryId).toBe('a'); + expect(a.exitId).toBe('c'); + }); + + it('rejects multiple entry nodes', () => { + const a = analyzeRegion( + FlowRegionSchema.parse({ + nodes: [node('a'), node('b'), node('c')], + edges: [edge('e1', 'a', 'c'), edge('e2', 'b', 'c')], + }), + ); + expect(a.errors.join(' ')).toMatch(/single-entry/); + }); + + it('rejects multiple exit nodes', () => { + const a = analyzeRegion( + FlowRegionSchema.parse({ + nodes: [node('a'), node('b'), node('c')], + edges: [edge('e1', 'a', 'b'), edge('e2', 'a', 'c')], + }), + ); + expect(a.errors.join(' ')).toMatch(/single-exit/); + }); + + it('rejects a cyclic region', () => { + const a = analyzeRegion({ + nodes: [node('a'), node('b')], + edges: [edge('e1', 'a', 'b'), edge('e2', 'b', 'a')], + }); + // Cycle => no entry/exit and a cycle error. + expect(a.errors.join(' ')).toMatch(/cycle|entry|exit/); + }); + + it('rejects an edge that references a non-region node', () => { + const a = analyzeRegion({ + nodes: [node('a')], + edges: [edge('e1', 'a', 'ghost')], + }); + expect(a.errors.join(' ')).toMatch(/not a region node/); + }); +}); + +describe('findRegionEntry', () => { + it('returns the single entry id', () => { + expect(findRegionEntry({ nodes: [node('a'), node('b')], edges: [edge('e', 'a', 'b')] })).toBe('a'); + }); + + it('throws on a malformed region', () => { + expect(() => findRegionEntry({ nodes: [node('a'), node('b')], edges: [] })).toThrow(/malformed/); + }); +}); + +describe('validateControlFlow', () => { + it('passes a flow with a well-formed loop body', () => { + expect(() => + validateControlFlow({ + nodes: [ + { ...node('start', 'start') }, + { + ...node('loop1', LOOP_NODE_TYPE), + config: { collection: '{items}', iteratorVariable: 'item', body: { nodes: [node('w', 'noop')], edges: [] } }, + }, + ] as never, + }), + ).not.toThrow(); + }); + + it('throws on a malformed loop body', () => { + expect(() => + validateControlFlow({ + nodes: [ + { + ...node('loop1', LOOP_NODE_TYPE), + config: { + collection: '{items}', + body: { nodes: [node('a'), node('b')], edges: [] }, // two entries, two exits + }, + }, + ] as never, + }), + ).toThrow(/loop 'loop1' body/); + }); + + it('ignores legacy flat-graph loops (no body)', () => { + expect(() => + validateControlFlow({ + nodes: [{ ...node('loop1', LOOP_NODE_TYPE), config: { collection: '{items}', iteratorVariable: 'x' } }] as never, + }), + ).not.toThrow(); + }); + + it('throws when a parallel block has fewer than two branches', () => { + expect(() => + validateControlFlow({ + nodes: [{ ...node('p', PARALLEL_NODE_TYPE), config: { branches: [{ nodes: [node('a')] }] } }] as never, + }), + ).toThrow(/at least 2 branches/); + }); + + it('validates try_catch try/catch regions', () => { + expect(() => + validateControlFlow({ + nodes: [ + { + ...node('tc', TRY_CATCH_NODE_TYPE), + config: { try: { nodes: [node('a'), node('b')], edges: [] } }, // two entries + }, + ] as never, + }), + ).toThrow(/try_catch 'tc' try/); + }); +}); diff --git a/packages/spec/src/automation/control-flow.zod.ts b/packages/spec/src/automation/control-flow.zod.ts new file mode 100644 index 000000000..ed9cbe737 --- /dev/null +++ b/packages/spec/src/automation/control-flow.zod.ts @@ -0,0 +1,328 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +/** + * @module automation/control-flow + * + * Structured control-flow constructs (ADR-0031) — the **native + AI-authored** + * flow model: a `loop` **container**, a `parallel` **block**, and structured + * `try/catch/retry`. Unlike BPMN's gateway/boundary/token graph (kept in the + * protocol for *interop* only), these constructs are **well-formed by + * construction**, locally composable, and statically analyzable — the right + * substrate for LLM authoring (ADR-0010/0011). + * + * ## Representation — decision: **(B) nested sub-structure** + * + * ADR-0031 flagged two ways to carry structured containers in the flat + * `nodes[]`+`edges[]` model: + * + * - **(A)** marker-delimited scoped regions (a container node + a scope-end + * marker; the body is the edges *between* them in the main graph), or + * - **(B)** the container node carries a **nested mini-flow** in its `config`. + * + * We adopt **(B)**. Each container holds its body as a self-contained + * {@link FlowRegionSchema} (`config.body` for `loop`, `config.branches[]` for + * `parallel`, `config.try`/`config.catch` for `try_catch`). The reasons: + * + * 1. **Well-formed by construction** — a nested region is its *own* graph, so + * single-entry is intrinsic; there are no scope markers to balance and no + * way to "leak" an edge across a boundary. Validation is local. + * 2. **The shared engine traversal stays untouched** — the container executor + * runs its own body via a scoped helper; the main DAG `traverseNext` never + * learns about scope markers (important under the multi-agent discipline + * around `engine.ts`). The container's *ordinary* out-edges remain the + * "after-loop / after-block" continuation. + * 3. **Cleaner AST for AI** — ADR-0031 calls (B) "the cleaner long-term AST," + * and AI authoring is the design center. + * + * Existing flat-graph loops (a `loop` node with no `config.body`) keep their + * legacy behavior — the constructs are **additive**, activated only when the + * nested structure is present. + * + * The canonical construct type ids are {@link LOOP_NODE_TYPE} (`loop`, + * pre-existing), {@link PARALLEL_NODE_TYPE} (`parallel`), and + * {@link TRY_CATCH_NODE_TYPE} (`try_catch`). These are distinct from the BPMN + * interop node types (`parallel_gateway` / `join_gateway` / `boundary_event`), + * which remain author-invisible interchange representations. + */ + +import { z } from 'zod'; +import { lazySchema } from '../shared/lazy-schema'; +import { FlowNodeSchema, FlowEdgeSchema } from './flow.zod'; +import type { FlowNodeParsed, FlowEdgeParsed } from './flow.zod'; + +// ─── Canonical construct type ids ──────────────────────────────────── + +/** The structured iteration container (pre-existing built-in id). */ +export const LOOP_NODE_TYPE = 'loop' as const; +/** The structured parallel block (implicit join at block end). */ +export const PARALLEL_NODE_TYPE = 'parallel' as const; +/** The structured try/catch/retry construct. */ +export const TRY_CATCH_NODE_TYPE = 'try_catch' as const; + +/** + * Hard ceiling on loop iterations — the engine refuses to iterate beyond this + * regardless of `maxIterations`, so a runaway collection can never spin the + * runtime. ADR-0031 §Decision 1 ("a **hard max-iteration guard**"). + */ +export const LOOP_MAX_ITERATIONS_CEILING = 100_000; + +// ─── Region — a nested single-entry/single-exit sub-graph ──────────── + +/** + * A **region** is a self-contained sub-graph (nodes + edges) executed as the + * body of a container. It must be **single-entry / single-exit** and acyclic — + * exactly the well-formedness {@link analyzeRegion} enforces. Region nodes + * execute in the **enclosing variable scope** (the iterator variable and any + * body mutations are visible to the surrounding flow), so a region is *not* a + * separate `subflow` invocation. + */ +export const FlowRegionSchema = lazySchema(() => z.object({ + /** Body nodes (must not include `start`/`end` trigger sentinels). */ + nodes: z.array(FlowNodeSchema).min(1).describe('Region body nodes (single-entry/single-exit sub-graph)'), + /** Body edges connecting the region nodes. */ + edges: z.array(FlowEdgeSchema).default([]).describe('Region body edges'), +})); + +export type FlowRegion = z.input; +export type FlowRegionParsed = z.infer; + +// ─── Loop container ────────────────────────────────────────────────── + +/** + * `loop` container config — bounded iteration over a collection. The `body` + * region runs once per item in the enclosing variable scope, with the current + * item bound to `iteratorVariable` (and the zero-based index to `indexVariable`, + * when given). Iteration is hard-capped by `maxIterations` (clamped to + * {@link LOOP_MAX_ITERATIONS_CEILING}) so termination stays analyzable. + * + * `body` is **optional** for back-compat: a `loop` node with no `body` keeps the + * legacy flat-graph behavior (the constructs are additive). + */ +export const LoopConfigSchema = lazySchema(() => z.object({ + /** + * The collection to iterate. A `{token}` template or bare variable name that + * resolves (at run time) to an array in the flow's variable scope. + */ + collection: z.string().min(1).describe('Template/variable resolving to the array to iterate'), + /** Variable name the current item is bound to inside the body. */ + iteratorVariable: z.string().min(1).default('item').describe('Loop variable holding the current item'), + /** Optional variable name the zero-based index is bound to inside the body. */ + indexVariable: z.string().optional().describe('Optional loop variable holding the current index'), + /** + * Maximum iterations to run — a guard against runaway collections. Clamped to + * {@link LOOP_MAX_ITERATIONS_CEILING}; a collection longer than this fails the + * node rather than truncating silently. + */ + maxIterations: z.number().int().min(1).max(LOOP_MAX_ITERATIONS_CEILING).optional() + .describe('Hard cap on iterations (clamped to the engine ceiling)'), + /** The body region executed once per item (single-entry/single-exit). */ + body: FlowRegionSchema.optional().describe('Loop body region (omit for legacy flat-graph loops)'), +})); + +export type LoopConfig = z.input; +export type LoopConfigParsed = z.infer; + +// ─── Parallel block ────────────────────────────────────────────────── + +/** One named branch of a {@link ParallelConfigSchema} parallel block. */ +export const ParallelBranchSchema = lazySchema(() => z.object({ + /** Optional human label for the branch (designer + logs). */ + name: z.string().optional().describe('Branch label'), + nodes: z.array(FlowNodeSchema).min(1).describe('Branch body nodes'), + edges: z.array(FlowEdgeSchema).default([]).describe('Branch body edges'), +})); + +export type ParallelBranch = z.input; + +/** + * `parallel` block config — N branch regions that run concurrently and **join + * implicitly at block end** (the engine continues once when all branches + * complete). There is no author-visible split/join gateway to mis-wire. The + * branches run in the enclosing variable scope. + */ +export const ParallelConfigSchema = lazySchema(() => z.object({ + branches: z.array(ParallelBranchSchema).min(2) + .describe('Branch regions executed concurrently; implicit join at block end'), +})); + +export type ParallelConfig = z.input; +export type ParallelConfigParsed = z.infer; + +// ─── Try / catch / retry ───────────────────────────────────────────── + +/** + * Structured retry policy — surfaces the engine's existing exponential-backoff + * retry (`FlowSchema.errorHandling`) as a per-construct policy. Mirrors that + * shape so the engine can reuse one backoff implementation. + */ +export const RetryPolicySchema = lazySchema(() => z.object({ + maxRetries: z.number().int().min(0).max(10).default(0).describe('Retry attempts before giving up'), + retryDelayMs: z.number().int().min(0).default(1000).describe('Base delay between retries (ms)'), + backoffMultiplier: z.number().min(1).default(1).describe('Exponential backoff multiplier'), + maxRetryDelayMs: z.number().int().min(0).default(30000).describe('Maximum delay between retries (ms)'), + jitter: z.boolean().default(false).describe('Add random jitter to retry delay'), +})); + +export type RetryPolicy = z.input; + +/** + * `try_catch` config — structured error handling. The `try` region runs; if it + * throws, the `catch` region runs (with the caught error bound to + * `errorVariable`). `retry`, when present, re-runs the `try` region with + * exponential backoff before falling through to `catch`. This is the low-code + * native error model — the same `fault` + retry semantics already in the engine, + * surfaced as a construct rather than BPMN boundary events (ADR-0031 §Decision 3). + */ +export const TryCatchConfigSchema = lazySchema(() => z.object({ + try: FlowRegionSchema.describe('Protected region'), + catch: FlowRegionSchema.optional().describe('Handler region run when the try region fails'), + /** Variable the caught error is bound to inside the catch region. */ + errorVariable: z.string().default('$error').describe('Variable holding the caught error in the catch region'), + retry: RetryPolicySchema.optional().describe('Optional retry policy for the try region'), +})); + +export type TryCatchConfig = z.input; +export type TryCatchConfigParsed = z.infer; + +// ─── Well-formedness analysis ──────────────────────────────────────── + +/** The result of analyzing a region for structural well-formedness. */ +export interface RegionAnalysis { + /** The single entry node id (node with no in-edges), if well-formed. */ + entryId?: string; + /** The single exit node id (node with no out-edges), if well-formed. */ + exitId?: string; + /** Well-formedness problems; empty when the region is valid. */ + errors: string[]; +} + +/** + * Analyze a region's structural well-formedness (ADR-0031 §Sequencing 1): + * + * - every edge references nodes that exist in the region, + * - node ids are unique, + * - exactly **one entry** (a node with no incoming edge) — execution needs a + * unique place to start, + * - exactly **one exit** (a node with no outgoing edge), + * - the region is **acyclic** (loops/iteration are the *container's* job; a + * region body is a plain DAG). + * + * Returns the entry/exit ids and a list of problems. A malformed region is + * rejected at `registerFlow()` so the broken flow never runs. + */ +export function analyzeRegion(region: { nodes: FlowNodeParsed[]; edges?: FlowEdgeParsed[] }): RegionAnalysis { + const errors: string[] = []; + const nodes = region.nodes ?? []; + const edges = region.edges ?? []; + + if (nodes.length === 0) { + return { errors: ['region has no nodes'] }; + } + + // Unique ids. + const ids = new Set(); + for (const n of nodes) { + if (ids.has(n.id)) errors.push(`duplicate node id '${n.id}'`); + ids.add(n.id); + } + + // Edge integrity + in/out degree. + const hasIncoming = new Set(); + const hasOutgoing = new Set(); + const adj = new Map(); + for (const id of ids) adj.set(id, []); + for (const e of edges) { + if (!ids.has(e.source)) errors.push(`edge '${e.id}' source '${e.source}' is not a region node`); + if (!ids.has(e.target)) errors.push(`edge '${e.id}' target '${e.target}' is not a region node`); + if (ids.has(e.source) && ids.has(e.target)) { + hasOutgoing.add(e.source); + hasIncoming.add(e.target); + adj.get(e.source)!.push(e.target); + } + } + + const entries = [...ids].filter(id => !hasIncoming.has(id)); + const exits = [...ids].filter(id => !hasOutgoing.has(id)); + + if (entries.length === 0) errors.push('region has no entry node (every node has an incoming edge — cyclic?)'); + else if (entries.length > 1) errors.push(`region must be single-entry but has ${entries.length}: ${entries.join(', ')}`); + + if (exits.length === 0) errors.push('region has no exit node (every node has an outgoing edge — cyclic?)'); + else if (exits.length > 1) errors.push(`region must be single-exit but has ${exits.length}: ${exits.join(', ')}`); + + // Acyclicity (DFS coloring) — a region body must be a DAG. + const WHITE = 0, GRAY = 1, BLACK = 2; + const color = new Map(); + for (const id of ids) color.set(id, WHITE); + let cyclic = false; + const dfs = (id: string): void => { + color.set(id, GRAY); + for (const next of adj.get(id) ?? []) { + if (color.get(next) === GRAY) { cyclic = true; return; } + if (color.get(next) === WHITE) { dfs(next); if (cyclic) return; } + } + color.set(id, BLACK); + }; + for (const id of ids) { + if (color.get(id) === WHITE) { dfs(id); if (cyclic) break; } + } + if (cyclic) errors.push('region contains a cycle (region bodies must be acyclic)'); + + return { + entryId: entries.length === 1 ? entries[0] : undefined, + exitId: exits.length === 1 ? exits[0] : undefined, + errors, + }; +} + +/** + * The single entry node id of a region, or throw if the region is not + * well-formed. Used by the engine's loop/parallel executors to know where to + * begin executing a body region. + */ +export function findRegionEntry(region: { nodes: FlowNodeParsed[]; edges?: FlowEdgeParsed[] }): string { + const analysis = analyzeRegion(region); + if (!analysis.entryId) { + throw new Error(`malformed control-flow region: ${analysis.errors.join('; ')}`); + } + return analysis.entryId; +} + +/** + * Validate every structured control-flow construct in a flow, throwing on the + * first malformed region (ADR-0031 — "reject the malformed before run"). Covers + * `loop` bodies, `parallel` branches, and `try_catch` try/catch regions. Only + * validates the *nested structure* when present, so legacy flat-graph `loop` + * nodes (no `config.body`) are untouched — the constructs are additive. + * + * Intended to be called from `registerFlow()` after DAG cycle detection. + */ +export function validateControlFlow(flow: { nodes: FlowNodeParsed[] }): void { + const assertRegion = (raw: unknown, where: string): void => { + const parsed = FlowRegionSchema.safeParse(raw); + if (!parsed.success) { + throw new Error(`${where}: invalid region — ${parsed.error.issues.map(i => i.message).join('; ')}`); + } + const analysis = analyzeRegion(parsed.data); + if (analysis.errors.length > 0) { + throw new Error(`${where}: ${analysis.errors.join('; ')}`); + } + }; + + for (const node of flow.nodes) { + const cfg = node.config as Record | undefined; + if (!cfg) continue; + + if (node.type === LOOP_NODE_TYPE && cfg.body != null) { + assertRegion(cfg.body, `loop '${node.id}' body`); + } else if (node.type === PARALLEL_NODE_TYPE && Array.isArray(cfg.branches)) { + if (cfg.branches.length < 2) { + throw new Error(`parallel '${node.id}': a parallel block needs at least 2 branches`); + } + cfg.branches.forEach((branch, i) => assertRegion(branch, `parallel '${node.id}' branch ${i}`)); + } else if (node.type === TRY_CATCH_NODE_TYPE) { + if (cfg.try != null) assertRegion(cfg.try, `try_catch '${node.id}' try`); + if (cfg.catch != null) assertRegion(cfg.catch, `try_catch '${node.id}' catch`); + } + } +} diff --git a/packages/spec/src/automation/index.ts b/packages/spec/src/automation/index.ts index bd4330647..718ca142d 100644 --- a/packages/spec/src/automation/index.ts +++ b/packages/spec/src/automation/index.ts @@ -2,6 +2,7 @@ export * from './flow.zod'; +export * from './control-flow.zod'; export { flowForm } from './flow.form'; export * from './execution.zod'; export * from './webhook.zod';