diff --git a/packages/standard-server-fetch/src/event-iterator.test.ts b/packages/standard-server-fetch/src/event-iterator.test.ts index 3982ea0ce..94ba5f3fc 100644 --- a/packages/standard-server-fetch/src/event-iterator.test.ts +++ b/packages/standard-server-fetch/src/event-iterator.test.ts @@ -180,6 +180,30 @@ describe('toEventIterator', () => { expect(runInSpanContextSpy).toHaveBeenCalledTimes(2) }) + it('should throw if .return() while waiting for .next()', async () => { + const stream = new ReadableStream({ + async pull(controller) { + await new Promise(resolve => setTimeout(resolve, 25)) + controller.enqueue('event: message\ndata: {"order": 1}\nid: id-1\nretry: 10000\n\n') + controller.close() + }, + }).pipeThrough(new TextEncoderStream()) + + const generator = toEventIterator(stream) + expect(generator).toSatisfy(isAsyncIteratorObject) + + const promise = expect(generator.next()).rejects.toBeInstanceOf(shared.AbortError) + + await new Promise(resolve => setTimeout(resolve, 0)) + await generator.return(undefined) + await promise + + await vi.waitFor(() => expect(stream.getReader().closed).resolves.toBe(undefined)) + + expect(startSpanSpy).toHaveBeenCalledTimes(1) + expect(runInSpanContextSpy).toHaveBeenCalledTimes(2) + }) + it('error while reading', async () => { const stream = new ReadableStream({ async pull(controller) { diff --git a/packages/standard-server-fetch/src/event-iterator.ts b/packages/standard-server-fetch/src/event-iterator.ts index 1c9d1dbd4..d9194989d 100644 --- a/packages/standard-server-fetch/src/event-iterator.ts +++ b/packages/standard-server-fetch/src/event-iterator.ts @@ -1,5 +1,5 @@ import type { SetSpanErrorOptions } from '@orpc/shared' -import { AsyncIteratorClass, isTypescriptObject, parseEmptyableJSON, runInSpanContext, setSpanError, startSpan, stringifyJSON } from '@orpc/shared' +import { AbortError, AsyncIteratorClass, isTypescriptObject, parseEmptyableJSON, runInSpanContext, setSpanError, startSpan, stringifyJSON } from '@orpc/shared' import { encodeEventMessage, ErrorEvent, EventDecoderStream, getEventMeta, withEventMeta } from '@orpc/standard-server' export interface ToEventIteratorOptions extends SetSpanErrorOptions {} @@ -14,6 +14,7 @@ export function toEventIterator( const reader = eventStream?.getReader() let span: ReturnType | undefined + let isCancelled = false return new AsyncIteratorClass(async () => { span ??= startSpan('consume_event_iterator_stream') @@ -26,7 +27,23 @@ export function toEventIterator( const { done, value } = await runInSpanContext(span, () => reader.read()) + /** + * Handle stream completion scenarios: + * + * 1. If the reader is cancelled while waiting for the next value, + * reader.read() will resolve as { done: true, value: undefined }. + * However, this behavior is unreliable and we should only resolve + * a value when the sender explicitly indicates completion. + * + * 2. The only implicit behavior we allow is when the sender successfully + * closes the stream without sending a 'done' event - in this case, + * we resolve with { done: true, value: undefined }. + */ if (done) { + if (isCancelled) { + throw new AbortError('Stream was cancelled') + } + return { done: true, value: undefined } } @@ -82,6 +99,7 @@ export function toEventIterator( }, async (reason) => { try { if (reason !== 'next') { + isCancelled = true span?.addEvent('cancelled') }