From 4f2ebd648561710cffff2818737169fecdd546b1 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 27 Jul 2022 15:40:13 -0400 Subject: [PATCH 01/12] wip --- src/cursor/abstract_cursor.ts | 38 +-- src/operations/command.ts | 1 + src/operations/get_more.ts | 6 +- .../change-streams/change_stream.test.ts | 88 +++++- test/integration/crud/maxTimeMS.test.ts | 252 ++++++++++++++++++ test/integration/crud/maxtimems.test.js | 120 --------- 6 files changed, 361 insertions(+), 144 deletions(-) create mode 100644 test/integration/crud/maxTimeMS.test.ts delete mode 100644 test/integration/crud/maxtimems.test.js diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index 62661c04d7..cd4a46c0ea 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -79,7 +79,22 @@ export interface AbstractCursorOptions extends BSONSerializeOptions { readPreference?: ReadPreferenceLike; readConcern?: ReadConcernLike; batchSize?: number; + /** + * For **`tailable=false` cursor** OR **`tailable=true && awaitData=false` cursor**, + * - the driver MUST set `maxTimeMS` on the `find` command and MUST NOT set `maxTimeMS` on the `getMore` command. + * - If `maxTimeMS` is not set in options, the driver SHOULD refrain from setting **maxTimeMS** + * + * For **`tailable=true && awaitData=true` cursor** + * - the driver MUST provide a cursor level option named `maxAwaitTimeMS`. + * - The `maxTimeMS` option on the `getMore` command MUST be set to the value of the option `maxAwaitTimeMS`. + * - If no `maxAwaitTimeMS` is specified, the driver MUST not set `maxTimeMS` on the `getMore` command. + * - `maxAwaitTimeMS` option is not set on the `aggregate` command nor `$changeStream` pipeline stage + * + * ## `maxCommitTimeMS` + * Note, this option is an alias for the `maxTimeMS` commitTransaction command option. + */ maxTimeMS?: number; + maxAwaitTimeMS?: number; /** * Comment to apply to the operation. * @@ -155,7 +170,7 @@ export abstract class AbstractCursor< } this[kClient] = client; this[kNamespace] = namespace; - this[kDocuments] = []; // TODO: https://github.com/microsoft/TypeScript/issues/36230 + this[kDocuments] = []; this[kInitialized] = false; this[kClosed] = false; this[kKilled] = false; @@ -186,6 +201,10 @@ export abstract class AbstractCursor< this[kOptions].maxTimeMS = options.maxTimeMS; } + if (typeof options.maxAwaitTimeMS === 'number') { + this[kOptions].maxAwaitTimeMS = options.maxAwaitTimeMS; + } + if (options.session instanceof ClientSession) { this[kSession] = options.session; } else { @@ -617,21 +636,8 @@ export abstract class AbstractCursor< /** @internal */ _getMore(batchSize: number, callback: Callback): void { - const cursorId = this[kId]; - const cursorNs = this[kNamespace]; - const server = this[kServer]; - - if (cursorId == null) { - callback(new MongoRuntimeError('Unable to iterate cursor with no id')); - return; - } - - if (server == null) { - callback(new MongoRuntimeError('Unable to iterate cursor without selected server')); - return; - } - - const getMoreOperation = new GetMoreOperation(cursorNs, cursorId, server, { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const getMoreOperation = new GetMoreOperation(this[kNamespace], this[kId]!, this[kServer]!, { ...this[kOptions], session: this[kSession], batchSize diff --git a/src/operations/command.ts b/src/operations/command.ts index 57186ab42c..f5f0d613c9 100644 --- a/src/operations/command.ts +++ b/src/operations/command.ts @@ -44,6 +44,7 @@ export interface CommandOperationOptions readConcern?: ReadConcernLike; /** Collation */ collation?: CollationOptions; + /** TODO This is probably in the wrong place................. specs only mention this being a thing for createIndex/dropIndex */ maxTimeMS?: number; /** * Comment to apply to the operation. diff --git a/src/operations/get_more.ts b/src/operations/get_more.ts index 2d999e5ce2..c86c3f3685 100644 --- a/src/operations/get_more.ts +++ b/src/operations/get_more.ts @@ -39,7 +39,7 @@ export class GetMoreOperation extends AbstractOperation { cursorId: Long; override options: GetMoreOptions; - constructor(ns: MongoDBNamespace, cursorId: Long, server: Server, options: GetMoreOptions = {}) { + constructor(ns: MongoDBNamespace, cursorId: Long, server: Server, options: GetMoreOptions) { super(options); this.options = options; @@ -63,6 +63,10 @@ export class GetMoreOperation extends AbstractOperation { ); } + if (this.cursorId == null || this.cursorId.isZero()) { + return callback(new MongoRuntimeError('Unable to iterate cursor with no id')); + } + const collection = this.ns.collection; if (collection == null) { // Cursors should have adopted the namespace returned by MongoDB diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index be8c155006..c289c8e962 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -1111,7 +1111,7 @@ describe('Change Streams', function () { changeStream.next((err, doc) => { expect(err).to.exist; expect(doc).to.not.exist; - expect(err.message).to.equal('ChangeStream is closed'); + expect(err?.message).to.equal('ChangeStream is closed'); changeStream.close(() => client.close(done)); }); }); @@ -1372,23 +1372,97 @@ describe('Change Streams', function () { ) .run(); + UnifiedTestSuiteBuilder.describe('entity.watch() server-side options') + .runOnRequirement({ + topologies: ['replicaset', 'sharded-replicaset', 'sharded', 'load-balanced'], + minServerVersion: '4.4.0' + }) + .createEntities([ + { client: { id: 'client0', observeEvents: ['commandStartedEvent'] } }, + { database: { id: 'db0', client: 'client0', databaseName: 'watchOpts' } }, + { collection: { id: 'collection0', database: 'db0', collectionName: 'watchOpts' } } + ]) + .test( + TestBuilder.it('should use maxAwaitTimeMS to send maxTimeMS on getMore commands') + .operation({ + object: 'collection0', + name: 'createChangeStream', + saveResultAsEntity: 'changeStreamOnClient', + arguments: { maxAwaitTimeMS: 5000 } + }) + .operation({ + name: 'insertOne', + object: 'collection0', + arguments: { document: { a: 1 } }, + ignoreResultAndError: true + }) + .operation({ + object: 'changeStreamOnClient', + name: 'iterateUntilDocumentOrError', + ignoreResultAndError: true + }) + .expectEvents({ + client: 'client0', + events: [ + { commandStartedEvent: { commandName: 'aggregate' } }, + { commandStartedEvent: { commandName: 'insert' } }, + { commandStartedEvent: { commandName: 'getMore', command: { maxTimeMS: 5000 } } } + ] + }) + .toJSON() + ) + .test( + TestBuilder.it('should send maxTimeMS on aggregate command') + .operation({ + object: 'collection0', + name: 'createChangeStream', + saveResultAsEntity: 'changeStreamOnClient', + arguments: { maxTimeMS: 5000 } + }) + .operation({ + name: 'insertOne', + object: 'collection0', + arguments: { document: { a: 1 } }, + ignoreResultAndError: true + }) + .operation({ + object: 'changeStreamOnClient', + name: 'iterateUntilDocumentOrError', + ignoreResultAndError: true + }) + .expectEvents({ + client: 'client0', + events: [ + { commandStartedEvent: { commandName: 'aggregate', command: { maxTimeMS: 5000 } } }, + { commandStartedEvent: { commandName: 'insert' } }, + { + commandStartedEvent: { + commandName: 'getMore', + command: { maxTimeMS: { $$exists: false } } + } + } + ] + }) + .toJSON() + ) + .run(); + describe('BSON Options', function () { let client: MongoClient; let db: Db; let collection: Collection; let cs: ChangeStream; + beforeEach(async function () { client = await this.configuration.newClient({ monitorCommands: true }).connect(); db = client.db('db'); collection = await db.createCollection('collection'); }); + afterEach(async function () { await db.dropCollection('collection'); await cs.close(); await client.close(); - client = undefined; - db = undefined; - collection = undefined; }); context('promoteLongs', () => { @@ -1452,7 +1526,7 @@ describe('Change Streams', function () { it('does not send invalid options on the aggregate command', { metadata: { requires: { topology: '!single' } }, test: async function () { - const started = []; + const started: CommandStartedEvent[] = []; client.on('commandStarted', filterForCommands(['aggregate'], started)); const doc = { invalidBSONOption: true }; @@ -1473,7 +1547,7 @@ describe('Change Streams', function () { it('does not send invalid options on the getMore command', { metadata: { requires: { topology: '!single' } }, test: async function () { - const started = []; + const started: CommandStartedEvent[] = []; client.on('commandStarted', filterForCommands(['aggregate'], started)); const doc = { invalidBSONOption: true }; @@ -1503,7 +1577,7 @@ describe('ChangeStream resumability', function () { const changeStreamResumeOptions: ChangeStreamOptions = { fullDocument: 'updateLookup', collation: { locale: 'en', maxVariable: 'punct' }, - maxAwaitTimeMS: 20000, + maxAwaitTimeMS: 2000, batchSize: 200 }; diff --git a/test/integration/crud/maxTimeMS.test.ts b/test/integration/crud/maxTimeMS.test.ts new file mode 100644 index 0000000000..bd36cba5c3 --- /dev/null +++ b/test/integration/crud/maxTimeMS.test.ts @@ -0,0 +1,252 @@ +import { expect } from 'chai'; +import { inspect } from 'util'; + +import { + Collection, + CommandStartedEvent, + FindCursor, + MongoClient, + MongoCursorExhaustedError, + MongoServerError +} from '../../../src'; +import { sleep } from '../../tools/utils'; +import { assert as test, setupDatabase } from '../shared'; + +describe('MaxTimeMS', function () { + before(function () { + return setupDatabase(this.configuration); + }); + + it('Should Correctly respect the maxTimeMS property on count', function (done) { + const configuration = this.configuration; + const client = configuration.newClient(configuration.writeConcernMax(), { maxPoolSize: 1 }); + client.connect(function () { + const db = client.db(configuration.db); + const col = db.collection('max_time_ms'); + + // Insert a couple of docs + const docs_1 = [{ agg_pipe: 1 }]; + + // Simple insert + col.insertMany(docs_1, { writeConcern: { w: 1 } }, function (err) { + expect(err).to.not.exist; + + // Execute a find command + col + .find({ $where: 'sleep(100) || true' }) + .maxTimeMS(50) + .count(function (err) { + test.ok(err != null); + client.close(done); + }); + }); + }); + }); + + it('Should Correctly respect the maxTimeMS property on toArray', { + metadata: { + requires: { + topology: ['single', 'replicaset'] + } + }, + + test: function (done) { + const configuration = this.configuration; + const client = configuration.newClient(configuration.writeConcernMax(), { maxPoolSize: 1 }); + client.connect(function () { + const db = client.db(configuration.db); + const col = db.collection('max_time_ms_2'); + + // Insert a couple of docs + const docs_1 = [{ agg_pipe: 1 }]; + + // Simple insert + col.insertMany(docs_1, { writeConcern: { w: 1 } }, function (err) { + expect(err).to.not.exist; + + // Execute a find command + col + .find({ $where: 'sleep(100) || true' }) + .maxTimeMS(50) + .toArray(function (err) { + test.ok(err != null); + client.close(done); + }); + }); + }); + } + }); + + it('Should Correctly fail with maxTimeMS error', { + // Add a tag that our runner can trigger on + // in this case we are setting that node needs to be higher than 0.10.X to run + metadata: { + requires: { + topology: ['single', 'replicaset'] + } + }, + + test: function (done) { + const configuration = this.configuration; + const client = configuration.newClient(configuration.writeConcernMax(), { maxPoolSize: 1 }); + client.connect(function () { + const db = client.db(configuration.db); + const col = db.collection('max_time_ms_5'); + + // Insert a couple of docs + const docs_1 = [{ agg_pipe: 10 }]; + + // Simple insert + col.insertMany(docs_1, { writeConcern: { w: 1 } }, function (err) { + expect(err).to.not.exist; + + db.admin().command( + { configureFailPoint: 'maxTimeAlwaysTimeOut', mode: 'alwaysOn' }, + function (err, result) { + expect(err).to.not.exist; + test.equal(1, result?.ok); + + col + .find({}) + .maxTimeMS(10) + .toArray(function (err) { + test.ok(err != null); + + db.admin().command( + { configureFailPoint: 'maxTimeAlwaysTimeOut', mode: 'off' }, + function (err, result) { + expect(err).to.not.exist; + test.equal(1, result?.ok); + client.close(done); + } + ); + }); + } + ); + }); + }); + } + }); + + describe('awaitData, tailable, maxTimeMS, and maxAwaitTimeMS on cursors', () => { + const insertedDocs = [{ _id: 1 }]; + let client: MongoClient; + let cappedCollection: Collection<{ _id: number }>; + let cursor: FindCursor<{ _id: number }>; + let events: CommandStartedEvent[]; + + beforeEach(async function () { + client = this.configuration.newClient({ monitorCommands: true }); + await client + .db() + .dropCollection('cappedAt3') + .catch(() => null); + await sleep(500); + cappedCollection = await client + .db() + .createCollection('cappedAt3', { capped: true, size: 4096, max: 3 }); + cappedCollection.insertMany(insertedDocs); + + events = []; + client.on('commandStarted', event => + ['getMore', 'find'].includes(event.commandName) ? events.push(event) : null + ); + }); + + afterEach(async function () { + events = []; + await cursor?.close(); + await client?.close(); + }); + + const tailableValues = [true, false, undefined]; + const awaitDataValues = [true, false, undefined]; + const maxTimeMSValues = [100, 0, undefined]; + const maxAwaitTimeMSValues = [100, 0, undefined]; + + const tests = tailableValues.flatMap(tailable => + awaitDataValues.flatMap(awaitData => + maxAwaitTimeMSValues.flatMap(maxAwaitTimeMS => + maxTimeMSValues.flatMap(maxTimeMS => { + const awaitDataSet = Boolean(awaitData) === true; + const tailableSet = Boolean(tailable) === true; + const timeIsSetOnGetMore = typeof maxAwaitTimeMS === 'number'; + return [ + { + options: { tailable, awaitData, maxAwaitTimeMS, maxTimeMS }, + outcome: { + // Cannot set 'awaitData' without also setting 'tailable' + isFindError: awaitDataSet && !tailableSet, + // cannot set maxTimeMS on getMore command for a non-awaitData cursor + isGetMoreError: timeIsSetOnGetMore && !awaitDataSet + } + } + ]; + }) + ) + ) + ); + + for (const { options, outcome } of tests) { + let optionsString = inspect(options, { breakLength: Infinity }); + optionsString = optionsString + .slice(2, optionsString.length - 2) + .split('undefined') + .join('omit'); + + it(`should create find cursor with ${optionsString}`, async () => { + cursor = cappedCollection.find({ _id: { $gt: 0 } }, { ...options, batchSize: 1 }); + + const findDocOrError: { _id: number } | Error = await cursor.next().catch(error => error); + + const exhaustedByFind = !!cursor.id?.isZero(); + + const getMoreDocOrError: { _id: number } | Error | null = await cursor + .tryNext() + .catch(error => error); + + expect(events).to.have.length.of.at.least(1); // At least find must be sent + + if (outcome.isFindError) { + expect(findDocOrError).to.be.instanceOf(MongoServerError); + } else { + if (findDocOrError instanceof Error) { + throw findDocOrError; + } + expect(findDocOrError).to.have.property('_id', 1); + + expect(events[0].command).to.be.an('object').that.has.a.property('find'); + const findCommand = events[0].command; + + if (typeof options.maxTimeMS === 'number') { + expect(findCommand).to.have.property('maxTimeMS', options.maxTimeMS); + } else { + expect(findCommand).to.not.have.property('maxTimeMS'); + } + } + + if (outcome.isGetMoreError) { + expect(getMoreDocOrError).to.be.instanceOf(MongoServerError); + } else if (exhaustedByFind) { + expect(getMoreDocOrError).to.be.instanceOf(MongoCursorExhaustedError); + } else { + if (getMoreDocOrError instanceof Error) { + throw getMoreDocOrError; + } + expect(getMoreDocOrError).to.be.null; + + expect(events[1].command).to.be.an('object').that.has.a.property('getMore'); + const getMoreCommand = events[1].command; + + if (typeof options.maxAwaitTimeMS === 'number') { + expect(getMoreCommand).to.have.property('maxTimeMS', options.maxAwaitTimeMS); + } else { + expect(getMoreCommand).to.not.have.property('maxTimeMS'); + } + } + + await cursor.close(); + }); + } + }); +}); diff --git a/test/integration/crud/maxtimems.test.js b/test/integration/crud/maxtimems.test.js deleted file mode 100644 index f7ca1acb4b..0000000000 --- a/test/integration/crud/maxtimems.test.js +++ /dev/null @@ -1,120 +0,0 @@ -'use strict'; -const { assert: test, setupDatabase } = require('../shared'); -const { expect } = require('chai'); - -describe('MaxTimeMS', function () { - before(function () { - return setupDatabase(this.configuration); - }); - - it('Should Correctly respect the maxtimeMs property on count', function (done) { - var configuration = this.configuration; - var client = configuration.newClient(configuration.writeConcernMax(), { maxPoolSize: 1 }); - client.connect(function (err, client) { - var db = client.db(configuration.db); - var col = db.collection('max_time_ms'); - - // Insert a couple of docs - var docs_1 = [{ agg_pipe: 1 }]; - - // Simple insert - col.insert(docs_1, { writeConcern: { w: 1 } }, function (err) { - expect(err).to.not.exist; - - // Execute a find command - col - .find({ $where: 'sleep(100) || true' }) - .maxTimeMS(50) - .count(function (err) { - test.ok(err != null); - client.close(done); - }); - }); - }); - }); - - it('Should Correctly respect the maxtimeMs property on toArray', { - metadata: { - requires: { - topology: ['single', 'replicaset'] - } - }, - - test: function (done) { - var configuration = this.configuration; - var client = configuration.newClient(configuration.writeConcernMax(), { maxPoolSize: 1 }); - client.connect(function (err, client) { - var db = client.db(configuration.db); - var col = db.collection('max_time_ms_2'); - - // Insert a couple of docs - var docs_1 = [{ agg_pipe: 1 }]; - - // Simple insert - col.insert(docs_1, { writeConcern: { w: 1 } }, function (err) { - expect(err).to.not.exist; - - // Execute a find command - col - .find({ $where: 'sleep(100) || true' }) - .maxTimeMS(50) - .toArray(function (err) { - test.ok(err != null); - client.close(done); - }); - }); - }); - } - }); - - it('Should Correctly fail with maxTimeMS error', { - // Add a tag that our runner can trigger on - // in this case we are setting that node needs to be higher than 0.10.X to run - metadata: { - requires: { - topology: ['single', 'replicaset'] - } - }, - - test: function (done) { - var configuration = this.configuration; - var client = configuration.newClient(configuration.writeConcernMax(), { maxPoolSize: 1 }); - client.connect(function (err, client) { - var db = client.db(configuration.db); - var col = db.collection('max_time_ms_5'); - - // Insert a couple of docs - var docs_1 = [{ agg_pipe: 10 }]; - - // Simple insert - col.insert(docs_1, { writeConcern: { w: 1 } }, function (err) { - expect(err).to.not.exist; - - db.admin().command( - { configureFailPoint: 'maxTimeAlwaysTimeOut', mode: 'alwaysOn' }, - function (err, result) { - expect(err).to.not.exist; - test.equal(1, result.ok); - - col - .find({}) - .maxTimeMS(10) - .toArray(function (err) { - test.ok(err != null); - - db.admin().command( - { configureFailPoint: 'maxTimeAlwaysTimeOut', mode: 'off' }, - function (err, result) { - expect(err).to.not.exist; - test.equal(1, result.ok); - client.close(done); - } - ); - }); - } - ); - }); - }); - } - }); -}); From afb5b563e8a3423de3ce5059a18a5fd154e3344e Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 27 Jul 2022 16:56:03 -0400 Subject: [PATCH 02/12] organize tests, better names --- test/integration/crud/maxTimeMS.test.ts | 74 +++++++++++++++---------- 1 file changed, 44 insertions(+), 30 deletions(-) diff --git a/test/integration/crud/maxTimeMS.test.ts b/test/integration/crud/maxTimeMS.test.ts index bd36cba5c3..409863ab3a 100644 --- a/test/integration/crud/maxTimeMS.test.ts +++ b/test/integration/crud/maxTimeMS.test.ts @@ -173,7 +173,10 @@ describe('MaxTimeMS', function () { const timeIsSetOnGetMore = typeof maxAwaitTimeMS === 'number'; return [ { - options: { tailable, awaitData, maxAwaitTimeMS, maxTimeMS }, + // Use JSON to drop explicit undefined + options: JSON.parse( + JSON.stringify({ tailable, awaitData, maxAwaitTimeMS, maxTimeMS }) + ), outcome: { // Cannot set 'awaitData' without also setting 'tailable' isFindError: awaitDataSet && !tailableSet, @@ -187,33 +190,55 @@ describe('MaxTimeMS', function () { ) ); + it('meta test: should setup test table correctly', () => { + expect(tests).to.have.lengthOf(81); + expect(tests.filter(t => t.outcome.isFindError)).to.have.lengthOf(18); + expect(tests.filter(t => t.outcome.isGetMoreError)).to.have.lengthOf(36); + expect( + tests.filter(t => { + return !t.outcome.isFindError && !t.outcome.isGetMoreError; + }) + ).to.have.lengthOf(27); + }); + for (const { options, outcome } of tests) { let optionsString = inspect(options, { breakLength: Infinity }); - optionsString = optionsString - .slice(2, optionsString.length - 2) - .split('undefined') - .join('omit'); + optionsString = optionsString.slice(1, optionsString.length - 1).trim(); + optionsString = optionsString === '' ? 'nothing set' : optionsString; - it(`should create find cursor with ${optionsString}`, async () => { + const operation = async () => { cursor = cappedCollection.find({ _id: { $gt: 0 } }, { ...options, batchSize: 1 }); - const findDocOrError: { _id: number } | Error = await cursor.next().catch(error => error); - const exhaustedByFind = !!cursor.id?.isZero(); - const getMoreDocOrError: { _id: number } | Error | null = await cursor .tryNext() .catch(error => error); - expect(events).to.have.length.of.at.least(1); // At least find must be sent + return { findDocOrError, exhaustedByFind, getMoreDocOrError }; + }; - if (outcome.isFindError) { + if (outcome.isFindError) { + it(`should error on find due to setting ${optionsString}`, async () => { + const { findDocOrError } = await operation(); expect(findDocOrError).to.be.instanceOf(MongoServerError); - } else { - if (findDocOrError instanceof Error) { - throw findDocOrError; + }); + } else if (outcome.isGetMoreError) { + it(`should error on getMore due to setting ${optionsString}`, async () => { + const { exhaustedByFind, getMoreDocOrError } = await operation(); + if (exhaustedByFind) { + expect(getMoreDocOrError).to.be.instanceOf(MongoCursorExhaustedError); + } else { + expect(getMoreDocOrError).to.be.instanceOf(MongoServerError); } - expect(findDocOrError).to.have.property('_id', 1); + }); + } else { + it(`should create find cursor with ${optionsString}`, async () => { + const { findDocOrError: findDoc, getMoreDocOrError: getMoreDoc } = await operation(); + + expect(findDoc).to.not.be.instanceOf(Error); + expect(getMoreDoc).to.not.be.instanceOf(Error); + + expect(findDoc).to.have.property('_id', 1); expect(events[0].command).to.be.an('object').that.has.a.property('find'); const findCommand = events[0].command; @@ -223,17 +248,8 @@ describe('MaxTimeMS', function () { } else { expect(findCommand).to.not.have.property('maxTimeMS'); } - } - - if (outcome.isGetMoreError) { - expect(getMoreDocOrError).to.be.instanceOf(MongoServerError); - } else if (exhaustedByFind) { - expect(getMoreDocOrError).to.be.instanceOf(MongoCursorExhaustedError); - } else { - if (getMoreDocOrError instanceof Error) { - throw getMoreDocOrError; - } - expect(getMoreDocOrError).to.be.null; + + expect(getMoreDoc).to.be.null; expect(events[1].command).to.be.an('object').that.has.a.property('getMore'); const getMoreCommand = events[1].command; @@ -243,10 +259,8 @@ describe('MaxTimeMS', function () { } else { expect(getMoreCommand).to.not.have.property('maxTimeMS'); } - } - - await cursor.close(); - }); + }); + } } }); }); From 1d159d30c1e73c09bf4b93966b9bada3235d6975 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Thu, 28 Jul 2022 16:16:07 -0400 Subject: [PATCH 03/12] fix: limit server versions --- test/integration/crud/maxTimeMS.test.ts | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/test/integration/crud/maxTimeMS.test.ts b/test/integration/crud/maxTimeMS.test.ts index 409863ab3a..de20fdf43b 100644 --- a/test/integration/crud/maxTimeMS.test.ts +++ b/test/integration/crud/maxTimeMS.test.ts @@ -17,7 +17,7 @@ describe('MaxTimeMS', function () { return setupDatabase(this.configuration); }); - it('Should Correctly respect the maxTimeMS property on count', function (done) { + it('should correctly respect the maxTimeMS property on count', function (done) { const configuration = this.configuration; const client = configuration.newClient(configuration.writeConcernMax(), { maxPoolSize: 1 }); client.connect(function () { @@ -43,7 +43,7 @@ describe('MaxTimeMS', function () { }); }); - it('Should Correctly respect the maxTimeMS property on toArray', { + it('should correctly respect the maxTimeMS property on toArray', { metadata: { requires: { topology: ['single', 'replicaset'] @@ -77,7 +77,7 @@ describe('MaxTimeMS', function () { } }); - it('Should Correctly fail with maxTimeMS error', { + it('should correctly fail with maxTimeMS error', { // Add a tag that our runner can trigger on // in this case we are setting that node needs to be higher than 0.10.X to run metadata: { @@ -201,11 +201,13 @@ describe('MaxTimeMS', function () { ).to.have.lengthOf(27); }); + const metadata = { requires: { mongodb: '>=5.0.0' } }; for (const { options, outcome } of tests) { let optionsString = inspect(options, { breakLength: Infinity }); optionsString = optionsString.slice(1, optionsString.length - 1).trim(); optionsString = optionsString === '' ? 'nothing set' : optionsString; + // Each test runs the same find operation, but asserts different outcomes const operation = async () => { cursor = cappedCollection.find({ _id: { $gt: 0 } }, { ...options, batchSize: 1 }); const findDocOrError: { _id: number } | Error = await cursor.next().catch(error => error); @@ -218,12 +220,12 @@ describe('MaxTimeMS', function () { }; if (outcome.isFindError) { - it(`should error on find due to setting ${optionsString}`, async () => { + it(`should error on find due to setting ${optionsString}`, metadata, async () => { const { findDocOrError } = await operation(); expect(findDocOrError).to.be.instanceOf(MongoServerError); }); } else if (outcome.isGetMoreError) { - it(`should error on getMore due to setting ${optionsString}`, async () => { + it(`should error on getMore due to setting ${optionsString}`, metadata, async () => { const { exhaustedByFind, getMoreDocOrError } = await operation(); if (exhaustedByFind) { expect(getMoreDocOrError).to.be.instanceOf(MongoCursorExhaustedError); @@ -232,7 +234,7 @@ describe('MaxTimeMS', function () { } }); } else { - it(`should create find cursor with ${optionsString}`, async () => { + it(`should create find cursor with ${optionsString}`, metadata, async () => { const { findDocOrError: findDoc, getMoreDocOrError: getMoreDoc } = await operation(); expect(findDoc).to.not.be.instanceOf(Error); From 34835193ed188f902d8155b84181e6719f170699 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Thu, 28 Jul 2022 17:20:32 -0400 Subject: [PATCH 04/12] wip --- test/integration/crud/maxTimeMS.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/crud/maxTimeMS.test.ts b/test/integration/crud/maxTimeMS.test.ts index de20fdf43b..0ee49cd8ff 100644 --- a/test/integration/crud/maxTimeMS.test.ts +++ b/test/integration/crud/maxTimeMS.test.ts @@ -141,7 +141,7 @@ describe('MaxTimeMS', function () { .db() .dropCollection('cappedAt3') .catch(() => null); - await sleep(500); + await sleep(100); // Need to make sure it has had time to drop entirely cappedCollection = await client .db() .createCollection('cappedAt3', { capped: true, size: 4096, max: 3 }); From 12ca746a0b660b996f638d0513e20f6062d96601 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Mon, 1 Aug 2022 10:29:25 -0400 Subject: [PATCH 05/12] test: on replicaset --- test/integration/crud/maxTimeMS.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/crud/maxTimeMS.test.ts b/test/integration/crud/maxTimeMS.test.ts index 0ee49cd8ff..5ed376ace7 100644 --- a/test/integration/crud/maxTimeMS.test.ts +++ b/test/integration/crud/maxTimeMS.test.ts @@ -201,7 +201,7 @@ describe('MaxTimeMS', function () { ).to.have.lengthOf(27); }); - const metadata = { requires: { mongodb: '>=5.0.0' } }; + const metadata = { requires: { mongodb: '>=4', topology: ['replicaset'] as const } }; for (const { options, outcome } of tests) { let optionsString = inspect(options, { breakLength: Infinity }); optionsString = optionsString.slice(1, optionsString.length - 1).trim(); From 9ad9ce9c5aba2040c46fa8459d20743aacc19d57 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Mon, 1 Aug 2022 10:58:44 -0400 Subject: [PATCH 06/12] docs: fixup --- src/cursor/abstract_cursor.ts | 33 ++++++++++++++++--------- src/operations/command.ts | 1 - test/integration/crud/maxTimeMS.test.ts | 2 +- 3 files changed, 22 insertions(+), 14 deletions(-) diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index cd4a46c0ea..0323b9ffac 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -78,22 +78,19 @@ export interface AbstractCursorOptions extends BSONSerializeOptions { session?: ClientSession; readPreference?: ReadPreferenceLike; readConcern?: ReadConcernLike; + /** + * Specifies the number of documents to return in each response from MongoDB + */ batchSize?: number; /** - * For **`tailable=false` cursor** OR **`tailable=true && awaitData=false` cursor**, - * - the driver MUST set `maxTimeMS` on the `find` command and MUST NOT set `maxTimeMS` on the `getMore` command. - * - If `maxTimeMS` is not set in options, the driver SHOULD refrain from setting **maxTimeMS** - * - * For **`tailable=true && awaitData=true` cursor** - * - the driver MUST provide a cursor level option named `maxAwaitTimeMS`. - * - The `maxTimeMS` option on the `getMore` command MUST be set to the value of the option `maxAwaitTimeMS`. - * - If no `maxAwaitTimeMS` is specified, the driver MUST not set `maxTimeMS` on the `getMore` command. - * - `maxAwaitTimeMS` option is not set on the `aggregate` command nor `$changeStream` pipeline stage - * - * ## `maxCommitTimeMS` - * Note, this option is an alias for the `maxTimeMS` commitTransaction command option. + * When applicable `maxTimeMS` controls the amount of time the initial command + * that constructs a cursor should take. (ex. find, aggregate, listCollections) */ maxTimeMS?: number; + /** + * When applicable `maxAwaitTimeMS` controls the amount of time subsequent getMores + * that a cursor uses to fetch more data should take. (ex. cursor.next()) + */ maxAwaitTimeMS?: number; /** * Comment to apply to the operation. @@ -104,7 +101,19 @@ export interface AbstractCursorOptions extends BSONSerializeOptions { * In server versions 4.4 and above, 'comment' can be any valid BSON type. */ comment?: unknown; + /** + * By default, MongoDB will automatically close a cursor when the + * client has exhausted all results in the cursor. However, for [capped collections](https://www.mongodb.com/docs/manual/core/capped-collections) + * you may use a Tailable Cursor that remains open after the client exhausts + * the results in the initial cursor. + */ tailable?: boolean; + /** + * If awaitData is set to true, when the cursor reaches the end of the capped collection, + * MongoDB blocks the query thread for a period of time waiting for new data to arrive. + * When new data is inserted into the capped collection, the blocked thread is signaled + * to wake up and return the next batch to the client. + */ awaitData?: boolean; noCursorTimeout?: boolean; } diff --git a/src/operations/command.ts b/src/operations/command.ts index f5f0d613c9..57186ab42c 100644 --- a/src/operations/command.ts +++ b/src/operations/command.ts @@ -44,7 +44,6 @@ export interface CommandOperationOptions readConcern?: ReadConcernLike; /** Collation */ collation?: CollationOptions; - /** TODO This is probably in the wrong place................. specs only mention this being a thing for createIndex/dropIndex */ maxTimeMS?: number; /** * Comment to apply to the operation. diff --git a/test/integration/crud/maxTimeMS.test.ts b/test/integration/crud/maxTimeMS.test.ts index 5ed376ace7..78906f443e 100644 --- a/test/integration/crud/maxTimeMS.test.ts +++ b/test/integration/crud/maxTimeMS.test.ts @@ -201,7 +201,7 @@ describe('MaxTimeMS', function () { ).to.have.lengthOf(27); }); - const metadata = { requires: { mongodb: '>=4', topology: ['replicaset'] as const } }; + const metadata = { requires: { mongodb: '>=5', topology: ['replicaset'] as const } }; for (const { options, outcome } of tests) { let optionsString = inspect(options, { breakLength: Infinity }); optionsString = optionsString.slice(1, optionsString.length - 1).trim(); From 0c0136a3dba6cd288daf3e904ebbc2b1598a2b88 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 3 Aug 2022 15:42:45 -0400 Subject: [PATCH 07/12] rm sleep --- test/integration/crud/maxTimeMS.test.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/test/integration/crud/maxTimeMS.test.ts b/test/integration/crud/maxTimeMS.test.ts index 78906f443e..d80d66813f 100644 --- a/test/integration/crud/maxTimeMS.test.ts +++ b/test/integration/crud/maxTimeMS.test.ts @@ -141,7 +141,6 @@ describe('MaxTimeMS', function () { .db() .dropCollection('cappedAt3') .catch(() => null); - await sleep(100); // Need to make sure it has had time to drop entirely cappedCollection = await client .db() .createCollection('cappedAt3', { capped: true, size: 4096, max: 3 }); From 6f885734d68baba5083464d4717d6fd952652fa1 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Wed, 3 Aug 2022 15:54:49 -0400 Subject: [PATCH 08/12] test: error cases --- test/unit/operations/get_more.test.ts | 48 +++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/test/unit/operations/get_more.test.ts b/test/unit/operations/get_more.test.ts index c150aae800..c4802104c1 100644 --- a/test/unit/operations/get_more.test.ts +++ b/test/unit/operations/get_more.test.ts @@ -192,6 +192,54 @@ describe('GetMoreOperation', function () { } }); }); + + context('error cases', () => { + const server = new Server(new Topology([], {} as any), new ServerDescription(''), {} as any); + sinon.stub(server, 'command').yieldsRight(); + + it('should throw if the cursorId is undefined', async () => { + const getMoreOperation = new GetMoreOperation( + ns('db.collection'), + // @ts-expect-error: Testing undefined cursorId + undefined, + server, + options + ); + const error = await promisify(getMoreOperation.execute.bind(getMoreOperation))( + server, + undefined + ).catch(error => error); + expect(error).to.be.instanceOf(MongoRuntimeError); + }); + + it('should throw if the collection is undefined', async () => { + const getMoreOperation = new GetMoreOperation( + ns('db'), + Long.fromNumber(1), + server, + options + ); + const error = await promisify(getMoreOperation.execute.bind(getMoreOperation))( + server, + undefined + ).catch(error => error); + expect(error).to.be.instanceOf(MongoRuntimeError); + }); + + it('should throw if the cursorId is zero', async () => { + const getMoreOperation = new GetMoreOperation( + ns('db.collection'), + Long.fromNumber(0), + server, + options + ); + const error = await promisify(getMoreOperation.execute.bind(getMoreOperation))( + server, + undefined + ).catch(error => error); + expect(error).to.be.instanceOf(MongoRuntimeError); + }); + }); }); describe('#hasAspect', function () { From 053b194261266aedc294e6edec914b1652c4743b Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Thu, 4 Aug 2022 13:36:28 -0400 Subject: [PATCH 09/12] fix: lint --- test/integration/crud/maxTimeMS.test.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/test/integration/crud/maxTimeMS.test.ts b/test/integration/crud/maxTimeMS.test.ts index d80d66813f..9b42a41dc3 100644 --- a/test/integration/crud/maxTimeMS.test.ts +++ b/test/integration/crud/maxTimeMS.test.ts @@ -9,7 +9,6 @@ import { MongoCursorExhaustedError, MongoServerError } from '../../../src'; -import { sleep } from '../../tools/utils'; import { assert as test, setupDatabase } from '../shared'; describe('MaxTimeMS', function () { From 586b3ed4a9b4754e77ef34c87575cffbc670a17b Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 5 Aug 2022 12:28:03 -0400 Subject: [PATCH 10/12] test: asyncify exisiting tests --- test/integration/crud/maxTimeMS.test.ts | 165 +++++++++--------------- 1 file changed, 59 insertions(+), 106 deletions(-) diff --git a/test/integration/crud/maxTimeMS.test.ts b/test/integration/crud/maxTimeMS.test.ts index 9b42a41dc3..8bc0de587d 100644 --- a/test/integration/crud/maxTimeMS.test.ts +++ b/test/integration/crud/maxTimeMS.test.ts @@ -9,121 +9,74 @@ import { MongoCursorExhaustedError, MongoServerError } from '../../../src'; -import { assert as test, setupDatabase } from '../shared'; +import { getSymbolFrom } from '../../tools/utils'; describe('MaxTimeMS', function () { - before(function () { - return setupDatabase(this.configuration); + let client: MongoClient; + let commandStartedEvents: CommandStartedEvent[]; + + beforeEach(async function () { + client = this.configuration.newClient({ monitorCommands: true }); + commandStartedEvents = []; + client.on('commandStarted', ev => commandStartedEvents.push(ev)); }); - it('should correctly respect the maxTimeMS property on count', function (done) { - const configuration = this.configuration; - const client = configuration.newClient(configuration.writeConcernMax(), { maxPoolSize: 1 }); - client.connect(function () { - const db = client.db(configuration.db); - const col = db.collection('max_time_ms'); - - // Insert a couple of docs - const docs_1 = [{ agg_pipe: 1 }]; - - // Simple insert - col.insertMany(docs_1, { writeConcern: { w: 1 } }, function (err) { - expect(err).to.not.exist; - - // Execute a find command - col - .find({ $where: 'sleep(100) || true' }) - .maxTimeMS(50) - .count(function (err) { - test.ok(err != null); - client.close(done); - }); - }); - }); + afterEach(async function () { + commandStartedEvents = []; + await client.close(); }); - it('should correctly respect the maxTimeMS property on toArray', { - metadata: { - requires: { - topology: ['single', 'replicaset'] - } - }, - - test: function (done) { - const configuration = this.configuration; - const client = configuration.newClient(configuration.writeConcernMax(), { maxPoolSize: 1 }); - client.connect(function () { - const db = client.db(configuration.db); - const col = db.collection('max_time_ms_2'); - - // Insert a couple of docs - const docs_1 = [{ agg_pipe: 1 }]; - - // Simple insert - col.insertMany(docs_1, { writeConcern: { w: 1 } }, function (err) { - expect(err).to.not.exist; - - // Execute a find command - col - .find({ $where: 'sleep(100) || true' }) - .maxTimeMS(50) - .toArray(function (err) { - test.ok(err != null); - client.close(done); - }); - }); - }); - } + it('should correctly respect the maxTimeMS property on count', async function () { + const col = client.db().collection('max_time_ms'); + await col.insertMany([{ agg_pipe: 1 }], { writeConcern: { w: 1 } }); + const cursor = col.find({ $where: 'sleep(100) || true' }).maxTimeMS(50); + const kBuiltOptions = getSymbolFrom(cursor, 'builtOptions'); + expect(cursor[kBuiltOptions]).to.have.property('maxTimeMS', 50); + + const error = await cursor.count().catch(error => error); + expect(error).to.be.instanceOf(MongoServerError); + + const countCommandEvent = commandStartedEvents.find(ev => ev.commandName === 'count'); + expect(countCommandEvent).to.have.nested.property('command.maxTimeMS', 50); }); - it('should correctly fail with maxTimeMS error', { - // Add a tag that our runner can trigger on - // in this case we are setting that node needs to be higher than 0.10.X to run - metadata: { - requires: { - topology: ['single', 'replicaset'] - } - }, - - test: function (done) { - const configuration = this.configuration; - const client = configuration.newClient(configuration.writeConcernMax(), { maxPoolSize: 1 }); - client.connect(function () { - const db = client.db(configuration.db); - const col = db.collection('max_time_ms_5'); - - // Insert a couple of docs - const docs_1 = [{ agg_pipe: 10 }]; - - // Simple insert - col.insertMany(docs_1, { writeConcern: { w: 1 } }, function (err) { - expect(err).to.not.exist; - - db.admin().command( - { configureFailPoint: 'maxTimeAlwaysTimeOut', mode: 'alwaysOn' }, - function (err, result) { - expect(err).to.not.exist; - test.equal(1, result?.ok); - - col - .find({}) - .maxTimeMS(10) - .toArray(function (err) { - test.ok(err != null); - - db.admin().command( - { configureFailPoint: 'maxTimeAlwaysTimeOut', mode: 'off' }, - function (err, result) { - expect(err).to.not.exist; - test.equal(1, result?.ok); - client.close(done); - } - ); - }); - } - ); - }); + it('should correctly respect the maxTimeMS property on toArray', async function () { + const col = client.db().collection('max_time_ms'); + await col.insertMany([{ agg_pipe: 1 }], { writeConcern: { w: 1 } }); + const cursor = col.find({ $where: 'sleep(100) || true' }).maxTimeMS(50); + const kBuiltOptions = getSymbolFrom(cursor, 'builtOptions'); + expect(cursor[kBuiltOptions]).to.have.property('maxTimeMS', 50); + + const error = await cursor.toArray().catch(error => error); + expect(error).to.be.instanceOf(MongoServerError); + + const findCommandEvent = commandStartedEvents.find(ev => ev.commandName === 'find'); + expect(findCommandEvent).to.have.nested.property('command.maxTimeMS', 50); + }); + + it('should correctly fail with maxTimeMS error', async function () { + const admin = client.db().admin(); + const col = client.db().collection('max_time_ms_5'); + + await col.insertMany([{ agg_pipe: 10 }], { writeConcern: { w: 1 } }); + + try { + const res = await admin.command({ + configureFailPoint: 'maxTimeAlwaysTimeOut', + mode: 'alwaysOn' }); + expect(res).to.have.property('ok', 1); + + const error = await col + .find({}) + .maxTimeMS(10) + .toArray() + .catch(error => error); + + expect(error).to.be.instanceOf(MongoServerError); + } finally { + const res = await admin.command({ configureFailPoint: 'maxTimeAlwaysTimeOut', mode: 'off' }); + expect(res).to.have.property('ok', 1); } }); From 83c1b56e6a829ab3b823239d386aab447a6c8b6d Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Tue, 9 Aug 2022 14:16:23 -0400 Subject: [PATCH 11/12] test: fix sharded cluster testing --- test/integration/change-streams/change_stream.test.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index c289c8e962..a0cf6748fc 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -1432,6 +1432,7 @@ describe('Change Streams', function () { }) .expectEvents({ client: 'client0', + ignoreExtraEvents: true, // Sharded clusters have extra getMores events: [ { commandStartedEvent: { commandName: 'aggregate', command: { maxTimeMS: 5000 } } }, { commandStartedEvent: { commandName: 'insert' } }, From e1dc674e2a8801cb2e3eb15fed738697cfd76eee Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Tue, 9 Aug 2022 17:52:28 -0400 Subject: [PATCH 12/12] test: fix up names, add existence assertion, combo test case --- .../change-streams/change_stream.test.ts | 47 +++++++++++++++++-- 1 file changed, 44 insertions(+), 3 deletions(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index a0cf6748fc..d7abc7a003 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -1383,7 +1383,9 @@ describe('Change Streams', function () { { collection: { id: 'collection0', database: 'db0', collectionName: 'watchOpts' } } ]) .test( - TestBuilder.it('should use maxAwaitTimeMS to send maxTimeMS on getMore commands') + TestBuilder.it( + 'should use maxAwaitTimeMS option to set maxTimeMS on getMore and should not set maxTimeMS on aggregate' + ) .operation({ object: 'collection0', name: 'createChangeStream', @@ -1404,7 +1406,12 @@ describe('Change Streams', function () { .expectEvents({ client: 'client0', events: [ - { commandStartedEvent: { commandName: 'aggregate' } }, + { + commandStartedEvent: { + commandName: 'aggregate', + command: { maxTimeMS: { $$exists: false } } + } + }, { commandStartedEvent: { commandName: 'insert' } }, { commandStartedEvent: { commandName: 'getMore', command: { maxTimeMS: 5000 } } } ] @@ -1412,7 +1419,9 @@ describe('Change Streams', function () { .toJSON() ) .test( - TestBuilder.it('should send maxTimeMS on aggregate command') + TestBuilder.it( + 'should use maxTimeMS option to set maxTimeMS on aggregate and not set maxTimeMS on getMore' + ) .operation({ object: 'collection0', name: 'createChangeStream', @@ -1446,6 +1455,38 @@ describe('Change Streams', function () { }) .toJSON() ) + .test( + TestBuilder.it( + 'should use maxTimeMS option to set maxTimeMS on aggregate and maxAwaitTimeMS option to set maxTimeMS on getMore' + ) + .operation({ + object: 'collection0', + name: 'createChangeStream', + saveResultAsEntity: 'changeStreamOnClient', + arguments: { maxTimeMS: 5000, maxAwaitTimeMS: 6000 } + }) + .operation({ + name: 'insertOne', + object: 'collection0', + arguments: { document: { a: 1 } }, + ignoreResultAndError: true + }) + .operation({ + object: 'changeStreamOnClient', + name: 'iterateUntilDocumentOrError', + ignoreResultAndError: true + }) + .expectEvents({ + client: 'client0', + ignoreExtraEvents: true, // Sharded clusters have extra getMores + events: [ + { commandStartedEvent: { commandName: 'aggregate', command: { maxTimeMS: 5000 } } }, + { commandStartedEvent: { commandName: 'insert' } }, + { commandStartedEvent: { commandName: 'getMore', command: { maxTimeMS: 6000 } } } + ] + }) + .toJSON() + ) .run(); describe('BSON Options', function () {