diff --git a/src/change_stream.ts b/src/change_stream.ts index e2056a49eb..6d019dd5d5 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -1,6 +1,5 @@ -import Denque = require('denque'); import type { Readable } from 'stream'; -import { setTimeout } from 'timers'; +import { promisify } from 'util'; import type { Binary, Document, Timestamp } from './bson'; import { Collection } from './collection'; @@ -20,21 +19,9 @@ import { InferIdType, TypedEventEmitter } from './mongo_types'; import type { AggregateOptions } from './operations/aggregate'; import type { CollationOptions, OperationParent } from './operations/command'; import type { ReadPreference } from './read_preference'; -import type { Topology } from './sdam/topology'; import type { ServerSessionId } from './sessions'; -import { - calculateDurationInMs, - Callback, - filterOptions, - getTopology, - maxWireVersion, - maybePromise, - MongoDBNamespace, - now -} from './utils'; +import { Callback, filterOptions, getTopology, maybePromise, MongoDBNamespace } from './utils'; -/** @internal */ -const kResumeQueue = Symbol('resumeQueue'); /** @internal */ const kCursorStream = Symbol('cursorStream'); /** @internal */ @@ -57,19 +44,10 @@ const CHANGE_DOMAIN_TYPES = { CLUSTER: Symbol('Cluster') }; -interface TopologyWaitOptions { - start?: number; - timeout?: number; - readPreference?: ReadPreference; -} - -const SELECTION_TIMEOUT = 30000; - const CHANGE_STREAM_EVENTS = [RESUME_TOKEN_CHANGED, END, CLOSE]; const NO_RESUME_TOKEN_ERROR = 'A change stream document has been received that lacks a resume token (_id).'; -const NO_CURSOR_ERROR = 'ChangeStream has no cursor'; const CHANGESTREAM_CLOSED_ERROR = 'ChangeStream is closed'; /** @@ -548,11 +526,9 @@ export class ChangeStream< namespace: MongoDBNamespace; type: symbol; /** @internal */ - cursor: ChangeStreamCursor | undefined; + cursor: ChangeStreamCursor; streamOptions?: CursorStreamOptions; /** @internal */ - [kResumeQueue]: Denque>>; - /** @internal */ [kCursorStream]?: Readable & AsyncIterable; /** @internal */ [kClosed]: boolean; @@ -618,8 +594,6 @@ export class ChangeStream< this.options.readPreference = parent.readPreference; } - this[kResumeQueue] = new Denque(); - // Create contained Change Stream cursor this.cursor = this._createChangeStreamCursor(options); @@ -655,11 +629,28 @@ export class ChangeStream< hasNext(callback: Callback): void; hasNext(callback?: Callback): Promise | void { this._setIsIterator(); - return maybePromise(callback, cb => { - this._getCursor((err, cursor) => { - if (err || !cursor) return cb(err); // failed to resume, raise an error - cursor.hasNext(cb); - }); + // TOOD(NODE-4319): Add eslint rule preventing accidental variable shadowing + // Shadowing is intentional here. We want to override the `callback` variable + // from the outer scope so that the inner scope doesn't accidentally call the wrong callback. + return maybePromise(callback, callback => { + (async () => { + try { + const hasNext = await this.cursor.hasNext(); + return hasNext; + } catch (error) { + try { + await this._processErrorIteratorMode(error); + const hasNext = await this.cursor.hasNext(); + return hasNext; + } catch (error) { + await this.close().catch(err => err); + throw error; + } + } + })().then( + hasNext => callback(undefined, hasNext), + error => callback(error) + ); }); } @@ -668,18 +659,30 @@ export class ChangeStream< next(callback: Callback): void; next(callback?: Callback): Promise | void { this._setIsIterator(); - return maybePromise(callback, cb => { - this._getCursor((err, cursor) => { - if (err || !cursor) return cb(err); // failed to resume, raise an error - cursor.next((error, change) => { - if (error) { - this[kResumeQueue].push(() => this.next(cb)); - this._processError(error, cb); - return; + // TOOD(NODE-4319): Add eslint rule preventing accidental variable shadowing + // Shadowing is intentional here. We want to override the `callback` variable + // from the outer scope so that the inner scope doesn't accidentally call the wrong callback. + return maybePromise(callback, callback => { + (async () => { + try { + const change = await this.cursor.next(); + const processedChange = this._processChange(change ?? null); + return processedChange; + } catch (error) { + try { + await this._processErrorIteratorMode(error); + const change = await this.cursor.next(); + const processedChange = this._processChange(change ?? null); + return processedChange; + } catch (error) { + await this.close().catch(err => err); + throw error; } - this._processNewChange(change ?? null, cb); - }); - }); + } + })().then( + change => callback(undefined, change), + error => callback(error) + ); }); } @@ -690,32 +693,46 @@ export class ChangeStream< tryNext(callback: Callback): void; tryNext(callback?: Callback): Promise | void { this._setIsIterator(); - return maybePromise(callback, cb => { - this._getCursor((err, cursor) => { - if (err || !cursor) return cb(err); // failed to resume, raise an error - return cursor.tryNext(cb); - }); + // TOOD(NODE-4319): Add eslint rule preventing accidental variable shadowing + // Shadowing is intentional here. We want to override the `callback` variable + // from the outer scope so that the inner scope doesn't accidentally call the wrong callback. + return maybePromise(callback, callback => { + (async () => { + try { + const change = await this.cursor.tryNext(); + return change ?? null; + } catch (error) { + try { + await this._processErrorIteratorMode(error); + const change = await this.cursor.tryNext(); + return change ?? null; + } catch (error) { + await this.close().catch(err => err); + throw error; + } + } + })().then( + change => callback(undefined, change), + error => callback(error) + ); }); } /** Is the cursor closed */ get closed(): boolean { - return this[kClosed] || (this.cursor?.closed ?? false); + return this[kClosed] || this.cursor.closed; } /** Close the Change Stream */ + close(): Promise; + close(callback: Callback): void; close(callback?: Callback): Promise | void { this[kClosed] = true; return maybePromise(callback, cb => { - if (!this.cursor) { - return cb(); - } - const cursor = this.cursor; return cursor.close(err => { this._endStream(); - this.cursor = undefined; return cb(err); }); }); @@ -723,11 +740,18 @@ export class ChangeStream< /** * Return a modified Readable stream including a possible transform method. - * @throws MongoDriverError if this.cursor is undefined + * + * NOTE: When using a Stream to process change stream events, the stream will + * NOT automatically resume in the case a resumable error is encountered. + * + * @throws MongoChangeStreamError if the underlying cursor or the change stream is closed */ stream(options?: CursorStreamOptions): Readable & AsyncIterable { + if (this.closed) { + throw new MongoChangeStreamError(CHANGESTREAM_CLOSED_ERROR); + } + this.streamOptions = options; - if (!this.cursor) throw new MongoChangeStreamError(NO_CURSOR_ERROR); return this.cursor.stream(options); } @@ -800,43 +824,13 @@ export class ChangeStream< return changeStreamCursor; } - /** - * This method performs a basic server selection loop, satisfying the requirements of - * ChangeStream resumability until the new SDAM layer can be used. - * @internal - */ - private _waitForTopologyConnected( - topology: Topology, - options: TopologyWaitOptions, - callback: Callback - ) { - setTimeout(() => { - if (options && options.start == null) { - options.start = now(); - } - - const start = options.start || now(); - const timeout = options.timeout || SELECTION_TIMEOUT; - if (topology.isConnected()) { - return callback(); - } - - if (calculateDurationInMs(start) > timeout) { - // TODO(NODE-3497): Replace with MongoNetworkTimeoutError - return callback(new MongoRuntimeError('Timed out waiting for connection')); - } - - this._waitForTopologyConnected(topology, options, callback); - }, 500); // this is an arbitrary wait time to allow SDAM to transition - } - /** @internal */ - private _closeWithError(error: AnyError, callback?: Callback): void { - if (!callback) { - this.emit(ChangeStream.ERROR, error); - } + private _closeEmitterModeWithError(error: AnyError): void { + this.emit(ChangeStream.ERROR, error); - this.close(() => callback && callback(error)); + this.close(() => { + // nothing to do + }); } /** @internal */ @@ -844,8 +838,15 @@ export class ChangeStream< this._setIsEmitter(); const stream = this[kCursorStream] ?? cursor.stream(); this[kCursorStream] = stream; - stream.on('data', change => this._processNewChange(change)); - stream.on('error', error => this._processError(error)); + stream.on('data', change => { + try { + const processedChange = this._processChange(change); + this.emit(ChangeStream.CHANGE, processedChange); + } catch (error) { + this.emit(ChangeStream.ERROR, error); + } + }); + stream.on('error', error => this._processErrorStreamMode(error)); } /** @internal */ @@ -860,127 +861,80 @@ export class ChangeStream< } /** @internal */ - private _processNewChange(change: TChange | null, callback?: Callback) { + private _processChange(change: TChange | null): TChange { if (this[kClosed]) { // TODO(NODE-3485): Replace with MongoChangeStreamClosedError - if (callback) callback(new MongoAPIError(CHANGESTREAM_CLOSED_ERROR)); - return; + throw new MongoAPIError(CHANGESTREAM_CLOSED_ERROR); } // a null change means the cursor has been notified, implicitly closing the change stream if (change == null) { // TODO(NODE-3485): Replace with MongoChangeStreamClosedError - return this._closeWithError(new MongoRuntimeError(CHANGESTREAM_CLOSED_ERROR), callback); + throw new MongoRuntimeError(CHANGESTREAM_CLOSED_ERROR); } if (change && !change._id) { - return this._closeWithError(new MongoChangeStreamError(NO_RESUME_TOKEN_ERROR), callback); + throw new MongoChangeStreamError(NO_RESUME_TOKEN_ERROR); } // cache the resume token - this.cursor?.cacheResumeToken(change._id); + this.cursor.cacheResumeToken(change._id); // wipe the startAtOperationTime if there was one so that there won't be a conflict // between resumeToken and startAtOperationTime if we need to reconnect the cursor this.options.startAtOperationTime = undefined; - // Return the change - if (!callback) return this.emit(ChangeStream.CHANGE, change); - return callback(undefined, change); + return change; } /** @internal */ - private _processError(error: AnyError, callback?: Callback) { - const cursor = this.cursor; - + private _processErrorStreamMode(changeStreamError: AnyError) { // If the change stream has been closed explicitly, do not process error. - if (this[kClosed]) { - // TODO(NODE-3485): Replace with MongoChangeStreamClosedError - if (callback) callback(new MongoAPIError(CHANGESTREAM_CLOSED_ERROR)); - return; - } + if (this[kClosed]) return; - // if the resume succeeds, continue with the new cursor - const resumeWithCursor = (newCursor: ChangeStreamCursor) => { - this.cursor = newCursor; - this._processResumeQueue(); - }; - - if (cursor && isResumableError(error, maxWireVersion(cursor.server))) { - this.cursor = undefined; - - // stop listening to all events from old cursor + if (isResumableError(changeStreamError, this.cursor.maxWireVersion)) { this._endStream(); - - // close internal cursor, ignore errors - cursor.close(); + this.cursor.close(); const topology = getTopology(this.parent); - this._waitForTopologyConnected(topology, { readPreference: cursor.readPreference }, err => { - // if the topology can't reconnect, close the stream - if (err) return this._closeWithError(err, callback); - - // create a new cursor, preserving the old cursor's options - const newCursor = this._createChangeStreamCursor(cursor.resumeOptions); - - // attempt to continue in emitter mode - if (!callback) return resumeWithCursor(newCursor); - - // attempt to continue in iterator mode - newCursor.hasNext(err => { - // if there's an error immediately after resuming, close the stream - if (err) return this._closeWithError(err); - resumeWithCursor(newCursor); - }); + topology.selectServer(this.cursor.readPreference, {}, serverSelectionError => { + if (serverSelectionError) return this._closeEmitterModeWithError(changeStreamError); + this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions); }); - return; + } else { + this._closeEmitterModeWithError(changeStreamError); } - - // if initial error wasn't resumable, raise an error and close the change stream - return this._closeWithError(error, callback); } + /** + * @internal + * + * TODO(NODE-4320): promisify selectServer and refactor this code to be async + * + * we promisify _processErrorIteratorModeCallback until we have a promisifed version of selectServer. + */ + private _processErrorIteratorMode = promisify(this._processErrorIteratorModeCallback); + /** @internal */ - private _getCursor(callback: Callback>) { + private _processErrorIteratorModeCallback(changeStreamError: AnyError, callback: Callback) { if (this[kClosed]) { // TODO(NODE-3485): Replace with MongoChangeStreamClosedError - callback(new MongoAPIError(CHANGESTREAM_CLOSED_ERROR)); - return; + return callback(new MongoAPIError(CHANGESTREAM_CLOSED_ERROR)); } - // if a cursor exists and it is open, return it - if (this.cursor) { - callback(undefined, this.cursor); - return; - } + if (isResumableError(changeStreamError, this.cursor.maxWireVersion)) { + this.cursor.close(); - // no cursor, queue callback until topology reconnects - this[kResumeQueue].push(callback); - } + const topology = getTopology(this.parent); + topology.selectServer(this.cursor.readPreference, {}, serverSelectionError => { + // if the topology can't reconnect, close the stream + if (serverSelectionError) return this.close(() => callback(changeStreamError)); - /** - * Drain the resume queue when a new has become available - * @internal - * - * @param error - error getting a new cursor - */ - private _processResumeQueue(error?: Error) { - while (this[kResumeQueue].length) { - const request = this[kResumeQueue].pop(); - if (!request) break; // Should never occur but TS can't use the length check in the while condition - - if (!error) { - if (this[kClosed]) { - // TODO(NODE-3485): Replace with MongoChangeStreamClosedError - request(new MongoAPIError(CHANGESTREAM_CLOSED_ERROR)); - return; - } - if (!this.cursor) { - request(new MongoChangeStreamError(NO_CURSOR_ERROR)); - return; - } - } - request(error, this.cursor ?? undefined); + this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions); + callback(); + }); + } else { + this.close(() => callback(changeStreamError)); } } } diff --git a/src/cursor/change_stream_cursor.ts b/src/cursor/change_stream_cursor.ts index b75c38f18d..b5a4ae4630 100644 --- a/src/cursor/change_stream_cursor.ts +++ b/src/cursor/change_stream_cursor.ts @@ -53,6 +53,13 @@ export class ChangeStreamCursor< postBatchResumeToken?: ResumeToken; pipeline: Document[]; + /** + * @internal + * + * used to determine change stream resumability + */ + maxWireVersion: number | undefined; + constructor( client: MongoClient, namespace: MongoDBNamespace, @@ -148,11 +155,13 @@ export class ChangeStreamCursor< } const server = aggregateOperation.server; + this.maxWireVersion = maxWireVersion(server); + if ( this.startAtOperationTime == null && this.resumeAfter == null && this.startAfter == null && - maxWireVersion(server) >= 7 + this.maxWireVersion >= 7 ) { this.startAtOperationTime = response.operationTime; } @@ -174,6 +183,7 @@ export class ChangeStreamCursor< return callback(err); } + this.maxWireVersion = maxWireVersion(this.server); this._processBatch(response as TODO_NODE_3286 as ChangeStreamAggregateRawResult); this.emit(ChangeStream.MORE, response); diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 2e6e96938f..48c5ac1dcc 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -1,6 +1,8 @@ import { strict as assert } from 'assert'; import { expect } from 'chai'; import { on, once } from 'events'; +import { gte, lt } from 'semver'; +import * as sinon from 'sinon'; import { PassThrough } from 'stream'; import { setTimeout } from 'timers'; import { promisify } from 'util'; @@ -10,18 +12,24 @@ import { ChangeStream, ChangeStreamOptions, Collection, + CommandStartedEvent, Db, Long, MongoChangeStreamError, MongoClient, - MongoNetworkError, MongoServerError, ReadPreference, ResumeToken } from '../../../src'; import { isHello } from '../../../src/utils'; import * as mock from '../../tools/mongodb-mock/index'; -import { getSymbolFrom, sleep, TestBuilder, UnifiedTestSuiteBuilder } from '../../tools/utils'; +import { + FailPoint, + getSymbolFrom, + sleep, + TestBuilder, + UnifiedTestSuiteBuilder +} from '../../tools/utils'; import { delay, filterForCommands } from '../shared'; const initIteratorMode = async (cs: ChangeStream) => { @@ -353,8 +361,8 @@ describe('Change Streams', function () { expect(err).to.not.exist; // Check the cursor is closed - assert.equal(changeStream.closed, true); - assert.ok(!changeStream.cursor); + expect(changeStream.closed).to.be.true; + expect(changeStream.cursor.closed).to.be.true; done(); }); }); @@ -765,30 +773,6 @@ describe('Change Streams', function () { } }); - it('should maintain change stream options on resume', { - metadata: { requires: { topology: 'replicaset' } }, - async test() { - const collectionName = 'resumeAfterKillCursor'; - const changeStreamOptions: ChangeStreamOptions = { - fullDocument: 'updateLookup', - collation: { locale: 'en', maxVariable: 'punct' }, - maxAwaitTimeMS: 20000, - batchSize: 200 - }; - - const db = client.db('integration_tests'); - const coll = db.collection(collectionName); - const changeStream = coll.watch([], changeStreamOptions); - - await initIteratorMode(changeStream); - await changeStream.tryNext(); - - expect(changeStream.cursor.resumeOptions).to.containSubset(changeStreamOptions); - - await changeStream.close(); - } - }); - describe('should error when used as iterator and emitter concurrently', function () { let client, coll, changeStream, kMode; @@ -1053,60 +1037,6 @@ describe('Change Streams', function () { }); describe('Change Stream Resume Error Tests', function () { - it.skip('should continue emitting change events after a resumable error', { - metadata: { requires: { topology: 'replicaset' } }, - async test() { - const changes = on(changeStream, 'change'); - await once(changeStream.cursor, 'init'); - - await collection.insertOne({ a: 42 }); - - changeStream.cursorStream.emit('error', new MongoNetworkError('error triggered from test')); - - await collection.insertOne({ b: 24 }); - - const changesCollected = []; - for await (const [change] of changes) { - changesCollected.push(change); - if (changesCollected.length === 2) { - break; - } - } - - expect(changesCollected[0]).to.have.nested.property('fullDocument.a'); - expect(changesCollected[1]).to.have.nested.property('fullDocument.b'); - } - }).skipReason = 'TODO(NODE-4125): resumability needs fixing (this test fails only on 3.6)'; - - it.skip('should continue iterating changes after a resumable error', { - metadata: { requires: { topology: 'replicaset' } }, - async test() { - await initIteratorMode(changeStream); - await collection.insertOne({ a: 42 }); - - // NOTE the error comes from cursor.next. ChangeStream squashes it - const oldNext = changeStream.cursor.next.bind(changeStream.cursor); - - // @ts-expect-error: simulating network error - changeStream.cursor.next = function (callback) { - changeStream.cursor.next = oldNext; - return callback(new MongoNetworkError('error triggered from test')); - }; - - await changeStream.hasNext(); - const changeA = await changeStream.next(); - expect(changeA).to.have.property('operationType', 'insert'); - expect(changeA).to.have.nested.property('fullDocument.a', 42); - - await collection.insertOne({ b: 24 }); - - await changeStream.hasNext(); - const changeB = await changeStream.next(); - expect(changeB).to.have.property('operationType', 'insert'); - expect(changeB).to.have.nested.property('fullDocument.b', 24); - } - }).skipReason = 'TODO(NODE-4125): resumability needs fixing (this test fails only on 3.6)'; - it.skip('should continue piping changes after a resumable error', { metadata: { requires: { topology: 'replicaset' } }, test: done => { @@ -1476,7 +1406,7 @@ describe('Change Streams', function () { const change = await willBeChange; - expect(typeof change.fullDocument.a).to.equal('number'); + expect(change.fullDocument.a).to.be.a('number'); } }); }); @@ -1563,3 +1493,609 @@ describe('Change Streams', function () { }); }); }); + +describe('ChangeStream resumability', function () { + let client: MongoClient; + let collection: Collection; + let changeStream: ChangeStream; + let aggregateEvents: CommandStartedEvent[] = []; + + const changeStreamResumeOptions: ChangeStreamOptions = { + fullDocument: 'updateLookup', + collation: { locale: 'en', maxVariable: 'punct' }, + maxAwaitTimeMS: 20000, + batchSize: 200 + }; + + const resumableErrorCodes = [ + { error: 'HostUnreachable', code: 6 }, + { error: 'HostNotFound', code: 7 }, + { error: 'NetworkTimeout', code: 89 }, + { error: 'ShutdownInProgress', code: 91 }, + { error: 'PrimarySteppedDown', code: 189 }, + { error: 'ExceededTimeLimit', code: 262 }, + { error: 'SocketException', code: 9001 }, + { error: 'NotWritablePrimary', code: 10107 }, + { error: 'InterruptedAtShutdown', code: 11600 }, + { error: 'InterruptedDueToReplStateChange', code: 11602 }, + { error: 'NotPrimaryNoSecondaryOk', code: 13435 }, + { error: 'StaleShardVersion', code: 63 }, + { error: 'StaleEpoch', code: 150 }, + { error: 'RetryChangeStream', code: 234 }, + { error: 'FailedToSatisfyReadPreference', code: 133 }, + { error: 'CursorNotFound', code: 43 } + ]; + + const is4_2Server = (serverVersion: string) => + gte(serverVersion, '4.2.0') && lt(serverVersion, '4.3.0'); + + beforeEach(async function () { + const dbName = 'resumabilty_tests'; + const collectionName = 'foo'; + const utilClient = this.configuration.newClient(); + // 3.6 servers do not support creating a change stream on a database that doesn't exist + await utilClient + .db(dbName) + .dropDatabase() + .catch(e => e); + await utilClient.db(dbName).createCollection(collectionName); + await utilClient.close(); + + client = this.configuration.newClient({ monitorCommands: true }); + client.on('commandStarted', filterForCommands(['aggregate'], aggregateEvents)); + collection = client.db(dbName).collection(collectionName); + }); + + afterEach(async function () { + await changeStream.close(); + await client.close(); + aggregateEvents = []; + }); + + context('iterator api', function () { + context('#next', function () { + for (const { error, code } of resumableErrorCodes) { + it( + `resumes on error code ${code} (${error})`, + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: { times: 1 }, + data: { + failCommands: ['getMore'], + errorCode: code + } + } as FailPoint); + + await collection.insertOne({ name: 'bailey' }); + + const change = await changeStream.next(); + expect(change).to.have.property('operationType', 'insert'); + + expect(aggregateEvents).to.have.lengthOf(2); + } + ); + } + for (const { error, code } of resumableErrorCodes) { + it( + `resumes on error code ${code} (${error})`, + { requires: { topology: '!single', mongodb: '<4.2' } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + + // on 3.6 servers, no postBatchResumeToken is sent back in the initial aggregate response. + // This means that a resume token isn't cached until the first change has been iterated. + // In order to test the resume, we need to ensure that at least one document has + // been iterated so we have a resume token to resume on. + await collection.insertOne({ name: 'bailey' }); + await changeStream.next(); + + const mock = sinon + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + .stub(changeStream.cursor.server!, 'getMore') + .callsFake((_ns, _cursorId, _options, callback) => { + mock.restore(); + const error = new MongoServerError({ message: 'Something went wrong' }); + error.code = code; + callback(error); + }); + + await collection.insertOne({ name: 'bailey' }); + + const change = await changeStream.next(); + + expect(change).to.have.property('operationType', 'insert'); + expect(aggregateEvents).to.have.lengthOf(2); + } + ); + } + + it( + 'maintains change stream options on resume', + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + changeStream = collection.watch([], changeStreamResumeOptions); + await initIteratorMode(changeStream); + + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: { times: 1 }, + data: { + failCommands: ['getMore'], + errorCode: resumableErrorCodes[0].code + } + } as FailPoint); + + expect(changeStream.cursor) + .to.have.property('options') + .that.containSubset(changeStreamResumeOptions); + + await collection.insertOne({ name: 'bailey' }); + + await changeStream.next(); + + expect(changeStream.cursor) + .to.have.property('options') + .that.containSubset(changeStreamResumeOptions); + } + ); + + context('when the error is not a resumable error', function () { + it( + 'does not resume', + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + changeStream = collection.watch([]); + + const unresumableErrorCode = 1000; + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: { times: 1 }, + data: { + failCommands: ['getMore'], + errorCode: unresumableErrorCode + } + } as FailPoint); + + await initIteratorMode(changeStream); + + await collection.insertOne({ name: 'bailey' }); + + const error = await changeStream.next().catch(err => err); + + expect(error).to.be.instanceOf(MongoServerError); + expect(aggregateEvents).to.have.lengthOf(1); + } + ); + }); + }); + + context('#hasNext', function () { + for (const { error, code } of resumableErrorCodes) { + it( + `resumes on error code ${code} (${error})`, + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: { times: 1 }, + data: { + failCommands: ['getMore'], + errorCode: code + } + } as FailPoint); + + await collection.insertOne({ name: 'bailey' }); + + const hasNext = await changeStream.hasNext(); + expect(hasNext).to.be.true; + + expect(aggregateEvents).to.have.lengthOf(2); + } + ); + } + + for (const { error, code } of resumableErrorCodes) { + it( + `resumes on error code ${code} (${error})`, + { requires: { topology: '!single', mongodb: '<4.2' } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + + // on 3.6 servers, no postBatchResumeToken is sent back in the initial aggregate response. + // This means that a resume token isn't cached until the first change has been iterated. + // In order to test the resume, we need to ensure that at least one document has + // been iterated so we have a resume token to resume on. + await collection.insertOne({ name: 'bailey' }); + await changeStream.next(); + + const mock = sinon + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + .stub(changeStream.cursor.server!, 'getMore') + .callsFake((_ns, _cursorId, _options, callback) => { + mock.restore(); + const error = new MongoServerError({ message: 'Something went wrong' }); + error.code = code; + callback(error); + }); + + await collection.insertOne({ name: 'bailey' }); + + const hasNext = await changeStream.hasNext(); + expect(hasNext).to.be.true; + + expect(aggregateEvents).to.have.lengthOf(2); + } + ); + } + + it( + 'maintains change stream options on resume', + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + changeStream = collection.watch([], changeStreamResumeOptions); + await initIteratorMode(changeStream); + + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: { times: 1 }, + data: { + failCommands: ['getMore'], + errorCode: resumableErrorCodes[0].code + } + } as FailPoint); + + expect(changeStream.cursor) + .to.have.property('options') + .that.containSubset(changeStreamResumeOptions); + + await collection.insertOne({ name: 'bailey' }); + + await changeStream.hasNext(); + + expect(changeStream.cursor) + .to.have.property('options') + .that.containSubset(changeStreamResumeOptions); + } + ); + + context('when the error is not a resumable error', function () { + it( + 'does not resume', + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + changeStream = collection.watch([]); + + const unresumableErrorCode = 1000; + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: { times: 1 }, + data: { + failCommands: ['getMore'], + errorCode: unresumableErrorCode + } + } as FailPoint); + + await initIteratorMode(changeStream); + + await collection.insertOne({ name: 'bailey' }); + + const error = await changeStream.hasNext().catch(err => err); + + expect(error).to.be.instanceOf(MongoServerError); + expect(aggregateEvents).to.have.lengthOf(1); + } + ); + }); + }); + + context('#tryNext', function () { + for (const { error, code } of resumableErrorCodes) { + it( + `resumes on error code ${code} (${error})`, + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: { times: 1 }, + data: { + failCommands: ['getMore'], + errorCode: code + } + } as FailPoint); + + try { + // tryNext is not blocking and on sharded clusters we don't have control of when + // the actual change event will be ready on the change stream pipeline. This introduces + // a race condition, where sometimes we receive the change event and sometimes + // we don't when we call tryNext, depending on the timing of the sharded cluster. + + // Since we really only care about the resumability, it's enough for this test to throw + // if tryNext ever throws and assert on the number of aggregate events. + await changeStream.tryNext(); + } catch (err) { + expect.fail(`expected tryNext to resume, received error instead: ${err}`); + } + expect(aggregateEvents).to.have.lengthOf(2); + } + ); + } + + for (const { error, code } of resumableErrorCodes) { + it( + `resumes on error code ${code} (${error})`, + { requires: { topology: '!single', mongodb: '<4.2' } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + + // on 3.6 servers, no postBatchResumeToken is sent back in the initial aggregate response. + // This means that a resume token isn't cached until the first change has been iterated. + // In order to test the resume, we need to ensure that at least one document has + // been iterated so we have a resume token to resume on. + await collection.insertOne({ name: 'bailey' }); + await changeStream.next(); + + const mock = sinon + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + .stub(changeStream.cursor.server!, 'getMore') + .callsFake((_ns, _cursorId, _options, callback) => { + mock.restore(); + const error = new MongoServerError({ message: 'Something went wrong' }); + error.code = code; + callback(error); + }); + + try { + // tryNext is not blocking and on sharded clusters we don't have control of when + // the actual change event will be ready on the change stream pipeline. This introduces + // a race condition, where sometimes we receive the change event and sometimes + // we don't when we call tryNext, depending on the timing of the sharded cluster. + + // Since we really only care about the resumability, it's enough for this test to throw + // if tryNext ever throws and assert on the number of aggregate events. + await changeStream.tryNext(); + } catch (err) { + expect.fail(`expected tryNext to resume, received error instead: ${err}`); + } + expect(aggregateEvents).to.have.lengthOf(2); + } + ); + } + + it( + 'maintains change stream options on resume', + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + changeStream = collection.watch([], changeStreamResumeOptions); + await initIteratorMode(changeStream); + + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: { times: 1 }, + data: { + failCommands: ['getMore'], + errorCode: resumableErrorCodes[0].code + } + } as FailPoint); + + expect(changeStream.cursor) + .to.have.property('options') + .that.containSubset(changeStreamResumeOptions); + + await collection.insertOne({ name: 'bailey' }); + + await changeStream.tryNext(); + + expect(changeStream.cursor) + .to.have.property('options') + .that.containSubset(changeStreamResumeOptions); + } + ); + + context('when the error is not a resumable error', function () { + it( + 'does not resume', + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + changeStream = collection.watch([]); + + const unresumableErrorCode = 1000; + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: { times: 1 }, + data: { + failCommands: ['getMore'], + errorCode: unresumableErrorCode + } + } as FailPoint); + + await initIteratorMode(changeStream); + + const error = await changeStream.tryNext().catch(err => err); + + expect(error).to.be.instanceOf(MongoServerError); + expect(aggregateEvents).to.have.lengthOf(1); + } + ); + }); + }); + }); + + describe('event emitter based iteration', function () { + for (const { error, code } of resumableErrorCodes) { + it( + `resumes on error code ${code} (${error})`, + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + changeStream = collection.watch([]); + + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: { times: 1 }, + data: { + failCommands: ['getMore'], + errorCode: code + } + } as FailPoint); + + const changes = once(changeStream, 'change'); + await once(changeStream.cursor, 'init'); + + await collection.insertOne({ name: 'bailey' }); + + const [change] = await changes; + expect(change).to.have.property('operationType', 'insert'); + + expect(aggregateEvents).to.have.lengthOf(2); + } + ); + } + + it( + 'maintains the change stream options on resume', + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + changeStream = collection.watch([], changeStreamResumeOptions); + + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: { times: 1 }, + data: { + failCommands: ['getMore'], + errorCode: resumableErrorCodes[0].code + } + } as FailPoint); + + expect(changeStream.cursor) + .to.have.property('options') + .that.containSubset(changeStreamResumeOptions); + + const changes = once(changeStream, 'change'); + await once(changeStream.cursor, 'init'); + + await collection.insertOne({ name: 'bailey' }); + + await changes; + + expect(changeStream.cursor) + .to.have.property('options') + .that.containSubset(changeStreamResumeOptions); + } + ); + + context('when the error is not a resumable error', function () { + it( + 'does not resume', + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + changeStream = collection.watch([]); + + const unresumableErrorCode = 1000; + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: { times: 1 }, + data: { + failCommands: ['getMore'], + errorCode: unresumableErrorCode + } + } as FailPoint); + + const willBeError = once(changeStream, 'change').catch(error => error); + await once(changeStream.cursor, 'init'); + await collection.insertOne({ name: 'bailey' }); + + const error = await willBeError; + + expect(error).to.be.instanceOf(MongoServerError); + expect(aggregateEvents).to.have.lengthOf(1); + } + ); + }); + }); + + it( + 'caches the server version after the initial aggregate call', + { requires: { topology: '!single' } }, + async function () { + changeStream = collection.watch([], changeStreamResumeOptions); + expect(changeStream.cursor.maxWireVersion).to.be.undefined; + await initIteratorMode(changeStream); + + expect(changeStream.cursor.maxWireVersion).to.be.a('number'); + } + ); + + it( + 'updates the cached server version after the first getMore call', + { requires: { topology: '!single' } }, + async function () { + changeStream = collection.watch([], changeStreamResumeOptions); + await initIteratorMode(changeStream); + + const maxWireVersion = changeStream.cursor.maxWireVersion; + changeStream.cursor.maxWireVersion = -1; + + await changeStream.tryNext(); + + expect(changeStream.cursor.maxWireVersion).equal(maxWireVersion); + } + ); + + it( + 'updates the cached server version after each getMore call', + { requires: { topology: '!single' } }, + async function () { + changeStream = collection.watch([], changeStreamResumeOptions); + await initIteratorMode(changeStream); + + const maxWireVersion = changeStream.cursor.maxWireVersion; + changeStream.cursor.maxWireVersion = -1; + + await changeStream.tryNext(); + + expect(changeStream.cursor.maxWireVersion).equal(maxWireVersion); + + changeStream.cursor.maxWireVersion = -1; + + await changeStream.tryNext(); + expect(changeStream.cursor.maxWireVersion).equal(maxWireVersion); + } + ); +}); diff --git a/test/tools/unified-spec-runner/operations.ts b/test/tools/unified-spec-runner/operations.ts index 0ef38d0f99..c68a7e02c2 100644 --- a/test/tools/unified-spec-runner/operations.ts +++ b/test/tools/unified-spec-runner/operations.ts @@ -328,7 +328,7 @@ operations.set('iterateUntilDocumentOrError', async ({ entities, operation }) => return await cursor.next(); } - return changeStream.next(); + return await changeStream.next(); }); operations.set('listCollections', async ({ entities, operation }) => { diff --git a/test/tools/utils.ts b/test/tools/utils.ts index 04b082a64c..d2d4f2400a 100644 --- a/test/tools/utils.ts +++ b/test/tools/utils.ts @@ -279,7 +279,7 @@ export function extractAuthFromConnectionString(connectionString: string | any[] } export interface FailPoint { - configureFailPoint: 'failCommand'; + configureFailPoint: 'failCommand' | 'failGetMoreAfterCursorCheckout'; mode: { activationProbability: number } | { times: number } | 'alwaysOn' | 'off'; data: { failCommands: string[];