Skip to content
Merged
122 changes: 115 additions & 7 deletions src/vs/platform/agentHost/browser/remoteAgentHostProtocolClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
// Wraps WebSocketClientTransport and SessionClientState to provide a
// higher-level API matching IAgentService.

import { DeferredPromise } from '../../../base/common/async.js';
import { DeferredPromise, IntervalTimer } from '../../../base/common/async.js';
import { CancellationError } from '../../../base/common/errors.js';
import { Emitter } from '../../../base/common/event.js';
import { Disposable, IReference } from '../../../base/common/lifecycle.js';
Expand All @@ -33,6 +33,36 @@ import { decodeBase64, encodeBase64, VSBuffer } from '../../../base/common/buffe

const AHP_CLIENT_CONNECTION_CLOSED = -32000;

/**
* How often the connection liveness watchdog runs.
*
* Mirrors {@link ProtocolConstants.KeepAliveSendTime} from the regular
* remote extension host stack. Cheap because the check is just a couple
* of timestamp comparisons.
*/
const WATCHDOG_CHECK_INTERVAL_MS = 5_000;

/**
* If a request has been outstanding for this long AND no message of any
* kind has been received in the same window, declare the transport dead
* and force-close it so the renderer's reconnect logic kicks in.
*
* Matches {@link ProtocolConstants.TimeoutTime} from the regular remote
* extension host stack.
*
* Idle connections are not probed — no ping traffic is sent. The first
* user-driven request after the transport goes silent will surface the
* timeout within {@link WATCHDOG_TIMEOUT_MS}ms.
*/
const WATCHDOG_TIMEOUT_MS = 20_000;

function connectionTimeoutError(address: string, sinceLastReadMs: number, oldestRequestAgeMs: number): ProtocolError {
return new ProtocolError(
AHP_CLIENT_CONNECTION_CLOSED,
`Connection appears dead: ${address}; no message received for ${sinceLastReadMs}ms, oldest pending request is ${oldestRequestAgeMs}ms old.`,
);
}

function connectionClosedError(address: string): ProtocolError {
return new ProtocolError(AHP_CLIENT_CONNECTION_CLOSED, `Connection closed: ${address}`);
}
Expand Down Expand Up @@ -77,11 +107,27 @@ export class RemoteAgentHostProtocolClient extends Disposable implements IAgentC
readonly onDidClose = this._onDidClose.event;

/** Pending JSON-RPC requests keyed by request id. */
private readonly _pendingRequests = new Map<number, DeferredPromise<unknown>>();
private readonly _pendingRequests = new Map<number, { deferred: DeferredPromise<unknown>; sentAt: number }>();
private _nextRequestId = 1;
private _isClosed = false;
private _closeError: ProtocolError | undefined;

/**
* Timestamp of the most recent message of any kind received from the
* server. Updated in {@link _handleMessage}. Used by the watchdog to
* decide if the transport has gone silent.
*/
private _lastReadTime = Date.now();

/**
* Periodic check that fires {@link _handleClose} when there are
* outstanding requests *and* nothing has been received for
* {@link WATCHDOG_TIMEOUT_MS}ms. Detects silently-dead transports
* (e.g. SSH/tunnel after laptop sleep + network change) that don't
* produce a socket close event of their own. See {@link _watchdogTick}.
*/
private readonly _watchdog = this._register(new IntervalTimer());

