[core] Add wire-level framing for byte streams#1853
[core] Add wire-level framing for byte streams#1853TooTallNate wants to merge 1 commit intonate/healthcheck-coreversionfrom
Conversation
🦋 Changeset detectedLatest commit: b3a7f7f The changes in this PR will be included in the next version bump. This PR includes changesets to release 17 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
📊 Benchmark Results
workflow with no steps💻 Local Development
workflow with 1 step💻 Local Development
workflow with 10 sequential steps💻 Local Development
workflow with 25 sequential steps💻 Local Development
workflow with 50 sequential steps💻 Local Development
Promise.all with 10 concurrent steps💻 Local Development
Promise.all with 25 concurrent steps💻 Local Development
Promise.all with 50 concurrent steps💻 Local Development
Promise.race with 10 concurrent steps💻 Local Development
Promise.race with 25 concurrent steps💻 Local Development
Promise.race with 50 concurrent steps💻 Local Development
workflow with 10 sequential data payload steps (10KB)💻 Local Development
workflow with 25 sequential data payload steps (10KB)💻 Local Development
workflow with 50 sequential data payload steps (10KB)💻 Local Development
workflow with 10 concurrent data payload steps (10KB)💻 Local Development
workflow with 25 concurrent data payload steps (10KB)💻 Local Development
workflow with 50 concurrent data payload steps (10KB)💻 Local Development
Stream Benchmarks (includes TTFB metrics)workflow with stream💻 Local Development
stream pipeline with 5 transform steps (1MB)💻 Local Development
10 parallel streams (1MB each)💻 Local Development
fan-out fan-in 10 streams (1MB each)💻 Local Development
SummaryFastest Framework by WorldWinner determined by most benchmark wins
Fastest World by FrameworkWinner determined by most benchmark wins
Column Definitions
Worlds:
❌ Some benchmark jobs failed:
Check the workflow run for details. |
🧪 E2E Test Results❌ Some tests failed Summary
❌ Failed Tests💻 Local Development (2 failed)vite-stable (2 failed):
Details by Category❌ 💻 Local Development
✅ 📦 Local Production
✅ 🐘 Local Postgres
✅ 🪟 Windows
✅ 📋 Other
❌ Some E2E test jobs failed:
Check the workflow run for details. |
There was a problem hiding this comment.
Pull request overview
Adds opt-in wire-level length-prefix framing for type: 'bytes' ReadableStreams, gated by target run capabilities, to preserve chunk boundaries on the wire and enable future transparent auto-reconnect.
Changes:
- Introduces byte-stream framing/unframing transforms and carries a per-stream
framingfield through serialization refs and VM symbols. - Adds
framedByteStreamscapability detection (viaworkflowCoreVersion) and uses a cross-deploymenthealthCheckprobe instart()to decide framing. - Updates producer/consumer sites (step return values,
getWritable,resumeHook, workflow argument dehydration) and adds dedicated tests + changeset.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| packages/core/src/symbols.ts | Adds STREAM_FRAMING_SYMBOL to persist framing choice through the VM boundary. |
| packages/core/src/step/writable-stream.ts | Forces framed byte-stream support when serializing values written from steps (same-deployment assumption). |
| packages/core/src/serialization.ts | Implements byte framing/unframing, adds framing to serialized stream refs, and threads framedByteStreams through reducers/revivers. |
| packages/core/src/runtime/step-handler.ts | Always enables framed byte streams for step return value dehydration (same-deployment assumption). |
| packages/core/src/runtime/start.ts | Adds capability probing for cross-deployment start() to decide whether to frame byte streams. |
| packages/core/src/runtime/resume-hook.ts | Uses run capabilities to decide whether to emit framed byte streams in hook resume payloads. |
| packages/core/src/runtime/helpers.ts | Parses and surfaces workflowCoreVersion from health check responses. |
| packages/core/src/capabilities.ts | Adds framedByteStreams capability with semver cutoff logic. |
| packages/core/src/capabilities.test.ts | Adds coverage for framedByteStreams cutoff/invalid-version behavior. |
| packages/core/src/byte-stream-framing.test.ts | Adds unit + e2e tests for framing/unframing and ref behavior. |
| .changeset/byte-stream-wire-framing.md | Declares a minor release for the new wire framing capability and behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| function appendToBuffer(data: Uint8Array) { | ||
| const next = new Uint8Array(buffer.length + data.length); | ||
| next.set(buffer, 0); | ||
| next.set(data, buffer.length); | ||
| buffer = next; | ||
| } |
There was a problem hiding this comment.
appendToBuffer() in getByteUnframingStream() reallocates and copies the entire buffered data on every incoming transport chunk. With many small chunks this becomes O(n²) copying and can cause significant GC/memory churn during long byte streams. Consider a chunk-list + offset approach (similar to a rope), or a growable buffer strategy (amortized linear) to avoid repeated full copies.
| // Sanity cap: 100MB per chunk. Workflow byte chunks are typically far | ||
| // smaller; anything bigger almost certainly means we got a non-framed | ||
| // wire fed through this transform by mistake (e.g. legacy raw bytes | ||
| // routed to a framed reader). | ||
| const MAX_FRAME_SIZE = 100_000_000; | ||
| let buffer = new Uint8Array(0); |
There was a problem hiding this comment.
MAX_FRAME_SIZE is hard-coded to 100MB in getByteUnframingStream(). This introduces a new per-chunk size limit for framed byte streams (previously raw byte streams had no SDK-level cap), and will cause valid >100MB chunks to fail even when framing is correct. If a cap is required, it should be aligned with documented/transport limits and ideally enforced (or at least surfaced) consistently on the producer side too; otherwise consider making it configurable or raising/removing it.
| it('hydrate of a framed-v1 ref unframes; absent ref reads raw', async () => { | ||
| // Direct exercise of the reviver dispatch: write framed bytes to a | ||
| // mock world under a known name, then construct the stream ref two | ||
| // different ways (with framing and without) to verify the consumer | ||
| // dispatches correctly. | ||
| setWorld(makeMockWorld()); | ||
| const world = await (await import('./runtime/world.js')).getWorld(); | ||
|
|
||
| // Frame three user chunks into the wire format and stash them. | ||
| const chunks = [ | ||
| new Uint8Array([1, 2]), | ||
| new Uint8Array([3, 4, 5]), | ||
| new Uint8Array([6]), | ||
| ]; | ||
| const reader = new ReadableStream<Uint8Array>({ | ||
| pull(c) { | ||
| for (const ch of chunks) c.enqueue(ch); | ||
| c.close(); | ||
| }, | ||
| }) | ||
| .pipeThrough(getByteFramingStream()) | ||
| .getReader(); | ||
|
|
||
| const wireBytes: Uint8Array[] = []; | ||
| for (;;) { | ||
| const r = await reader.read(); | ||
| if (r.done) break; | ||
| wireBytes.push(r.value); | ||
| } | ||
| for (const b of wireBytes) { | ||
| await world.streams.write('wrun_test', 'strm_known', b); | ||
| } | ||
|
|
||
| // Now read back via wire stream + unframer — should produce original chunks. | ||
| const wireStream = await world.streams.get('wrun_test', 'strm_known'); | ||
| const got = await readBytes( | ||
| wireStream.pipeThrough(getByteUnframingStream()) | ||
| ); | ||
| expect(got).toEqual(chunks); | ||
| }); |
There was a problem hiding this comment.
The test name/description doesn't match what the test actually exercises: "hydrate of a framed-v1 ref unframes; absent ref reads raw" only writes framed bytes and then manually pipes through getByteUnframingStream(); it doesn't construct/read a serialized ref with/without framing to verify the reviver dispatch. Either rename this test to match its behavior, or extend it to actually cover the value.framing branching in getExternalRevivers/getStepRevivers.
| return new TransformStream<Uint8Array, Uint8Array>({ | ||
| transform(chunk, controller) { | ||
| if (chunk.length === 0) return; | ||
| const frame = new Uint8Array(FRAME_HEADER_SIZE + chunk.length); | ||
| new DataView(frame.buffer).setUint32(0, chunk.length, false); | ||
| frame.set(chunk, FRAME_HEADER_SIZE); | ||
| controller.enqueue(frame); | ||
| }, |
There was a problem hiding this comment.
getByteFramingStream() drops zero-length Uint8Array chunks (if (chunk.length === 0) return;). That is an observable semantic change for type: 'bytes' streams when framing is enabled (a producer can enqueue empty chunks today and the consumer would receive them). Either preserve empty chunks by emitting a 0-length frame (the unframer already supports it), or explicitly document/justify that empty chunks are intentionally lossy despite the PR description claiming the user-facing API is unchanged.
cc06349 to
90d9e65
Compare
VaguelySerious
left a comment
There was a problem hiding this comment.
AI review: no blocking issues
| framedByteStreams = getRunCapabilities( | ||
| probe?.workflowCoreVersion | ||
| ).framedByteStreams; | ||
| } |
There was a problem hiding this comment.
AI Review: Note
This probe is synchronous on every cross-deployment start() call and gates the dehydrate. With a 2s timeout, repeated cross-deployment starts to a deployment that doesn't recognise the health check (or where workflowCoreVersion is absent) each pay up to 2s of latency, even when the workflow args contain no byte streams.
Worth caching the probe result per deploymentId (or per (deploymentId, workflowCoreVersion) once known) — the target's capabilities don't change between calls within a process. Without caching, a workflow that fans out N runs across deployments via start({ deploymentId }) would re-probe N times.
A softer alternative: only probe lazily when a byte stream is actually encountered in the args. Today the probe runs unconditionally.
| // the wire format without runtime negotiation. Producers that target a | ||
| // run whose deployment doesn't support framing (see `getRunCapabilities` | ||
| // in capabilities.ts) emit raw bytes and a ref without the field — which | ||
| // the reader treats as legacy raw bytes for backwards compatibility. |
There was a problem hiding this comment.
AI Review: Note
The unframer relies on an implicit invariant that's not stated here: each server-side stored chunk corresponds to exactly one frame, so startIndex (used by WorkflowServerReadableStream for resume) aligns to frame boundaries. If the server ever coalesces or splits stored chunks (or WorkflowServerWritableStream ever switches to writing partial frames per write() call), the unframer's first read would land mid-frame, the bogus length header would trip the MAX_FRAME_SIZE guard, and the stream would error.
Worth a sentence in this block (or near startIndex in WorkflowServerReadableStream) calling out that the framer must enqueue one whole frame per chunk to the underlying writable, and that any future change to the writable's chunk batching needs to honour that.
| next.set(buffer, 0); | ||
| next.set(data, buffer.length); | ||
| buffer = next; | ||
| } |
There was a problem hiding this comment.
AI Review: Nit
appendToBuffer allocates a fresh Uint8Array and copies the entire pending buffer + new chunk on every transport read — O(n²) when frames are split across many small reads.
This mirrors the pre-existing pattern in getDeserializeStream, so it's not new debt introduced by this PR. But byte streams can plausibly carry far larger volumes per chunk than object streams (the PR's MAX_FRAME_SIZE is 100MB). Worth a follow-up to switch both to a chunk-list accumulator with concat-on-extract.
| frame.set(chunk, FRAME_HEADER_SIZE); | ||
| controller.enqueue(frame); | ||
| }, | ||
| }); |
There was a problem hiding this comment.
AI Review: Nit
The framer drops empty chunks (good — empty frames are useless and the [0,0,0,0] shape collides with the looks-framed sniff in getDeserializeStream). But the unframer happily decodes a 4-byte zero-length header into an empty Uint8Array enqueue — no symmetry check.
In practice this only matters if a non-canonical writer ever produced a length-0 frame. Either drop empty enqueues on the unframer side too, or add a comment that the unframer tolerates them for forward-compat.
| const CAPABILITY_VERSION_TABLE: ReadonlyArray<{ | ||
| capability: keyof Omit<RunCapabilities, 'supportedFormats'>; | ||
| minVersion: string; | ||
| }> = [{ capability: 'framedByteStreams', minVersion: '5.0.0-beta.3' }]; |
There was a problem hiding this comment.
AI Review: Nit
The 5.0.0-beta.3 cutoff is a placeholder per the PR description ("Update this if the actual ship version differs"). Easy to forget — consider a // TODO(release): marker right on this line so it surfaces in a grep before cutting the beta.
| ); | ||
| expect(got).toEqual(chunks); | ||
| }); | ||
| }); |
There was a problem hiding this comment.
AI Review: Note
Two test gaps worth filling here:
- Cross-deployment probe in
start.ts— none of the new behaviour instart()(probe success → framed, probe miss → raw, probe timeout → raw, world withoutstreams.get→ raw) is exercised. This is the most user-visible new path and the one that adds latency on cross-deployment starts. - Workflow-VM round-trip propagation — the invariant that
getWorkflowRevivers→getWorkflowReducers/getStepReducerspreservesframingacross the VM boundary, even when the boundary is toldframedByteStreams=false(because flipping mid-stream would corrupt already-written bytes). I confirmed this works locally by hand-constructing a ref withframing: 'framed-v1', reviving viagetWorkflowRevivers, then re-serializing viadehydrateStepArguments— the field survives. Worth a regression test in this file so that invariant doesn't quietly drift in a future refactor.
Not blocking, but the second one in particular guards a subtle correctness property the rest of the framing story depends on.
Wraps each chunk of `type: 'bytes'` ReadableStreams in a 4-byte
big-endian length prefix on the wire, so consumers can identify chunk
boundaries and (in a follow-up) transparently reconnect on transient
stream errors. The user-facing API is unchanged — `getReader()` still
yields raw `Uint8Array` chunks; only the wire envelope is new.
The framing decision is made per-stream at serialization time and
recorded in the stream ref (`framing: 'framed-v1'`). Readers dispatch
on the field, falling back to raw bytes when absent so existing runs
keep working unchanged.
The choice is gated on a new `framedByteStreams` capability (in
`getRunCapabilities`) keyed on the target run's `workflowCoreVersion`.
Producers consult the capability of the run they're writing to:
- Same-deployment writes (the common case for `start()`,
`dehydrateStepReturnValue`, `getWritable()`): always frame —
version skew protection means the consumer is on this same SDK.
- `resumeHook` payloads: check the hook's owning run's
`workflowCoreVersion` (already loaded for encryption capability).
- Cross-deployment `start()` (explicit `deploymentId` or `'latest'`
resolving to a different deployment): probe the target via
`healthCheck` with a tight 2s timeout; fall back to raw on
timeout. Surfaces `workflowCoreVersion` from the existing health
check wire payload, which was already advertised but dropped on
the read side.
This is a prerequisite for transparent byte-stream auto-reconnect
(the goal of #1847 on stable). With frames on the wire, a future
reconnecting reader can count completed frames and resume from
`startIndex + consumed` after a transient error — just like
`createReconnectingFramedStream` already does for object streams.
Tests cover the framing/unframing transforms (split frames, coalesced
frames, truncation errors, oversized-frame guard), the capability
table for byte-stream support across version cutoffs, and end-to-end
round-trips through `dehydrateStepReturnValue` in both modes.
547dc27 to
b3a7f7f
Compare
VaguelySerious
left a comment
There was a problem hiding this comment.
One blocking question about version compat, otherwise LGTM
| } else if (typeof world.streams?.get !== 'function') { | ||
| framedByteStreams = false; | ||
| } else { |
There was a problem hiding this comment.
Wouldn't this fail if you create a newer run from an older deployment? E.g. infinite chess
Summary
Wraps each chunk of
type: 'bytes'ReadableStreams in a 4-byte big-endian length prefix on the wire, so consumers can identify chunk boundaries. The user-facing API is unchanged —getReader()still yields rawUint8Arraychunks; only the wire envelope is new.This enables byte-stream auto-reconnect to be added in a follow-up. With frames on the wire, a reconnecting reader can count completed frames and resume from
startIndex + consumedafter a transient error, which is the same trick thatcreateReconnectingFramedStreamuses today for object streams.Relationship to #1847
Independent. #1847 (targeting
stable) moves stream reconnect into core viacreateReconnectingFramedStream, applying it to object streams only — it explicitly opts byte streams out, with this comment:That decision is correct on
stablebecause the legacy wire format for byte streams is unframed. This PR (targetingmain/ v5) introduces the framed byte-stream wire format, opt-in per stream and gated on a capability flag. Once both this PR and #1847 land, a follow-up onmaincan applycreateReconnectingFramedStreamto byte streams whose ref carriesframing: 'framed-v1', completing the reconnect story for byte streams without touching the legacy raw path.#1847 is unaffected by this PR and can land on
stableas-is.Mechanism
The framing decision is made per-stream at serialization time and recorded in the stream ref:
Readers dispatch on the field. Absent or
'raw'→ existing legacy behavior (no unframing).'framed-v1'→ pipe throughgetByteUnframingStream()to strip the length prefix before handing chunks to the user.Capability gating
Producers consult a new
framedByteStreamscapability ingetRunCapabilities()(incapabilities.ts), keyed on the target run'sworkflowCoreVersion. The choice is then baked into the ref so consumers know what to do without re-doing the lookup.dehydrateStepReturnValue(step → workflow)getWritable()dehydrateWorkflowArguments(start()same-deployment)dehydrateWorkflowArguments(start()cross-deployment)healthCheck(2s timeout); fall back to raw on miss/timeoutdehydrateStepReturnValue(resumeHookpayload)workflowRun.executionContext.workflowCoreVersion(already loaded for encryption capability check)The cross-deployment probe relies on
HealthCheckResult.workflowCoreVersionbeing surfaced byhealthCheck()— that's PR #1854.Backwards compatibility
framingfield are read as raw bytes. ✓framing→ raw path. ✓resumeHookagainst an old run: target'sworkflowCoreVersionis below cutoff → producer emits raw bytes → old reader sees what it expects. ✓start({ deploymentId: 'old-deployment' }): probe times out or returns an oldworkflowCoreVersion→ producer emits raw bytes. ✓The framing format identifier is opaque (
'framed-v1') so future framing variants ('framed-v2', etc.) can be added without breaking existing consumers.Tests
byte-stream-framing.test.ts(new, 16 tests):getByteFramingStream: prefix shape, empty-chunk drop, large chunks, EOFgetByteUnframingStream: round-trip, split frames across reads, coalesced frames in one read, mid-frame truncation error, oversized-frame guardframedByteStreams=falseemits noframingfield (back-compat);framedByteStreams=trueemits'framed-v1'; both modes round-trip user bytes unchangedcapabilities.test.ts(extended):framedByteStreamsisfalsefor invalid/old versions,truefrom the cutoff (5.0.0-beta.3) onwardAll 624 core tests pass on the rebased branch.
Notes
minVersionforframedByteStreamsis set to'5.0.0-beta.3'based on the next beta release. Update this if the actual ship version differs.world.streams?.getbeing a function — minimal test mocks that don't implement it skip the probe (avoiding the 2s timeout in test runs).encrformat prefix system could wrap each frame's payload). Out of scope here.🤖 Generated with Claude Code