Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions extensions/copilot/src/platform/endpoint/node/messagesApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import { ContentBlockParam, DocumentBlockParam, ImageBlockParam, MessageParam, RedactedThinkingBlockParam, TextBlockParam, ThinkingBlockParam, ToolReferenceBlockParam, ToolResultBlockParam } from '@anthropic-ai/sdk/resources';
import { Raw } from '@vscode/prompt-tsx';
import { Response, withStreamIdleTimeout } from '../../../platform/networking/common/fetcherService';
import { Response } from '../../../platform/networking/common/fetcherService';
import { AsyncIterableObject } from '../../../util/vs/base/common/async';
import { SSEParser } from '../../../util/vs/base/common/sseParser';
import { generateUuid } from '../../../util/vs/base/common/uuid';
Expand Down Expand Up @@ -593,7 +593,7 @@ export async function processResponseFromMessagesEndpoint(
}
});

for await (const chunk of withStreamIdleTimeout(response.body)) {
for await (const chunk of response.body) {
parser.feed(chunk);
}
}, async () => {
Expand Down
4 changes: 2 additions & 2 deletions extensions/copilot/src/platform/endpoint/node/responsesApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import { Raw } from '@vscode/prompt-tsx';
import type { OpenAI } from 'openai';
import { Response, withStreamIdleTimeout } from '../../../platform/networking/common/fetcherService';
import { Response } from '../../../platform/networking/common/fetcherService';
import { coalesce } from '../../../util/vs/base/common/arrays';
import { AsyncIterableObject } from '../../../util/vs/base/common/async';
import { binaryIndexOf } from '../../../util/vs/base/common/buffer';
Expand Down Expand Up @@ -534,7 +534,7 @@ export async function processResponseFromChatEndpoint(instantiationService: IIns
}
});

for await (const chunk of withStreamIdleTimeout(response.body)) {
for await (const chunk of response.body) {
parser.feed(chunk);
}
}, async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,84 +296,6 @@ export class DestroyableStream<T> implements AsyncIterable<T> {
}
}

/**
* How long to wait (ms) for the first SSE chunk. The model's TTFT is often longer
* than the subsequent chunks so we give it slightly more time.
*/
export const SSE_FIRST_CHUNK_TIMEOUT_MS = 2 * 60 * 1000;

/**
* How long to wait (ms) between subsequent SSE chunks once streaming has
* started. Once the model is actively streaming, gaps this long indicate
* a hung connection.
*/
export const SSE_IDLE_TIMEOUT_MS = 60 * 1000;

export class StreamIdleTimeoutError extends Error {
constructor(timeoutMs: number, isFirstChunk: boolean) {
super(isFirstChunk
? `SSE stream timed out waiting ${timeoutMs}ms for the first chunk`
: `SSE stream timed out after ${timeoutMs}ms of inactivity`);
this.name = 'StreamIdleTimeoutError';
}
}

/**
* Wraps a {@link DestroyableStream} with an idle watchdog. Uses a longer
* timeout ({@link SSE_FIRST_CHUNK_TIMEOUT_MS}) while waiting for the first
* chunk, then switches to {@link SSE_IDLE_TIMEOUT_MS} for subsequent chunks.
* If no chunk arrives within the active timeout, the stream is destroyed
* and a {@link StreamIdleTimeoutError} is thrown.
*/
export async function* withStreamIdleTimeout<T>(
stream: DestroyableStream<T>,
): AsyncGenerator<T> {
let timedOut = false;
let isFirstChunk = true;
let timer: ReturnType<typeof setTimeout> | undefined;

const clearTimer = () => {
if (timer !== undefined) {
clearTimeout(timer);
timer = undefined;
}
};

const startTimer = (timeoutMs: number) => {
clearTimer();
timer = setTimeout(() => {
timedOut = true;
void stream.destroy().catch(() => { });
}, timeoutMs);
};

const iterator = stream[Symbol.asyncIterator]();
try {
while (true) {
// Timer runs only while awaiting the next chunk from the network
startTimer(isFirstChunk ? SSE_FIRST_CHUNK_TIMEOUT_MS : SSE_IDLE_TIMEOUT_MS);
const result = await iterator.next();
clearTimer();

if (result.done) {
break;
}

isFirstChunk = false;
// Consumer processing time is NOT timed — the timer is cleared above
yield result.value;
}
} finally {
clearTimer();
await iterator.return?.();
}

if (timedOut) {
const timeoutMs = isFirstChunk ? SSE_FIRST_CHUNK_TIMEOUT_MS : SSE_IDLE_TIMEOUT_MS;
throw new StreamIdleTimeoutError(timeoutMs, isFirstChunk);
}
}

export async function jsonVerboseError(resp: Response) {
const text = await resp.text();
try {
Expand Down
4 changes: 2 additions & 2 deletions extensions/copilot/src/platform/networking/node/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { TelemetryData } from '../../telemetry/common/telemetryData';
import { RawThinkingDelta, ThinkingDelta } from '../../thinking/common/thinking';
import { extractThinkingDeltaFromChoice, } from '../../thinking/common/thinkingUtils';
import { FinishedCallback, getRequestId, ICodeVulnerabilityAnnotation, ICopilotBeginToolCall, ICopilotConfirmation, ICopilotError, ICopilotFunctionCall, ICopilotReference, ICopilotToolCall, ICopilotToolCallStreamUpdate, IIPCodeCitation, isCodeCitationAnnotation, isCopilotAnnotation, RequestId } from '../common/fetch';
import { DestroyableStream, Response, withStreamIdleTimeout } from '../common/fetcherService';
import { DestroyableStream, Response } from '../common/fetcherService';
import { APIErrorResponse, APIJsonData, APIUsage, ChoiceLogProbs, FilterReason, FinishedCompletionReason, isApiUsage, IToolCall } from '../common/openai';

/** Gathers together many chunks of a single completion choice. */
Expand Down Expand Up @@ -320,7 +320,7 @@ export class SSEProcessor {
let thinkingFound = false;

// Iterate over arbitrarily sized chunks coming in from the network.
for await (const chunk of withStreamIdleTimeout(this.body)) {
for await (const chunk of this.body) {
if (await this.maybeCancel('after awaiting body chunk')) {
return;
}
Expand Down

This file was deleted.

Loading