Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
*--------------------------------------------------------------------------------------------*/

import { Throttler } from '../../../../../../base/common/async.js';
import { CancellationToken } from '../../../../../../base/common/cancellation.js';
import { CancellationToken, CancellationTokenSource } from '../../../../../../base/common/cancellation.js';
import { Emitter } from '../../../../../../base/common/event.js';
import { MarkdownString } from '../../../../../../base/common/htmlContent.js';
import { Disposable, DisposableMap, DisposableStore, MutableDisposable, toDisposable } from '../../../../../../base/common/lifecycle.js';
import { Disposable, DisposableMap, DisposableStore, MutableDisposable, type IDisposable, toDisposable } from '../../../../../../base/common/lifecycle.js';
import { observableValue } from '../../../../../../base/common/observable.js';
import { URI } from '../../../../../../base/common/uri.js';
import { generateUuid } from '../../../../../../base/common/uuid.js';
Expand All @@ -18,7 +18,7 @@ import { ActionType, isSessionAction, type ISessionAction } from '../../../../..
import { SessionClientState } from '../../../../../../platform/agentHost/common/state/sessionClientState.js';
import { AHP_AUTH_REQUIRED, ProtocolError } from '../../../../../../platform/agentHost/common/state/sessionProtocol.js';
import { getToolKind, getToolLanguage } from '../../../../../../platform/agentHost/common/state/sessionReducers.js';
import { AttachmentType, PendingMessageKind, ResponsePartKind, ToolCallCancellationReason, ToolCallConfirmationReason, ToolCallStatus, TurnState, type IMessageAttachment } from '../../../../../../platform/agentHost/common/state/sessionState.js';
import { AttachmentType, PendingMessageKind, ResponsePartKind, ToolCallCancellationReason, ToolCallConfirmationReason, ToolCallStatus, TurnState, type IMessageAttachment, type ISessionState } from '../../../../../../platform/agentHost/common/state/sessionState.js';
import { ExtensionIdentifier } from '../../../../../../platform/extensions/common/extensions.js';
import { IInstantiationService } from '../../../../../../platform/instantiation/common/instantiation.js';
import { ILogService } from '../../../../../../platform/log/common/log.js';
Expand All @@ -30,10 +30,10 @@ import { ChatAgentLocation, ChatModeKind } from '../../../common/constants.js';
import { ChatToolInvocation } from '../../../common/model/chatProgressTypes/chatToolInvocation.js';
import { IChatAgentData, IChatAgentImplementation, IChatAgentRequest, IChatAgentResult, IChatAgentService } from '../../../common/participants/chatAgents.js';
import { getAgentHostIcon } from '../agentSessions.js';
import { finalizeToolInvocation, toolCallStateToInvocation, turnsToHistory, type IToolCallFileEdit } from './stateToProgressAdapter.js';
import { activeTurnToProgress, finalizeToolInvocation, toolCallStateToInvocation, turnsToHistory, type IToolCallFileEdit } from './stateToProgressAdapter.js';

