Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remote - merge two latency measurements into one #184566

Merged
merged 2 commits into from Jun 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
191 changes: 1 addition & 190 deletions src/vs/base/parts/ipc/common/ipc.net.ts
Expand Up @@ -3,7 +3,6 @@
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/

import { IntervalTimer } from 'vs/base/common/async';
import { VSBuffer } from 'vs/base/common/buffer';
import { Emitter, Event } from 'vs/base/common/event';
import { Disposable, DisposableStore, IDisposable } from 'vs/base/common/lifecycle';
Expand Down Expand Up @@ -264,9 +263,7 @@ const enum ProtocolMessageType {
ReplayRequest = 6,
Pause = 7,
Resume = 8,
KeepAlive = 9,
LatencyMeasurementRequest = 10,
LatencyMeasurementResponse = 11,
KeepAlive = 9
}

function protocolMessageTypeToString(messageType: ProtocolMessageType) {
Expand All @@ -280,8 +277,6 @@ function protocolMessageTypeToString(messageType: ProtocolMessageType) {
case ProtocolMessageType.Pause: return 'PauseWriting';
case ProtocolMessageType.Resume: return 'ResumeWriting';
case ProtocolMessageType.KeepAlive: return 'KeepAlive';
case ProtocolMessageType.LatencyMeasurementRequest: return 'LatencyMeasurementRequest';
case ProtocolMessageType.LatencyMeasurementResponse: return 'LatencyMeasurementResponse';
}
}

Expand Down Expand Up @@ -309,22 +304,6 @@ export const enum ProtocolConstants {
* Send a message every 5 seconds to avoid that the connection is closed by the OS.
*/
KeepAliveSendTime = 5000, // 5 seconds
/**
* Measure the latency every 1 minute.
*/
LatencySampleTime = 1 * 60 * 1000, // 1 minute
/**
* Keep the last 5 samples for latency measurement.
*/
LatencySampleCount = 5,
/**
* A latency over 1s will be considered high.
*/
HighLatencyTimeThreshold = 1000,
/**
* Having 3 or more samples with high latency will trigger a high latency event.
*/
HighLatencySampleThreshold = 3,
}

