Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions .changeset/structured-parallel-block.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
---
"@objectstack/service-automation": minor
---

feat(automation): structured parallel block (ADR-0031, task 3)

Implement engine execution for the `parallel` block — a structured construct
with an **implicit join** (ADR-0031 §Decision 2). The `parallel` node declares N
branch regions in `config.branches[]`; the executor runs them concurrently in
the enclosing variable scope (via `AutomationEngine.runRegion`) and continues
once when all branches complete — no author-visible split/join gateway.

- New `builtin/parallel-node.ts` executor (registered as a built-in).
- Branch failure fails the block (surfaced as a node failure → fault edge/error
handling); durable pause inside a branch is a clear error.
- Well-formedness (≥2 branches, single-entry/single-exit regions) is already
enforced at `registerFlow()` by `validateControlFlow` (shipped with the loop
container).

Showcase `FanOutNotifyFlow` demonstrates the parallel block. Try/catch execution
and BPMN interop mapping remain follow-ups (#1479 tasks 4–5).
73 changes: 73 additions & 0 deletions examples/app-showcase/src/flows/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,78 @@ export const BatchRemindersFlow = defineFlow({
],
});

/**
* Fan-out Notify — demonstrates the ADR-0031 **structured parallel block**.
*
* The `parallel` node declares two branch regions in `config.branches[]`; both
* run concurrently in the enclosing variable scope and **join implicitly** at
* block end (the engine continues once both complete). There is no
* author-visible split/join gateway. The node's ordinary out-edge (`→ end`) is
* the after-block continuation.
*/
export const FanOutNotifyFlow = defineFlow({
name: 'showcase_fan_out_notify',
label: 'Fan-out Notify (Parallel)',
description: 'Notifies owner and watchers concurrently via a parallel block, joining before completion (ADR-0031).',
type: 'autolaunched',
nodes: [
{
id: 'start',
type: 'start',
label: 'On Task Completed',
config: {
objectName: 'showcase_task',
triggerType: 'record-after-update',
condition: 'status == "done" && previous.status != "done"',
},
},
{
id: 'fan_out',
type: 'parallel',
label: 'Notify in parallel',
config: {
branches: [
{
name: 'Email the owner',
nodes: [
{
id: 'email_owner',
type: 'script',
label: 'Email Owner',
config: {
actionType: 'email',
inputs: { to: '{record.project.owner}', subject: '✅ Done: {record.title}' },
},
},
],
edges: [],
},
{
name: 'Post to Slack',
nodes: [
{
id: 'slack_post',
type: 'script',
label: 'Slack Notify',
config: {
actionType: 'slack',
inputs: { channel: '#tasks', text: 'Task done: {record.title}' },
},
},
],
edges: [],
},
],
},
},
{ id: 'end', type: 'end', label: 'End' },
],
edges: [
{ id: 'e1', source: 'start', target: 'fan_out' },
{ id: 'e2', source: 'fan_out', target: 'end' },
],
});

