Skip to content

Commit 06aad3a

Browse files
authored
fix(standard-server): throw error for waiting operation if event iterator cancelled (#974)
<!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - Bug Fixes - Cancelling an ongoing iteration now reliably stops the stream and surfaces a clear cancellation error instead of appearing to complete normally. - Distinguishes explicit cancellation from natural completion to prevent delivering a final value after cancellation and ensures the stream closes promptly and instrumentation remains consistent. - Tests - Added tests verifying that cancelling while awaiting the next item rejects with the cancellation error, closes the stream, and updates instrumentation as expected. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 80c9415 commit 06aad3a

File tree

2 files changed

+43
-1
lines changed

2 files changed

+43
-1
lines changed

packages/standard-server-fetch/src/event-iterator.test.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,30 @@ describe('toEventIterator', () => {
180180
expect(runInSpanContextSpy).toHaveBeenCalledTimes(2)
181181
})
182182

183+
it('should throw if .return() while waiting for .next()', async () => {
184+
const stream = new ReadableStream<string>({
185+
async pull(controller) {
186+
await new Promise(resolve => setTimeout(resolve, 25))
187+
controller.enqueue('event: message\ndata: {"order": 1}\nid: id-1\nretry: 10000\n\n')
188+
controller.close()
189+
},
190+
}).pipeThrough(new TextEncoderStream())
191+
192+
const generator = toEventIterator(stream)
193+
expect(generator).toSatisfy(isAsyncIteratorObject)
194+
195+
const promise = expect(generator.next()).rejects.toBeInstanceOf(shared.AbortError)
196+
197+
await new Promise(resolve => setTimeout(resolve, 0))
198+
await generator.return(undefined)
199+
await promise
200+
201+
await vi.waitFor(() => expect(stream.getReader().closed).resolves.toBe(undefined))
202+
203+
expect(startSpanSpy).toHaveBeenCalledTimes(1)
204+
expect(runInSpanContextSpy).toHaveBeenCalledTimes(2)
205+
})
206+
183207
it('error while reading', async () => {
184208
const stream = new ReadableStream<string>({
185209
async pull(controller) {

packages/standard-server-fetch/src/event-iterator.ts

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import type { SetSpanErrorOptions } from '@orpc/shared'
2-
import { AsyncIteratorClass, isTypescriptObject, parseEmptyableJSON, runInSpanContext, setSpanError, startSpan, stringifyJSON } from '@orpc/shared'
2+
import { AbortError, AsyncIteratorClass, isTypescriptObject, parseEmptyableJSON, runInSpanContext, setSpanError, startSpan, stringifyJSON } from '@orpc/shared'
33
import { encodeEventMessage, ErrorEvent, EventDecoderStream, getEventMeta, withEventMeta } from '@orpc/standard-server'
44

55
export interface ToEventIteratorOptions extends SetSpanErrorOptions {}
@@ -14,6 +14,7 @@ export function toEventIterator(
1414

1515
const reader = eventStream?.getReader()
1616
let span: ReturnType<typeof startSpan> | undefined
17+
let isCancelled = false
1718

1819
return new AsyncIteratorClass(async () => {
1920
span ??= startSpan('consume_event_iterator_stream')
@@ -26,7 +27,23 @@ export function toEventIterator(
2627

2728
const { done, value } = await runInSpanContext(span, () => reader.read())
2829

30+
/**
31+
* Handle stream completion scenarios:
32+
*
33+
* 1. If the reader is cancelled while waiting for the next value,
34+
* reader.read() will resolve as { done: true, value: undefined }.
35+
* However, this behavior is unreliable and we should only resolve
36+
* a value when the sender explicitly indicates completion.
37+
*
38+
* 2. The only implicit behavior we allow is when the sender successfully
39+
* closes the stream without sending a 'done' event - in this case,
40+
* we resolve with { done: true, value: undefined }.
41+
*/
2942
if (done) {
43+
if (isCancelled) {
44+
throw new AbortError('Stream was cancelled')
45+
}
46+
3047
return { done: true, value: undefined }
3148
}
3249

@@ -82,6 +99,7 @@ export function toEventIterator(
8299
}, async (reason) => {
83100
try {
84101
if (reason !== 'next') {
102+
isCancelled = true
85103
span?.addEvent('cancelled')
86104
}
87105

0 commit comments

Comments
 (0)