diff --git a/.changeset/stream-failure-diagnostics.md b/.changeset/stream-failure-diagnostics.md new file mode 100644 index 0000000000..ac1c86082c --- /dev/null +++ b/.changeset/stream-failure-diagnostics.md @@ -0,0 +1,6 @@ +--- +'@workflow/core': patch +'@workflow/world-vercel': patch +--- + +Prevent failed stream writes from surfacing as unhandled rejections and include request correlation details in stream errors. diff --git a/packages/core/src/flushable-stream.test.ts b/packages/core/src/flushable-stream.test.ts index 60d0b4dfc9..423e2f08bd 100644 --- a/packages/core/src/flushable-stream.test.ts +++ b/packages/core/src/flushable-stream.test.ts @@ -8,6 +8,26 @@ import { } from './flushable-stream.js'; describe('flushable stream behavior', () => { + it('does not emit an unhandled rejection before the runtime awaits a failed operation', async () => { + const unhandledRejections: unknown[] = []; + const onUnhandledRejection = (reason: unknown) => { + unhandledRejections.push(reason); + }; + process.on('unhandledRejection', onUnhandledRejection); + + try { + const state = createFlushableState(); + state.reject(new Error('Stream write failed')); + + await new Promise((resolve) => setTimeout(resolve, 0)); + + expect(unhandledRejections).toEqual([]); + await expect(state.promise).rejects.toThrow('Stream write failed'); + } finally { + process.off('unhandledRejection', onUnhandledRejection); + } + }); + it('promise should resolve when writable stream lock is released (polling)', async () => { // Test the pattern: user writes, releases lock, polling detects it, promise resolves const chunks: string[] = []; diff --git a/packages/core/src/flushable-stream.ts b/packages/core/src/flushable-stream.ts index 22751c8020..bbf199881c 100644 --- a/packages/core/src/flushable-stream.ts +++ b/packages/core/src/flushable-stream.ts @@ -48,12 +48,18 @@ export interface FlushableStreamState extends PromiseWithResolvers { } export function createFlushableState(): FlushableStreamState { - return { + const state: FlushableStreamState = { ...withResolvers(), pendingOps: 0, doneResolved: false, streamEnded: false, }; + + // The runtime awaits this promise after user code returns. Observe early + // stream failures now so they do not become unhandled rejections first. + state.promise.catch(() => {}); + + return state; } /** diff --git a/packages/world-vercel/src/streamer.test.ts b/packages/world-vercel/src/streamer.test.ts index 9cb8f0c95a..f8b561c68d 100644 --- a/packages/world-vercel/src/streamer.test.ts +++ b/packages/world-vercel/src/streamer.test.ts @@ -180,6 +180,38 @@ vi.mock('./utils.js', () => ({ }), })); +describe('writeToStream error diagnostics', () => { + async function getStreamer() { + const { createStreamer } = await import('./streamer.js'); + return createStreamer(); + } + + afterEach(() => { + vi.restoreAllMocks(); + }); + + it('includes endpoint and Vercel correlation headers in failed writes', async () => { + vi.spyOn(globalThis, 'fetch').mockImplementation( + async () => + new Response('Internal Server Error\nrequest-token', { + status: 500, + headers: { + 'x-vercel-id': 'sfo1::abc', + 'x-vercel-error': 'FUNCTION_INVOCATION_FAILED', + }, + }) + ); + + const streamer = await getStreamer(); + + await expect( + streamer.writeToStream('user', 'wrun_test', 'chunk') + ).rejects.toThrow( + 'Stream write failed: HTTP 500 (PUT https://test.example.com/v2/runs/wrun_test/stream/user; x-vercel-id=sfo1::abc; x-vercel-error=FUNCTION_INVOCATION_FAILED): Internal Server Error\nrequest-token' + ); + }); +}); + describe('writeToStreamMulti pagination', () => { /** * Decode length-prefixed multi-chunk body to count chunks per request. diff --git a/packages/world-vercel/src/streamer.ts b/packages/world-vercel/src/streamer.ts index bc9baa6c90..1b560b4037 100644 --- a/packages/world-vercel/src/streamer.ts +++ b/packages/world-vercel/src/streamer.ts @@ -72,6 +72,25 @@ async function fetchStreamMutation( } } +function createStreamRequestError( + operation: 'write' | 'close', + url: URL, + response: Response, + text: string +): Error { + const context = [`PUT ${url.origin}${url.pathname}`]; + for (const header of ['x-vercel-id', 'x-vercel-error']) { + const value = response.headers.get(header); + if (value) { + context.push(`${header}=${value}`); + } + } + + return new Error( + `Stream ${operation} failed: HTTP ${response.status} (${context.join('; ')}): ${text}` + ); +} + /** * Encode multiple chunks into a length-prefixed binary format. * Format: [4 bytes big-endian length][chunk bytes][4 bytes length][chunk bytes]... @@ -143,8 +162,9 @@ export function createStreamer(config?: APIConfig): Streamer { const resolvedRunId = await runId; const httpConfig = await getHttpConfig(config); + const url = getStreamUrl(name, resolvedRunId, httpConfig); const response = await fetchStreamMutation( - getStreamUrl(name, resolvedRunId, httpConfig), + url, { method: 'PUT', body: chunk, @@ -154,9 +174,7 @@ export function createStreamer(config?: APIConfig): Streamer { ); const text = await response.text(); if (!response.ok) { - throw new Error( - `Stream write failed: HTTP ${response.status}: ${text}` - ); + throw createStreamRequestError('write', url, response, text); } }, @@ -186,8 +204,9 @@ export function createStreamer(config?: APIConfig): Streamer { for (let i = 0; i < chunks.length; i += MAX_CHUNKS_PER_REQUEST) { const batch = chunks.slice(i, i + MAX_CHUNKS_PER_REQUEST); const body = encodeMultiChunks(batch); + const url = getStreamUrl(name, resolvedRunId, httpConfig); const response = await fetchStreamMutation( - getStreamUrl(name, resolvedRunId, httpConfig), + url, { method: 'PUT', body, @@ -197,9 +216,7 @@ export function createStreamer(config?: APIConfig): Streamer { ); const text = await response.text(); if (!response.ok) { - throw new Error( - `Stream write failed: HTTP ${response.status}: ${text}` - ); + throw createStreamRequestError('write', url, response, text); } } }, @@ -210,8 +227,9 @@ export function createStreamer(config?: APIConfig): Streamer { const httpConfig = await getHttpConfig(config); httpConfig.headers.set('X-Stream-Done', 'true'); + const url = getStreamUrl(name, resolvedRunId, httpConfig); const response = await fetchStreamMutation( - getStreamUrl(name, resolvedRunId, httpConfig), + url, { method: 'PUT', headers: httpConfig.headers, @@ -220,9 +238,7 @@ export function createStreamer(config?: APIConfig): Streamer { ); const text = await response.text(); if (!response.ok) { - throw new Error( - `Stream close failed: HTTP ${response.status}: ${text}` - ); + throw createStreamRequestError('close', url, response, text); } },