fix(core): propagate stream cancellation on disconnect#1618
fix(core): propagate stream cancellation on disconnect#1618VaguelySerious merged 1 commit intomainfrom
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
🦋 Changeset detectedLatest commit: 43f1952 The changes in this PR will be included in the next version bump. This PR includes changesets to release 16 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
📊 Benchmark Results
workflow with no steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Express | Next.js (Turbopack) | Nitro workflow with 1 step💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Next.js (Turbopack) | Express workflow with 10 sequential steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Next.js (Turbopack) | Express workflow with 25 sequential steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Express | Next.js (Turbopack) | Nitro workflow with 50 sequential steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Express | Next.js (Turbopack) Promise.all with 10 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Express | Next.js (Turbopack) Promise.all with 25 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Express | Next.js (Turbopack) Promise.all with 50 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Express | Next.js (Turbopack) Promise.race with 10 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) | Express | Nitro Promise.race with 25 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Express | Next.js (Turbopack) | Nitro Promise.race with 50 concurrent steps💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Express | Next.js (Turbopack) workflow with 10 sequential data payload steps (10KB)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) | Nitro | Express workflow with 25 sequential data payload steps (10KB)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Express | Next.js (Turbopack) workflow with 50 sequential data payload steps (10KB)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Next.js (Turbopack) | Express workflow with 10 concurrent data payload steps (10KB)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Express | Next.js (Turbopack) workflow with 25 concurrent data payload steps (10KB)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Express | Next.js (Turbopack) workflow with 50 concurrent data payload steps (10KB)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Express | Nitro | Next.js (Turbopack) Stream Benchmarks (includes TTFB metrics)workflow with stream💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Express | Next.js (Turbopack) stream pipeline with 5 transform steps (1MB)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Next.js (Turbopack) | Express | Nitro 10 parallel streams (1MB each)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Express | Nitro | Next.js (Turbopack) fan-out fan-in 10 streams (1MB each)💻 Local Development
▲ Production (Vercel)
🔍 Observability: Nitro | Express | Next.js (Turbopack) SummaryFastest Framework by WorldWinner determined by most benchmark wins
Fastest World by FrameworkWinner determined by most benchmark wins
Column Definitions
Worlds:
|
🧪 E2E Test Results❌ Some tests failed Summary
❌ Failed Tests🌍 Community Worlds (64 failed)mongodb (4 failed):
redis (3 failed):
turso (57 failed):
Details by Category✅ ▲ Vercel Production
✅ 💻 Local Development
✅ 📦 Local Production
✅ 🐘 Local Postgres
✅ 🪟 Windows
❌ 🌍 Community Worlds
✅ 📋 Other
|
Based on #1354 — adds reader.cancel() in flushablePipe's finally block so the source stream is notified on disconnect, and adds a cancel() method to WorkflowServerReadableStream. Addresses all review feedback: retains original tests, avoids double cancel, uses .catch(() => {}) instead of console.warn in library code. Fixes #1349 Signed-off-by: Peter Wielander <mittgfu@gmail.com> Co-Authored-By: Sigmabrogz <bnb1000bnb@gmail.com> Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
18320f6 to
43f1952
Compare
TooTallNate
left a comment
There was a problem hiding this comment.
Clean fix for #1349. The two changes work together well: the Promise.race detects premature sink closure, and reader.cancel(reason) propagates the cancellation upstream.
What I verified:
flushablePipe — Promise.race on reader.read() vs writer.closed:
- On normal completion:
reader.read()returns{ done: true }, exits the loop beforewriter.closedresolves, so the race never triggers the error. Correct. - On premature close (e.g. client disconnect, clean
close()from external code):writer.closedresolves, throws'Writable stream closed prematurely', caught by thecatchblock, setscancelReason, rejectsstate.promise. Correct. - On writable error (e.g. write failure):
writer.closedrejects (not resolves), soPromise.racepropagates the rejection directly (bypasses the.then()). Also caught by thecatchblock. Correct. - The
writer.closedpromise is a persistent getter — no re-allocation per iteration, so the race adds negligible overhead.
flushablePipe — reader.cancel(cancelReason) in finally:
- On error:
cancelReasonis set to the error incatch,reader.cancel(err)propagates context upstream. Correct. - On normal completion:
cancelReasonisundefined,reader.cancel(undefined)is a no-op on an already-done reader per the streams spec. Correct. .catch(() => {})swallows cancel errors, which is appropriate — cancellation is best-effort.- Placed before
reader.releaseLock(), which is the right order — cancel before release.
WorkflowServerReadableStream — cancel(reason) method:
- Propagates cancellation to the inner reader and clears the reference. Correct.
.catch(() => {})on the inner cancel — same best-effort pattern. Correct.
Test coverage:
- Tests
controller.error()propagation through the pipe, verifyingstate.promiserejects and prior chunks were written. Good coverage for the core scenario.
Changeset: patch for @workflow/core. Correct.
LGTM.
| reader.read(), | ||
| writer.closed.then(() => { | ||
| throw new Error('Writable stream closed prematurely'); | ||
| }), |
There was a problem hiding this comment.
Non-blocking observation: writer.closed is a promise that settles at most once. Since the Promise.race is called on every loop iteration, each iteration creates a new .then() handler on the same writer.closed promise. These handlers accumulate but are harmless — once writer.closed settles, they all fire but only the first race winner matters (subsequent iterations would have already exited). In practice, stream iterations are bounded, so this is fine. Just noting for awareness.
Summary
reader.cancel()influshablePipe's finally block so the source stream is notified on disconnect, with error reason passed via a local variable to avoid double-cancelreader.read()againstwriter.closedto detect premature sink closure fastercancel(reason)method toWorkflowServerReadableStreamso cancellation propagates to the inner readerBased on #1354, with all review feedback addressed.
Closes #1354
Fixes #1349
Test plan
flushable-stream.test.ts(8 original + 1 new cancellation test)🤖 Generated with Claude Code