Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/stream-failure-diagnostics.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@workflow/core': patch
'@workflow/world-vercel': patch
---

Prevent failed stream writes from surfacing as unhandled rejections and include request correlation details in stream errors.
20 changes: 20 additions & 0 deletions packages/core/src/flushable-stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,26 @@ import {
} from './flushable-stream.js';

describe('flushable stream behavior', () => {
it('does not emit an unhandled rejection before the runtime awaits a failed operation', async () => {
const unhandledRejections: unknown[] = [];
const onUnhandledRejection = (reason: unknown) => {
unhandledRejections.push(reason);
};
process.on('unhandledRejection', onUnhandledRejection);

try {
const state = createFlushableState();
state.reject(new Error('Stream write failed'));

await new Promise((resolve) => setTimeout(resolve, 0));

expect(unhandledRejections).toEqual([]);
await expect(state.promise).rejects.toThrow('Stream write failed');
} finally {
process.off('unhandledRejection', onUnhandledRejection);
}
});

it('promise should resolve when writable stream lock is released (polling)', async () => {
// Test the pattern: user writes, releases lock, polling detects it, promise resolves
const chunks: string[] = [];
Expand Down
8 changes: 7 additions & 1 deletion packages/core/src/flushable-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,18 @@ export interface FlushableStreamState extends PromiseWithResolvers<void> {
}

export function createFlushableState(): FlushableStreamState {
return {
const state: FlushableStreamState = {
...withResolvers<void>(),
pendingOps: 0,
doneResolved: false,
streamEnded: false,
};

// The runtime awaits this promise after user code returns. Observe early
// stream failures now so they do not become unhandled rejections first.
state.promise.catch(() => {});

return state;
}

/**
Expand Down
32 changes: 32 additions & 0 deletions packages/world-vercel/src/streamer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,38 @@ vi.mock('./utils.js', () => ({
}),
}));

describe('writeToStream error diagnostics', () => {
async function getStreamer() {
const { createStreamer } = await import('./streamer.js');
return createStreamer();
}

afterEach(() => {
vi.restoreAllMocks();
});

it('includes endpoint and Vercel correlation headers in failed writes', async () => {
vi.spyOn(globalThis, 'fetch').mockImplementation(
async () =>
new Response('Internal Server Error\nrequest-token', {
status: 500,
headers: {
'x-vercel-id': 'sfo1::abc',
'x-vercel-error': 'FUNCTION_INVOCATION_FAILED',
},
})
);

const streamer = await getStreamer();

await expect(
streamer.writeToStream('user', 'wrun_test', 'chunk')
).rejects.toThrow(
'Stream write failed: HTTP 500 (PUT https://test.example.com/v2/runs/wrun_test/stream/user; x-vercel-id=sfo1::abc; x-vercel-error=FUNCTION_INVOCATION_FAILED): Internal Server Error\nrequest-token'
);
});
});

describe('writeToStreamMulti pagination', () => {
/**
* Decode length-prefixed multi-chunk body to count chunks per request.
Expand Down
40 changes: 28 additions & 12 deletions packages/world-vercel/src/streamer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,25 @@ async function fetchStreamMutation(
}
}

function createStreamRequestError(
operation: 'write' | 'close',
url: URL,
response: Response,
text: string
): Error {
const context = [`PUT ${url.origin}${url.pathname}`];
for (const header of ['x-vercel-id', 'x-vercel-error']) {
const value = response.headers.get(header);
if (value) {
context.push(`${header}=${value}`);
}
}

return new Error(
`Stream ${operation} failed: HTTP ${response.status} (${context.join('; ')}): ${text}`
);
}

/**
* Encode multiple chunks into a length-prefixed binary format.
* Format: [4 bytes big-endian length][chunk bytes][4 bytes length][chunk bytes]...
Expand Down Expand Up @@ -143,8 +162,9 @@ export function createStreamer(config?: APIConfig): Streamer {
const resolvedRunId = await runId;

const httpConfig = await getHttpConfig(config);
const url = getStreamUrl(name, resolvedRunId, httpConfig);
const response = await fetchStreamMutation(
getStreamUrl(name, resolvedRunId, httpConfig),
url,
{
method: 'PUT',
body: chunk,
Expand All @@ -154,9 +174,7 @@ export function createStreamer(config?: APIConfig): Streamer {
);
const text = await response.text();
if (!response.ok) {
throw new Error(
`Stream write failed: HTTP ${response.status}: ${text}`
);
throw createStreamRequestError('write', url, response, text);
}
},

Expand Down Expand Up @@ -186,8 +204,9 @@ export function createStreamer(config?: APIConfig): Streamer {
for (let i = 0; i < chunks.length; i += MAX_CHUNKS_PER_REQUEST) {
const batch = chunks.slice(i, i + MAX_CHUNKS_PER_REQUEST);
const body = encodeMultiChunks(batch);
const url = getStreamUrl(name, resolvedRunId, httpConfig);
const response = await fetchStreamMutation(
getStreamUrl(name, resolvedRunId, httpConfig),
url,
{
method: 'PUT',
body,
Expand All @@ -197,9 +216,7 @@ export function createStreamer(config?: APIConfig): Streamer {
);
const text = await response.text();
if (!response.ok) {
throw new Error(
`Stream write failed: HTTP ${response.status}: ${text}`
);
throw createStreamRequestError('write', url, response, text);
}
}
},
Expand All @@ -210,8 +227,9 @@ export function createStreamer(config?: APIConfig): Streamer {

const httpConfig = await getHttpConfig(config);
httpConfig.headers.set('X-Stream-Done', 'true');
const url = getStreamUrl(name, resolvedRunId, httpConfig);
const response = await fetchStreamMutation(
getStreamUrl(name, resolvedRunId, httpConfig),
url,
{
method: 'PUT',
headers: httpConfig.headers,
Expand All @@ -220,9 +238,7 @@ export function createStreamer(config?: APIConfig): Streamer {
);
const text = await response.text();
if (!response.ok) {
throw new Error(
`Stream close failed: HTTP ${response.status}: ${text}`
);
throw createStreamRequestError('close', url, response, text);
}
},

Expand Down