diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 731d88b96a8..510e4e82093 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -1,4 +1,4 @@ -import { on } from 'stream'; +import { type Readable, Transform, type TransformCallback } from 'stream'; import { clearTimeout, setTimeout } from 'timers'; import { promisify } from 'util'; @@ -61,6 +61,7 @@ import type { ClientMetadata } from './handshake/client_metadata'; import { MessageStream, type OperationDescription } from './message_stream'; import { StreamDescription, type StreamDescriptionOptions } from './stream_description'; import { decompressResponse } from './wire_protocol/compression'; +import { onData } from './wire_protocol/on_data'; import { getReadPreference, isSharded } from './wire_protocol/shared'; /** @internal */ @@ -807,7 +808,6 @@ export class ModernConnection extends TypedEventEmitter { /** @internal */ authContext?: AuthContext; - /**@internal */ delayedTimeoutId: NodeJS.Timeout | null = null; /** @internal */ [kDescription]: StreamDescription; @@ -815,9 +815,12 @@ export class ModernConnection extends TypedEventEmitter { [kGeneration]: number; /** @internal */ [kLastUseTime]: number; - /** @internal */ - socket: Stream; - controller: AbortController; + + private socket: Stream; + private controller: AbortController; + private messageStream: Readable; + private socketWrite: (buffer: Uint8Array) => Promise; + /** @internal */ [kHello]: Document | null; /** @internal */ @@ -857,9 +860,18 @@ export class ModernConnection extends TypedEventEmitter { this.socket = stream; this.controller = new AbortController(); - this.socket.on('error', this.onError.bind(this)); + + this.messageStream = this.socket + .on('error', this.onError.bind(this)) + .pipe(new SizedMessageTransform({ connection: this })) + .on('error', this.onError.bind(this)); this.socket.on('close', this.onClose.bind(this)); this.socket.on('timeout', this.onTimeout.bind(this)); + + const socketWrite = promisify(this.socket.write.bind(this.socket)); + this.socketWrite = async buffer => { + return abortable(socketWrite(buffer), { signal: this.controller.signal }); + }; } async commandAsync(...args: Parameters) { @@ -1060,15 +1072,11 @@ export class ModernConnection extends TypedEventEmitter { } try { - await writeCommand(this, message, { + await this.writeCommand(message, { agreedCompressor: this.description.compressor ?? 'none', - zlibCompressionLevel: this.description.zlibCompressionLevel, - signal: this.controller.signal + zlibCompressionLevel: this.description.zlibCompressionLevel }); - // TODO(NODE-5770): Replace controller to avoid boundless 'abort' listeners - this.controller = new AbortController(); - if (options.noResponse) { yield { ok: 1 }; return; @@ -1076,7 +1084,7 @@ export class ModernConnection extends TypedEventEmitter { this.controller.signal.throwIfAborted(); - for await (const response of readMany(this, { signal: this.controller.signal })) { + for await (const response of this.readMany()) { this.socket.setTimeout(0); response.parse(options); @@ -1094,9 +1102,6 @@ export class ModernConnection extends TypedEventEmitter { } } - // TODO(NODE-5770): Replace controller to avoid boundless 'abort' listeners - this.controller = new AbortController(); - yield document; this.controller.signal.throwIfAborted(); @@ -1214,121 +1219,83 @@ export class ModernConnection extends TypedEventEmitter { }; exhaustLoop().catch(replyListener); } -} -const kDefaultMaxBsonMessageSize = 1024 * 1024 * 16 * 4; - -/** - * @internal - * - * This helper reads chucks of data out of a socket and buffers them until it has received a - * full wire protocol message. - * - * By itself, produces an infinite async generator of wire protocol messages and consumers must end - * the stream by calling `return` on the generator. - * - * Note that `for-await` loops call `return` automatically when the loop is exited. - */ -export async function* readWireProtocolMessages( - connection: ModernConnection, - { signal }: { signal?: AbortSignal } = {} -): AsyncGenerator { - const bufferPool = new BufferPool(); - const maxBsonMessageSize = connection.hello?.maxBsonMessageSize ?? kDefaultMaxBsonMessageSize; - for await (const [chunk] of on(connection.socket, 'data', { signal })) { - if (connection.delayedTimeoutId) { - clearTimeout(connection.delayedTimeoutId); - connection.delayedTimeoutId = null; - } - - bufferPool.append(chunk); - const sizeOfMessage = bufferPool.getInt32(); + /** + * @internal + * + * Writes an OP_MSG or OP_QUERY request to the socket, optionally compressing the command. This method + * waits until the socket's buffer has emptied (the Nodejs socket `drain` event has fired). + */ + async writeCommand( + command: WriteProtocolMessageType, + options: Partial> + ): Promise { + const finalCommand = + options.agreedCompressor === 'none' || !OpCompressedRequest.canCompress(command) + ? command + : new OpCompressedRequest(command, { + agreedCompressor: options.agreedCompressor ?? 'none', + zlibCompressionLevel: options.zlibCompressionLevel ?? 0 + }); - if (sizeOfMessage == null) { - continue; - } + const buffer = Buffer.concat(await finalCommand.toBin()); - if (sizeOfMessage < 0) { - throw new MongoParseError(`Invalid message size: ${sizeOfMessage}`); - } + return this.socketWrite(buffer); + } - if (sizeOfMessage > maxBsonMessageSize) { - throw new MongoParseError( - `Invalid message size: ${sizeOfMessage}, max allowed: ${maxBsonMessageSize}` - ); - } + /** + * @internal + * + * Returns an async generator that yields full wire protocol messages from the underlying socket. This function + * yields messages until `moreToCome` is false or not present in a response, or the caller cancels the request + * by calling `return` on the generator. + * + * Note that `for-await` loops call `return` automatically when the loop is exited. + */ + async *readMany(): AsyncGenerator { + for await (const message of onData(this.messageStream, { signal: this.controller.signal })) { + const response = await decompressResponse(message); + yield response; - if (sizeOfMessage > bufferPool.length) { - continue; + if (!response.moreToCome) { + return; + } } - - yield bufferPool.read(sizeOfMessage); } } -/** - * @internal - * - * Writes an OP_MSG or OP_QUERY request to the socket, optionally compressing the command. This method - * waits until the socket's buffer has emptied (the Nodejs socket `drain` event has fired). - */ -export async function writeCommand( - connection: ModernConnection, - command: WriteProtocolMessageType, - options: Partial> & { - signal?: AbortSignal; - } -): Promise { - const finalCommand = - options.agreedCompressor === 'none' || !OpCompressedRequest.canCompress(command) - ? command - : new OpCompressedRequest(command, { - agreedCompressor: options.agreedCompressor ?? 'none', - zlibCompressionLevel: options.zlibCompressionLevel ?? 0 - }); +/** @internal */ +export class SizedMessageTransform extends Transform { + bufferPool: BufferPool; + connection: ModernConnection; + + constructor({ connection }: { connection: ModernConnection }) { + super({ objectMode: false }); + this.bufferPool = new BufferPool(); + this.connection = connection; + } + override _transform(chunk: Buffer, encoding: unknown, callback: TransformCallback): void { + if (this.connection.delayedTimeoutId != null) { + clearTimeout(this.connection.delayedTimeoutId); + this.connection.delayedTimeoutId = null; + } - const buffer = Buffer.concat(await finalCommand.toBin()); + this.bufferPool.append(chunk); + const sizeOfMessage = this.bufferPool.getInt32(); - const socketWriteFn = promisify(connection.socket.write.bind(connection.socket)); + if (sizeOfMessage == null) { + return callback(); + } - return abortable(socketWriteFn(buffer), options); -} + if (sizeOfMessage < 0) { + return callback(new MongoParseError(`Invalid message size: ${sizeOfMessage}, too small`)); + } -/** - * @internal - * - * Returns an async generator that yields full wire protocol messages from the underlying socket. This function - * yields messages until `moreToCome` is false or not present in a response, or the caller cancels the request - * by calling `return` on the generator. - * - * Note that `for-await` loops call `return` automatically when the loop is exited. - */ -export async function* readMany( - connection: ModernConnection, - options: { signal?: AbortSignal } = {} -): AsyncGenerator { - for await (const message of readWireProtocolMessages(connection, options)) { - const response = await decompressResponse(message); - yield response; - - if (!response.moreToCome) { - return; + if (sizeOfMessage > this.bufferPool.length) { + return callback(); } - } -} -/** - * @internal - * - * Reads a single wire protocol message out of a connection. - */ -export async function read( - connection: ModernConnection, - options: { signal?: AbortSignal } = {} -): Promise { - for await (const value of readMany(connection, options)) { - return value; + const message = this.bufferPool.read(sizeOfMessage); + return callback(null, message); } - - throw new MongoRuntimeError('unable to read message off of connection'); } diff --git a/src/cmap/wire_protocol/on_data.ts b/src/cmap/wire_protocol/on_data.ts new file mode 100644 index 00000000000..04c82f709d3 --- /dev/null +++ b/src/cmap/wire_protocol/on_data.ts @@ -0,0 +1,132 @@ +import { type EventEmitter } from 'events'; + +import { List, promiseWithResolvers } from '../../utils'; + +/** + * @internal + * An object holding references to a promise's resolve and reject functions. + */ +type PendingPromises = Omit< + ReturnType>>, + 'promise' +>; + +/** + * onData is adapted from Node.js' events.on helper + * https://nodejs.org/api/events.html#eventsonemitter-eventname-options + * + * Returns an AsyncIterator that iterates each 'data' event emitted from emitter. + * It will reject upon an error event or if the provided signal is aborted. + */ +export function onData(emitter: EventEmitter, options: { signal: AbortSignal }) { + const signal = options.signal; + + // Setup pending events and pending promise lists + /** + * When the caller has not yet called .next(), we store the + * value from the event in this list. Next time they call .next() + * we pull the first value out of this list and resolve a promise with it. + */ + const unconsumedEvents = new List(); + /** + * When there has not yet been an event, a new promise will be created + * and implicitly stored in this list. When an event occurs we take the first + * promise in this list and resolve it. + */ + const unconsumedPromises = new List(); + + /** + * Stored an error created by an error event. + * This error will turn into a rejection for the subsequent .next() call + */ + let error: Error | null = null; + + /** Set to true only after event listeners have been removed. */ + let finished = false; + + const iterator: AsyncGenerator = { + next() { + // First, we consume all unread events + const value = unconsumedEvents.shift(); + if (value != null) { + return Promise.resolve({ value, done: false }); + } + + // Then we error, if an error happened + // This happens one time if at all, because after 'error' + // we stop listening + if (error != null) { + const p = Promise.reject(error); + // Only the first element errors + error = null; + return p; + } + + // If the iterator is finished, resolve to done + if (finished) return closeHandler(); + + // Wait until an event happens + const { promise, resolve, reject } = promiseWithResolvers>(); + unconsumedPromises.push({ resolve, reject }); + return promise; + }, + + return() { + return closeHandler(); + }, + + throw(err: Error) { + errorHandler(err); + return Promise.resolve({ value: undefined, done: true }); + }, + + [Symbol.asyncIterator]() { + return this; + } + }; + + // Adding event handlers + emitter.on('data', eventHandler); + emitter.on('error', errorHandler); + + if (signal.aborted) { + // If the signal is aborted, set up the first .next() call to be a rejection + queueMicrotask(abortListener); + } else { + signal.addEventListener('abort', abortListener, { once: true }); + } + + return iterator; + + function abortListener() { + errorHandler(signal.reason); + } + + function eventHandler(value: Buffer) { + const promise = unconsumedPromises.shift(); + if (promise != null) promise.resolve({ value, done: false }); + else unconsumedEvents.push(value); + } + + function errorHandler(err: Error) { + const promise = unconsumedPromises.shift(); + if (promise != null) promise.reject(err); + else error = err; + void closeHandler(); + } + + function closeHandler() { + // Adding event handlers + emitter.off('data', eventHandler); + emitter.off('error', errorHandler); + signal.removeEventListener('abort', abortListener); + finished = true; + const doneResult = { value: undefined, done: finished } as const; + + for (const promise of unconsumedPromises) { + promise.resolve(doneResult); + } + + return Promise.resolve(doneResult); + } +} diff --git a/src/index.ts b/src/index.ts index 4db500efebe..f1d1c5ac08b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -276,10 +276,7 @@ export type { ConnectionOptions, DestroyOptions, ModernConnection, - ProxyOptions, - read, - readMany, - writeCommand + ProxyOptions } from './cmap/connection'; export type { CloseOptions, diff --git a/src/utils.ts b/src/utils.ts index 8526930e555..e6d58d30bcd 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1325,36 +1325,30 @@ export function isHostMatch(match: RegExp, host?: string): boolean { */ export async function abortable( promise: Promise, - { signal }: { signal?: AbortSignal } = {} + { signal }: { signal: AbortSignal } ): Promise { - const { abort, done } = aborted(signal); + const { promise: aborted, reject } = promiseWithResolvers(); + + function rejectOnAbort() { + reject(signal.reason); + } + + if (signal.aborted) rejectOnAbort(); + else signal.addEventListener('abort', rejectOnAbort, { once: true }); + try { - return await Promise.race([promise, abort]); + return await Promise.race([promise, aborted]); } finally { - done.abort(); + signal.removeEventListener('abort', rejectOnAbort); } } -/** - * Takes an AbortSignal and creates a promise that will reject when the signal aborts - * If the argument provided is nullish the returned promise will **never** resolve. - * Also returns a done controller - abort the done controller when your task is done to remove the abort listeners - * @param signal - an optional abort signal to link to a promise rejection - */ -function aborted(signal?: AbortSignal): { - abort: Promise; - done: AbortController; -} { - const done = new AbortController(); - if (signal?.aborted) { - return { abort: Promise.reject(signal.reason), done }; - } - const abort = new Promise((_, reject) => - signal?.addEventListener('abort', () => reject(signal.reason), { - once: true, - // @ts-expect-error: @types/node erroneously claim this does not exist - signal: done.signal - }) - ); - return { abort, done }; +export function promiseWithResolvers() { + let resolve!: Parameters>[0]>[0]; + let reject!: Parameters>[0]>[1]; + const promise = new Promise(function withResolversExecutor(promiseResolve, promiseReject) { + resolve = promiseResolve; + reject = promiseReject; + }); + return { promise, resolve, reject } as const; } diff --git a/test/benchmarks/driverBench/common.js b/test/benchmarks/driverBench/common.js index e194ea91874..b41ab406343 100644 --- a/test/benchmarks/driverBench/common.js +++ b/test/benchmarks/driverBench/common.js @@ -6,6 +6,8 @@ const { Readable } = require('stream'); const { pipeline } = require('stream/promises'); const { MongoClient } = require('../../..'); const { GridFSBucket } = require('../../..'); +// eslint-disable-next-line no-restricted-modules +const { ModernConnection } = require('../../../lib/cmap/connection'); // eslint-disable-next-line no-restricted-modules const { MONGODB_ERROR_CODES } = require('../../../lib/error'); @@ -25,7 +27,9 @@ function loadSpecString(filePath) { } function makeClient() { - this.client = new MongoClient(process.env.MONGODB_URI || 'mongodb://localhost:27017'); + this.client = new MongoClient(process.env.MONGODB_URI || 'mongodb://127.0.0.1:27017', { + connectionType: ModernConnection + }); } function connectClient() { diff --git a/test/benchmarks/driverBench/index.js b/test/benchmarks/driverBench/index.js index 1a1d847822f..fd8f79ffd50 100644 --- a/test/benchmarks/driverBench/index.js +++ b/test/benchmarks/driverBench/index.js @@ -1,6 +1,7 @@ 'use strict'; const MongoBench = require('../mongoBench'); +const os = require('node:os'); const Runner = MongoBench.Runner; @@ -11,6 +12,21 @@ const { inspect } = require('util'); const { writeFile } = require('fs/promises'); const { makeParallelBenchmarks, makeSingleBench, makeMultiBench } = require('../mongoBench/suites'); +const hw = os.cpus(); +const ram = os.totalmem() / 1024 ** 3; +const platform = { name: hw[0].model, cores: hw.length, ram: `${ram}GB` }; + +const systemInfo = () => + [ + `ModernConnection`, + `\n- cpu: ${platform.name}`, + `- cores: ${platform.cores}`, + `- arch: ${os.arch()}`, + `- os: ${process.platform} (${os.release()})`, + `- ram: ${platform.ram}\n` + ].join('\n'); +console.log(systemInfo()); + function average(arr) { return arr.reduce((x, y) => x + y, 0) / arr.length; } diff --git a/test/unit/utils.test.ts b/test/unit/utils.test.ts index 980daf498a6..8be3753be72 100644 --- a/test/unit/utils.test.ts +++ b/test/unit/utils.test.ts @@ -1174,21 +1174,6 @@ describe('driver utils', function () { const badError = new Error('unexpected bad error!'); const expectedValue = "don't panic"; - context('when not given a signal', () => { - it('returns promise fulfillment if the promise resolves or rejects', async () => { - expect(await abortable(Promise.resolve(expectedValue))).to.equal(expectedValue); - expect(await abortable(Promise.reject(goodError)).catch(e => e)).to.equal(goodError); - }); - - it('pends indefinitely if the promise is never settled', async () => { - const forever = abortable(new Promise(() => null)); - // Assume 100ms is good enough to prove "forever" - expect(await Promise.race([forever, sleep(100).then(() => expectedValue)])).to.equal( - expectedValue - ); - }); - }); - context('always removes the abort listener it attaches', () => { let controller; let removeEventListenerSpy;