/**
* Comparison keys of customization URIs we have already granted implicit
* read access for on this connection. Dedupes repeat sends so we don't
Expand Down Expand Up @@ -128,6 +174,9 @@ export class RemoteAgentHostProtocolClient extends Disposable implements IAgentC
this._register(this.onDidAction(envelope => {
this._subscriptionManager.receiveEnvelope(envelope);
}));

// Detect silently-dead transports — see {@link _watchdogTick}.
this._watchdog.cancelAndSet(() => this._watchdogTick(), WATCHDOG_CHECK_INTERVAL_MS);
}

override dispose(): void {
Expand Down Expand Up @@ -397,6 +446,19 @@ export class RemoteAgentHostProtocolClient extends Disposable implements IAgentC
}

private _handleMessage(msg: ProtocolMessage): void {
if (this._isClosed) {
// After close, the transport may still emit late messages (e.g.
// because the same shared event source is also feeding a newer
// transport for the same connectionId). Drop them so they can't
// trigger any side effects.
return;
}

// Any inbound traffic — including this message — is evidence the
// transport is still alive. Update before dispatch so the watchdog
// is consistent even if a handler synchronously schedules work.
this._lastReadTime = Date.now();

if (isJsonRpcRequest(msg)) {
this._handleReverseRequest(msg.id, msg.method, msg.params);
} else if (isJsonRpcResponse(msg)) {
Expand All @@ -405,9 +467,9 @@ export class RemoteAgentHostProtocolClient extends Disposable implements IAgentC
this._pendingRequests.delete(msg.id);
if (hasKey(msg, { error: true })) {
this._logService.warn(`[RemoteAgentHostProtocol] Request ${msg.id} failed:`, msg.error);
pending.error(this._toProtocolError(msg.error));
pending.deferred.error(this._toProtocolError(msg.error));
} else {
pending.complete(msg.result);
pending.deferred.complete(msg.result);
}
} else {
this._logService.warn(`[RemoteAgentHostProtocol] Received response for unknown request id ${msg.id}`);
Expand Down Expand Up @@ -443,6 +505,9 @@ export class RemoteAgentHostProtocolClient extends Disposable implements IAgentC

this._isClosed = true;
this._closeError = error;
// Stop the watchdog so it doesn't keep ticking on a dead connection
// (the client may outlive the close, waiting to be replaced).
this._watchdog.cancel();
this._rejectPendingRequests(error);
this._permissionService.connectionClosed(this._address);
this._grantedCustomizationUris.clear();
Expand Down Expand Up @@ -625,7 +690,7 @@ export class RemoteAgentHostProtocolClient extends Disposable implements IAgentC

const id = this._nextRequestId++;
const deferred = new DeferredPromise<unknown>();
this._pendingRequests.set(id, deferred);
this._pendingRequests.set(id, { deferred, sentAt: Date.now() });
// Generic M can't satisfy the distributive AhpRequest union directly
// eslint-disable-next-line local/code-no-dangerous-type-assertions
this._transport.send({ jsonrpc: '2.0' as const, id, method, params } as ProtocolMessage);
Expand All @@ -640,7 +705,7 @@ export class RemoteAgentHostProtocolClient extends Disposable implements IAgentC

const id = this._nextRequestId++;
const deferred = new DeferredPromise<unknown>();
this._pendingRequests.set(id, deferred);
this._pendingRequests.set(id, { deferred, sentAt: Date.now() });
const request: JsonRpcRequest = { jsonrpc: '2.0', id, method, params };
this._transport.send(request);
return deferred.p as Promise<IRemoteAgentHostExtensionCommandMap[M]['result']>;
Expand All @@ -652,11 +717,54 @@ export class RemoteAgentHostProtocolClient extends Disposable implements IAgentC

private _rejectPendingRequests(error: ProtocolError): void {
for (const pending of this._pendingRequests.values()) {
pending.error(error);
pending.deferred.error(error);
}
this._pendingRequests.clear();
}

/**
* Fired on a {@link WATCHDOG_CHECK_INTERVAL_MS} interval. If the
* transport has at least one outstanding request that's been waiting
* for more than {@link WATCHDOG_TIMEOUT_MS} and no inbound message has
* arrived in the same window, declare the transport dead and trigger
* the renderer's reconnect path.
*
* After laptop sleep + wake the JS event loop is paused, so the
* interval fires only once after wake. The lookback comparison still
* works — we're comparing wall-clock {@link Date.now()} values, not
* counting ticks.
*/
private _watchdogTick(): void {
if (this._isClosed || this._pendingRequests.size === 0) {
return;
}
const now = Date.now();
const sinceLastRead = now - this._lastReadTime;
if (sinceLastRead < WATCHDOG_TIMEOUT_MS) {
return;
}
// Find the oldest outstanding request; treat it as a proxy for
// "how long have we been waiting for *any* response from the
// remote". Iterating is cheap — _pendingRequests is at most a
// few dozen entries in practice.
let oldestSentAt = now;
for (const pending of this._pendingRequests.values()) {
if (pending.sentAt < oldestSentAt) {
oldestSentAt = pending.sentAt;
}
}
const oldestAge = now - oldestSentAt;
if (oldestAge < WATCHDOG_TIMEOUT_MS) {
return;
}
this._logService.info(
`[RemoteAgentHostProtocol] Watchdog: connection to ${this._address} appears dead — `
+ `${this._pendingRequests.size} request(s) outstanding, no message received for ${sinceLastRead}ms, `
+ `oldest request ${oldestAge}ms old. Forcing close to trigger reconnect.`,
);
this._handleClose(connectionTimeoutError(this._address, sinceLastRead, oldestAge));
Comment thread
roblourens marked this conversation as resolved.
}

