Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 38 additions & 10 deletions src/vs/base/parts/ipc/common/ipc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ export class BufferReader implements IReader {
}
}

export class BufferWriter implements IWriter {
export class BufferWriter implements IWriter, IDisposable {

private buffers: VSBuffer[] = [];

Expand All @@ -232,6 +232,11 @@ export class BufferWriter implements IWriter {
write(buffer: VSBuffer): void {
this.buffers.push(buffer);
}

dispose(): void {
// Release the buffers so a thrown serialization error's stack can't pin them.
this.buffers.length = 0;
}
}

enum DataType {
Expand Down Expand Up @@ -367,9 +372,13 @@ export class ChannelServer<TContext = string> implements IChannelServer<TContext

private send(header: unknown, body: any = undefined): number {
const writer = new BufferWriter();
serialize(writer, header);
serialize(writer, body);
return this.sendBuffer(writer.buffer);
try {
serialize(writer, header);
serialize(writer, body);
return this.sendBuffer(writer.buffer);
} finally {
writer.dispose();
}
}

private sendBuffer(message: VSBuffer): number {
Expand Down Expand Up @@ -609,7 +618,18 @@ export class ChannelClient implements IChannelClient, IDisposable {
};

this.handlers.set(id, handler);
this.sendRequest(request);

try {
this.sendRequest(request);
} catch (err) {
// `sendRequest` can throw synchronously while serializing the
// request (e.g. an oversized argument). The handler was just
// registered but no request went out and it's only removed on a
// response, so without this it would leak (along with the rejected
// promise and error it retains). Clean up and reject.
this.handlers.delete(id);
e(err);
}
};

let uninitializedPromise: CancelablePromise<void> | null = null;
Expand Down Expand Up @@ -712,9 +732,13 @@ export class ChannelClient implements IChannelClient, IDisposable {

private send(header: unknown, body: any = undefined): number {
const writer = new BufferWriter();
serialize(writer, header);
serialize(writer, body);
return this.sendBuffer(writer.buffer);
try {
serialize(writer, header);
serialize(writer, body);
return this.sendBuffer(writer.buffer);
} finally {
writer.dispose();
}
}

private sendBuffer(message: VSBuffer): number {
Expand Down Expand Up @@ -996,8 +1020,12 @@ export class IPCClient<TContext = string> implements IChannelClient, IChannelSer

constructor(protocol: IMessagePassingProtocol, ctx: TContext, ipcLogger: IIPCLogger | null = null) {
const writer = new BufferWriter();
serialize(writer, ctx);
protocol.send(writer.buffer);
try {
serialize(writer, ctx);
protocol.send(writer.buffer);
} finally {
writer.dispose();
}

this.channelClient = new ChannelClient(protocol, ipcLogger);
this.channelServer = new ChannelServer(protocol, ctx, ipcLogger);
Expand Down
49 changes: 48 additions & 1 deletion src/vs/base/parts/ipc/test/common/ipc.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { Emitter, Event } from '../../../../common/event.js';
import { DisposableStore } from '../../../../common/lifecycle.js';
import { isEqual } from '../../../../common/resources.js';
import { URI } from '../../../../common/uri.js';
import { BufferReader, BufferWriter, ClientConnectionEvent, deserialize, IChannel, IMessagePassingProtocol, IPCClient, IPCServer, IServerChannel, ProxyChannel, serialize } from '../../common/ipc.js';
import { BufferReader, BufferWriter, ChannelClient, ChannelServer, ClientConnectionEvent, deserialize, IChannel, IMessagePassingProtocol, IPCClient, IPCServer, IServerChannel, ProxyChannel, serialize } from '../../common/ipc.js';
import { ensureNoDisposablesAreLeakedInTestSuite } from '../../../../test/common/utils.js';

class QueueProtocol implements IMessagePassingProtocol {
Expand Down Expand Up @@ -340,6 +340,53 @@ suite('Base IPC', function () {
serialize(writer, input);
assert.deepStrictEqual(deserialize(new BufferReader(writer.buffer)), input);
});

test('BufferWriter releases its buffers on dispose', () => {
const writer = new BufferWriter();
serialize(writer, ['a', 'b', 'c']);
assert.ok(writer.buffer.byteLength > 0);

writer.dispose();

// After dispose the writer no longer retains the serialized buffers, so
// `buffer` is empty. This guards against a thrown error's captured stack
// pinning large intermediate buffers (see ChannelClient/ChannelServer.send).
assert.strictEqual(writer.buffer.byteLength, 0);
});

test('request rejects (and cleans up) when serialization throws on the deferred path', async function () {
// Reproduces the leak where a synchronous serialization failure left a
// dangling entry in `ChannelClient.handlers` (and, on the uninitialized
// path, a permanently pending promise). We make a call *before* the
// client is initialized so the request is deferred until init; when it
// finally serializes, a circular argument makes `JSON.stringify` throw.
const clientIncoming = store.add(new Emitter<VSBuffer>());
const clientProtocol: IMessagePassingProtocol = {
onMessage: clientIncoming.event,
send: () => { /* client outbound is irrelevant to this test */ }
};
const serverOutbox: VSBuffer[] = [];
const serverProtocol: IMessagePassingProtocol = {
onMessage: Event.None,
send: buffer => serverOutbox.push(buffer)
};

const channelClient = store.add(new ChannelClient(clientProtocol));
// Constructing the server emits an Initialize message into its outbox.
store.add(new ChannelServer(serverProtocol, 'ctx'));

// Issue the call while the client is still uninitialized: it is queued
// behind `whenInitialized()` rather than serialized immediately.
const circular: Record<string, unknown> = {};
circular.self = circular;
const resultPromise = channelClient.getChannel('testchannel').call('cmd', circular);

// Deliver the server's Initialize so the deferred request runs and throws.
assert.strictEqual(serverOutbox.length, 1);
clientIncoming.fire(serverOutbox[0]);

await assert.rejects(resultPromise);
});
});

suite('one to one (proxy)', function () {
Expand Down
68 changes: 58 additions & 10 deletions src/vs/platform/agentHost/common/ahpJsonlLogger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@ import { ILogService } from '../../log/common/log.js';

export type AhpLogDirection = 'c2s' | 's2c';

interface IAhpLogMeta {
readonly ts: string;
readonly dir: AhpLogDirection;
readonly connectionId: string;
readonly transport: string;
readonly byteLength?: number;
/** Set when oversized string values in the entry were elided (see {@link stringifyAhpLogEntryTruncated}). */
truncated?: boolean;
}

export interface IAhpJsonlLoggerOptions {
readonly logsHome: URI;
readonly connectionId: string;
Expand All @@ -30,6 +40,19 @@ const DEFAULT_MAX_FILES = 5;
// and keeping per-write allocations modest.
const MAX_BATCH_BYTES = 1024 * 1024;

// A single AHP protocol message can be enormous (e.g. a `resourceRead` carrying
// a base64-encoded file, or an `action` carrying a full session snapshot). We
// don't want to write hundreds of MB on a single JSONL line — it bloats the log
// directory and, more importantly, building/holding that line creates exactly
// the GC pressure these logs are meant to help diagnose. When a serialized
// entry exceeds this size we re-serialize it with oversized string values
// elided so the line stays well-formed JSONL.
const MAX_LOG_LINE_LENGTH = 1024 * 1024;
// When trimming an oversized entry, individual string values are capped to this
// length. Generous enough to keep messages useful for debugging.
const MAX_LOGGED_STRING_LENGTH = 16 * 1024;


export class AhpJsonlLogger extends Disposable {

private readonly _directory: URI;
Expand Down Expand Up @@ -64,17 +87,26 @@ export class AhpJsonlLogger extends Disposable {
}

log(message: object, dir: AhpLogDirection, byteLength?: number): void {
const entry = {
...message,
_ahpLog: {
ts: new Date().toISOString(),
dir,
connectionId: this._options.connectionId,
transport: this._options.transport,
...(typeof byteLength === 'number' ? { byteLength } : {}),
}
const meta: IAhpLogMeta = {
ts: new Date().toISOString(),
dir,
connectionId: this._options.connectionId,
transport: this._options.transport,
...(typeof byteLength === 'number' ? { byteLength } : {}),
};
const line = `${stringifyAhpLogEntry(entry)}\n`;
const entry = { ...message, _ahpLog: meta };
// Fast path: serialize once. The vast majority of messages are small, so
// we only pay a single stringify and use its length to decide whether the
// rare oversized-message path below is needed.
let body = stringifyAhpLogEntry(entry);
if (body.length > MAX_LOG_LINE_LENGTH) {
// Slow path (rare): a single message carried very large payloads. Walk
// the object via a replacer that elides long string values, keeping the
// line valid JSONL instead of writing/holding the full multi-MB payload.
meta.truncated = true;
body = stringifyAhpLogEntryTruncated(entry, MAX_LOGGED_STRING_LENGTH);
}
const line = `${body}\n`;
this._pending.push(VSBuffer.fromString(line));
this._scheduleDrain();
Comment thread
roblourens marked this conversation as resolved.
}
Expand Down Expand Up @@ -184,6 +216,22 @@ export function stringifyAhpLogEntry(value: unknown): string {
return JSON.stringify(value, _ahpReplacer);
}

/**
* Like {@link stringifyAhpLogEntry} but additionally elides any string value
* longer than {@param maxStringLength}, replacing the overflow with a short
* marker. The result is still well-formed JSON, so the log remains valid JSONL.
* Only used for the rare oversized entry, so the extra per-value work is fine.
*/
function stringifyAhpLogEntryTruncated(value: unknown, maxStringLength: number): string {
return JSON.stringify(value, function (this: unknown, key: string, val: unknown): unknown {
const revived = _ahpReplacer.call(this, key, val);
if (typeof revived === 'string' && revived.length > maxStringLength) {
return `${revived.slice(0, maxStringLength)}…[${revived.length - maxStringLength} more chars elided]`;
}
return revived;
});
}

/**
* JSON.stringify replacer that converts URI values to their canonical string
* form. `URI.prototype.toJSON()` runs before this replacer is invoked and
Expand Down
31 changes: 31 additions & 0 deletions src/vs/platform/agentHost/test/common/ahpJsonlLogger.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,37 @@ suite('AhpJsonlLogger', () => {
assert.deepStrictEqual(ids, [1, 2, 3, 4]);
});

test('elides oversized string payloads while keeping the line valid JSONL', async () => {
const fileService = store.add(new FileService(new NullLogService()));
store.add(fileService.registerProvider('file', store.add(new InMemoryFileSystemProvider())));

const logger = store.add(new AhpJsonlLogger(
{ logsHome: URI.file('/logs'), connectionId: 'conn:1', transport: 'websocket' },
fileService,
new NullLogService(),
));

// A normal small message is written verbatim and is not marked truncated.
logger.log({ jsonrpc: '2.0', id: 1, method: 'ping' }, 'c2s');
// A message carrying a multi-MB string (e.g. a base64 resourceRead) is trimmed.
const huge = 'x'.repeat(4 * 1024 * 1024);
logger.log({ jsonrpc: '2.0', id: 2, result: { data: huge } }, 's2c');
await logger.flush();

const content = (await fileService.readFile(logger.resource)).value.toString();
const lines = content.split('\n').filter(Boolean);
// Both lines must be valid JSON (the trimmed line stays well-formed JSONL).
const parsed = lines.map(line => JSON.parse(line));

assert.strictEqual(parsed[0]._ahpLog.truncated, undefined);
assert.strictEqual(parsed[1]._ahpLog.truncated, true);
// The huge string was elided rather than written in full.
assert.ok(parsed[1].result.data.length < huge.length);
assert.ok(parsed[1].result.data.includes('chars elided'));
// The whole serialized line stays modest in size.
assert.ok(lines[1].length < 1024 * 1024);
});

suite('stringifyAhpLogEntry', () => {

test('serialises a top-level URI as its string form', () => {
Expand Down
Loading