From 079802d357e8f697dff406d9c9e6d799e8c33600 Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 1 Jun 2026 20:46:53 +0000 Subject: [PATCH] feat(automation): structured parallel block (ADR-0031, task 3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add engine execution for the `parallel` block — a structured construct with an implicit join (ADR-0031 §Decision 2), building on the loop container PR. - new builtin/parallel-node.ts: runs config.branches[] regions concurrently in the enclosing variable scope via engine.runRegion() and continues once when all complete — no author-visible split/join gateway. Branch failure fails the block (→ fault edge / error handling); durable pause inside a branch is a clear error. - registered as a built-in node executor (builtin/index.ts). - well-formedness (>=2 branches, single-entry/single-exit regions) is already enforced at registerFlow() by validateControlFlow (shipped with the loop PR). - showcase FanOutNotifyFlow demonstrates the parallel block; changeset added. https://claude.ai/code/session_012ti8cx3TkdiQdjCnZXZg2Q --- .changeset/structured-parallel-block.md | 21 +++ examples/app-showcase/src/flows/index.ts | 73 +++++++++ .../service-automation/src/builtin/index.ts | 4 + .../src/builtin/parallel-node.test.ts | 149 ++++++++++++++++++ .../src/builtin/parallel-node.ts | 93 +++++++++++ 5 files changed, 340 insertions(+) create mode 100644 .changeset/structured-parallel-block.md create mode 100644 packages/services/service-automation/src/builtin/parallel-node.test.ts create mode 100644 packages/services/service-automation/src/builtin/parallel-node.ts diff --git a/.changeset/structured-parallel-block.md b/.changeset/structured-parallel-block.md new file mode 100644 index 000000000..947965db3 --- /dev/null +++ b/.changeset/structured-parallel-block.md @@ -0,0 +1,21 @@ +--- +"@objectstack/service-automation": minor +--- + +feat(automation): structured parallel block (ADR-0031, task 3) + +Implement engine execution for the `parallel` block — a structured construct +with an **implicit join** (ADR-0031 §Decision 2). The `parallel` node declares N +branch regions in `config.branches[]`; the executor runs them concurrently in +the enclosing variable scope (via `AutomationEngine.runRegion`) and continues +once when all branches complete — no author-visible split/join gateway. + +- New `builtin/parallel-node.ts` executor (registered as a built-in). +- Branch failure fails the block (surfaced as a node failure → fault edge/error + handling); durable pause inside a branch is a clear error. +- Well-formedness (≥2 branches, single-entry/single-exit regions) is already + enforced at `registerFlow()` by `validateControlFlow` (shipped with the loop + container). + +Showcase `FanOutNotifyFlow` demonstrates the parallel block. Try/catch execution +and BPMN interop mapping remain follow-ups (#1479 tasks 4–5). diff --git a/examples/app-showcase/src/flows/index.ts b/examples/app-showcase/src/flows/index.ts index 80ff2e02e..a7e32d1bc 100644 --- a/examples/app-showcase/src/flows/index.ts +++ b/examples/app-showcase/src/flows/index.ts @@ -561,6 +561,78 @@ export const BatchRemindersFlow = defineFlow({ ], }); +/** + * Fan-out Notify — demonstrates the ADR-0031 **structured parallel block**. + * + * The `parallel` node declares two branch regions in `config.branches[]`; both + * run concurrently in the enclosing variable scope and **join implicitly** at + * block end (the engine continues once both complete). There is no + * author-visible split/join gateway. The node's ordinary out-edge (`→ end`) is + * the after-block continuation. + */ +export const FanOutNotifyFlow = defineFlow({ + name: 'showcase_fan_out_notify', + label: 'Fan-out Notify (Parallel)', + description: 'Notifies owner and watchers concurrently via a parallel block, joining before completion (ADR-0031).', + type: 'autolaunched', + nodes: [ + { + id: 'start', + type: 'start', + label: 'On Task Completed', + config: { + objectName: 'showcase_task', + triggerType: 'record-after-update', + condition: 'status == "done" && previous.status != "done"', + }, + }, + { + id: 'fan_out', + type: 'parallel', + label: 'Notify in parallel', + config: { + branches: [ + { + name: 'Email the owner', + nodes: [ + { + id: 'email_owner', + type: 'script', + label: 'Email Owner', + config: { + actionType: 'email', + inputs: { to: '{record.project.owner}', subject: '✅ Done: {record.title}' }, + }, + }, + ], + edges: [], + }, + { + name: 'Post to Slack', + nodes: [ + { + id: 'slack_post', + type: 'script', + label: 'Slack Notify', + config: { + actionType: 'slack', + inputs: { channel: '#tasks', text: 'Task done: {record.title}' }, + }, + }, + ], + edges: [], + }, + ], + }, + }, + { id: 'end', type: 'end', label: 'End' }, + ], + edges: [ + { id: 'e1', source: 'start', target: 'fan_out' }, + { id: 'e2', source: 'fan_out', target: 'end' }, + ], +}); + export const allFlows = [ TaskCompletedFlow, ReassignWizardFlow, @@ -573,4 +645,5 @@ export const allFlows = [ NotifyOwnerSubflow, TaskDoneNotifyOwnerFlow, BatchRemindersFlow, + FanOutNotifyFlow, ]; diff --git a/packages/services/service-automation/src/builtin/index.ts b/packages/services/service-automation/src/builtin/index.ts index 3d5479599..c44ab3935 100644 --- a/packages/services/service-automation/src/builtin/index.ts +++ b/packages/services/service-automation/src/builtin/index.ts @@ -12,6 +12,7 @@ * Scope (built-in baseline): * - logic — decision / assignment (engine core) * - logic — loop (structured iteration container, ADR-0031) + * - logic — parallel (structured parallel block, implicit join, ADR-0031) * - data — get/create/update/delete_record (platform CRUD baseline) * - human — screen / script (core flow capability) * - io — http_request (foundational outbound I/O) @@ -30,6 +31,7 @@ import type { PluginContext } from '@objectstack/core'; import type { AutomationEngine } from '../engine.js'; import { registerLogicNodes } from './logic-nodes.js'; import { registerLoopNode } from './loop-node.js'; +import { registerParallelNode } from './parallel-node.js'; import { registerCrudNodes } from './crud-nodes.js'; import { registerScreenNodes } from './screen-nodes.js'; import { registerHttpNodes } from './http-nodes.js'; @@ -40,6 +42,7 @@ import { registerSubflowNode } from './subflow-node.js'; export { registerLogicNodes } from './logic-nodes.js'; export { registerLoopNode } from './loop-node.js'; +export { registerParallelNode } from './parallel-node.js'; export { registerCrudNodes } from './crud-nodes.js'; export { registerScreenNodes } from './screen-nodes.js'; export { registerHttpNodes } from './http-nodes.js'; @@ -56,6 +59,7 @@ export { registerSubflowNode } from './subflow-node.js'; export function installBuiltinNodes(engine: AutomationEngine, ctx: PluginContext): void { registerLogicNodes(engine, ctx); registerLoopNode(engine, ctx); + registerParallelNode(engine, ctx); registerCrudNodes(engine, ctx); registerScreenNodes(engine, ctx); registerHttpNodes(engine, ctx); diff --git a/packages/services/service-automation/src/builtin/parallel-node.test.ts b/packages/services/service-automation/src/builtin/parallel-node.test.ts new file mode 100644 index 000000000..b2e53663e --- /dev/null +++ b/packages/services/service-automation/src/builtin/parallel-node.test.ts @@ -0,0 +1,149 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +import { describe, it, expect, beforeEach } from 'vitest'; +import { AutomationEngine } from '../engine.js'; +import type { NodeExecutor } from '../engine.js'; +import { registerParallelNode } from './parallel-node.js'; + +function silentLogger() { + return { info() {}, warn() {}, error() {}, debug() {}, child() { return silentLogger(); } } as any; +} +function ctx() { + return { logger: silentLogger(), getService() { throw new Error('none'); } } as any; +} + +describe('parallel block executor (ADR-0031)', () => { + let engine: AutomationEngine; + let order: string[]; + + beforeEach(() => { + engine = new AutomationEngine(silentLogger()); + order = []; + registerParallelNode(engine, ctx()); + + // A node that writes a variable named by config.key with value config.value, + // optionally after an awaited microtask delay (config.delay ticks). + engine.registerNodeExecutor({ + type: 'setvar', + async execute(node, variables) { + const cfg = (node.config ?? {}) as Record; + const ticks = Number(cfg.delay ?? 0); + for (let i = 0; i < ticks; i++) await Promise.resolve(); + variables.set(cfg.key as string, cfg.value); + order.push(cfg.key as string); + return { success: true }; + }, + } as NodeExecutor); + + engine.registerNodeExecutor({ + type: 'boom', + async execute() { return { success: false, error: 'branch kaboom' }; }, + } as NodeExecutor); + + // Marks that the after-block node ran (proves the implicit join continued). + engine.registerNodeExecutor({ + type: 'after', + async execute(_node, variables) { + order.push('after'); + variables.set('joined', true); + return { success: true }; + }, + } as NodeExecutor); + }); + + const parallelFlow = (branches: unknown) => ({ + name: 'par_flow', + label: 'Parallel Flow', + type: 'autolaunched' as const, + nodes: [ + { id: 'start', type: 'start', label: 'Start' }, + { id: 'par', type: 'parallel', label: 'Fan out', config: { branches } }, + { id: 'join', type: 'after', label: 'After' }, + { id: 'end', type: 'end', label: 'End' }, + ], + edges: [ + { id: 'e1', source: 'start', target: 'par' }, + { id: 'e2', source: 'par', target: 'join' }, + { id: 'e3', source: 'join', target: 'end' }, + ], + }); + + it('runs all branches and joins implicitly before continuing', async () => { + engine.registerFlow('par_flow', parallelFlow([ + { name: 'A', nodes: [{ id: 'a', type: 'setvar', label: 'A', config: { key: 'a', value: 1 } }], edges: [] }, + { name: 'B', nodes: [{ id: 'b', type: 'setvar', label: 'B', config: { key: 'b', value: 2 } }], edges: [] }, + ])); + + const result = await engine.execute('par_flow'); + + expect(result.success).toBe(true); + // Both branches ran, and the after-block node ran exactly once, AFTER both. + expect(order).toContain('a'); + expect(order).toContain('b'); + expect(order[order.length - 1]).toBe('after'); + expect(order.filter(o => o === 'after')).toHaveLength(1); + }); + + it('joins only after the slowest branch completes', async () => { + // Branch "slow" awaits several microtasks; "fast" resolves immediately. + // The join ('after') must still be last. + engine.registerFlow('par_flow', parallelFlow([ + { name: 'fast', nodes: [{ id: 'f', type: 'setvar', label: 'F', config: { key: 'fast', value: 1, delay: 0 } }], edges: [] }, + { name: 'slow', nodes: [{ id: 's', type: 'setvar', label: 'S', config: { key: 'slow', value: 1, delay: 5 } }], edges: [] }, + ])); + + await engine.execute('par_flow'); + + expect(order.indexOf('after')).toBeGreaterThan(order.indexOf('slow')); + expect(order.indexOf('after')).toBeGreaterThan(order.indexOf('fast')); + }); + + it('runs multi-node branch regions in order', async () => { + engine.registerFlow('par_flow', parallelFlow([ + { + name: 'chain', + nodes: [ + { id: 'c1', type: 'setvar', label: 'C1', config: { key: 'c1', value: 1 } }, + { id: 'c2', type: 'setvar', label: 'C2', config: { key: 'c2', value: 2 } }, + ], + edges: [{ id: 'ce', source: 'c1', target: 'c2' }], + }, + { name: 'solo', nodes: [{ id: 'd', type: 'setvar', label: 'D', config: { key: 'd', value: 3 } }], edges: [] }, + ])); + + const result = await engine.execute('par_flow'); + expect(result.success).toBe(true); + expect(order.indexOf('c1')).toBeLessThan(order.indexOf('c2')); + expect(order).toContain('d'); + }); + + it('fails the block when a branch fails', async () => { + engine.registerFlow('par_flow', parallelFlow([ + { name: 'ok', nodes: [{ id: 'a', type: 'setvar', label: 'A', config: { key: 'a', value: 1 } }], edges: [] }, + { name: 'bad', nodes: [{ id: 'x', type: 'boom', label: 'X' }], edges: [] }, + ])); + + const result = await engine.execute('par_flow'); + expect(result.success).toBe(false); + expect(result.error).toMatch(/branch/i); + expect(order).not.toContain('after'); // join did not continue + }); + + it('rejects a parallel block with fewer than two branches at registerFlow', () => { + expect(() => + engine.registerFlow('bad_par', parallelFlow([ + { name: 'only', nodes: [{ id: 'a', type: 'setvar', label: 'A' }], edges: [] }, + ])), + ).toThrow(/at least 2 branches/); + }); + + it('rejects a malformed branch region at registerFlow', () => { + expect(() => + engine.registerFlow('bad_par', parallelFlow([ + { name: 'good', nodes: [{ id: 'a', type: 'setvar', label: 'A' }], edges: [] }, + // two entry/exit nodes, no edges → not single-entry/single-exit + { name: 'bad', nodes: [{ id: 'b', type: 'setvar', label: 'B' }, { id: 'c', type: 'setvar', label: 'C' }], edges: [] }, + ])), + ).toThrow(/parallel 'par' branch 1/); + }); +}); diff --git a/packages/services/service-automation/src/builtin/parallel-node.ts b/packages/services/service-automation/src/builtin/parallel-node.ts new file mode 100644 index 000000000..d4af5a075 --- /dev/null +++ b/packages/services/service-automation/src/builtin/parallel-node.ts @@ -0,0 +1,93 @@ +// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license. + +import type { PluginContext } from '@objectstack/core'; +import { defineActionDescriptor } from '@objectstack/spec/automation'; +import type { FlowRegionParsed } from '@objectstack/spec/automation'; +import type { AutomationContext } from '@objectstack/spec/contracts'; +import type { AutomationEngine } from '../engine.js'; + +/** One branch of a parallel block — a region plus an optional label. */ +interface ParallelBranch extends FlowRegionParsed { + name?: string; +} + +/** + * `parallel` built-in node — a **structured parallel block** with an + * **implicit join** (ADR-0031 §Decision 2). + * + * The node declares N branch regions in `config.branches[]`; each branch is a + * self-contained single-entry/single-exit sub-graph (validated at + * `registerFlow()`). The executor runs every branch concurrently + * (`Promise.all`) in the **enclosing variable scope** and continues **once when + * all branches complete** — the join is implicit at block end, engine + * synchronized. There is no author-visible split/join gateway to mis-wire or + * deadlock; the node's ordinary out-edges remain the after-block continuation. + * + * Concurrency model: JavaScript is single-threaded, so branches interleave only + * at `await` points and the shared `variables` map is never torn. Branches + * SHOULD write distinct variables; on a key collision the last writer to settle + * wins (same semantics as the engine's existing unconditional-edge fan-out). + * + * If any branch fails (a node returns `success: false` or throws), the block + * fails — surfaced as a node failure so the flow's fault edge / error handling + * applies. Durable pause inside a branch is unsupported (a clear error), mirror- + * ing the loop container. + */ +export function registerParallelNode(engine: AutomationEngine, ctx: PluginContext): void { + engine.registerNodeExecutor({ + type: 'parallel', + descriptor: defineActionDescriptor({ + type: 'parallel', + version: '1.0.0', + name: 'Parallel', + description: 'Run N branch regions concurrently and join implicitly when all complete.', + icon: 'git-fork', + category: 'logic', + source: 'builtin', + configSchema: { + type: 'object', + properties: { + branches: { + type: 'array', + minItems: 2, + description: 'Branch regions executed concurrently; implicit join at block end', + items: { + type: 'object', + properties: { + name: { type: 'string' }, + nodes: { type: 'array' }, + edges: { type: 'array' }, + }, + }, + }, + }, + required: ['branches'], + }, + }), + async execute(node, variables, context) { + const cfg = (node.config ?? {}) as Record; + const branches = cfg.branches as ParallelBranch[] | undefined; + + if (!Array.isArray(branches) || branches.length < 2) { + return { + success: false, + error: `parallel '${node.id}': config.branches must declare at least 2 branch regions`, + }; + } + + try { + // Implicit join: continue once when ALL branches have completed. + await Promise.all( + branches.map(branch => engine.runRegion(branch, variables, context ?? ({} as AutomationContext))), + ); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + return { success: false, error: `parallel '${node.id}': branch failed — ${message}` }; + } + + return { success: true, output: { branches: branches.length } }; + }, + }); + + ctx.logger.info('[Parallel Node] 1 built-in node executor registered'); +}