export const allFlows = [
TaskCompletedFlow,
ReassignWizardFlow,
Expand All @@ -573,4 +645,5 @@ export const allFlows = [
NotifyOwnerSubflow,
TaskDoneNotifyOwnerFlow,
BatchRemindersFlow,
FanOutNotifyFlow,
];
4 changes: 4 additions & 0 deletions packages/services/service-automation/src/builtin/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* Scope (built-in baseline):
* - logic — decision / assignment (engine core)
* - logic — loop (structured iteration container, ADR-0031)
* - logic — parallel (structured parallel block, implicit join, ADR-0031)
* - data — get/create/update/delete_record (platform CRUD baseline)
* - human — screen / script (core flow capability)
* - io — http_request (foundational outbound I/O)
Expand All @@ -30,6 +31,7 @@ import type { PluginContext } from '@objectstack/core';
import type { AutomationEngine } from '../engine.js';
import { registerLogicNodes } from './logic-nodes.js';
import { registerLoopNode } from './loop-node.js';
import { registerParallelNode } from './parallel-node.js';
import { registerCrudNodes } from './crud-nodes.js';
import { registerScreenNodes } from './screen-nodes.js';
import { registerHttpNodes } from './http-nodes.js';
Expand All @@ -40,6 +42,7 @@ import { registerSubflowNode } from './subflow-node.js';

export { registerLogicNodes } from './logic-nodes.js';
export { registerLoopNode } from './loop-node.js';
export { registerParallelNode } from './parallel-node.js';
export { registerCrudNodes } from './crud-nodes.js';
export { registerScreenNodes } from './screen-nodes.js';
export { registerHttpNodes } from './http-nodes.js';
Expand All @@ -56,6 +59,7 @@ export { registerSubflowNode } from './subflow-node.js';
export function installBuiltinNodes(engine: AutomationEngine, ctx: PluginContext): void {
registerLogicNodes(engine, ctx);
registerLoopNode(engine, ctx);
registerParallelNode(engine, ctx);
registerCrudNodes(engine, ctx);
registerScreenNodes(engine, ctx);
registerHttpNodes(engine, ctx);
Expand Down
149 changes: 149 additions & 0 deletions packages/services/service-automation/src/builtin/parallel-node.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license.

import { describe, it, expect, beforeEach } from 'vitest';
import { AutomationEngine } from '../engine.js';
import type { NodeExecutor } from '../engine.js';
import { registerParallelNode } from './parallel-node.js';

function silentLogger() {
return { info() {}, warn() {}, error() {}, debug() {}, child() { return silentLogger(); } } as any;
}
function ctx() {
return { logger: silentLogger(), getService() { throw new Error('none'); } } as any;
}

describe('parallel block executor (ADR-0031)', () => {
let engine: AutomationEngine;
let order: string[];

beforeEach(() => {
engine = new AutomationEngine(silentLogger());
order = [];
registerParallelNode(engine, ctx());

// A node that writes a variable named by config.key with value config.value,
// optionally after an awaited microtask delay (config.delay ticks).
engine.registerNodeExecutor({
type: 'setvar',
async execute(node, variables) {
const cfg = (node.config ?? {}) as Record<string, unknown>;
const ticks = Number(cfg.delay ?? 0);
for (let i = 0; i < ticks; i++) await Promise.resolve();
variables.set(cfg.key as string, cfg.value);
order.push(cfg.key as string);
return { success: true };
},
} as NodeExecutor);

engine.registerNodeExecutor({
type: 'boom',
async execute() { return { success: false, error: 'branch kaboom' }; },
} as NodeExecutor);

// Marks that the after-block node ran (proves the implicit join continued).
engine.registerNodeExecutor({
type: 'after',
async execute(_node, variables) {
order.push('after');
variables.set('joined', true);
return { success: true };
},
} as NodeExecutor);
});

const parallelFlow = (branches: unknown) => ({
name: 'par_flow',
label: 'Parallel Flow',
type: 'autolaunched' as const,
nodes: [
{ id: 'start', type: 'start', label: 'Start' },
{ id: 'par', type: 'parallel', label: 'Fan out', config: { branches } },
{ id: 'join', type: 'after', label: 'After' },
{ id: 'end', type: 'end', label: 'End' },
],
edges: [
{ id: 'e1', source: 'start', target: 'par' },
{ id: 'e2', source: 'par', target: 'join' },
{ id: 'e3', source: 'join', target: 'end' },
],
});

it('runs all branches and joins implicitly before continuing', async () => {
engine.registerFlow('par_flow', parallelFlow([
{ name: 'A', nodes: [{ id: 'a', type: 'setvar', label: 'A', config: { key: 'a', value: 1 } }], edges: [] },
{ name: 'B', nodes: [{ id: 'b', type: 'setvar', label: 'B', config: { key: 'b', value: 2 } }], edges: [] },
]));

const result = await engine.execute('par_flow');

expect(result.success).toBe(true);
// Both branches ran, and the after-block node ran exactly once, AFTER both.
expect(order).toContain('a');
expect(order).toContain('b');
expect(order[order.length - 1]).toBe('after');
expect(order.filter(o => o === 'after')).toHaveLength(1);
});

it('joins only after the slowest branch completes', async () => {
// Branch "slow" awaits several microtasks; "fast" resolves immediately.
// The join ('after') must still be last.
engine.registerFlow('par_flow', parallelFlow([
{ name: 'fast', nodes: [{ id: 'f', type: 'setvar', label: 'F', config: { key: 'fast', value: 1, delay: 0 } }], edges: [] },
{ name: 'slow', nodes: [{ id: 's', type: 'setvar', label: 'S', config: { key: 'slow', value: 1, delay: 5 } }], edges: [] },
]));

await engine.execute('par_flow');

expect(order.indexOf('after')).toBeGreaterThan(order.indexOf('slow'));
expect(order.indexOf('after')).toBeGreaterThan(order.indexOf('fast'));
});

it('runs multi-node branch regions in order', async () => {
engine.registerFlow('par_flow', parallelFlow([
{
name: 'chain',
nodes: [
{ id: 'c1', type: 'setvar', label: 'C1', config: { key: 'c1', value: 1 } },
{ id: 'c2', type: 'setvar', label: 'C2', config: { key: 'c2', value: 2 } },
],
edges: [{ id: 'ce', source: 'c1', target: 'c2' }],
},
{ name: 'solo', nodes: [{ id: 'd', type: 'setvar', label: 'D', config: { key: 'd', value: 3 } }], edges: [] },
]));

const result = await engine.execute('par_flow');
expect(result.success).toBe(true);
expect(order.indexOf('c1')).toBeLessThan(order.indexOf('c2'));
expect(order).toContain('d');
});

it('fails the block when a branch fails', async () => {
engine.registerFlow('par_flow', parallelFlow([
{ name: 'ok', nodes: [{ id: 'a', type: 'setvar', label: 'A', config: { key: 'a', value: 1 } }], edges: [] },
{ name: 'bad', nodes: [{ id: 'x', type: 'boom', label: 'X' }], edges: [] },
]));

const result = await engine.execute('par_flow');
expect(result.success).toBe(false);
expect(result.error).toMatch(/branch/i);
expect(order).not.toContain('after'); // join did not continue
});

it('rejects a parallel block with fewer than two branches at registerFlow', () => {
expect(() =>
engine.registerFlow('bad_par', parallelFlow([
{ name: 'only', nodes: [{ id: 'a', type: 'setvar', label: 'A' }], edges: [] },
])),
).toThrow(/at least 2 branches/);
});

it('rejects a malformed branch region at registerFlow', () => {
expect(() =>
engine.registerFlow('bad_par', parallelFlow([
{ name: 'good', nodes: [{ id: 'a', type: 'setvar', label: 'A' }], edges: [] },
// two entry/exit nodes, no edges → not single-entry/single-exit
{ name: 'bad', nodes: [{ id: 'b', type: 'setvar', label: 'B' }, { id: 'c', type: 'setvar', label: 'C' }], edges: [] },
])),
).toThrow(/parallel 'par' branch 1/);
});
});
93 changes: 93 additions & 0 deletions packages/services/service-automation/src/builtin/parallel-node.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright (c) 2025 ObjectStack. Licensed under the Apache-2.0 license.

import type { PluginContext } from '@objectstack/core';
import { defineActionDescriptor } from '@objectstack/spec/automation';
import type { FlowRegionParsed } from '@objectstack/spec/automation';
import type { AutomationContext } from '@objectstack/spec/contracts';
import type { AutomationEngine } from '../engine.js';

/** One branch of a parallel block — a region plus an optional label. */
interface ParallelBranch extends FlowRegionParsed {
name?: string;
}

/**
* `parallel` built-in node — a **structured parallel block** with an
* **implicit join** (ADR-0031 §Decision 2).
*
* The node declares N branch regions in `config.branches[]`; each branch is a
* self-contained single-entry/single-exit sub-graph (validated at
* `registerFlow()`). The executor runs every branch concurrently
* (`Promise.all`) in the **enclosing variable scope** and continues **once when
* all branches complete** — the join is implicit at block end, engine
* synchronized. There is no author-visible split/join gateway to mis-wire or
* deadlock; the node's ordinary out-edges remain the after-block continuation.
*
* Concurrency model: JavaScript is single-threaded, so branches interleave only
* at `await` points and the shared `variables` map is never torn. Branches
* SHOULD write distinct variables; on a key collision the last writer to settle
* wins (same semantics as the engine's existing unconditional-edge fan-out).
*
* If any branch fails (a node returns `success: false` or throws), the block
* fails — surfaced as a node failure so the flow's fault edge / error handling
* applies. Durable pause inside a branch is unsupported (a clear error), mirror-
* ing the loop container.
*/
export function registerParallelNode(engine: AutomationEngine, ctx: PluginContext): void {
engine.registerNodeExecutor({
type: 'parallel',
descriptor: defineActionDescriptor({
type: 'parallel',
version: '1.0.0',
name: 'Parallel',
description: 'Run N branch regions concurrently and join implicitly when all complete.',
icon: 'git-fork',
category: 'logic',
source: 'builtin',
configSchema: {
type: 'object',
properties: {
branches: {
type: 'array',
minItems: 2,
description: 'Branch regions executed concurrently; implicit join at block end',
items: {
type: 'object',
properties: {
name: { type: 'string' },
nodes: { type: 'array' },
edges: { type: 'array' },
},
},
},
},
required: ['branches'],
},
}),
async execute(node, variables, context) {
const cfg = (node.config ?? {}) as Record<string, unknown>;
const branches = cfg.branches as ParallelBranch[] | undefined;

if (!Array.isArray(branches) || branches.length < 2) {
return {
success: false,
error: `parallel '${node.id}': config.branches must declare at least 2 branch regions`,
};
}

try {
// Implicit join: continue once when ALL branches have completed.
await Promise.all(
branches.map(branch => engine.runRegion(branch, variables, context ?? ({} as AutomationContext))),
);
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
return { success: false, error: `parallel '${node.id}': branch failed — ${message}` };
}

return { success: true, output: { branches: branches.length } };
},
});

ctx.logger.info('[Parallel Node] 1 built-in node executor registered');
}