Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-4413): set maxTimeMS on getMores when maxAwaitTimeMS is specified #3319

Merged
merged 12 commits into from
Aug 10, 2022
47 changes: 31 additions & 16 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,20 @@ export interface AbstractCursorOptions extends BSONSerializeOptions {
session?: ClientSession;
readPreference?: ReadPreferenceLike;
readConcern?: ReadConcernLike;
/**
* Specifies the number of documents to return in each response from MongoDB
*/
batchSize?: number;
/**
* When applicable `maxTimeMS` controls the amount of time the initial command
* that constructs a cursor should take. (ex. find, aggregate, listCollections)
*/
maxTimeMS?: number;
/**
* When applicable `maxAwaitTimeMS` controls the amount of time subsequent getMores
* that a cursor uses to fetch more data should take. (ex. cursor.next())
*/
maxAwaitTimeMS?: number;
/**
* Comment to apply to the operation.
*
Expand All @@ -89,7 +101,19 @@ export interface AbstractCursorOptions extends BSONSerializeOptions {
* In server versions 4.4 and above, 'comment' can be any valid BSON type.
*/
comment?: unknown;
/**
* By default, MongoDB will automatically close a cursor when the
* client has exhausted all results in the cursor. However, for [capped collections](https://www.mongodb.com/docs/manual/core/capped-collections)
* you may use a Tailable Cursor that remains open after the client exhausts
* the results in the initial cursor.
*/
tailable?: boolean;
/**
* If awaitData is set to true, when the cursor reaches the end of the capped collection,
* MongoDB blocks the query thread for a period of time waiting for new data to arrive.
* When new data is inserted into the capped collection, the blocked thread is signaled
* to wake up and return the next batch to the client.
*/
awaitData?: boolean;
noCursorTimeout?: boolean;
}
Expand Down Expand Up @@ -155,7 +179,7 @@ export abstract class AbstractCursor<
}
this[kClient] = client;
this[kNamespace] = namespace;
this[kDocuments] = []; // TODO: https://github.com/microsoft/TypeScript/issues/36230
this[kDocuments] = [];
this[kInitialized] = false;
this[kClosed] = false;
this[kKilled] = false;
Expand Down Expand Up @@ -186,6 +210,10 @@ export abstract class AbstractCursor<
this[kOptions].maxTimeMS = options.maxTimeMS;
}

if (typeof options.maxAwaitTimeMS === 'number') {
this[kOptions].maxAwaitTimeMS = options.maxAwaitTimeMS;
}

