Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tunnels: implement default port forwarding for managed RA's #191099

Merged
merged 5 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
119 changes: 119 additions & 0 deletions src/vs/platform/remote/common/managedSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
*--------------------------------------------------------------------------------------------*/

import { VSBuffer, encodeBase64 } from 'vs/base/common/buffer';
import { Emitter, Event, PauseableEmitter } from 'vs/base/common/event';
import { Disposable, DisposableStore } from 'vs/base/common/lifecycle';
import { ISocket, SocketCloseEvent, SocketDiagnostics, SocketDiagnosticsEventType } from 'vs/base/parts/ipc/common/ipc.net';

export const makeRawSocketHeaders = (path: string, query: string, deubgLabel: string) => {
// https://tools.ietf.org/html/rfc6455#section-4
Expand All @@ -24,3 +27,119 @@ export const makeRawSocketHeaders = (path: string, query: string, deubgLabel: st
};

export const socketRawEndHeaderSequence = VSBuffer.fromString('\r\n\r\n');

export interface RemoteSocketHalf {
onData: Emitter<VSBuffer>;
onClose: Emitter<SocketCloseEvent>;
onEnd: Emitter<void>;
}

/** Should be called immediately after making a ManagedSocket to make it ready for data flow. */
export async function connectManagedSocket<T extends ManagedSocket>(
socket: T,
path: string, query: string, debugLabel: string,
half: RemoteSocketHalf
): Promise<T> {
socket.write(VSBuffer.fromString(makeRawSocketHeaders(path, query, debugLabel)));

const d = new DisposableStore();
try {
return await new Promise<T>((resolve, reject) => {
let dataSoFar: VSBuffer | undefined;
d.add(socket.onData(d_1 => {
if (!dataSoFar) {
dataSoFar = d_1;
} else {
dataSoFar = VSBuffer.concat([dataSoFar, d_1], dataSoFar.byteLength + d_1.byteLength);
}

const index = dataSoFar.indexOf(socketRawEndHeaderSequence);
if (index === -1) {
return;
}

resolve(socket);
// pause data events until the socket consumer is hooked up. We may
// immediately emit remaining data, but if not there may still be
// microtasks queued which would fire data into the abyss.
socket.pauseData();

const rest = dataSoFar.slice(index + socketRawEndHeaderSequence.byteLength);
if (rest.byteLength) {
half.onData.fire(rest);
}
}));

d.add(socket.onClose(err => reject(err ?? new Error('socket closed'))));
d.add(socket.onEnd(() => reject(new Error('socket ended'))));
});
} catch (e) {
socket.dispose();
throw e;
} finally {
d.dispose();
}
}

export abstract class ManagedSocket extends Disposable implements ISocket {
private readonly pausableDataEmitter = this._register(new PauseableEmitter<VSBuffer>());

public onData: Event<VSBuffer> = (...args) => {
if (this.pausableDataEmitter.isPaused) {
queueMicrotask(() => this.pausableDataEmitter.resume());
}
return this.pausableDataEmitter.event(...args);
};
public onClose: Event<SocketCloseEvent>;
public onEnd: Event<void>;

private readonly didDisposeEmitter = this._register(new Emitter<void>());
public onDidDispose = this.didDisposeEmitter.event;

private ended = false;

protected constructor(
private readonly debugLabel: string,
half: RemoteSocketHalf,
) {
super();

this._register(half.onData);
this._register(half.onData.event(data => this.pausableDataEmitter.fire(data)));

this.onClose = this._register(half.onClose).event;
this.onEnd = this._register(half.onEnd).event;
}

/** Pauses data events until a new listener comes in onData() */
public pauseData() {
this.pausableDataEmitter.pause();
}

/** Flushes data to the socket. */
public drain(): Promise<void> {
return Promise.resolve();
}

/** Ends the remote socket. */
public end(): void {
this.ended = true;
this.closeRemote();
}

public abstract write(buffer: VSBuffer): void;
protected abstract closeRemote(): void;

traceSocketEvent(type: SocketDiagnosticsEventType, data?: any): void {
SocketDiagnostics.traceSocketEvent(this, this.debugLabel, type, data);
}

override dispose(): void {
if (!this.ended) {
this.closeRemote();
}

this.didDisposeEmitter.fire();
super.dispose();
}
}
50 changes: 36 additions & 14 deletions src/vs/platform/tunnel/node/tunnelService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ import { NodeSocket } from 'vs/base/parts/ipc/node/ipc.net';

import { Barrier } from 'vs/base/common/async';
import { Disposable } from 'vs/base/common/lifecycle';
import { OS } from 'vs/base/common/platform';
import { ISocket } from 'vs/base/parts/ipc/common/ipc.net';
import { IConfigurationService } from 'vs/platform/configuration/common/configuration';
import { ILogService } from 'vs/platform/log/common/log';
import { IProductService } from 'vs/platform/product/common/productService';
import { connectRemoteAgentTunnel, IAddressProvider, IConnectionOptions } from 'vs/platform/remote/common/remoteAgentConnection';
import { AbstractTunnelService, isAllInterfaces, ISharedTunnelsService as ISharedTunnelsService, isLocalhost, isPortPrivileged, isTunnelProvider, ITunnelProvider, ITunnelService, RemoteTunnel, TunnelPrivacyId } from 'vs/platform/tunnel/common/tunnel';
import { ISignService } from 'vs/platform/sign/common/sign';
import { OS } from 'vs/base/common/platform';
import { IAddressProvider, IConnectionOptions, connectRemoteAgentTunnel } from 'vs/platform/remote/common/remoteAgentConnection';
import { IRemoteSocketFactoryService } from 'vs/platform/remote/common/remoteSocketFactoryService';
import { ISignService } from 'vs/platform/sign/common/sign';
import { AbstractTunnelService, ISharedTunnelsService, ITunnelProvider, ITunnelService, RemoteTunnel, TunnelPrivacyId, isAllInterfaces, isLocalhost, isPortPrivileged, isTunnelProvider } from 'vs/platform/tunnel/common/tunnel';

async function createRemoteTunnel(options: IConnectionOptions, defaultTunnelHost: string, tunnelRemoteHost: string, tunnelRemotePort: number, tunnelLocalPort?: number): Promise<RemoteTunnel> {
let readyTunnel: NodeRemoteTunnel | undefined;
Expand All @@ -32,7 +33,7 @@ async function createRemoteTunnel(options: IConnectionOptions, defaultTunnelHost
return readyTunnel!;
}

class NodeRemoteTunnel extends Disposable implements RemoteTunnel {
export class NodeRemoteTunnel extends Disposable implements RemoteTunnel {

public readonly tunnelRemotePort: number;
public tunnelLocalPort!: number;
Expand Down Expand Up @@ -113,7 +114,7 @@ class NodeRemoteTunnel extends Disposable implements RemoteTunnel {

const tunnelRemoteHost = (isLocalhost(this.tunnelRemoteHost) || isAllInterfaces(this.tunnelRemoteHost)) ? 'localhost' : this.tunnelRemoteHost;
const protocol = await connectRemoteAgentTunnel(this._options, tunnelRemoteHost, this.tunnelRemotePort);
const remoteSocket = (<NodeSocket>protocol.getSocket()).socket;
const remoteSocket = protocol.getSocket();
const dataChunk = protocol.readEntireBuffer();
protocol.dispose();

Expand All @@ -132,17 +133,19 @@ class NodeRemoteTunnel extends Disposable implements RemoteTunnel {
if (localSocket.localAddress) {
this._socketsDispose.delete(localSocket.localAddress);
}
remoteSocket.destroy();
if (remoteSocket instanceof NodeSocket) {
remoteSocket.socket.destroy();
} else {
remoteSocket.end();
}
});

remoteSocket.on('end', () => localSocket.end());
remoteSocket.on('close', () => localSocket.end());
remoteSocket.on('error', () => {
localSocket.destroy();
});
if (remoteSocket instanceof NodeSocket) {
this._mirrorNodeSocket(localSocket, remoteSocket);
} else {
this._mirrorGenericSocket(localSocket, remoteSocket);
}

localSocket.pipe(remoteSocket);
remoteSocket.pipe(localSocket);
if (localSocket.localAddress) {
this._socketsDispose.set(localSocket.localAddress, () => {
// Need to end instead of unpipe, otherwise whatever is connected locally could end up "stuck" with whatever state it had until manually exited.
Expand All @@ -151,6 +154,25 @@ class NodeRemoteTunnel extends Disposable implements RemoteTunnel {
});
}
}

private _mirrorGenericSocket(localSocket: net.Socket, remoteSocket: ISocket) {
remoteSocket.onClose(() => localSocket.destroy());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inverse question: why is this one destroy?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

destroy closes the local half of the socket without waiting for the remote half. When close happens, and end would have already happened to cause us to start closing the remote socket gracefully, so do it forcefully at this point

remoteSocket.onEnd(() => localSocket.end());
remoteSocket.onData(d => localSocket.write(d.buffer));
localSocket.resume();
}

private _mirrorNodeSocket(localSocket: net.Socket, remoteNodeSocket: NodeSocket) {
const remoteSocket = remoteNodeSocket.socket;
remoteSocket.on('end', () => localSocket.end());
remoteSocket.on('close', () => localSocket.end());
remoteSocket.on('error', () => {
localSocket.destroy();
});

remoteSocket.pipe(localSocket);
localSocket.pipe(remoteSocket);
}
}

export class BaseTunnelService extends AbstractTunnelService {
Expand Down
118 changes: 18 additions & 100 deletions src/vs/workbench/api/browser/mainThreadManagedSockets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/

import { MainContext, ExtHostContext, MainThreadManagedSocketsShape, ExtHostManagedSocketsShape } from 'vs/workbench/api/common/extHost.protocol';
import { extHostNamedCustomer, IExtHostContext } from 'vs/workbench/services/extensions/common/extHostCustomers';
import { Disposable, DisposableStore, IDisposable } from 'vs/base/common/lifecycle';
import { ManagedRemoteConnection, RemoteConnectionType } from 'vs/platform/remote/common/remoteAuthorityResolver';
import { VSBuffer } from 'vs/base/common/buffer';
import { Emitter } from 'vs/base/common/event';
import { Disposable, IDisposable } from 'vs/base/common/lifecycle';
import { ISocket, SocketCloseEventType } from 'vs/base/parts/ipc/common/ipc.net';
import { ManagedSocket, RemoteSocketHalf, connectManagedSocket } from 'vs/platform/remote/common/managedSocket';
import { ManagedRemoteConnection, RemoteConnectionType } from 'vs/platform/remote/common/remoteAuthorityResolver';
import { IRemoteSocketFactoryService, ISocketFactory } from 'vs/platform/remote/common/remoteSocketFactoryService';
import { ISocket, SocketCloseEvent, SocketCloseEventType, SocketDiagnostics, SocketDiagnosticsEventType } from 'vs/base/parts/ipc/common/ipc.net';
import { Emitter, Event, PauseableEmitter } from 'vs/base/common/event';
import { makeRawSocketHeaders, socketRawEndHeaderSequence } from 'vs/platform/remote/common/managedSocket';
import { ExtHostContext, ExtHostManagedSocketsShape, MainContext, MainThreadManagedSocketsShape } from 'vs/workbench/api/common/extHost.protocol';
import { IExtHostContext, extHostNamedCustomer } from 'vs/workbench/services/extensions/common/extHostCustomers';

@extHostNamedCustomer(MainContext.MainThreadManagedSockets)
export class MainThreadManagedSockets extends Disposable implements MainThreadManagedSocketsShape {
Expand Down Expand Up @@ -51,7 +51,7 @@ export class MainThreadManagedSockets extends Disposable implements MainThreadMa
};
that._remoteSockets.set(socketId, half);

ManagedSocket.connect(socketId, that._proxy, path, query, debugLabel, half)
MainThreadManagedSocket.connect(socketId, that._proxy, path, query, debugLabel, half)
.then(
socket => {
socket.onDidDispose(() => that._remoteSockets.delete(socketId));
Expand Down Expand Up @@ -91,117 +91,35 @@ export class MainThreadManagedSockets extends Disposable implements MainThreadMa
}
}

export interface RemoteSocketHalf {
onData: Emitter<VSBuffer>;
onClose: Emitter<SocketCloseEvent>;
onEnd: Emitter<void>;
}

export class ManagedSocket extends Disposable implements ISocket {
export class MainThreadManagedSocket extends ManagedSocket {
public static connect(
socketId: number,
proxy: ExtHostManagedSocketsShape,
path: string, query: string, debugLabel: string,

half: RemoteSocketHalf
): Promise<ManagedSocket> {
const socket = new ManagedSocket(socketId, proxy, debugLabel, half.onClose, half.onData, half.onEnd);

socket.write(VSBuffer.fromString(makeRawSocketHeaders(path, query, debugLabel)));

const d = new DisposableStore();
return new Promise<ManagedSocket>((resolve, reject) => {
let dataSoFar: VSBuffer | undefined;
d.add(socket.onData(d => {
if (!dataSoFar) {
dataSoFar = d;
} else {
dataSoFar = VSBuffer.concat([dataSoFar, d], dataSoFar.byteLength + d.byteLength);
}

const index = dataSoFar.indexOf(socketRawEndHeaderSequence);
if (index === -1) {
return;
}

resolve(socket);
// pause data events until the socket consumer is hooked up. We may
// immediately emit remaining data, but if not there may still be
// microtasks queued which would fire data into the abyss.
socket.pauseData();

const rest = dataSoFar.slice(index + socketRawEndHeaderSequence.byteLength);
if (rest.byteLength) {
half.onData.fire(rest);
}
}));

d.add(socket.onClose(err => reject(err ?? new Error('socket closed'))));
d.add(socket.onEnd(() => reject(new Error('socket ended'))));
}).finally(() => d.dispose());
): Promise<MainThreadManagedSocket> {
const socket = new MainThreadManagedSocket(socketId, proxy, debugLabel, half);
return connectManagedSocket(socket, path, query, debugLabel, half);
}

private readonly pausableDataEmitter = this._register(new PauseableEmitter<VSBuffer>());

public onData: Event<VSBuffer> = (...args) => {
if (this.pausableDataEmitter.isPaused) {
queueMicrotask(() => this.pausableDataEmitter.resume());
}
return this.pausableDataEmitter.event(...args);
};
public onClose: Event<SocketCloseEvent>;
public onEnd: Event<void>;

private readonly didDisposeEmitter = this._register(new Emitter<void>());
public onDidDispose = this.didDisposeEmitter.event;

private ended = false;

private constructor(
private readonly socketId: number,
private readonly proxy: ExtHostManagedSocketsShape,
private readonly debugLabel: string,
onCloseEmitter: Emitter<SocketCloseEvent>,
onDataEmitter: Emitter<VSBuffer>,
onEndEmitter: Emitter<void>,
debugLabel: string,
half: RemoteSocketHalf,
) {
super();

this._register(onDataEmitter);
this._register(onDataEmitter.event(data => this.pausableDataEmitter.fire(data)));

this.onClose = this._register(onCloseEmitter).event;
this.onEnd = this._register(onEndEmitter).event;
super(debugLabel, half);
}

/** Pauses data events until a new listener comes in onData() */
pauseData() {
this.pausableDataEmitter.pause();
}

write(buffer: VSBuffer): void {
public override write(buffer: VSBuffer): void {
this.proxy.$remoteSocketWrite(this.socketId, buffer);
}

end(): void {
this.ended = true;
protected override closeRemote(): void {
this.proxy.$remoteSocketEnd(this.socketId);
}

drain(): Promise<void> {
public override drain(): Promise<void> {
return this.proxy.$remoteSocketDrain(this.socketId);
}

traceSocketEvent(type: SocketDiagnosticsEventType, data?: any): void {
SocketDiagnostics.traceSocketEvent(this, this.debugLabel, type, data);
}

override dispose(): void {
if (!this.ended) {
this.proxy.$remoteSocketEnd(this.socketId);
}

this.didDisposeEmitter.fire();
super.dispose();
}
}
6 changes: 4 additions & 2 deletions src/vs/workbench/api/common/extHostExtensionService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -861,9 +861,11 @@ export abstract class AbstractExtHostExtensionService extends Disposable impleme
performance.mark(`code/extHost/willResolveAuthority/${authorityPrefix}`);
result = await resolver.resolve(remoteAuthority, { resolveAttempt, execServer });
performance.mark(`code/extHost/didResolveAuthorityOK/${authorityPrefix}`);
// todo@connor4312: we probably need to chain tunnels too, how does this work with 'public' tunnels?
logInfo(`setting tunnel factory...`);
this._register(await this._extHostTunnelService.setTunnelFactory(resolver));
this._register(await this._extHostTunnelService.setTunnelFactory(
resolver,
ExtHostManagedResolvedAuthority.isManagedResolvedAuthority(result) ? result : undefined
));
} else {
logInfo(`invoking resolveExecServer() for ${remoteAuthority}`);
performance.mark(`code/extHost/willResolveExecServer/${authorityPrefix}`);
Expand Down