Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(NODE-5771): improve new connection #3948

Merged
merged 12 commits into from
Dec 19, 2023
201 changes: 84 additions & 117 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { on } from 'stream';
import { type Readable, Transform, type TransformCallback } from 'stream';
import { clearTimeout, setTimeout } from 'timers';
import { promisify } from 'util';

Expand Down Expand Up @@ -61,6 +61,7 @@ import type { ClientMetadata } from './handshake/client_metadata';
import { MessageStream, type OperationDescription } from './message_stream';
import { StreamDescription, type StreamDescriptionOptions } from './stream_description';
import { decompressResponse } from './wire_protocol/compression';
import { onData } from './wire_protocol/on_data';
import { getReadPreference, isSharded } from './wire_protocol/shared';

/** @internal */
Expand Down Expand Up @@ -786,17 +787,19 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
/** @internal */
authContext?: AuthContext;

/**@internal */
delayedTimeoutId: NodeJS.Timeout | null = null;
/** @internal */
[kDescription]: StreamDescription;
/** @internal */
[kGeneration]: number;
/** @internal */
[kLastUseTime]: number;
/** @internal */
socket: Stream;
controller: AbortController;

private socket: Stream;
private controller: AbortController;
private messageStream: Readable;
private socketWrite: (buffer: Uint8Array) => Promise<void>;

/** @internal */
[kHello]: Document | null;
/** @internal */
Expand Down Expand Up @@ -836,9 +839,18 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {

this.socket = stream;
this.controller = new AbortController();
this.socket.on('error', this.onError.bind(this));

this.messageStream = this.socket
.on('error', this.onError.bind(this))
.pipe(new SizedMessageTransform({ connection: this }))
.on('error', this.onError.bind(this));
durran marked this conversation as resolved.
Show resolved Hide resolved
this.socket.on('close', this.onClose.bind(this));
this.socket.on('timeout', this.onTimeout.bind(this));

const socketWrite = promisify(this.socket.write.bind(this.socket));
this.socketWrite = async buffer => {
return abortable(socketWrite(buffer), { signal: this.controller.signal });
};
}

async commandAsync(...args: Parameters<typeof this.command>) {
Expand Down Expand Up @@ -1039,23 +1051,19 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
}

try {
await writeCommand(this, message, {
await this.writeCommand(message, {
agreedCompressor: this.description.compressor ?? 'none',
zlibCompressionLevel: this.description.zlibCompressionLevel,
signal: this.controller.signal
zlibCompressionLevel: this.description.zlibCompressionLevel
});

// TODO(NODE-5770): Replace controller to avoid boundless 'abort' listeners
this.controller = new AbortController();

if (options.noResponse) {
yield { ok: 1 };
return;
}

this.controller.signal.throwIfAborted();

for await (const response of readMany(this, { signal: this.controller.signal })) {
for await (const response of this.readMany()) {
this.socket.setTimeout(0);
response.parse(options);

Expand All @@ -1073,9 +1081,6 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
}
}

// TODO(NODE-5770): Replace controller to avoid boundless 'abort' listeners
this.controller = new AbortController();

yield document;
this.controller.signal.throwIfAborted();

Expand Down Expand Up @@ -1181,121 +1186,83 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
};
exhaustLoop().catch(replyListener);
}
}

const kDefaultMaxBsonMessageSize = 1024 * 1024 * 16 * 4;