class ProtocolMessage {
Expand Down Expand Up @@ -803,52 +782,6 @@ export interface ILoadEstimator {
hasHighLoad(): boolean;
}

export const enum ConnectionHealth {
/**
* The connection health is considered good when a certain number of recent round trip time measurements are below a certain threshold.
* @see ProtocolConstants.HighLatencyTimeThreshold @see ProtocolConstants.HighLatencySampleThreshold
*/
Good,
/**
* The connection health is considered poor when a certain number of recent round trip time measurements are above a certain threshold.
* @see ProtocolConstants.HighLatencyTimeThreshold @see ProtocolConstants.HighLatencySampleThreshold
*/
Poor
}

export function connectionHealthToString(connectionHealth: ConnectionHealth): 'good' | 'poor' {
switch (connectionHealth) {
case ConnectionHealth.Good: return 'good';
case ConnectionHealth.Poor: return 'poor';
}
}

/**
* An event describing that the connection health has changed.
*/
export class ConnectionHealthChangedEvent {
constructor(
public readonly connectionHealth: ConnectionHealth
) { }
}

/**
* An event describing that a round trip time measurement was above a certain threshold.
*/
export class HighRoundTripTimeEvent {
constructor(
/**
* The round trip time in milliseconds.
*/
public readonly roundTripTime: number,
/**
* The number of recent round trip time measurements that were above the threshold.
* @see ProtocolConstants.HighLatencyTimeThreshold @see ProtocolConstants.HighLatencySampleThreshold
*/
public readonly recentHighRoundTripCount: number
) { }
}

export interface PersistentProtocolOptions {
/**
* The socket to use.
Expand All @@ -862,10 +795,6 @@ export interface PersistentProtocolOptions {
* The CPU load estimator to use.
*/
loadEstimator?: ILoadEstimator;
/**
* Whether to measure round trip time. Defaults to false.
*/
measureRoundTripTime?: boolean;
/**
* Whether to send keep alive messages. Defaults to true.
*/
Expand Down Expand Up @@ -898,11 +827,9 @@ export class PersistentProtocol implements IMessagePassingProtocol {
private _socket: ISocket;
private _socketWriter: ProtocolWriter;
private _socketReader: ProtocolReader;
private _socketLatencyMonitor: LatencyMonitor;
private _socketDisposables: DisposableStore;

private readonly _loadEstimator: ILoadEstimator;
private readonly _measureRoundTripTime: boolean;
private readonly _shouldSendKeepAlive: boolean;

private readonly _onControlMessage = new BufferedEmitter<VSBuffer>();
Expand All @@ -920,19 +847,12 @@ export class PersistentProtocol implements IMessagePassingProtocol {
private readonly _onSocketTimeout = new BufferedEmitter<SocketTimeoutEvent>();
readonly onSocketTimeout: Event<SocketTimeoutEvent> = this._onSocketTimeout.event;

private readonly _onHighRoundTripTime = new BufferedEmitter<HighRoundTripTimeEvent>();
readonly onHighRoundTripTime = this._onHighRoundTripTime.event;

private readonly _onDidChangeConnectionHealth = new BufferedEmitter<ConnectionHealth>();
readonly onDidChangeConnectionHealth = this._onDidChangeConnectionHealth.event;

public get unacknowledgedCount(): number {
return this._outgoingMsgId - this._outgoingAckId;
}

constructor(opts: PersistentProtocolOptions) {
this._loadEstimator = opts.loadEstimator ?? LoadEstimator.getInstance();
this._measureRoundTripTime = opts.measureRoundTripTime ?? false;
this._shouldSendKeepAlive = opts.sendKeepAlive ?? true;
this._isReconnecting = false;
this._outgoingUnackMsg = new Queue<ProtocolMessage>();
Expand All @@ -954,13 +874,6 @@ export class PersistentProtocol implements IMessagePassingProtocol {
this._socketReader = this._socketDisposables.add(new ProtocolReader(this._socket));
this._socketDisposables.add(this._socketReader.onMessage(msg => this._receiveMessage(msg)));
this._socketDisposables.add(this._socket.onClose(e => this._onSocketClose.fire(e)));
this._socketLatencyMonitor = this._socketDisposables.add(new LatencyMonitor()); // is started immediately
this._socketDisposables.add(this._socketLatencyMonitor.onSendLatencyRequest(buffer => this._sendLatencyMeasurementRequest(buffer)));
this._socketDisposables.add(this._socketLatencyMonitor.onHighRoundTripTime(e => this._onHighRoundTripTime.fire(e)));
this._socketDisposables.add(this._socketLatencyMonitor.onDidChangeConnectionHealth(e => this._onDidChangeConnectionHealth.fire(e)));
if (this._measureRoundTripTime) {
this._socketLatencyMonitor.start();
}

if (opts.initialChunk) {
this._socketReader.acceptChunk(opts.initialChunk);
Expand Down Expand Up @@ -1041,19 +954,12 @@ export class PersistentProtocol implements IMessagePassingProtocol {
this._socketReader = this._socketDisposables.add(new ProtocolReader(this._socket));
this._socketDisposables.add(this._socketReader.onMessage(msg => this._receiveMessage(msg)));
this._socketDisposables.add(this._socket.onClose(e => this._onSocketClose.fire(e)));
this._socketLatencyMonitor = this._socketDisposables.add(new LatencyMonitor()); // will be started later
this._socketDisposables.add(this._socketLatencyMonitor.onSendLatencyRequest(buffer => this._sendLatencyMeasurementRequest(buffer)));
this._socketDisposables.add(this._socketLatencyMonitor.onHighRoundTripTime(e => this._onHighRoundTripTime.fire(e)));
this._socketDisposables.add(this._socketLatencyMonitor.onDidChangeConnectionHealth(e => this._onDidChangeConnectionHealth.fire(e)));

this._socketReader.acceptChunk(initialDataChunk);
}

public endAcceptReconnection(): void {
this._isReconnecting = false;
if (this._measureRoundTripTime) {
this._socketLatencyMonitor.start();
}

// After a reconnection, let the other party know (again) which messages have been received.
// (perhaps the other party didn't receive a previous ACK)
Expand Down Expand Up @@ -1144,15 +1050,6 @@ export class PersistentProtocol implements IMessagePassingProtocol {
// nothing to do
break;
}
case ProtocolMessageType.LatencyMeasurementRequest: {
// we just send the data back
this._sendLatencyMeasurementResponse(msg.data);
break;
}
case ProtocolMessageType.LatencyMeasurementResponse: {
this._socketLatencyMonitor.handleResponse(msg.data);
break;
}
}
}

Expand Down Expand Up @@ -1282,92 +1179,6 @@ export class PersistentProtocol implements IMessagePassingProtocol {
const msg = new ProtocolMessage(ProtocolMessageType.KeepAlive, 0, this._incomingAckId, getEmptyBuffer());
this._socketWriter.write(msg);
}

private _sendLatencyMeasurementRequest(buffer: VSBuffer): void {
this._incomingAckId = this._incomingMsgId;
const msg = new ProtocolMessage(ProtocolMessageType.LatencyMeasurementRequest, 0, this._incomingAckId, buffer);
this._socketWriter.write(msg);
}

private _sendLatencyMeasurementResponse(buffer: VSBuffer): void {
this._incomingAckId = this._incomingMsgId;
const msg = new ProtocolMessage(ProtocolMessageType.LatencyMeasurementResponse, 0, this._incomingAckId, buffer);
this._socketWriter.write(msg);
}
}

class LatencyMonitor extends Disposable {

private readonly _onSendLatencyRequest = this._register(new Emitter<VSBuffer>());
readonly onSendLatencyRequest: Event<VSBuffer> = this._onSendLatencyRequest.event;

private readonly _onHighRoundTripTime = this._register(new Emitter<HighRoundTripTimeEvent>());
public readonly onHighRoundTripTime = this._onHighRoundTripTime.event;

private readonly _onDidChangeConnectionHealth = this._register(new Emitter<ConnectionHealth>());
public readonly onDidChangeConnectionHealth = this._onDidChangeConnectionHealth.event;

private readonly _measureLatencyTimer = this._register(new IntervalTimer());

/**
* Timestamp of our last latency request message sent to the other host.
*/
private _lastLatencyMeasurementSent: number = -1;

/**
* ID separate from the regular message IDs. Used to match up latency
* requests with responses so we know we're timing the right message
* even if a reconnection occurs.
*/
private _lastLatencyMeasurementId: number = 0;

/**
* Circular buffer of latency measurements
*/
private _latencySamples: number[] = Array.from({ length: ProtocolConstants.LatencySampleCount }, (_) => 0);
private _latencySampleIndex: number = 0;
private _connectionHealth = ConnectionHealth.Good;

constructor() {
super();
}

public start(): void {
this._measureLatencyTimer.cancelAndSet(() => {
this._lastLatencyMeasurementSent = Date.now();
const measurementId = ++this._lastLatencyMeasurementId;
const buffer = VSBuffer.alloc(4);
buffer.writeUInt32BE(measurementId, 0);
this._onSendLatencyRequest.fire(buffer);
}, ProtocolConstants.LatencySampleTime);
}

public handleResponse(buffer: VSBuffer): void {
if (buffer.byteLength !== 4) {
// invalid measurementId
return;
}
const measurementId = buffer.readUInt32BE(0);
if (this._lastLatencyMeasurementSent <= 0 || measurementId !== this._lastLatencyMeasurementId) {
// invalid measurementId
return;
}

const roundtripTime = Date.now() - this._lastLatencyMeasurementSent;
const sampleIndex = this._latencySampleIndex++;
this._latencySamples[sampleIndex % this._latencySamples.length] = roundtripTime;

const previousConnectionHealth = this._connectionHealth;
const highLatencySampleCount = this._latencySamples.filter(s => s >= ProtocolConstants.HighLatencyTimeThreshold).length;
this._connectionHealth = (highLatencySampleCount >= ProtocolConstants.HighLatencySampleThreshold ? ConnectionHealth.Poor : ConnectionHealth.Good);

if (roundtripTime > ProtocolConstants.HighLatencyTimeThreshold) {
this._onHighRoundTripTime.fire(new HighRoundTripTimeEvent(roundtripTime, highLatencySampleCount));
}
if (previousConnectionHealth !== this._connectionHealth) {
this._onDidChangeConnectionHealth.fire(this._connectionHealth);
}
}
}

// (() => {
Expand Down
23 changes: 4 additions & 19 deletions src/vs/platform/remote/common/remoteAgentConnection.ts
Expand Up @@ -13,7 +13,7 @@ import * as performance from 'vs/base/common/performance';
import { StopWatch } from 'vs/base/common/stopwatch';
import { generateUuid } from 'vs/base/common/uuid';
import { IIPCLogger } from 'vs/base/parts/ipc/common/ipc';
import { Client, ConnectionHealth, ISocket, PersistentProtocol, ProtocolConstants, SocketCloseEventType } from 'vs/base/parts/ipc/common/ipc.net';
import { Client, ISocket, PersistentProtocol, SocketCloseEventType } from 'vs/base/parts/ipc/common/ipc.net';
import { ILogService } from 'vs/platform/log/common/log';
import { RemoteAgentConnectionContext } from 'vs/platform/remote/common/remoteAgentEnvironment';
import { RemoteAuthorityResolverError, RemoteConnection } from 'vs/platform/remote/common/remoteAuthorityResolver';
Expand Down Expand Up @@ -250,7 +250,7 @@ async function connectToRemoteExtensionHostAgent<T extends RemoteConnection>(opt
protocol = options.reconnectionProtocol;
ownsProtocol = false;
} else {
protocol = new PersistentProtocol({ socket, measureRoundTripTime: true });
protocol = new PersistentProtocol({ socket });
ownsProtocol = true;
}

Expand Down Expand Up @@ -482,8 +482,7 @@ export const enum PersistentConnectionEventType {
ReconnectionWait,
ReconnectionRunning,
ReconnectionPermanentFailure,
ConnectionGain,
ConnectionHealthChanged
ConnectionGain
}
export class ConnectionLostEvent {
public readonly type = PersistentConnectionEventType.ConnectionLost;
Expand Down Expand Up @@ -521,13 +520,6 @@ export class ConnectionGainEvent {
public readonly attempt: number
) { }
}
export class ConnectionHealthChangedEvent {
public readonly type = PersistentConnectionEventType.ConnectionHealthChanged;
constructor(
public readonly reconnectionToken: string,
public readonly connectionHealth: ConnectionHealth
) { }
}
export class ReconnectionPermanentFailureEvent {
public readonly type = PersistentConnectionEventType.ReconnectionPermanentFailure;
constructor(
Expand All @@ -537,7 +529,7 @@ export class ReconnectionPermanentFailureEvent {
public readonly handled: boolean
) { }
}
export type PersistentConnectionEvent = ConnectionGainEvent | ConnectionHealthChangedEvent | ConnectionLostEvent | ReconnectionWaitEvent | ReconnectionRunningEvent | ReconnectionPermanentFailureEvent;
export type PersistentConnectionEvent = ConnectionGainEvent | ConnectionLostEvent | ReconnectionWaitEvent | ReconnectionRunningEvent | ReconnectionPermanentFailureEvent;

export abstract class PersistentConnection extends Disposable {

Expand Down Expand Up @@ -607,13 +599,6 @@ export abstract class PersistentConnection extends Disposable {
this._options.logService.info(`${logPrefix} received socket timeout event (unacknowledgedMsgCount: ${e.unacknowledgedMsgCount}, timeSinceOldestUnacknowledgedMsg: ${e.timeSinceOldestUnacknowledgedMsg}, timeSinceLastReceivedSomeData: ${e.timeSinceLastReceivedSomeData}).`);
this._beginReconnecting();
}));
this._register(protocol.onHighRoundTripTime((e) => {
const logPrefix = _commonLogPrefix(this._connectionType, this.reconnectionToken);
this._options.logService.info(`${logPrefix} high roundtrip time: ${e.roundTripTime}ms (${e.recentHighRoundTripCount} of ${ProtocolConstants.LatencySampleCount} recent samples)`);
}));
this._register(protocol.onDidChangeConnectionHealth((connectionHealth) => {
this._onDidStateChange.fire(new ConnectionHealthChangedEvent(this.reconnectionToken, connectionHealth));
}));

PersistentConnection._instances.push(this);
this._register(toDisposable(() => {
Expand Down
21 changes: 0 additions & 21 deletions src/vs/workbench/contrib/remote/browser/remote.ts
Expand Up @@ -53,7 +53,6 @@ import { ILogService } from 'vs/platform/log/common/log';
import { ITimerService } from 'vs/workbench/services/timer/browser/timerService';
import { getRemoteName } from 'vs/platform/remote/common/remoteHosts';
import { IActionViewItem } from 'vs/base/browser/ui/actionbar/actionbar';
import { connectionHealthToString } from 'vs/base/parts/ipc/common/ipc.net';
import { getVirtualWorkspaceLocation } from 'vs/platform/workspace/common/virtualWorkspace';
import { IJSONSchema } from 'vs/base/common/jsonSchema';
import { IWalkthroughsService } from 'vs/workbench/contrib/welcomeGettingStarted/browser/gettingStartedService';
Expand Down Expand Up @@ -1090,26 +1089,6 @@ export class RemoteAgentConnectionStatusListener extends Disposable implements I

hideProgress();
break;

case PersistentConnectionEventType.ConnectionHealthChanged:
type RemoteConnectionHealthClassification = {
owner: 'alexdima';
comment: 'The remote connection health has changed (round trip time)';
remoteName: { classification: 'SystemMetaData'; purpose: 'PerformanceAndHealth'; comment: 'The name of the resolver.' };
reconnectionToken: { classification: 'SystemMetaData'; purpose: 'PerformanceAndHealth'; comment: 'The identifier of the connection.' };
connectionHealth: { classification: 'SystemMetaData'; purpose: 'PerformanceAndHealth'; comment: 'The health of the connection: good or poor.' };
};
type RemoteConnectionHealthEvent = {
remoteName: string | undefined;
reconnectionToken: string;
connectionHealth: 'good' | 'poor';
};
telemetryService.publicLog2<RemoteConnectionHealthEvent, RemoteConnectionHealthClassification>('remoteConnectionHealth', {
remoteName: getRemoteName(environmentService.remoteAuthority),
reconnectionToken: e.reconnectionToken,
connectionHealth: connectionHealthToString(e.connectionHealth)
});
break;
}
});
}
Expand Down