/**
* Get the next client sequence number for optimistic dispatch.
*/
Expand Down
32 changes: 23 additions & 9 deletions src/vs/platform/agentHost/browser/remoteAgentHostServiceImpl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,24 @@ import { PROTOCOL_VERSION } from '../common/state/protocol/version/registry.js';
interface IConnectionEntry {
readonly store: DisposableStore;
readonly client: RemoteAgentHostProtocolClient;
/**
* Optional teardown for the shared-process tunnel that this entry's
* transport is using (SSH or dev-tunnels). Tracked separately from
* {@link store} because on reconnect the new entry takes ownership of
* the same underlying connectionId — running the old teardown would
* disconnect the freshly-established tunnel as a side effect.
*/
readonly transportDisposable?: IDisposable;
connected: boolean;
/** Current connection status for UI display. */
status: RemoteAgentHostConnectionStatus;
}

function disposeEntry(entry: IConnectionEntry): void {
entry.store.dispose();
entry.transportDisposable?.dispose();
}

export class RemoteAgentHostService extends Disposable implements IRemoteAgentHostService {
private static readonly ConnectionWaitTimeout = 10000;
/** Initial reconnect delay in milliseconds. */
Expand Down Expand Up @@ -223,6 +236,13 @@ export class RemoteAgentHostService extends Disposable implements IRemoteAgentHo

// Dispose any existing entry for this address to avoid leaking
// old protocol clients and relay transports on reconnect.
//
// CRITICAL: we deliberately do NOT run the existing entry's
// transportDisposable. On a reconnect to the same address, the
// shared-process tunnel keyed by connectionId is already owned by
// the new connection we just established. Running the old teardown
// would call _mainService.disconnect(connectionId) and immediately
// kill the brand-new tunnel.
const existingEntry = this._entries.get(address);
if (existingEntry) {
this._entries.delete(address);
Expand All @@ -234,13 +254,7 @@ export class RemoteAgentHostService extends Disposable implements IRemoteAgentHo
// Create a connection entry wrapping the pre-connected client
const protocolClient = connection as RemoteAgentHostProtocolClient;
store.add(protocolClient);
// Tear the underlying transport (e.g. SSH/tunnel relay) down with
// the entry. This is what makes "Remove Remote" actually close the
// shared-process tunnel and stop the remote agent host process.
if (transportDisposable) {
store.add(transportDisposable);
}
const connEntry: IConnectionEntry = { store, client: protocolClient, connected: true, status: RemoteAgentHostConnectionStatus.connected };
const connEntry: IConnectionEntry = { store, client: protocolClient, transportDisposable, connected: true, status: RemoteAgentHostConnectionStatus.connected };
this._entries.set(address, connEntry);
this._names.set(address, entry.name);
this._registeredEntries.set(address, entry);
Expand Down Expand Up @@ -297,7 +311,7 @@ export class RemoteAgentHostService extends Disposable implements IRemoteAgentHo
const entry = this._entries.get(address);
if (entry) {
this._entries.delete(address);
entry.store.dispose();
disposeEntry(entry);
this._rejectPendingConnectionWait(address, new Error(`Connection closed: ${address}`));
this._onDidChangeConnections.fire();
}
Expand Down Expand Up @@ -643,7 +657,7 @@ export class RemoteAgentHostService extends Disposable implements IRemoteAgentHo
}
this._pendingConnectionWaits.clear();
for (const entry of this._entries.values()) {
entry.store.dispose();
disposeEntry(entry);
}
this._entries.clear();
for (const handle of this._labelFormatters.values()) {
Expand Down
55 changes: 49 additions & 6 deletions src/vs/platform/agentHost/node/sshRemoteAgentHostService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import * as cp from 'child_process';
import { dirname, join, isAbsolute, basename } from '../../../base/common/path.js';
import { Emitter, Event } from '../../../base/common/event.js';
import { Disposable, DisposableMap, toDisposable } from '../../../base/common/lifecycle.js';
import { raceTimeout } from '../../../base/common/async.js';
import { URI } from '../../../base/common/uri.js';
import { localize } from '../../../nls.js';
import { ILogService } from '../../log/common/log.js';
Expand Down Expand Up @@ -63,6 +64,22 @@ interface SSHClient {

const LOG_PREFIX = '[SSHRemoteAgentHost]';

/**
* Maximum time to wait for {@link SSHRemoteAgentHostMainService._createWebSocketRelay}
* to settle on the `replaceRelay` reconnect path before giving up. A silently
* dead SSH client (TCP half-open, ssh2 keepalive hasn't fired yet) can leave
* `forwardOut`'s callback unfired, hanging the whole `connect()` call. Bounding
* this surfaces a clean failure so the renderer can clear its pending-reconnect
* flag and retry, and so the dead SSH client gets ended (purging it from the
* shared-process `_connections` map).
*
* The value is just slightly larger than ssh2's default keepalive failure
* window (`keepaliveInterval * keepaliveCountMax` ~= 15s * 3 = 45s) so that in
* practice the SSH client itself will surface its own `'close'` first when
* the network is hard-down. Tests override this to a much smaller value.
*/
const RECONNECT_RELAY_TIMEOUT_MS = 60_000;

/**
* One entry in the queue of authentication attempts handed to ssh2's
* `authHandler`. Each attempt corresponds to one of the auth method shapes
Expand Down Expand Up @@ -330,8 +347,14 @@ class SSHConnection extends Disposable {
readonly config: ISSHAgentHostConfigSanitized;
private _closed = false;
private _sshClientDetached = false;
private readonly _sshCloseListener = () => { this.dispose(); };
private readonly _sshErrorListener = () => { this.dispose(); };
private readonly _sshCloseListener = () => {
this._logService.info(`${LOG_PREFIX} SSH client closed for connection ${this.connectionId} (address ${this.address}); disposing connection`);
this.dispose();
};
private readonly _sshErrorListener = (err?: Error) => {
this._logService.info(`${LOG_PREFIX} SSH client error for connection ${this.connectionId} (address ${this.address}): ${err instanceof Error ? err.message : String(err)}; disposing connection`);
this.dispose();
};

constructor(
fullConfig: ISSHAgentHostConfig,
Expand All @@ -343,6 +366,7 @@ class SSHConnection extends Disposable {
readonly sshClient: SSHClient,
private readonly _relay: { send: (data: string) => void; close: () => void },
private readonly _remoteStream: SSHChannel | undefined,
private readonly _logService: ILogService,
) {
super();

Expand Down Expand Up @@ -407,6 +431,12 @@ export class SSHRemoteAgentHostMainService extends Disposable implements ISSHRem

private _nativeRequire: NodeJS.Require | undefined;

/**
* Override hook for tests to shorten the relay-creation timeout used on
* the `replaceRelay` reconnect path. See {@link RECONNECT_RELAY_TIMEOUT_MS}.
*/
protected relayCreationTimeoutMs: number = RECONNECT_RELAY_TIMEOUT_MS;

constructor(
@ILogService private readonly _logService: ILogService,
@IProductService private readonly _productService: IProductService,
Expand Down Expand Up @@ -455,15 +485,27 @@ export class SSHRemoteAgentHostMainService extends Disposable implements ISSHRem
const connectionId = connectionKey;
try {
let conn: SSHConnection | undefined; // eslint-disable-line prefer-const
const relay = await this._createWebSocketRelay(
sshClient, '127.0.0.1', remotePort, connectionToken,
(data: string) => this._onDidRelayMessage.fire({ connectionId, data }),
() => { conn?.dispose(); },
// Bound the relay creation: a silently dead SSH client
// (TCP half-open, ssh2 keepalive hasn't fired yet) can
// leave forwardOut's callback unfired, hanging the whole
// promise chain. raceTimeout returns undefined on timeout.
const timeoutMs = this.relayCreationTimeoutMs;
const relay = await raceTimeout(
this._createWebSocketRelay(
sshClient, '127.0.0.1', remotePort, connectionToken,
(data: string) => this._onDidRelayMessage.fire({ connectionId, data }),
() => { conn?.dispose(); },
),
timeoutMs,
);
if (!relay) {
throw new Error(`SSH relay creation timed out after ${timeoutMs}ms (SSH client appears unresponsive)`);
}

conn = new SSHConnection(
config, connectionId, connectionKey, config.name,
connectionToken, remotePort, sshClient, relay, undefined,
this._logService,
);

Event.once(conn.onDidClose)(() => {
Expand Down Expand Up @@ -609,6 +651,7 @@ export class SSHRemoteAgentHostMainService extends Disposable implements ISSHRem
sshClient,
relay,
agentStream,
this._logService,
);

Event.once(conn.onDidClose)(() => {
Expand Down
Loading
Loading