feat(workflows): configurable concurrency limits for fan-out (swamp-club#260)#1323
feat(workflows): configurable concurrency limits for fan-out (swamp-club#260)#1323
Conversation
…lub#260) Add an optional `concurrency` field at the workflow, job, and step levels to cap how many parallel units execute simultaneously. This prevents downstream API rate-limit failures and local resource exhaustion on large forEach fan-outs. A semaphore-gated `mergeWithConcurrency()` wraps the existing `merge()` stream combinator. When the limit is unset or exceeds the stream count, the unbounded `merge()` path is used with zero overhead. Resolution order: step > job > workflow > unbounded. A global `SWAMP_MAX_CONCURRENT_STEPS` env var provides a host-level ceiling. Closes swamp-club#260 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
CLI UX Review
Blocking
None.
Suggestions
SWAMP_MAX_CONCURRENT_STEPSnot mentioned inworkflow runhelp text. The existing convention for env vars that back CLI options is to document them inline (e.g."Repository directory (env: SWAMP_REPO_DIR)").SWAMP_MAX_CONCURRENT_STEPShas no corresponding CLI flag, so the inline pattern doesn't quite apply — andSWAMP_DATASTORE_SYNC_TIMEOUT_MS(another operator-scoped knob) is similarly undocumented in help text. Still, a--helpmention or a description addendum onworkflow runlike"Concurrency capped by SWAMP_MAX_CONCURRENT_STEPS if set"would help operators discover it without reading design docs. Low priority given the precedent.
Verdict
PASS — no commands, flags, renderers, or error messages changed. The concurrency field is optional and exposed automatically via swamp workflow schema get. Backward compatibility is intact (absent/0 = unbounded). Ready to merge.
There was a problem hiding this comment.
Code Review
Clean, well-structured PR. The semaphore primitive is correct, the merge combinator properly falls through to the unbounded path, and the domain model changes are minimal and additive. DDD layer placement is sound — infrastructure primitives in infrastructure/stream/, configuration fields on domain entities.
Blocking Issues
None.
Suggestions
-
src/libswamp/mod.tsdoesn't re-exportmergeWithConcurrency— Line 39 re-exportsmergefrom./stream/merge.ts, but the newmergeWithConcurrency(added tosrc/libswamp/stream/merge.ts) isn't re-exported from the barrel. Nothing currently needs it from the barrel, but for consistency with the existingmergeexport, consider adding it. -
resolveEffectiveConcurrencytest coverage — The infrastructure layer (semaphore, mergeWithConcurrency) is well-tested, but the resolution logic inexecution_service.ts— particularly the step > job > workflow > global cascade and themin()across step concurrencies in a level — encodes important business rules that would benefit from targeted unit tests. Even a few cases forresolveEffectiveConcurrency(local, global)as a standalone function (or exposed via a test helper) would lock down the edge cases (e.g., both set, one zero, both undefined). -
Minor:
readGlobalConcurrencyLimit()called per-execution — This readsDeno.env.get()directly in domain code. The codebase has this pattern elsewhere (SWAMP_DEBUG,SYNC_TIMEOUT_ENV_VAR), so it's consistent. Just noting that in strict DDD this would be injected configuration rather than a direct env read. No action needed.
There was a problem hiding this comment.
Adversarial Review
Critical / High
None found. The Semaphore, mergeWithConcurrency, and resolution logic are well-constructed. Abort paths, resource cleanup, and counter management all hold up under trace analysis.
Medium
-
Step concurrency "leaks" to unrelated steps in the same topological level —
execution_service.ts:1489-1496When a topological level contains steps from multiple original step definitions with different
concurrencyvalues, the code takesMath.min(...stepConcurrencies)and applies that single limit to the entire level's merged stream. This means a step withconcurrency: 10gets throttled to 2 if another unrelated step in the same level hasconcurrency: 2.Example: Step A (
forEachover 20 items,concurrency: 2) and Step B (forEachover 5 items,concurrency: 10) are independent and land in the same topological level. The effective limit ismin(2, 10) = 2, so Step B is throttled far below its configured limit. The user setconcurrency: 10on Step B but gets 2.Mitigation: In practice, forEach expansions from different steps rarely share a topological level (they'd need zero inter-dependencies). The conservative
minis safe — it over-restricts rather than under-restricts. But the documented "most-local non-zero value wins" doesn't capture this cross-step interaction. Worth documenting or addressing in a follow-up.
Low
-
Semaphore has no over-release guard —
semaphore.ts:75-82. Callingrelease()without a matchingacquire()would pushavailableabovelimit, silently expanding capacity. Not triggerable by the currentmergeWithConcurrencycall site (release is only in the finally block after a successful acquire), but a latent footgun for future callers. -
mergeWithConcurrencyabort path leavesremaining > 0—merge.ts:112-131. Tasks that failsem.acquire(signal)during abort return early without decrementingremaining, soremainingnever reaches 0. This is benign because the abort handler has already calledqueue.close()viaqueue.abort(), so the consumer has exited andPromise.allSettled(tasks)correctly waits for all tasks to settle. No leak or deadlock, just worth noting the invariant difference from the unboundedmerge(). -
SWAMP_MAX_CONCURRENT_STEPSparsing is lenient —execution_service.ts:2007.parseInt("3abc", 10)returns3. Acceptable for an env var, but a typo like"3 0"(intended30) would silently become3. No action needed — consistent with how most CLI tools parse env vars.
Verdict
PASS — The semaphore and concurrency-limited merge are correctly implemented with proper abort handling, resource cleanup, and backward compatibility. The schema changes are additive and optional. The resolution logic matches the documented semantics. The medium finding is a design nuance worth tracking but not blocking.
Summary
concurrencyfield at workflow, job, and step levels to cap parallel execution in fan-out scenarios (forEach, parallel jobs/steps)Semaphoreprimitive andmergeWithConcurrency()stream combinator that wraps existingmerge()with zero overhead on the unbounded pathSWAMP_MAX_CONCURRENT_STEPSenv var as a host-level ceiling (min(local, global))Details
Resolution order: step → job → workflow → unbounded. The most-local non-zero value wins.
0or absent means unbounded (full backward compatibility).Verified end-to-end: a 10-item forEach with
concurrency: 3correctly batches execution into groups of 3 (max concurrent = 3, ~8s total vs ~2s unbounded).Files changed
semaphore.ts(new),merge.ts(+mergeWithConcurrency), libswamp re-exportworkflow.ts,job.ts,step.ts— optionalconcurrencyfieldexecution_service.ts— concurrency resolution + gated merge at both job and step levelsdesign/workflow.md— concurrency semantics sectionswamp-workflowSKILL.md + forEach referenceFollow-up issues
Closes swamp-club#260
Test plan
deno check— zero type errorsdeno lint— cleandeno fmt— cleandeno run test— 5541 passed, 0 faileddeno run compile— binary compiledconcurrency: 3caps 10-item fan-out (max concurrent = 3, batched in ~2s intervals)concurrencyfield behave identically🤖 Generated with Claude Code