// =============================================================================
// AgentHostSessionHandler renderer-side handler for a single agent host
// AgentHostSessionHandler - renderer-side handler for a single agent host
// chat session type. Bridges the protocol state layer with the chat UI:
// subscribes to session state, derives IChatProgress[] from immutable state
// changes, and dispatches client actions (turnStarted, toolCallConfirmed,
Expand All @@ -55,17 +55,24 @@ class AgentHostChatSession extends Disposable implements IChatSession {
readonly onDidStartServerRequest = this._onDidStartServerRequest.event;

readonly requestHandler: IChatSession['requestHandler'];
readonly interruptActiveResponseCallback: IChatSession['interruptActiveResponseCallback'];
interruptActiveResponseCallback: IChatSession['interruptActiveResponseCallback'];

constructor(
readonly sessionResource: URI,
readonly history: readonly IChatSessionHistoryItem[],
private readonly _sendRequest: (request: IChatAgentRequest, progress: (parts: IChatProgress[]) => void, token: CancellationToken) => Promise<void>,
initialProgress: IChatProgress[] | undefined,
onDispose: () => void,
@ILogService private readonly _logService: ILogService,
) {
super();

const hasActiveTurn = initialProgress !== undefined;
if (hasActiveTurn) {
this.isCompleteObs.set(false, undefined);
this.progressObs.set(initialProgress, undefined);
}

this._register(toDisposable(() => this._onWillDispose.fire()));
this._register(toDisposable(onDispose));

Expand All @@ -76,9 +83,34 @@ class AgentHostChatSession extends Disposable implements IChatSession {
this.isCompleteObs.set(true, undefined);
};

this.interruptActiveResponseCallback = history.length > 0 ? undefined : async () => {
// Provide interrupt callback when reconnecting to an active turn or
// when this is a brand-new session (no history yet).
this.interruptActiveResponseCallback = (hasActiveTurn || history.length === 0) ? async () => {
return true;
};
} : undefined;
}

/**
* Registers a disposable to be cleaned up when this session is disposed.
*/
registerDisposable<T extends IDisposable>(disposable: T): T {
return this._register(disposable);
}

/**
* Appends new progress items to the observable. Used by the reconnection
* flow to stream ongoing state changes into the chat UI.
*/
appendProgress(items: IChatProgress[]): void {
const current = this.progressObs.get();
this.progressObs.set([...current, ...items], undefined);
}

/**
* Marks the active turn as complete.
*/
complete(): void {
this.isCompleteObs.set(true, undefined);
}

/**
Expand Down Expand Up @@ -175,6 +207,8 @@ export class AgentHostSessionHandler extends Disposable implements IChatSessionC
let resolvedSession: URI | undefined;
const isUntitled = resourceKey.startsWith('untitled-');
const history: IChatSessionHistoryItem[] = [];
let initialProgress: IChatProgress[] | undefined;
let activeTurnId: string | undefined;
if (!isUntitled) {
resolvedSession = this._resolveSessionUri(sessionResource);
this._sessionToBackend.set(resourceKey, resolvedSession);
Expand All @@ -185,6 +219,26 @@ export class AgentHostSessionHandler extends Disposable implements IChatSessionC
const sessionState = this._clientState.getSessionState(resolvedSession.toString());
if (sessionState) {
history.push(...turnsToHistory(sessionState.turns, this._config.agentId));

// If there's an active turn, include its request in history
// with an empty response so the chat service creates a
// pending request, then provide accumulated progress via
// progressObs for live streaming.
if (sessionState.activeTurn) {
activeTurnId = sessionState.activeTurn.id;
history.push({
type: 'request',
prompt: sessionState.activeTurn.userMessage.text,
participant: this._config.agentId,
});
history.push({
type: 'response',
parts: [],
participant: this._config.agentId,
});
initialProgress = activeTurnToProgress(sessionState.activeTurn);
this._logService.info(`[AgentHost] Reconnecting to active turn ${activeTurnId} for session ${resolvedSession.toString()}`);
}
}
}
} catch (err) {
Expand All @@ -206,6 +260,7 @@ export class AgentHostSessionHandler extends Disposable implements IChatSessionC
this._ensurePendingMessageSubscription(resourceKey, sessionResource, backendSession);
return this._handleTurn(backendSession, request, progress, token);
},
initialProgress,
() => {
this._activeSessions.delete(resourceKey);
this._sessionToBackend.delete(resourceKey);
Expand All @@ -220,9 +275,15 @@ export class AgentHostSessionHandler extends Disposable implements IChatSessionC
);
this._activeSessions.set(resourceKey, session);

// For existing (non-untitled) sessions, start watching for server-initiated turns
// immediately. For untitled sessions, this is deferred to _createAndSubscribe.
if (resolvedSession) {
// If reconnecting to an active turn, wire up an ongoing state listener
// to stream new progress into the session's progressObs.
if (activeTurnId && initialProgress !== undefined) {
this._reconnectToActiveTurn(resolvedSession, activeTurnId, session, initialProgress);
}

// For existing (non-untitled) sessions, start watching for server-initiated turns
// immediately. For untitled sessions, this is deferred to _createAndSubscribe.
this._watchForServerInitiatedTurns(resolvedSession, sessionResource);
}

Expand Down Expand Up @@ -395,7 +456,11 @@ export class AgentHostSessionHandler extends Disposable implements IChatSessionC
private _watchForServerInitiatedTurns(backendSession: URI, sessionResource: URI): void {
const resourceKey = sessionResource.path.substring(1);
const sessionStr = backendSession.toString();
let lastSeenTurnId: string | undefined;

// Seed from the current state so we don't treat any pre-existing active
// turn (e.g. one being handled by _reconnectToActiveTurn) as new.
const currentState = this._clientState.getSessionState(sessionStr);
let lastSeenTurnId: string | undefined = currentState?.activeTurn?.id;
let previousQueuedIds: Set<string> | undefined;

const disposables = new DisposableStore();
Expand Down Expand Up @@ -827,6 +892,179 @@ export class AgentHostSessionHandler extends Disposable implements IChatSessionC
});
}

// ---- Reconnection to active turn ----------------------------------------

/**
* Wires up an ongoing state listener that streams incremental progress
* from an already-running turn into the chat session's progressObs.
* This is the reconnection counterpart of {@link _handleTurn}, which
* handles newly-initiated turns.
*/
private _reconnectToActiveTurn(
backendSession: URI,
turnId: string,
chatSession: AgentHostChatSession,
initialProgress: IChatProgress[],
): void {
const sessionKey = backendSession.toString();

// Extract live ChatToolInvocation objects from the initial progress
// array so we can update/finalize the same instances the chat UI holds.
const activeToolInvocations = new Map<string, ChatToolInvocation>();
for (const item of initialProgress) {
if (item instanceof ChatToolInvocation) {
activeToolInvocations.set(item.toolCallId, item);
}
}

// Track last-emitted content lengths per response part to compute deltas.
// Seed from the current state so we only emit new content beyond what
// activeTurnToProgress already captured.
const lastEmittedLengths = new Map<string, number>();
const currentState = this._clientState.getSessionState(sessionKey);
if (currentState?.activeTurn) {
for (const rp of currentState.activeTurn.responseParts) {
if (rp.kind === ResponsePartKind.Markdown || rp.kind === ResponsePartKind.Reasoning) {
lastEmittedLengths.set(rp.id, rp.content.length);
}
}
}

const reconnectDisposables = chatSession.registerDisposable(new DisposableStore());
const throttler = new Throttler();
reconnectDisposables.add(throttler);

// Set up the interrupt callback so the user can actually cancel the
// remote turn. This dispatches session/turnCancelled to the server.
chatSession.interruptActiveResponseCallback = async () => {
this._logService.info(`[AgentHost] Reconnect cancellation requested for ${sessionKey}, dispatching turnCancelled`);
const cancelAction = {
type: ActionType.SessionTurnCancelled as const,
session: sessionKey,
turnId,
};
const seq = this._clientState.applyOptimistic(cancelAction);
this._config.connection.dispatchAction(cancelAction, this._clientState.clientId, seq);
return true;
};

// Wire up awaitConfirmation for tool calls that were already pending
// confirmation at snapshot time so the user can approve/deny them.
const cts = new CancellationTokenSource();
reconnectDisposables.add(toDisposable(() => cts.dispose(true)));
for (const [toolCallId, invocation] of activeToolInvocations) {
if (!IChatToolInvocation.isComplete(invocation)) {
this._awaitToolConfirmation(invocation, toolCallId, backendSession, turnId, cts.token);
}
}

// Process state changes from the protocol layer.
const processStateChange = (sessionState: ISessionState) => {
const activeTurn = sessionState.activeTurn;
const isActive = activeTurn?.id === turnId;
const responseParts = isActive
? activeTurn.responseParts
: sessionState.turns.find(t => t.id === turnId)?.responseParts;

if (responseParts) {
for (const rp of responseParts) {
switch (rp.kind) {
case ResponsePartKind.Markdown: {
const lastLen = lastEmittedLengths.get(rp.id) ?? 0;
if (rp.content.length > lastLen) {
const delta = rp.content.substring(lastLen);
lastEmittedLengths.set(rp.id, rp.content.length);
chatSession.appendProgress([{ kind: 'markdownContent', content: new MarkdownString(delta, { supportHtml: true }) }]);
}
break;
}
case ResponsePartKind.Reasoning: {
const lastLen = lastEmittedLengths.get(rp.id) ?? 0;
if (rp.content.length > lastLen) {
const delta = rp.content.substring(lastLen);
lastEmittedLengths.set(rp.id, rp.content.length);
chatSession.appendProgress([{ kind: 'thinking', value: delta }]);
}
break;
}
case ResponsePartKind.ToolCall: {
const tc = rp.toolCall;
const toolCallId = tc.toolCallId;
let existing = activeToolInvocations.get(toolCallId);

if (!existing) {
existing = toolCallStateToInvocation(tc);
activeToolInvocations.set(toolCallId, existing);
chatSession.appendProgress([existing]);

if (tc.status === ToolCallStatus.PendingConfirmation) {
this._awaitToolConfirmation(existing, toolCallId, backendSession, turnId, cts.token);
}
} else if (tc.status === ToolCallStatus.PendingConfirmation) {
// Running -> PendingConfirmation (re-confirmation).
existing.didExecuteTool(undefined);
const confirmInvocation = toolCallStateToInvocation(tc);
activeToolInvocations.set(toolCallId, confirmInvocation);
chatSession.appendProgress([confirmInvocation]);
this._awaitToolConfirmation(confirmInvocation, toolCallId, backendSession, turnId, cts.token);
} else if (tc.status === ToolCallStatus.Running) {
existing.invocationMessage = typeof tc.invocationMessage === 'string'
? tc.invocationMessage
: new MarkdownString(tc.invocationMessage.markdown);
if (getToolKind(tc) === 'terminal' && tc.toolInput) {
existing.toolSpecificData = {
kind: 'terminal',
commandLine: { original: tc.toolInput },
language: getToolLanguage(tc) ?? 'shellscript',
};
}
}

// Finalize terminal-state tools
if (existing && (tc.status === ToolCallStatus.Completed || tc.status === ToolCallStatus.Cancelled) && !IChatToolInvocation.isComplete(existing)) {
activeToolInvocations.delete(toolCallId);
finalizeToolInvocation(existing, tc);
// Note: file edits from reconnection are not routed through
// the editing session pipeline as there is no active request
// context. The edits already happened on the remote.
}
break;
}
}
}
}

// If the turn is no longer active, emit any error and finish.
if (!isActive) {
const lastTurn = sessionState.turns.find(t => t.id === turnId);
if (lastTurn?.state === TurnState.Error && lastTurn.error) {
chatSession.appendProgress([{
kind: 'markdownContent',
content: new MarkdownString(`\n\nError: (${lastTurn.error.errorType}) ${lastTurn.error.message}`),
}]);
}
chatSession.complete();
reconnectDisposables.dispose();
}
};

// Attach the ongoing state listener
reconnectDisposables.add(this._clientState.onDidChangeSessionState(e => {
if (e.session !== sessionKey) {
return;
}
throttler.queue(async () => processStateChange(e.state));
}));

// Immediately reconcile against the current state to close any gap
// between snapshot time and listener registration. If the turn already
// completed in the interim, this will mark the session complete.
const latestState = this._clientState.getSessionState(sessionKey);
if (latestState) {
processStateChange(latestState);
}
}

// ---- File edit routing ---------------------------------------------------

/**
Expand Down
Loading
Loading