if (options.session instanceof ClientSession) {
this[kSession] = options.session;
} else {
Expand Down Expand Up @@ -617,21 +645,8 @@ export abstract class AbstractCursor<

/** @internal */
_getMore(batchSize: number, callback: Callback<Document>): void {
const cursorId = this[kId];
const cursorNs = this[kNamespace];
const server = this[kServer];

if (cursorId == null) {
callback(new MongoRuntimeError('Unable to iterate cursor with no id'));
return;
}

if (server == null) {
callback(new MongoRuntimeError('Unable to iterate cursor without selected server'));
return;
}

const getMoreOperation = new GetMoreOperation(cursorNs, cursorId, server, {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const getMoreOperation = new GetMoreOperation(this[kNamespace], this[kId]!, this[kServer]!, {
Comment on lines +648 to +649
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have tests that test for errors being thrown when execute is called on a zero-ed cursor or if the server is undefined? If not, can we add them to demonstrate this behavior still exists?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added! :)

...this[kOptions],
session: this[kSession],
batchSize
Expand Down
6 changes: 5 additions & 1 deletion src/operations/get_more.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export class GetMoreOperation extends AbstractOperation {
cursorId: Long;
override options: GetMoreOptions;

constructor(ns: MongoDBNamespace, cursorId: Long, server: Server, options: GetMoreOptions = {}) {
constructor(ns: MongoDBNamespace, cursorId: Long, server: Server, options: GetMoreOptions) {
super(options);

this.options = options;
Expand All @@ -63,6 +63,10 @@ export class GetMoreOperation extends AbstractOperation {
);
}

if (this.cursorId == null || this.cursorId.isZero()) {
return callback(new MongoRuntimeError('Unable to iterate cursor with no id'));
}

const collection = this.ns.collection;
if (collection == null) {
// Cursors should have adopted the namespace returned by MongoDB
Expand Down
130 changes: 123 additions & 7 deletions test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1111,7 +1111,7 @@ describe('Change Streams', function () {
changeStream.next((err, doc) => {
expect(err).to.exist;
expect(doc).to.not.exist;
expect(err.message).to.equal('ChangeStream is closed');
expect(err?.message).to.equal('ChangeStream is closed');
changeStream.close(() => client.close(done));
});
});
Expand Down Expand Up @@ -1372,23 +1372,139 @@ describe('Change Streams', function () {
)
.run();

UnifiedTestSuiteBuilder.describe('entity.watch() server-side options')
.runOnRequirement({
topologies: ['replicaset', 'sharded-replicaset', 'sharded', 'load-balanced'],
minServerVersion: '4.4.0'
})
.createEntities([
{ client: { id: 'client0', observeEvents: ['commandStartedEvent'] } },
{ database: { id: 'db0', client: 'client0', databaseName: 'watchOpts' } },
{ collection: { id: 'collection0', database: 'db0', collectionName: 'watchOpts' } }
])
.test(
TestBuilder.it(
'should use maxAwaitTimeMS option to set maxTimeMS on getMore and should not set maxTimeMS on aggregate'
)
.operation({
object: 'collection0',
name: 'createChangeStream',
saveResultAsEntity: 'changeStreamOnClient',
arguments: { maxAwaitTimeMS: 5000 }
})
.operation({
name: 'insertOne',
object: 'collection0',
arguments: { document: { a: 1 } },
ignoreResultAndError: true
})
.operation({
object: 'changeStreamOnClient',
name: 'iterateUntilDocumentOrError',
ignoreResultAndError: true
})
.expectEvents({
client: 'client0',
events: [
{
commandStartedEvent: {
commandName: 'aggregate',
command: { maxTimeMS: { $$exists: false } }
}
},
{ commandStartedEvent: { commandName: 'insert' } },
{ commandStartedEvent: { commandName: 'getMore', command: { maxTimeMS: 5000 } } }
]
})
.toJSON()
)
.test(
TestBuilder.it(
'should use maxTimeMS option to set maxTimeMS on aggregate and not set maxTimeMS on getMore'
)
.operation({
object: 'collection0',
name: 'createChangeStream',
saveResultAsEntity: 'changeStreamOnClient',
arguments: { maxTimeMS: 5000 }
})
.operation({
name: 'insertOne',
object: 'collection0',
arguments: { document: { a: 1 } },
ignoreResultAndError: true
})
.operation({
object: 'changeStreamOnClient',
name: 'iterateUntilDocumentOrError',
ignoreResultAndError: true
})
.expectEvents({
client: 'client0',
ignoreExtraEvents: true, // Sharded clusters have extra getMores
events: [
{ commandStartedEvent: { commandName: 'aggregate', command: { maxTimeMS: 5000 } } },
{ commandStartedEvent: { commandName: 'insert' } },
{
commandStartedEvent: {
commandName: 'getMore',
command: { maxTimeMS: { $$exists: false } }
dariakp marked this conversation as resolved.
Show resolved Hide resolved
}
}
]
})
.toJSON()
)
.test(
TestBuilder.it(
'should use maxTimeMS option to set maxTimeMS on aggregate and maxAwaitTimeMS option to set maxTimeMS on getMore'
)
.operation({
object: 'collection0',
name: 'createChangeStream',
saveResultAsEntity: 'changeStreamOnClient',
arguments: { maxTimeMS: 5000, maxAwaitTimeMS: 6000 }
})
.operation({
name: 'insertOne',
object: 'collection0',
arguments: { document: { a: 1 } },
ignoreResultAndError: true
})
.operation({
object: 'changeStreamOnClient',
name: 'iterateUntilDocumentOrError',
ignoreResultAndError: true
})
.expectEvents({
client: 'client0',
ignoreExtraEvents: true, // Sharded clusters have extra getMores
events: [
{ commandStartedEvent: { commandName: 'aggregate', command: { maxTimeMS: 5000 } } },
{ commandStartedEvent: { commandName: 'insert' } },
{ commandStartedEvent: { commandName: 'getMore', command: { maxTimeMS: 6000 } } }
]
})
.toJSON()
)
.run();

describe('BSON Options', function () {
let client: MongoClient;
let db: Db;
let collection: Collection;
let cs: ChangeStream;

beforeEach(async function () {
client = await this.configuration.newClient({ monitorCommands: true }).connect();
db = client.db('db');
collection = await db.createCollection('collection');
});

afterEach(async function () {
await db.dropCollection('collection');
await cs.close();
await client.close();
client = undefined;
db = undefined;
collection = undefined;
});

context('promoteLongs', () => {
Expand Down Expand Up @@ -1452,7 +1568,7 @@ describe('Change Streams', function () {
it('does not send invalid options on the aggregate command', {
metadata: { requires: { topology: '!single' } },
test: async function () {
const started = [];
const started: CommandStartedEvent[] = [];

client.on('commandStarted', filterForCommands(['aggregate'], started));
const doc = { invalidBSONOption: true };
Expand All @@ -1473,7 +1589,7 @@ describe('Change Streams', function () {
it('does not send invalid options on the getMore command', {
metadata: { requires: { topology: '!single' } },
test: async function () {
const started = [];
const started: CommandStartedEvent[] = [];

client.on('commandStarted', filterForCommands(['aggregate'], started));
const doc = { invalidBSONOption: true };
Expand Down Expand Up @@ -1503,7 +1619,7 @@ describe('ChangeStream resumability', function () {
const changeStreamResumeOptions: ChangeStreamOptions = {
fullDocument: 'updateLookup',
collation: { locale: 'en', maxVariable: 'punct' },
maxAwaitTimeMS: 20000,
maxAwaitTimeMS: 2000,
batchSize: 200
};

Expand Down