From 56ef84a7082bc57ef9c00cd96fac135e0f9239de Mon Sep 17 00:00:00 2001 From: Lars Grammel Date: Fri, 26 Apr 2024 09:49:52 +0200 Subject: [PATCH] ai/core: fix uncaught abort handling exception (#1451) --- .changeset/quiet-wasps-tell.md | 6 +++ .../ai-core/src/stream-text/openai-abort.ts | 19 +++----- .../generate-text/run-tools-transformation.ts | 45 ++++++++++--------- .../core/core/generate-text/stream-text.ts | 3 +- .../util/retry-with-exponential-backoff.ts | 4 +- packages/provider-utils/src/index.ts | 1 + packages/provider-utils/src/is-abort-error.ts | 6 +++ packages/provider-utils/src/post-to-api.ts | 18 +++----- 8 files changed, 51 insertions(+), 51 deletions(-) create mode 100644 .changeset/quiet-wasps-tell.md create mode 100644 packages/provider-utils/src/is-abort-error.ts diff --git a/.changeset/quiet-wasps-tell.md b/.changeset/quiet-wasps-tell.md new file mode 100644 index 0000000000..45fd55b86f --- /dev/null +++ b/.changeset/quiet-wasps-tell.md @@ -0,0 +1,6 @@ +--- +'@ai-sdk/provider-utils': patch +'ai': patch +--- + +ai/core: fix abort handling in transformation stream diff --git a/examples/ai-core/src/stream-text/openai-abort.ts b/examples/ai-core/src/stream-text/openai-abort.ts index d26742800d..ed85489fef 100644 --- a/examples/ai-core/src/stream-text/openai-abort.ts +++ b/examples/ai-core/src/stream-text/openai-abort.ts @@ -5,33 +5,24 @@ import dotenv from 'dotenv'; dotenv.config(); async function main() { - const abortController = new AbortController(); - - // run async: - (async () => { - await delay(1500); // wait 1.5 seconds - abortController.abort(); // aborts the streaming - })(); - try { const { textStream } = await experimental_streamText({ model: openai('gpt-3.5-turbo'), prompt: 'Write a short story about a robot learning to love:\n\n', - abortSignal: abortController.signal, + abortSignal: AbortSignal.timeout(3000), }); for await (const textPart of textStream) { process.stdout.write(textPart); } } catch (error) { - if (error instanceof Error && error.name === 'AbortError') { + if ( + error instanceof Error && + (error.name === 'AbortError' || error.name === 'TimeoutError') + ) { console.log('\n\nAbortError: The run was aborted.'); } } } main().catch(console.error); - -async function delay(delayInMs: number): Promise { - return new Promise(resolve => setTimeout(resolve, delayInMs)); -} diff --git a/packages/core/core/generate-text/run-tools-transformation.ts b/packages/core/core/generate-text/run-tools-transformation.ts index f91ea1d3a0..e27dc671c1 100644 --- a/packages/core/core/generate-text/run-tools-transformation.ts +++ b/packages/core/core/generate-text/run-tools-transformation.ts @@ -162,27 +162,30 @@ export function runToolsTransformation< // combine the generator stream and the tool results stream return new ReadableStream>({ async start(controller) { - generatorStream.pipeThrough(forwardStream).pipeTo( - new WritableStream({ - write(chunk) { - controller.enqueue(chunk); - }, - close() { - // the generator stream controller is automatically closed when it's consumed - }, - }), - ); - - toolResultsStream.pipeTo( - new WritableStream({ - write(chunk) { - controller.enqueue(chunk); - }, - close() { - controller.close(); - }, - }), - ); + // need to wait for both pipes so there are no dangling promises that + // can cause uncaught promise rejections when the stream is aborted + return Promise.all([ + generatorStream.pipeThrough(forwardStream).pipeTo( + new WritableStream({ + write(chunk) { + controller.enqueue(chunk); + }, + close() { + // the generator stream controller is automatically closed when it's consumed + }, + }), + ), + toolResultsStream.pipeTo( + new WritableStream({ + write(chunk) { + controller.enqueue(chunk); + }, + close() { + controller.close(); + }, + }), + ), + ]); }, }); } diff --git a/packages/core/core/generate-text/stream-text.ts b/packages/core/core/generate-text/stream-text.ts index be9d25caed..e82b95f890 100644 --- a/packages/core/core/generate-text/stream-text.ts +++ b/packages/core/core/generate-text/stream-text.ts @@ -4,9 +4,9 @@ import { LanguageModelV1FinishReason, LanguageModelV1LogProbs, } from '@ai-sdk/provider'; +import { ServerResponse } from 'node:http'; import { AIStreamCallbacksAndOptions, - StreamData, StreamingTextResponse, createCallbacksTransformer, createStreamDataTransformer, @@ -26,7 +26,6 @@ import { retryWithExponentialBackoff } from '../util/retry-with-exponential-back import { runToolsTransformation } from './run-tools-transformation'; import { ToToolCall } from './tool-call'; import { ToToolResult } from './tool-result'; -import { ServerResponse } from 'node:http'; /** Generate a text and call tools for a given prompt using a language model. diff --git a/packages/core/core/util/retry-with-exponential-backoff.ts b/packages/core/core/util/retry-with-exponential-backoff.ts index f712f5265c..99874c741d 100644 --- a/packages/core/core/util/retry-with-exponential-backoff.ts +++ b/packages/core/core/util/retry-with-exponential-backoff.ts @@ -1,5 +1,5 @@ import { APICallError, RetryError } from '@ai-sdk/provider'; -import { getErrorMessage } from '@ai-sdk/provider-utils'; +import { getErrorMessage, isAbortError } from '@ai-sdk/provider-utils'; import { delay } from './delay'; export type RetryFunction = ( @@ -35,7 +35,7 @@ async function _retryWithExponentialBackoff( try { return await f(); } catch (error) { - if (error instanceof Error && error.name === 'AbortError') { + if (isAbortError(error)) { throw error; // don't retry when the request was aborted } diff --git a/packages/provider-utils/src/index.ts b/packages/provider-utils/src/index.ts index a9a78c74ce..646efa28b1 100644 --- a/packages/provider-utils/src/index.ts +++ b/packages/provider-utils/src/index.ts @@ -1,6 +1,7 @@ export * from './extract-response-headers'; export * from './generate-id'; export * from './get-error-message'; +export * from './is-abort-error'; export * from './load-api-key'; export * from './parse-json'; export * from './post-to-api'; diff --git a/packages/provider-utils/src/is-abort-error.ts b/packages/provider-utils/src/is-abort-error.ts new file mode 100644 index 0000000000..029b1e2865 --- /dev/null +++ b/packages/provider-utils/src/is-abort-error.ts @@ -0,0 +1,6 @@ +export function isAbortError(error: unknown): error is DOMException { + return ( + error instanceof DOMException && + (error.name === 'AbortError' || error.name === 'TimeoutError') + ); +} diff --git a/packages/provider-utils/src/post-to-api.ts b/packages/provider-utils/src/post-to-api.ts index b1ef6cad5c..88095b5adf 100644 --- a/packages/provider-utils/src/post-to-api.ts +++ b/packages/provider-utils/src/post-to-api.ts @@ -1,4 +1,5 @@ import { APICallError } from '@ai-sdk/provider'; +import { isAbortError } from './is-abort-error'; import { ResponseHandler } from './response-handler'; export const postJsonToApi = async ({ @@ -70,13 +71,8 @@ export const postToApi = async ({ requestBodyValues: body.values, }); } catch (error) { - if (error instanceof Error) { - if ( - error.name === 'AbortError' || - APICallError.isAPICallError(error) - ) { - throw error; - } + if (isAbortError(error) || APICallError.isAPICallError(error)) { + throw error; } throw new APICallError({ @@ -97,7 +93,7 @@ export const postToApi = async ({ }); } catch (error) { if (error instanceof Error) { - if (error.name === 'AbortError' || APICallError.isAPICallError(error)) { + if (isAbortError(error) || APICallError.isAPICallError(error)) { throw error; } } @@ -111,10 +107,8 @@ export const postToApi = async ({ }); } } catch (error) { - if (error instanceof Error) { - if (error.name === 'AbortError') { - throw error; - } + if (isAbortError(error)) { + throw error; } // unwrap original error when fetch failed (for easier debugging):