/**
* @internal
*
* This helper reads chucks of data out of a socket and buffers them until it has received a
* full wire protocol message.
*
* By itself, produces an infinite async generator of wire protocol messages and consumers must end
* the stream by calling `return` on the generator.
*
* Note that `for-await` loops call `return` automatically when the loop is exited.
*/
export async function* readWireProtocolMessages(
connection: ModernConnection,
{ signal }: { signal?: AbortSignal } = {}
): AsyncGenerator<Buffer> {
const bufferPool = new BufferPool();
const maxBsonMessageSize = connection.hello?.maxBsonMessageSize ?? kDefaultMaxBsonMessageSize;
for await (const [chunk] of on(connection.socket, 'data', { signal })) {
if (connection.delayedTimeoutId) {
clearTimeout(connection.delayedTimeoutId);
connection.delayedTimeoutId = null;
}

bufferPool.append(chunk);
const sizeOfMessage = bufferPool.getInt32();
/**
durran marked this conversation as resolved.
Show resolved Hide resolved
* @internal
*
* Writes an OP_MSG or OP_QUERY request to the socket, optionally compressing the command. This method
* waits until the socket's buffer has emptied (the Nodejs socket `drain` event has fired).
*/
async writeCommand(
command: WriteProtocolMessageType,
options: Partial<Pick<OperationDescription, 'agreedCompressor' | 'zlibCompressionLevel'>>
): Promise<void> {
const finalCommand =
options.agreedCompressor === 'none' || !OpCompressedRequest.canCompress(command)
? command
: new OpCompressedRequest(command, {
agreedCompressor: options.agreedCompressor ?? 'none',
zlibCompressionLevel: options.zlibCompressionLevel ?? 0
});

if (sizeOfMessage == null) {
continue;
}
const buffer = Buffer.concat(await finalCommand.toBin());

if (sizeOfMessage < 0) {
throw new MongoParseError(`Invalid message size: ${sizeOfMessage}`);
}
return this.socketWrite(buffer);
}

if (sizeOfMessage > maxBsonMessageSize) {
throw new MongoParseError(
`Invalid message size: ${sizeOfMessage}, max allowed: ${maxBsonMessageSize}`
);
}
/**
* @internal
*
* Returns an async generator that yields full wire protocol messages from the underlying socket. This function
* yields messages until `moreToCome` is false or not present in a response, or the caller cancels the request
* by calling `return` on the generator.
*
* Note that `for-await` loops call `return` automatically when the loop is exited.
*/
async *readMany(): AsyncGenerator<OpMsgResponse | OpQueryResponse> {
for await (const message of onData(this.messageStream, { signal: this.controller.signal })) {
durran marked this conversation as resolved.
Show resolved Hide resolved
const response = await decompressResponse(message);
yield response;

if (sizeOfMessage > bufferPool.length) {
continue;
if (!response.moreToCome) {
return;
}
}

yield bufferPool.read(sizeOfMessage);
}
}

/**
* @internal
*
* Writes an OP_MSG or OP_QUERY request to the socket, optionally compressing the command. This method
* waits until the socket's buffer has emptied (the Nodejs socket `drain` event has fired).
*/
export async function writeCommand(
connection: ModernConnection,
command: WriteProtocolMessageType,
options: Partial<Pick<OperationDescription, 'agreedCompressor' | 'zlibCompressionLevel'>> & {
signal?: AbortSignal;
}
): Promise<void> {
const finalCommand =
options.agreedCompressor === 'none' || !OpCompressedRequest.canCompress(command)
? command
: new OpCompressedRequest(command, {
agreedCompressor: options.agreedCompressor ?? 'none',
zlibCompressionLevel: options.zlibCompressionLevel ?? 0
});
/** @internal */
durran marked this conversation as resolved.
Show resolved Hide resolved
export class SizedMessageTransform extends Transform {
bufferPool: BufferPool;
connection: ModernConnection;

constructor({ connection }: { connection: ModernConnection }) {
super({ objectMode: false });
this.bufferPool = new BufferPool();
this.connection = connection;
}
override _transform(chunk: Buffer, encoding: unknown, callback: TransformCallback): void {
if (this.connection.delayedTimeoutId != null) {
clearTimeout(this.connection.delayedTimeoutId);
this.connection.delayedTimeoutId = null;
}

const buffer = Buffer.concat(await finalCommand.toBin());
this.bufferPool.append(chunk);
const sizeOfMessage = this.bufferPool.getInt32();

const socketWriteFn = promisify(connection.socket.write.bind(connection.socket));
if (sizeOfMessage == null) {
return callback();
}

return abortable(socketWriteFn(buffer), options);
}
if (sizeOfMessage < 0) {
return callback(new MongoParseError(`Invalid message size: ${sizeOfMessage}, too small`));
}

/**
* @internal
*
* Returns an async generator that yields full wire protocol messages from the underlying socket. This function
* yields messages until `moreToCome` is false or not present in a response, or the caller cancels the request
* by calling `return` on the generator.
*
* Note that `for-await` loops call `return` automatically when the loop is exited.
*/
export async function* readMany(
connection: ModernConnection,
options: { signal?: AbortSignal } = {}
): AsyncGenerator<OpMsgResponse | OpQueryResponse> {
for await (const message of readWireProtocolMessages(connection, options)) {
const response = await decompressResponse(message);
yield response;

if (!response.moreToCome) {
return;
if (sizeOfMessage > this.bufferPool.length) {
return callback();
}
}
}

/**
* @internal
*
* Reads a single wire protocol message out of a connection.
*/
export async function read(
connection: ModernConnection,
options: { signal?: AbortSignal } = {}
): Promise<OpMsgResponse | OpQueryResponse> {
for await (const value of readMany(connection, options)) {
return value;
const message = this.bufferPool.read(sizeOfMessage);
return callback(null, message);
}

throw new MongoRuntimeError('unable to read message off of connection');
}
Loading