Implement per-port backpressure gating for no-accumulation passthrough edges#607
Merged
sroussey merged 4 commits intoJul 2, 2026
Conversation
…h edge Pace a streaming producer to its passthrough consumer's read rate so peak buffered bytes stay bounded for a fast-producer / slow-consumer pair: - streamEventCost(event): per-event byte cost (UTF-8 for text deltas, JSON length for object deltas, byte length for binary) shared by charge/credit. - StreamPump builds one BackpressureGate per passthrough source port (high-water mark = streamHighWaterBytes, default 8 MiB). The edge stream charges the gate as events are enqueued; a credit-on-read wrapper around the consumer's stream credits it back, and end/abort/error/consumer termination all close the gate so a parked producer is never orphaned. - StreamProcessor awaits the graph-installed edgeBackpressure thunk after each delta (per-port), keeping the task layer edge-agnostic; the cooperative ctx hook is generalized to backpressure() (awaits cache-sink routers AND edge gates) with binaryBackpressure kept as a deprecated alias. - prepareStreamingInputs no longer tees a passthrough edge: its materialize copy is never drained, so the tee branch would silently retain every event. - Gates engage only on isNoAccumulationPassthroughEdge; fan-out (2+ consumers of one port) keeps the tee'd drain — in-order delivery to every consumer, best-effort pacing by design. Flag off = unchanged behavior. Tests: StreamBackpressureEngaged (peak producer lead bounded by the mark, off-path runs free), StreamMixedModeFanout (append+object ports pace independently per-port; fan-out delivers all events in order to both consumers). EXECUTION_MODEL.md documents per-port sinks, cache-as-tee, skippable materialization, all-mode backpressure, the noAccumulation + streamHighWaterBytes knobs, x-validate-stream, the single-consumer scope, and inline-only backings. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01T89ffUgYbtp6UztpEXWyVW
…struction TS 5.7+ types Uint8Array over ArrayBufferLike, which no longer satisfies BlobPart; the in-memory repo doubles never hold SharedArrayBuffer-backed views, so the assertion is safe. Fixes @workglow/test build-types. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01T89ffUgYbtp6UztpEXWyVW
Liveness and cache-correctness fixes on the streaming/cache paths:
- Cache policy: a run consuming a live stream at an unsettled input port
now runs uncached (kind none) — the streamed content cannot contribute
to the cache key, so two runs differing only in stream payload would
collide on one entry (stale hits, poisoned rows).
- Edge gate liveness: a passthrough gate is built only when the consumer
can reach its stream reads while the producer parks; any other edge into
the consumer sourced from the producer or its descendants (drained,
mode-mismatched, or static edges) falls back to ungated passthrough
instead of deadlocking the pair. Credit reuses the charged cost via a
FIFO (no double JSON.stringify), and a read failure also closes the gate.
- Passthrough predicate: the target must itself be a streamable,
non-subgraph task — only streamable tasks receive ctx.inputStreams, and
subgraph hosts need the drain for their inner tasks' settled inputs.
- canStreamBinaryToCache now requires task.cacheable (matching the sink
builder); a non-cacheable binary streamer no longer silently drops its
output to {}.
- Per-port edge filtering treats binary-delta as a port delta, so one
port's bytes can no longer leak into another port's edge stream.
- Cache-hit replay honors the consumer-edge gate per delta and decodes
append/object logs in a single pass (emit + fold) instead of buffering
the whole byte log and decoding twice; dangling-ref misses release
already-opened streams; lookup/save/key use the instance schemas so
dynamic-schema tasks replay the ports they wrote.
- Producer failure enqueues an in-stream error event before closing edge
streams, so an early-dispatched consumer fails instead of completing
(and caching) on truncated input; abort keeps the graceful close.
- StreamProcessor fails (not ends) its cache-sink routers when the stream
ends without a finish event, so an aborted/truncated run discards the
partial blob instead of publishing it.
- Input hydration decodes append/object refs through their stream codec
(string / folded object) instead of handing byte Blobs to string ports,
resolves ports concurrently, and inline string/object job outputs adapt
to streams like their surviving-ref forms.
- Orphan-blob cleanup covers all delta modes (was binary-only) and works
on the private tier (RunPrivateCacheRepo now forwards deleteOutputByRef).
- FsFolder: stream reads open the fd eagerly (ENOENT → clean miss instead
of mid-iteration throw; no prune TOCTOU) and short writes are looped to
completion so ref.size never overstates the file.
- resolveOutput resolves shared subtrees once via promise memoization
(second occurrence previously got the original, unresolved object);
cyclic values keep the conservative visited-set behavior.
- TaskRegistry validates binary port formats on input schemas too;
AiTask/ImageEditTask/ImageGenerateTask forward the validateInput
skip-ports parameter.
- Cleanups: born-deprecated binaryBackpressure alias, dead
CacheCoordinator.saveStream, anyConsumerAcceptsBinaryStream, and
getBinaryPortId removed; isDeltaStreamMode/portForcesStreamValidation
helpers replace six hand-expanded copies; shared TextEncoder and
chunked browser base64 on hot paths; spec references removed from test
comments per repo rule.
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01T89ffUgYbtp6UztpEXWyVW
Coverage Report
File Coverage
|
||||||||||||||||||||||||||||||||||||||
…eaming phase-2 branch Conflict resolutions combine both sides' intents: - CacheCoordinator.lookup: keep the parent's degrade-to-miss try/catch around getOutput/deserialize AND this branch's instance output schema (dynamic-schema tasks replay the ports they wrote). - TaskRunner imports: parent's specific-module cache imports plus this branch's streamRefViaBacking / getStreamPortCodec. - StreamPump.createStreamFromTaskEvents: parent's hoisted idempotent cleanup + terminal-status listener + reader-cancel teardown, with this branch's passthrough-gate close folded into cleanup (every teardown path wakes a parked producer) and the in-stream error event emitted on FAILED before cleanup so drained consumers fail instead of settling on truncated data; abort stays a graceful close. - RunPrivateCacheRepo: take the parent's first-class run-scoped design (saveOutputForRun/getOutputForRun/deleteRun); the wrapper no longer forwards streaming sinks, so the private tier deliberately falls back to accumulation and this branch's deleteOutputByRef forward is moot. - Test doubles: parent's BlobPart cast form. Verified post-merge: graph 962 green, task 1119 green (24 skipped), tsc --noEmit clean, build:types clean (36/36). Co-Authored-By: Claude Fable 5 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01T89ffUgYbtp6UztpEXWyVW
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds per-port backpressure gating to the no-accumulation passthrough streaming path, enabling fine-grained pacing of producers to match consumer read rates on individual ports. Previously, passthrough edges had no pacing mechanism; producers could race ahead unbounded. This change introduces
BackpressureGatecoordination between producer and consumer streams, with separate gates per source port feeding independent consumers.Key Changes
Per-port edge gates (
EdgeGateState):StreamPumpnow builds aBackpressureGatefor each source port that feeds a no-accumulation passthrough edge. Gates track pending event costs in a FIFO and coordinate producer/consumer pacing.Producer-side pacing:
StreamProcessorreceives anedgeBackpressurecallback from the graph runner. After emitting each delta, the processor awaits the gate for that port, parking until the consumer has read enough to bring buffered bytes below the high-water mark.Consumer-side credit: The passthrough stream's reader charges the gate as events are enqueued and credits it as the consumer reads, using
streamEventCostto measure bytes per event.Safety cleanup: If a passthrough consumer reaches a terminal state (COMPLETED, FAILED, DISABLED, ABORTING) without fully reading its stream, the gate is closed to release any parked producer; every edge-stream teardown path (stream end, terminal producer status, reader cancel) also closes the gate.
Deadlock liveness guard: a gate is built only when the consumer can make read progress while the producer is parked — if any other edge into the consumer is sourced from the producer or one of its DAG descendants (drained, mode-mismatched, or static edges), that edge settles only after the producer finishes, so gating would deadlock the pair. Such consumers keep the ungated passthrough (correct, just unpaced).
Stream-fed consumers run uncached: a run consuming a live stream at an unsettled input port computes its cache key without the streamed content, so caching is disabled (
kind: "none") for that run — preventing stale hits and poisoned rows when two runs differ only in stream payload. Drained edges keep caching.Producer failure propagates in-stream: a producer failure enqueues an in-stream error event before edge streams close, so an early-dispatched consumer fails instead of completing (and caching) on truncated input; abort keeps the graceful close.
Mixed-mode and fan-out test coverage: Two new comprehensive test suites validate the gating behavior:
StreamMixedModeFanout.test.ts: One source with separateappendandobjectports, each feeding its own passthrough consumer with independent gates; also tests fan-out (one port → two consumers) which correctly falls back to tee'd drain without a gate.StreamBackpressureEngaged.test.ts: Fast append producer feeding a slow consumer over a small high-water mark; verifies producer is parked and peak lead stays bounded.Cache hit replay pacing:
CacheCoordinatornow acceptsedgeBackpressureinCacheReplayContext, allowing cache-hit replay to respect the same per-port gates as fresh runs so slow consumers pace both paths identically; append/object logs decode in a single pass instead of buffering the whole byte log.Validation and schema improvements:
isDeltaStreamMode()helper to identify modes that can be encoded to byte streams (append,object,binary).portForcesStreamValidation()to read thex-validate-streamopt-in, allowing ports to opt out of both validation exemption and passthrough pacing.TaskRegistryto validate binary format on both input and output ports.AiTask/ImageEditTask/ImageGenerateTaskforward thevalidateInputskip-ports parameter.Instance schema support:
CacheCoordinatorandTaskRunnernow use instance schemas (viatask.inputSchema()/task.outputSchema()) instead of static schemas, enabling dynamic-schema tasks to add ports at runtime without breaking cache key normalization, sink setup, or cache-hit replay.Streaming output hydration:
resolveJobOutputnow adapts inline append/object values to byte streams (UTF-8 text, NDJSON) for consistent caller behavior whether the value came from a ref or inline; input-ref hydration decodes append/object refs through their stream codecs (string / folded object) instead of handing byte Blobs to string ports.Cycle detection in ref resolution:
resolveOutputnow detects cyclic values and uses memoization for acyclic ones, ensuring shared subtrees resolve once and both slots receive the same resolved copy.File write robustness:
FsFolderTaskOutputRepositorynow loops on partial writes to ensure the full chunk reaches disk before returning the size stamped into theCacheRef, and opens stream reads eagerly so a pruned blob reports a clean cache miss instead of throwing ENOENT mid-iteration.Orphan-blob cleanup: covers all delta modes (was binary-only); a stream ending without a finish event fails its cache sinks so truncated blobs are discarded, not published. (The run-private tier does not build streaming sinks after the run-scoping reconciliation from main — it falls back to accumulation — so no orphan refs can arise there.)
Notable Implementation Details
isNoAccumulationPassthroughEdge(): single consumer, same mode on both ends, consumer is a streamable non-subgraph task, no validation opt-in — plus the liveness guard above.edgeBackpressurecallback is threaded throughIRunConfig→StreamProcessor→ task'sIExecuteContextbackpressure()hook, allowing both direct emitters andStreamProcessor-routed deltas to respect pacing.*edges, non-streamable or subgraph targets) fall back to the existing drain path — correct, just without the memory and pacing win. WithnoAccumulationoff, behavior is unchanged.claude/relaxed-faraday-ql3olrafter its reconciliation withmain(run-scoped private cache, degrade-to-miss cache decode, edge-stream reader-cancel teardown); conflict resolutions combine both sides — see the merge commit message for details.Verification (post-merge head)
bun scripts/test.ts graph vitestgreen (962 tests);bun scripts/test.ts task vitestgreen (1119, 24 skipped); job-queue + FsFolder storage spot suites greennpx tsc -p packages/task-graph/tsconfig.json --noEmitclean;bun run build:typesclean across all 36 packageshttps://claude.ai/code/session_01T89ffUgYbtp6UztpEXWyVW