From d0ddbfaa6f254802fda2123c38f5fbea349afd3f Mon Sep 17 00:00:00 2001 From: unnoq Date: Mon, 8 Sep 2025 15:32:54 +0700 Subject: [PATCH 1/2] fix(standard-server): throw error for waiting operation if event iterator cancelled --- .../src/event-iterator.test.ts | 24 +++++++++++++++++++ .../src/event-iterator.ts | 18 ++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/packages/standard-server-fetch/src/event-iterator.test.ts b/packages/standard-server-fetch/src/event-iterator.test.ts index 3982ea0ce..33a5df4c5 100644 --- a/packages/standard-server-fetch/src/event-iterator.test.ts +++ b/packages/standard-server-fetch/src/event-iterator.test.ts @@ -180,6 +180,30 @@ describe('toEventIterator', () => { expect(runInSpanContextSpy).toHaveBeenCalledTimes(2) }) + it('should throw if .return() while waiting for .next()', async () => { + const stream = new ReadableStream({ + async pull(controller) { + await new Promise(resolve => setTimeout(resolve, 1000)) + controller.enqueue('event: message\ndata: {"order": 1}\nid: id-1\nretry: 10000\n\n') + controller.close() + }, + }).pipeThrough(new TextEncoderStream()) + + const generator = toEventIterator(stream) + expect(generator).toSatisfy(isAsyncIteratorObject) + + const promise = expect(generator.next()).rejects.toThrow('Stream was cancelled') + + await new Promise(resolve => setTimeout(resolve, 1)) + await generator.return(undefined) + await promise + + await vi.waitFor(() => expect(stream.getReader().closed).resolves.toBe(undefined)) + + expect(startSpanSpy).toHaveBeenCalledTimes(1) + expect(runInSpanContextSpy).toHaveBeenCalledTimes(2) + }) + it('error while reading', async () => { const stream = new ReadableStream({ async pull(controller) { diff --git a/packages/standard-server-fetch/src/event-iterator.ts b/packages/standard-server-fetch/src/event-iterator.ts index 1c9d1dbd4..27524f8d4 100644 --- a/packages/standard-server-fetch/src/event-iterator.ts +++ b/packages/standard-server-fetch/src/event-iterator.ts @@ -14,6 +14,7 @@ export function toEventIterator( const reader = eventStream?.getReader() let span: ReturnType | undefined + let isCancelled = false return new AsyncIteratorClass(async () => { span ??= startSpan('consume_event_iterator_stream') @@ -26,7 +27,23 @@ export function toEventIterator( const { done, value } = await runInSpanContext(span, () => reader.read()) + /** + * Handle stream completion scenarios: + * + * 1. If the reader is cancelled while waiting for the next value, + * reader.read() will resolve as { done: true, value: undefined }. + * However, this behavior is unreliable and we should only resolve + * a value when the sender explicitly indicates completion. + * + * 2. The only implicit behavior we allow is when the sender successfully + * closes the stream without sending a 'done' event - in this case, + * we resolve with { done: true, value: undefined }. + */ if (done) { + if (isCancelled) { + throw new Error('Stream was cancelled') + } + return { done: true, value: undefined } } @@ -82,6 +99,7 @@ export function toEventIterator( }, async (reason) => { try { if (reason !== 'next') { + isCancelled = true span?.addEvent('cancelled') } From 18e587b35977330637ce9815f2904ce7a2fee3a7 Mon Sep 17 00:00:00 2001 From: unnoq Date: Mon, 8 Sep 2025 15:41:47 +0700 Subject: [PATCH 2/2] improve --- packages/standard-server-fetch/src/event-iterator.test.ts | 6 +++--- packages/standard-server-fetch/src/event-iterator.ts | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/standard-server-fetch/src/event-iterator.test.ts b/packages/standard-server-fetch/src/event-iterator.test.ts index 33a5df4c5..94ba5f3fc 100644 --- a/packages/standard-server-fetch/src/event-iterator.test.ts +++ b/packages/standard-server-fetch/src/event-iterator.test.ts @@ -183,7 +183,7 @@ describe('toEventIterator', () => { it('should throw if .return() while waiting for .next()', async () => { const stream = new ReadableStream({ async pull(controller) { - await new Promise(resolve => setTimeout(resolve, 1000)) + await new Promise(resolve => setTimeout(resolve, 25)) controller.enqueue('event: message\ndata: {"order": 1}\nid: id-1\nretry: 10000\n\n') controller.close() }, @@ -192,9 +192,9 @@ describe('toEventIterator', () => { const generator = toEventIterator(stream) expect(generator).toSatisfy(isAsyncIteratorObject) - const promise = expect(generator.next()).rejects.toThrow('Stream was cancelled') + const promise = expect(generator.next()).rejects.toBeInstanceOf(shared.AbortError) - await new Promise(resolve => setTimeout(resolve, 1)) + await new Promise(resolve => setTimeout(resolve, 0)) await generator.return(undefined) await promise diff --git a/packages/standard-server-fetch/src/event-iterator.ts b/packages/standard-server-fetch/src/event-iterator.ts index 27524f8d4..d9194989d 100644 --- a/packages/standard-server-fetch/src/event-iterator.ts +++ b/packages/standard-server-fetch/src/event-iterator.ts @@ -1,5 +1,5 @@ import type { SetSpanErrorOptions } from '@orpc/shared' -import { AsyncIteratorClass, isTypescriptObject, parseEmptyableJSON, runInSpanContext, setSpanError, startSpan, stringifyJSON } from '@orpc/shared' +import { AbortError, AsyncIteratorClass, isTypescriptObject, parseEmptyableJSON, runInSpanContext, setSpanError, startSpan, stringifyJSON } from '@orpc/shared' import { encodeEventMessage, ErrorEvent, EventDecoderStream, getEventMeta, withEventMeta } from '@orpc/standard-server' export interface ToEventIteratorOptions extends SetSpanErrorOptions {} @@ -41,7 +41,7 @@ export function toEventIterator( */ if (done) { if (isCancelled) { - throw new Error('Stream was cancelled') + throw new AbortError('Stream was cancelled') } return { done: true, value: undefined }