Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 8 additions & 32 deletions ts/docs/design/workflowSystem/dsl/dsl-v0.1-gap.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,27 @@ not yet fully wired end-to-end.
Address the gaps in dependency order, with correctness and validation before
new surface area:

1. **G17: Cancel in-flight fork/forkMap branches on failure.** Align fork
execution with the spec's failure and cleanup semantics.
2. **G3: Add TypeScript-style named type aliases.** Once structural
1. **G3: Add TypeScript-style named type aliases.** Once structural
assignability is sound, add named type declarations and a type environment.
3. **G18: Add union/literal types.** This is the broadest type-system expansion
2. **G18: Add union/literal types.** This is the broadest type-system expansion
and should come after type soundness and named types.
4. **G11: Decide/document bind stripping for explicit user names.** This is
3. **G11: Decide/document bind stripping for explicit user names.** This is
primarily debuggability and spec clarity.
5. **G9: Decide whether bare task calls need `ExpressionStatement`.** This is
4. **G9: Decide whether bare task calls need `ExpressionStatement`.** This is
AST honesty and visual-editor clarity, but current behavior works.
6. **G12: Decide `list.append` naming/semantics.** This is naming/API
5. **G12: Decide `list.append` naming/semantics.** This is naming/API
consistency with coordinated emitter, engine, and snapshot churn.
7. **G20: Audit remaining `identity` / `noop` usage in the emitter.**
6. **G20: Audit remaining `identity` / `noop` usage in the emitter.**
Decision 0010 removed `identity` / `noop` as load-bearing at branch
convergence, but the emitter still synthesizes them in several other
places. Classify each remaining usage as (a) reducible after 0010,
(b) forced by an IR shape that could be relaxed additively, or (c)
inherent to decision 0006 (no expressions). Pure audit; only
schedules follow-up work.
8. **G7: Revisit composition patterns only when concrete workflow needs appear.**
7. **G7: Revisit composition patterns only when concrete workflow needs appear.**
These patterns push against the visual-node discipline and should stay out
of scope until justified.
9. **G29 (open part): Decide whether to deprecate value-producing
8. **G29 (open part): Decide whether to deprecate value-producing
`if`/`switch` in favour of ternary.** The arm-type checking part of
G29 (same-type enforcement, `_resolvedSchemas` storage, partial-return
as a type error) is resolved; see decision 0011 §6 and the
Expand Down Expand Up @@ -187,28 +185,6 @@ suggests mutation in many languages (Python `list.append`, JS
be made explicit in the task name (e.g., `array.appended` or
`array.concat`) to avoid confusion with mutable append/push.

## G17: Fork/forkMap does not cancel in-flight branches on failure

**Spec:** ir-v0.2.md §2.1 rule 5 and §2.2 rule 5. "If any branch fails,
remaining in-flight branches are cancelled and the error propagates
immediately."

**Current state:** The engine's `executeFork` and `executeForkMap` use
`Promise.race` for concurrency limiting but do not cancel in-flight
branches/iterations when one fails. Errors from `Promise.race` propagate,
but other running branches continue executing in the background. This
wastes resources and may cause side effects from branches that should have
been cancelled.

**What needs to happen:**

1. When any branch/iteration rejects, signal cancellation to all other
in-flight branches via `AbortController`.
2. `await` all in-flight promises before propagating the error (to avoid
unhandled rejection warnings and ensure cleanup).
3. Add tests verifying that in-flight branches are cancelled on first
failure.

## G18: No union types in the DSL type system

**Spec/intent:** The DSL currently has no union types. This was an
Expand Down
140 changes: 105 additions & 35 deletions ts/examples/workflow/engine/src/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,92 @@ interface RunCtx {
wfCallStack: readonly string[];
}

async function runCancellableConcurrent<T>(
items: T[],
concurrency: number,
parentSignal: AbortSignal,
runItem: (item: T, signal: AbortSignal) => Promise<void>,
): Promise<void> {
type ConcurrentResult = { ok: true } | { ok: false; error: unknown };

const abortController = new AbortController();
const abortSignal = abortController.signal;
const propagateParentAbort = () =>
abortController.abort(parentSignal.reason);

if (parentSignal.aborted) {
propagateParentAbort();
} else {
parentSignal.addEventListener("abort", propagateParentAbort, {
once: true,
});
}

try {
const queue = [...items];
const executing = new Set<Promise<ConcurrentResult>>();
let failed = false;
let firstError: unknown;

const recordFailure = (err: unknown) => {
if (!failed) {
failed = true;
firstError = err;
queue.length = 0;
if (!abortSignal.aborted) {
abortController.abort(err);
}
}
};

while (queue.length > 0 || executing.size > 0) {
while (
!failed &&
queue.length > 0 &&
executing.size < concurrency
) {
const item = queue.shift()!;
const promise = runItem(item, abortSignal)
.then(
() => ({ ok: true }) as ConcurrentResult,
(err) => {
recordFailure(err);
return {
ok: false,
error: err,
} as ConcurrentResult;
},
)
.finally(() => {
executing.delete(promise);
});
executing.add(promise);
}

if (failed) {
await Promise.allSettled(executing);
throw firstError;
}

if (executing.size === 0) {
break;
}

const result = await Promise.race(executing);
if (!result.ok) {
await Promise.allSettled(executing);
throw result.error;
}
}

if (failed) {
throw firstError;
}
} finally {
parentSignal.removeEventListener("abort", propagateParentAbort);
}
}

// ---- Template resolution ----
// Note: the error throws below (unknown namespace, unresolved reference,
// path projection failures) should never fire when static validation is
Expand Down Expand Up @@ -1389,11 +1475,10 @@ export class WorkflowEngine {
const concurrency = node.maxConcurrency ?? branchNames.length;
const results: Record<string, unknown> = {};

// Execute branches with concurrency limiting
const executing = new Set<Promise<void>>();
const branchQueue = [...branchNames];

const runBranch = async (bName: string) => {
const runBranch = async (
bName: string,
branchSignal: AbortSignal,
) => {
const branch = node.branches[bName];
const branchInput = resolveTemplate(
branch.inputs,
Expand All @@ -1411,7 +1496,7 @@ export class WorkflowEngine {
branchScope,
branchScopePath,
runId,
signal,
branchSignal,
ctx,
policy,
approve,
Expand All @@ -1424,18 +1509,12 @@ export class WorkflowEngine {
);
};

while (branchQueue.length > 0 || executing.size > 0) {
while (branchQueue.length > 0 && executing.size < concurrency) {
const bName = branchQueue.shift()!;
const p = runBranch(bName).then(() => {
executing.delete(p);
});
executing.add(p);
}
if (executing.size > 0) {
await Promise.race(executing);
}
}
await runCancellableConcurrent(
branchNames,
concurrency,
signal,
runBranch,
);

if (node.bind) {
outerScope.bindings.set(node.bind, results);
Expand Down Expand Up @@ -1538,10 +1617,7 @@ export class WorkflowEngine {
const concurrency = node.maxConcurrency ?? items.length;
const results: unknown[] = new Array(items.length).fill(null);

const executing = new Set<Promise<void>>();
const indexQueue = items.map((_, i) => i);

const runItem = async (index: number) => {
const runItem = async (index: number, itemSignal: AbortSignal) => {
this.emit({
type: "forkMapIterationStarted",
runId,
Expand Down Expand Up @@ -1573,7 +1649,7 @@ export class WorkflowEngine {
itemScope,
itemScopePath,
runId,
signal,
itemSignal,
ctx,
policy,
approve,
Expand All @@ -1594,18 +1670,12 @@ export class WorkflowEngine {
});
};

while (indexQueue.length > 0 || executing.size > 0) {
while (indexQueue.length > 0 && executing.size < concurrency) {
const idx = indexQueue.shift()!;
const p = runItem(idx).then(() => {
executing.delete(p);
});
executing.add(p);
}
if (executing.size > 0) {
await Promise.race(executing);
}
}
await runCancellableConcurrent(
items.map((_, i) => i),
concurrency,
signal,
runItem,
);

if (node.bind) {
outerScope.bindings.set(node.bind, results);
Expand Down
Loading
Loading