diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 9a617874b3..7aa913fbe4 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -17,6 +17,7 @@ import { MongoMissingDependencyError, MongoNetworkError, MongoNetworkTimeoutError, + MongoRuntimeError, MongoServerError, MongoWriteConcernError } from '../error'; @@ -68,6 +69,8 @@ const kAutoEncrypter = Symbol('autoEncrypter'); /** @internal */ const kDelayedTimeoutId = Symbol('delayedTimeoutId'); +const INVALID_QUEUE_SIZE = 'Connection internal queue contains more than 1 operation description'; + /** @internal */ export interface CommandOptions extends BSONSerializeOptions { command?: boolean; @@ -369,7 +372,28 @@ export class Connection extends TypedEventEmitter { // always emit the message, in case we are streaming this.emit('message', message); - const operationDescription = this[kQueue].get(message.responseTo); + let operationDescription = this[kQueue].get(message.responseTo); + + if (!operationDescription && this.isMonitoringConnection) { + // This is how we recover when the initial hello's requestId is not + // the responseTo when hello responses have been skipped: + + // First check if the map is of invalid size + if (this[kQueue].size > 1) { + this.onError(new MongoRuntimeError(INVALID_QUEUE_SIZE)); + } else { + // Get the first orphaned operation description. + const entry = this[kQueue].entries().next(); + if (entry) { + const [requestId, orphaned]: [number, OperationDescription] = entry.value; + // If the orphaned operation description exists then set it. + operationDescription = orphaned; + // Remove the entry with the bad request id from the queue. + this[kQueue].delete(requestId); + } + } + } + if (!operationDescription) { return; } @@ -381,7 +405,10 @@ export class Connection extends TypedEventEmitter { // making the `responseTo` change on each response this[kQueue].delete(message.responseTo); if ('moreToCome' in message && message.moreToCome) { - // requeue the callback for next synthetic request + // If the operation description check above does find an orphaned + // description and sets the operationDescription then this line will put one + // back in the queue with the correct requestId and will resolve not being able + // to find the next one via the responseTo of the next streaming hello. this[kQueue].set(message.requestId, operationDescription); } else if (operationDescription.socketTimeoutOverride) { this[kStream].setTimeout(this.socketTimeoutMS); diff --git a/test/tools/utils.ts b/test/tools/utils.ts index 0ecaaea85a..35b17b5e1c 100644 --- a/test/tools/utils.ts +++ b/test/tools/utils.ts @@ -1,6 +1,7 @@ import { EJSON } from 'bson'; import * as BSON from 'bson'; import { expect } from 'chai'; +import { Readable } from 'stream'; import { setTimeout } from 'timers'; import { inspect, promisify } from 'util'; @@ -354,6 +355,18 @@ export class TestBuilder { } } +export function bufferToStream(buffer) { + const stream = new Readable(); + if (Array.isArray(buffer)) { + buffer.forEach(b => stream.push(b)); + } else { + stream.push(buffer); + } + + stream.push(null); + return stream; +} + export function generateOpMsgBuffer(document: Document): Buffer { const header = Buffer.alloc(4 * 4 + 4); diff --git a/test/unit/cmap/connection.test.ts b/test/unit/cmap/connection.test.ts index b1b25db28d..a9b95bcea2 100644 --- a/test/unit/cmap/connection.test.ts +++ b/test/unit/cmap/connection.test.ts @@ -1,16 +1,18 @@ import { expect } from 'chai'; -import { EventEmitter } from 'events'; +import { EventEmitter, once } from 'events'; import { Socket } from 'net'; import * as sinon from 'sinon'; +import { Readable } from 'stream'; import { setTimeout } from 'timers'; +import { BinMsg } from '../../../src/cmap/commands'; import { connect } from '../../../src/cmap/connect'; import { Connection, hasSessionSupport } from '../../../src/cmap/connection'; import { MessageStream } from '../../../src/cmap/message_stream'; -import { MongoNetworkTimeoutError } from '../../../src/error'; +import { MongoNetworkTimeoutError, MongoRuntimeError } from '../../../src/error'; import { isHello, ns } from '../../../src/utils'; import * as mock from '../../tools/mongodb-mock/index'; -import { getSymbolFrom } from '../../tools/utils'; +import { generateOpMsgBuffer, getSymbolFrom } from '../../tools/utils'; import { createTimerSandbox } from '../timer_sandbox'; const connectionOptionsDefaults = { @@ -22,6 +24,25 @@ const connectionOptionsDefaults = { loadBalanced: false }; +/** The absolute minimum socket API needed by Connection as of writing this test */ +class FakeSocket extends EventEmitter { + address() { + // is never called + } + pipe() { + // does not need to do anything + } + destroy() { + // is called, has no side effects + } + get remoteAddress() { + return 'iLoveJavaScript'; + } + get remotePort() { + return 123; + } +} + describe('new Connection()', function () { let server; after(() => mock.cleanup()); @@ -137,6 +158,189 @@ describe('new Connection()', function () { }); }); + describe('#onMessage', function () { + context('when the connection is a monitoring connection', function () { + let queue: Map; + let driverSocket: FakeSocket; + let connection: Connection; + + beforeEach(function () { + driverSocket = sinon.spy(new FakeSocket()); + }); + + context('when multiple hellos exist on the stream', function () { + let callbackSpy; + const inputStream = new Readable(); + const document = { ok: 1 }; + const last = { isWritablePrimary: true }; + + beforeEach(function () { + callbackSpy = sinon.spy(); + const firstHello = generateOpMsgBuffer(document); + const secondHello = generateOpMsgBuffer(document); + const thirdHello = generateOpMsgBuffer(last); + const buffer = Buffer.concat([firstHello, secondHello, thirdHello]); + + connection = sinon.spy(new Connection(inputStream, connectionOptionsDefaults)); + connection.isMonitoringConnection = true; + const queueSymbol = getSymbolFrom(connection, 'queue'); + queue = connection[queueSymbol]; + + // Create the operation description. + const operationDescription: OperationDescription = { + requestId: 1, + cb: callbackSpy + }; + + // Stick an operation description in the queue. + queue.set(1, operationDescription); + + // Push the buffer of 3 hellos to the input stream + inputStream.push(buffer); + inputStream.push(null); + }); + + it('calls the callback with the last hello document', async function () { + const messages = await once(connection, 'message'); + expect(messages[0].responseTo).to.equal(0); + expect(callbackSpy).to.be.calledOnceWith(undefined, last); + }); + }); + + context('when requestId/responseTo do not match', function () { + let callbackSpy; + const document = { ok: 1 }; + + beforeEach(function () { + callbackSpy = sinon.spy(); + + // @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay + connection = sinon.spy(new Connection(driverSocket, connectionOptionsDefaults)); + connection.isMonitoringConnection = true; + const queueSymbol = getSymbolFrom(connection, 'queue'); + queue = connection[queueSymbol]; + + // Create the operation description. + const operationDescription: OperationDescription = { + requestId: 1, + cb: callbackSpy + }; + + // Stick an operation description in the queue. + queue.set(1, operationDescription); + // Emit a message that won't match the existing operation description. + const msg = generateOpMsgBuffer(document); + const msgHeader: MessageHeader = { + length: msg.readInt32LE(0), + requestId: 1, + responseTo: 0, // This will not match. + opCode: msg.readInt32LE(12) + }; + const msgBody = msg.subarray(16); + + const message = new BinMsg(msg, msgHeader, msgBody); + connection.onMessage(message); + }); + + it('calls the operation description callback with the document', function () { + expect(callbackSpy).to.be.calledOnceWith(undefined, document); + }); + }); + + context('when requestId/reponseTo match', function () { + let callbackSpy; + const document = { ok: 1 }; + + beforeEach(function () { + callbackSpy = sinon.spy(); + + // @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay + connection = sinon.spy(new Connection(driverSocket, connectionOptionsDefaults)); + connection.isMonitoringConnection = true; + const queueSymbol = getSymbolFrom(connection, 'queue'); + queue = connection[queueSymbol]; + + // Create the operation description. + const operationDescription: OperationDescription = { + requestId: 1, + cb: callbackSpy + }; + + // Stick an operation description in the queue. + queue.set(1, operationDescription); + // Emit a message that matches the existing operation description. + const msg = generateOpMsgBuffer(document); + const msgHeader: MessageHeader = { + length: msg.readInt32LE(0), + requestId: 2, + responseTo: 1, + opCode: msg.readInt32LE(12) + }; + const msgBody = msg.subarray(16); + + const message = new BinMsg(msg, msgHeader, msgBody); + connection.onMessage(message); + }); + + it('calls the operation description callback with the document', function () { + expect(callbackSpy).to.be.calledOnceWith(undefined, document); + }); + }); + + context('when more than one operation description is in the queue', function () { + let spyOne; + let spyTwo; + const document = { ok: 1 }; + + beforeEach(function () { + spyOne = sinon.spy(); + spyTwo = sinon.spy(); + + // @ts-expect-error: driverSocket does not fully satisfy the stream type, but that's okay + connection = sinon.spy(new Connection(driverSocket, connectionOptionsDefaults)); + connection.isMonitoringConnection = true; + const queueSymbol = getSymbolFrom(connection, 'queue'); + queue = connection[queueSymbol]; + + // Create the operation descriptions. + const descriptionOne: OperationDescription = { + requestId: 1, + cb: spyOne + }; + const descriptionTwo: OperationDescription = { + requestId: 2, + cb: spyTwo + }; + + // Stick an operation description in the queue. + queue.set(2, descriptionOne); + queue.set(3, descriptionTwo); + // Emit a message that matches the existing operation description. + const msg = generateOpMsgBuffer(document); + const msgHeader: MessageHeader = { + length: msg.readInt32LE(0), + requestId: 2, + responseTo: 1, + opCode: msg.readInt32LE(12) + }; + const msgBody = msg.subarray(16); + + const message = new BinMsg(msg, msgHeader, msgBody); + connection.onMessage(message); + }); + + it('calls all operation description callbacks with an error', function () { + expect(spyOne).to.be.calledOnce; + expect(spyTwo).to.be.calledOnce; + const errorOne = spyOne.firstCall.args[0]; + const errorTwo = spyTwo.firstCall.args[0]; + expect(errorOne).to.be.instanceof(MongoRuntimeError); + expect(errorTwo).to.be.instanceof(MongoRuntimeError); + }); + }); + }); + }); + describe('onTimeout()', () => { let connection: sinon.SinonSpiedInstance; let clock: sinon.SinonFakeTimers; @@ -146,25 +350,6 @@ describe('new Connection()', function () { let kDelayedTimeoutId: symbol; let NodeJSTimeoutClass: any; - /** The absolute minimum socket API needed by Connection as of writing this test */ - class FakeSocket extends EventEmitter { - address() { - // is never called - } - pipe() { - // does not need to do anything - } - destroy() { - // is called, has no side effects - } - get remoteAddress() { - return 'iLoveJavaScript'; - } - get remotePort() { - return 123; - } - } - beforeEach(() => { timerSandbox = createTimerSandbox(); clock = sinon.useFakeTimers(); diff --git a/test/unit/cmap/message_stream.test.js b/test/unit/cmap/message_stream.test.js index 3158a9144a..69ffc08496 100644 --- a/test/unit/cmap/message_stream.test.js +++ b/test/unit/cmap/message_stream.test.js @@ -6,19 +6,7 @@ const { MessageStream } = require('../../../src/cmap/message_stream'); const { Msg } = require('../../../src/cmap/commands'); const expect = require('chai').expect; const { LEGACY_HELLO_COMMAND } = require('../../../src/constants'); -const { generateOpMsgBuffer } = require('../../tools/utils'); - -function bufferToStream(buffer) { - const stream = new Readable(); - if (Array.isArray(buffer)) { - buffer.forEach(b => stream.push(b)); - } else { - stream.push(buffer); - } - - stream.push(null); - return stream; -} +const { bufferToStream, generateOpMsgBuffer } = require('../../tools/utils'); describe('MessageStream', function () { context('when the stream is for a monitoring connection', function () {