[core] Move stream reconnect logic to getReadable level#1847
[core] Move stream reconnect logic to getReadable level#1847VaguelySerious wants to merge 9 commits intostablefrom
Conversation
Signed-off-by: Peter Wielander <mittgfu@gmail.com>
🦋 Changeset detectedLatest commit: e58a449 The changes in this PR will be included in the next version bump. This PR includes changesets to release 18 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
🧪 E2E Test Results❌ Some tests failed Summary
❌ Failed Tests▲ Vercel Production (1 failed)nextjs-turbopack (1 failed):
Details by Category❌ ▲ Vercel Production
✅ 🪟 Windows
❌ Some E2E test jobs failed:
Check the workflow run for details. |
Signed-off-by: Peter Wielander <mittgfu@gmail.com>
Signed-off-by: Peter Wielander <mittgfu@gmail.com>
TooTallNate
left a comment
There was a problem hiding this comment.
Review
The architectural shift makes sense: client-side frame counting is a cleaner abstraction than wire-level control frames, and moving it to core means it works for any world that returns a ReadableStream from readFromStream, not just world-vercel. The reconnect math, frame-counting, and partial-frame discard are all correct.
But there are two significant concerns I think need addressing before merge.
1. Byte streams lose auto-reconnect entirely
The PR explicitly opts byte streams out of reconnect:
if (value.type === 'bytes') {
// No auto-reconnect here: raw byte streams have no wire framing
const readable = new WorkflowServerReadableStream(value.name, value.startIndex);
// ...
} else {
const readable = createReconnectingFramedStream(value.name, value.startIndex);
// ...
}The reason given is technically correct (no wire framing → no chunk boundary detection client-side), but this is a regression vs. the reverted #1790, which handled byte streams just fine because the server sent the resume hint via control frame.
The use cases that lose auto-reconnect:
- AI streaming responses (text/SSE) piped from
getWritable() - Any HTTP route doing
return new Response(run.getReadable())for raw bytes - Any streaming workflow output that goes more than 2 minutes (the prior server-side timeout window) and uses byte type
The docs callout added by this PR points users to WorkflowChatTransport and supportsCancellation, but those address a different problem (cancellation, not reconnect). Pushing reconnect to the application layer — where every consumer has to reimplement it — is a step backward in usability.
Possible directions:
- Frame byte streams on the writable side too (4 bytes per chunk overhead) so
createReconnectingFramedStreamworks for them. The user-facing surface stays raw bytes; only the wire format changes. - Keep the control-frame approach for byte streams only as a hybrid — frame counting for non-byte streams, server-side hint for byte streams.
- Document this as an explicit limitation and update the docs callout to specifically warn about byte streams losing reconnect, not just talk about
supportsCancellation(separate issue).
(1) seems best to me — it removes the asymmetry entirely and keeps the cleaner architecture.
2. The "clean EOF means done" assumption needs verification
if (result.done || !result.value) {
// Clean EOF — stream is truly complete...
controller.close();
return;
}This assumes the workflow-server signals "done" and "timeout/aborted" differently at the network level — clean done = FIN, timeout = error/reset. The deleted control-frame logic disambiguated these because both manifested as clean closes from a TCP perspective; the magic-footer frame was the disambiguator.
Without the control frame, the new code can't tell them apart. If the workflow-server's 2-minute timeout sends a clean FIN (rather than a TCP reset or stream error), this PR will appear to "complete" any stream that hits 2 minutes.
Is that assumption verified against the actual server behavior? The new test simulates max-duration as controller.error(...), which is fine for the unit test, but I'd want to see either:
- An e2e test confirming a real long-lived stream against workflow-server triggers reconnect (not premature close)
- A statement in the PR description / commit explaining why the server-side timeout is now an error not a clean close (was the workflow-server changed? was the timeout removed?)
The supportsCancellation callout suggests the architecture has shifted such that streams now run for the full function maxDuration rather than the old 2-minute server timeout — but if so, that's a precondition for this PR and worth calling out explicitly.
Minor
See inline comments.
What looks good
- Frame-counting math is correct:
currentStartIndex += consumedFramesresumes at the right place, partial-frame buffer is correctly discarded, the math is symmetric for non-zero initialstartIndex. - Negative
startIndexcorrectly bypasses reconnect with a clear reason (can't compute absolute resume index without a tail-index lookup) — and there's a test for it. - AbortController plumbing in world-vercel
readFromStreamis the right primitive. Cancel propagation throughcancel(reason) { abortController.abort(reason) }correctly tears down the fetch. - Test coverage for
createReconnectingFramedStreamis good — frames split across reads, partial frame at error, clean EOF, non-zero initial startIndex, negative startIndex bypass, cancel propagation. Six tests, all targeted. - Two changesets correctly scoped:
@workflow/corefor the new wrapper,@workflow/world-vercelfor the cancel propagation.
| value.name, | ||
| value.startIndex | ||
| ); | ||
| if (value.type === 'bytes') { |
There was a problem hiding this comment.
Byte streams are intentionally opted out of auto-reconnect here. This is a behavioral regression vs. the reverted #1790, which handled byte streams via server-sent control frames.
The comment correctly identifies why this is hard (no wire framing → no chunk boundary detection client-side), but pushing reconnect to the application layer means:
- Every consumer of
run.getReadable()for byte streams (AI text streaming, raw HTTP responses, etc.) has to implement its own reconnect logic. - The docs callout added by this PR (about
supportsCancellation) doesn't actually help — that's a cancellation fix, not a reconnect fix.
I think the right move is to frame byte streams on the writable side too (4 bytes per chunk overhead), so createReconnectingFramedStream can be used uniformly. The user-facing API stays raw bytes; only the wire format gets the length prefix. That removes the asymmetry and keeps the cleaner architecture this PR is trying to achieve.
| * the writable buffers one frame per chunk when multi-writing). The wrapper | ||
| * counts completed frames and, on upstream error, reopens the connection | ||
| * with `startIndex = resolvedStartIndex + consumedFrames`. Partial-frame | ||
| * bytes buffered before the cut are discarded — the server will resend the |
There was a problem hiding this comment.
Comment says On serverfull backends, reconnects should only happen during transient errors. For serverless backends, we set this constant so that we cover at least 10 minutes even if the server would be limited to e.g. 1 minute per session.
10 reconnects \u00d7 1-minute-per-session = 10 minutes covered. That's tighter than the deleted constant in world-vercel (MAX_RECONNECTS = 50, ~100 minutes coverage at 2-min server timeouts). If the underlying assumption is that streams now run for full function maxDuration (which on Pro/Enterprise can exceed 10 minutes), this cap may be too low.
Worth either:
- Bumping the constant to match the longest realistic
maxDuration(~15 min Pro), so something like 30, or - Making it configurable per-call (or via the world)
| console.warn("Error closing ReadableStream reader:", err) | ||
| }); | ||
| reader = undefined; | ||
| } |
There was a problem hiding this comment.
Nit: cancel() here only cancels the active reader. There's a small race window: if cancel fires while connect() is in flight (between reader = undefined after a reconnect-triggering error and the new reader being assigned), there's nothing to cancel — the new connection completes and the loop continues reading.
A cancelled flag checked at the top of the pull loop and inside connect() would close this. Same race existed in the deleted world-vercel cancel handler, so it's not a regression — just worth tightening if you're touching this code.
let cancelled = false;
// ... in pull loop, top of for(;;):
if (cancelled) { controller.close(); return; }
// ... in cancel:
cancelled = true;| const { world } = makeWorldWithScriptedStreams({ | ||
| 0: () => | ||
| scriptedStream([ | ||
| // Split frame into 3 byte-level reads to prove boundary-aware |
There was a problem hiding this comment.
Test simulates max-duration abort as controller.error(...) — which is correct for what the wrapper sees on a network reset, but doesn't verify the actual workflow-server behavior matches.
If workflow-server's stream timeout sends a clean FIN (i.e., calls controller.close() on its end) instead of an error, this code path will treat it as EOF and not reconnect. The control-frame logic that this PR removes was specifically designed to disambiguate these two cases.
Could you confirm in the PR description whether:
- workflow-server's stream timeout has been removed entirely (streams now run for full function
maxDuration), OR - the timeout still exists but now manifests as a network error / TCP reset rather than a clean FIN?
This is the load-bearing assumption of the whole design.
|
Following up after the discussion thread — consolidating the recommended direction so it's all in one place. Recommended directionMove byte-stream framing into core, gated on a per-run feature flag, with the resolved choice baked into the serialized stream ref. The PR's instinct (move reconnect to core) is right. The concrete change to make it work uniformly for byte streams: 1. Frame byte streams on the writer sideIn ops.push(value.pipeTo(writable));It would become: ops.push(
value
.pipeThrough(getByteFramingStream()) // wrap each chunk in [4-byte len][bytes]
.pipeTo(writable)
);Cost: 4 bytes per server-side chunk. For typical streaming workloads (AI text chunks of dozens of bytes, structured byte payloads in the KB+ range) this is well under 5% overhead. 2. Use
|
TooTallNate
left a comment
There was a problem hiding this comment.
Approving — withdrawing my prior request-for-changes.
Context: my earlier blocker was that this PR opts byte streams out of auto-reconnect, which I called a regression vs. the now-reverted #1790. Since then we discussed it and settled on a different plan: this PR lands on stable as-is (object-stream reconnect only), and byte-stream support gets added on main/v5 via wire-level framing in a follow-up. The framing work is now in PRs #1854 (workflowCoreVersion on HealthCheckResult) and #1853 (the framing itself), which together let createReconnectingFramedStream be applied uniformly to byte streams on main once they land.
So for stable, this PR is the right scope:
- Object-stream reconnect via
createReconnectingFramedStreamis correct. - Byte streams legitimately can't be auto-reconnected with the legacy unframed wire format that
stableships, so opting them out is the right call there. - Frame-counting math, AbortController plumbing, world-vercel simplification all look good.
The earlier non-blocking concerns I raised still apply — would be nice to address them but I'm not gating on them:
- The "clean EOF means done" assumption. Worth a sentence in the commit/PR description confirming whether workflow-server's stream timeout now manifests as a network error rather than a clean FIN, since the deleted control-frame logic was specifically there to disambiguate them.
FRAMED_STREAM_MAX_RECONNECTS = 10is tighter than the deletedMAX_RECONNECTS = 50. Probably fine, but worth a sanity check against the longest realistic Pro/EnterprisemaxDuration.- Cancel race during reconnect — pre-existing, not a regression here.
Co-authored-by: Peter Wielander <mittgfu@gmail.com> Signed-off-by: Peter Wielander <mittgfu@gmail.com>
Reverts #1790 and instead moves stream reconnects to the
getReadablelevel, where we already to chunk framing.Closes #1801
Closes #1802
After shipping this
Needs to later be forward-ported to
main