diff --git a/extensions/copilot/src/platform/endpoint/node/messagesApi.ts b/extensions/copilot/src/platform/endpoint/node/messagesApi.ts index 20199c059db38..7e0ac181c2d5c 100644 --- a/extensions/copilot/src/platform/endpoint/node/messagesApi.ts +++ b/extensions/copilot/src/platform/endpoint/node/messagesApi.ts @@ -5,7 +5,7 @@ import { ContentBlockParam, DocumentBlockParam, ImageBlockParam, MessageParam, RedactedThinkingBlockParam, TextBlockParam, ThinkingBlockParam, ToolReferenceBlockParam, ToolResultBlockParam } from '@anthropic-ai/sdk/resources'; import { Raw } from '@vscode/prompt-tsx'; -import { Response, withStreamIdleTimeout } from '../../../platform/networking/common/fetcherService'; +import { Response } from '../../../platform/networking/common/fetcherService'; import { AsyncIterableObject } from '../../../util/vs/base/common/async'; import { SSEParser } from '../../../util/vs/base/common/sseParser'; import { generateUuid } from '../../../util/vs/base/common/uuid'; @@ -593,7 +593,7 @@ export async function processResponseFromMessagesEndpoint( } }); - for await (const chunk of withStreamIdleTimeout(response.body)) { + for await (const chunk of response.body) { parser.feed(chunk); } }, async () => { diff --git a/extensions/copilot/src/platform/endpoint/node/responsesApi.ts b/extensions/copilot/src/platform/endpoint/node/responsesApi.ts index 660c524447c6e..f34e6f805ca7f 100644 --- a/extensions/copilot/src/platform/endpoint/node/responsesApi.ts +++ b/extensions/copilot/src/platform/endpoint/node/responsesApi.ts @@ -5,7 +5,7 @@ import { Raw } from '@vscode/prompt-tsx'; import type { OpenAI } from 'openai'; -import { Response, withStreamIdleTimeout } from '../../../platform/networking/common/fetcherService'; +import { Response } from '../../../platform/networking/common/fetcherService'; import { coalesce } from '../../../util/vs/base/common/arrays'; import { AsyncIterableObject } from '../../../util/vs/base/common/async'; import { binaryIndexOf } from '../../../util/vs/base/common/buffer'; @@ -534,7 +534,7 @@ export async function processResponseFromChatEndpoint(instantiationService: IIns } }); - for await (const chunk of withStreamIdleTimeout(response.body)) { + for await (const chunk of response.body) { parser.feed(chunk); } }, async () => { diff --git a/extensions/copilot/src/platform/networking/common/fetcherService.ts b/extensions/copilot/src/platform/networking/common/fetcherService.ts index 14b02fda31a7f..ae22b33ad8d60 100644 --- a/extensions/copilot/src/platform/networking/common/fetcherService.ts +++ b/extensions/copilot/src/platform/networking/common/fetcherService.ts @@ -296,84 +296,6 @@ export class DestroyableStream implements AsyncIterable { } } -/** - * How long to wait (ms) for the first SSE chunk. The model's TTFT is often longer - * than the subsequent chunks so we give it slightly more time. - */ -export const SSE_FIRST_CHUNK_TIMEOUT_MS = 2 * 60 * 1000; - -/** - * How long to wait (ms) between subsequent SSE chunks once streaming has - * started. Once the model is actively streaming, gaps this long indicate - * a hung connection. - */ -export const SSE_IDLE_TIMEOUT_MS = 60 * 1000; - -export class StreamIdleTimeoutError extends Error { - constructor(timeoutMs: number, isFirstChunk: boolean) { - super(isFirstChunk - ? `SSE stream timed out waiting ${timeoutMs}ms for the first chunk` - : `SSE stream timed out after ${timeoutMs}ms of inactivity`); - this.name = 'StreamIdleTimeoutError'; - } -} - -/** - * Wraps a {@link DestroyableStream} with an idle watchdog. Uses a longer - * timeout ({@link SSE_FIRST_CHUNK_TIMEOUT_MS}) while waiting for the first - * chunk, then switches to {@link SSE_IDLE_TIMEOUT_MS} for subsequent chunks. - * If no chunk arrives within the active timeout, the stream is destroyed - * and a {@link StreamIdleTimeoutError} is thrown. - */ -export async function* withStreamIdleTimeout( - stream: DestroyableStream, -): AsyncGenerator { - let timedOut = false; - let isFirstChunk = true; - let timer: ReturnType | undefined; - - const clearTimer = () => { - if (timer !== undefined) { - clearTimeout(timer); - timer = undefined; - } - }; - - const startTimer = (timeoutMs: number) => { - clearTimer(); - timer = setTimeout(() => { - timedOut = true; - void stream.destroy().catch(() => { }); - }, timeoutMs); - }; - - const iterator = stream[Symbol.asyncIterator](); - try { - while (true) { - // Timer runs only while awaiting the next chunk from the network - startTimer(isFirstChunk ? SSE_FIRST_CHUNK_TIMEOUT_MS : SSE_IDLE_TIMEOUT_MS); - const result = await iterator.next(); - clearTimer(); - - if (result.done) { - break; - } - - isFirstChunk = false; - // Consumer processing time is NOT timed — the timer is cleared above - yield result.value; - } - } finally { - clearTimer(); - await iterator.return?.(); - } - - if (timedOut) { - const timeoutMs = isFirstChunk ? SSE_FIRST_CHUNK_TIMEOUT_MS : SSE_IDLE_TIMEOUT_MS; - throw new StreamIdleTimeoutError(timeoutMs, isFirstChunk); - } -} - export async function jsonVerboseError(resp: Response) { const text = await resp.text(); try { diff --git a/extensions/copilot/src/platform/networking/node/stream.ts b/extensions/copilot/src/platform/networking/node/stream.ts index c6322df57301f..db143af2acd7a 100644 --- a/extensions/copilot/src/platform/networking/node/stream.ts +++ b/extensions/copilot/src/platform/networking/node/stream.ts @@ -10,7 +10,7 @@ import { TelemetryData } from '../../telemetry/common/telemetryData'; import { RawThinkingDelta, ThinkingDelta } from '../../thinking/common/thinking'; import { extractThinkingDeltaFromChoice, } from '../../thinking/common/thinkingUtils'; import { FinishedCallback, getRequestId, ICodeVulnerabilityAnnotation, ICopilotBeginToolCall, ICopilotConfirmation, ICopilotError, ICopilotFunctionCall, ICopilotReference, ICopilotToolCall, ICopilotToolCallStreamUpdate, IIPCodeCitation, isCodeCitationAnnotation, isCopilotAnnotation, RequestId } from '../common/fetch'; -import { DestroyableStream, Response, withStreamIdleTimeout } from '../common/fetcherService'; +import { DestroyableStream, Response } from '../common/fetcherService'; import { APIErrorResponse, APIJsonData, APIUsage, ChoiceLogProbs, FilterReason, FinishedCompletionReason, isApiUsage, IToolCall } from '../common/openai'; /** Gathers together many chunks of a single completion choice. */ @@ -320,7 +320,7 @@ export class SSEProcessor { let thinkingFound = false; // Iterate over arbitrarily sized chunks coming in from the network. - for await (const chunk of withStreamIdleTimeout(this.body)) { + for await (const chunk of this.body) { if (await this.maybeCancel('after awaiting body chunk')) { return; } diff --git a/extensions/copilot/src/platform/networking/test/node/streamIdleTimeout.spec.ts b/extensions/copilot/src/platform/networking/test/node/streamIdleTimeout.spec.ts deleted file mode 100644 index 82581ae592c8d..0000000000000 --- a/extensions/copilot/src/platform/networking/test/node/streamIdleTimeout.spec.ts +++ /dev/null @@ -1,186 +0,0 @@ -/*--------------------------------------------------------------------------------------------- - * Copyright (c) Microsoft Corporation. All rights reserved. - * Licensed under the MIT License. See License.txt in the project root for license information. - *--------------------------------------------------------------------------------------------*/ - -import assert from 'assert'; -import { afterEach, beforeEach, suite, test, vi } from 'vitest'; -import { DestroyableStream, StreamIdleTimeoutError, SSE_FIRST_CHUNK_TIMEOUT_MS, SSE_IDLE_TIMEOUT_MS, withStreamIdleTimeout } from '../../common/fetcherService'; - -/** - * Creates a DestroyableStream backed by a ReadableStream whose enqueue/close - * are exposed so the test can push chunks on demand. - */ -interface ControllableStream { - stream: DestroyableStream; - push: (value: T) => void; - close: () => void; -} - -function createControllableStream(): ControllableStream { - let ctrl!: ReadableStreamDefaultController; - const readable = new ReadableStream({ start(c) { ctrl = c; } }); - return { - stream: new DestroyableStream(readable), - push: (value: T) => ctrl.enqueue(value), - close: () => ctrl.close(), - }; -} - -suite('withStreamIdleTimeout', () => { - - beforeEach(() => { - vi.useFakeTimers(); - }); - - afterEach(() => { - vi.useRealTimers(); - }); - - test('yields all chunks from a fast stream without delay', async () => { - const { stream, push, close } = createControllableStream(); - push('a'); - push('b'); - push('c'); - close(); - - const result: string[] = []; - for await (const chunk of withStreamIdleTimeout(stream)) { - result.push(chunk); - } - - assert.deepStrictEqual(result, ['a', 'b', 'c']); - }); - - test('throws StreamIdleTimeoutError when first chunk never arrives', async () => { - const { stream } = createControllableStream(); - - await assert.rejects( - async () => { - const iter = withStreamIdleTimeout(stream); - const nextPromise = iter.next(); - nextPromise.catch(() => { }); // prevent unhandled rejection during timer advancement - await vi.advanceTimersByTimeAsync(SSE_FIRST_CHUNK_TIMEOUT_MS + 1); - await nextPromise; - }, - (err: StreamIdleTimeoutError) => { - assert.strictEqual(err.name, 'StreamIdleTimeoutError'); - assert.ok(err.message.includes('first chunk')); - return true; - } - ); - }); - - test('throws StreamIdleTimeoutError when a subsequent chunk stalls', async () => { - const { stream, push } = createControllableStream(); - - const iter = withStreamIdleTimeout(stream); - - // First chunk arrives immediately - push('first'); - const first = await iter.next(); - assert.deepStrictEqual(first, { value: 'first', done: false }); - - await assert.rejects( - async () => { - const nextPromise = iter.next(); - nextPromise.catch(() => { }); // prevent unhandled rejection during timer advancement - await vi.advanceTimersByTimeAsync(SSE_IDLE_TIMEOUT_MS + 1); - await nextPromise; - }, - (err: StreamIdleTimeoutError) => { - assert.strictEqual(err.name, 'StreamIdleTimeoutError'); - assert.ok(err.message.includes('inactivity')); - return true; - } - ); - }); - - test('does not time out when chunks arrive within the deadline', async () => { - const { stream, push, close } = createControllableStream(); - - const collected: string[] = []; - const done = (async () => { - for await (const chunk of withStreamIdleTimeout(stream)) { - collected.push(chunk); - } - })(); - - // Deliver chunks just inside the timeout windows - push('a'); - await vi.advanceTimersByTimeAsync(SSE_IDLE_TIMEOUT_MS - 100); - push('b'); - await vi.advanceTimersByTimeAsync(SSE_IDLE_TIMEOUT_MS - 100); - push('c'); - await vi.advanceTimersByTimeAsync(1); - close(); - - await done; - assert.deepStrictEqual(collected, ['a', 'b', 'c']); - }); - - test('uses the longer first-chunk timeout before first chunk', async () => { - const { stream, push, close } = createControllableStream(); - - const collected: string[] = []; - const done = (async () => { - for await (const chunk of withStreamIdleTimeout(stream)) { - collected.push(chunk); - } - })(); - - // Advance past the idle timeout but before the first-chunk timeout — should NOT throw - await vi.advanceTimersByTimeAsync(SSE_IDLE_TIMEOUT_MS + 100); - push('delayed-first'); - await vi.advanceTimersByTimeAsync(1); - close(); - - await done; - assert.deepStrictEqual(collected, ['delayed-first']); - }); - - test('consumer break releases the underlying reader lock', async () => { - const { stream, push } = createControllableStream(); - push('a'); - - const collected: string[] = []; - for await (const chunk of withStreamIdleTimeout(stream)) { - collected.push(chunk); - if (chunk === 'b') { - break; - } - push('b'); - } - - assert.deepStrictEqual(collected, ['a', 'b']); - // After breaking, destroy() should still work without a dangling reader lock - await stream.destroy(); - }); - - test('consumer processing time longer than idle timeout does not cause false timeout', async () => { - const { stream, push, close } = createControllableStream(); - - const collected: string[] = []; - const done = (async () => { - for await (const chunk of withStreamIdleTimeout(stream)) { - collected.push(chunk); - // Simulate slow consumer: advance time well past the idle timeout - // while processing each chunk. This must NOT trigger a timeout - // because the timer should only run during iterator.next(), not - // while the consumer holds the yielded value. - await vi.advanceTimersByTimeAsync(SSE_IDLE_TIMEOUT_MS * 3); - } - })(); - - push('a'); - await vi.advanceTimersByTimeAsync(1); - push('b'); - await vi.advanceTimersByTimeAsync(1); - push('c'); - await vi.advanceTimersByTimeAsync(1); - close(); - - await done; - assert.deepStrictEqual(collected, ['a', 'b', 'c']); - }); -});