diff --git a/src/change_stream.ts b/src/change_stream.ts index 0772bf701e..0304a527c3 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -648,21 +648,25 @@ export class ChangeStream< hasNext(callback?: Callback): Promise | void { this._setIsIterator(); return maybeCallback(async () => { - try { - const hasNext = await this.cursor.hasNext(); - return hasNext; - } catch (error) { + // Change streams must resume indefinitely while each resume event succeeds. + // This loop continues until either a change event is received or until a resume attempt + // fails. + // eslint-disable-next-line no-constant-condition + while (true) { try { - await this._processErrorIteratorMode(error); const hasNext = await this.cursor.hasNext(); return hasNext; } catch (error) { try { - await this.close(); - } catch { - // We are not concerned with errors from close() + await this._processErrorIteratorMode(error); + } catch (error) { + try { + await this.close(); + } catch { + // We are not concerned with errors from close() + } + throw error; } - throw error; } } }, callback); @@ -675,23 +679,26 @@ export class ChangeStream< next(callback?: Callback): Promise | void { this._setIsIterator(); return maybeCallback(async () => { - try { - const change = await this.cursor.next(); - const processedChange = this._processChange(change ?? null); - return processedChange; - } catch (error) { + // Change streams must resume indefinitely while each resume event succeeds. + // This loop continues until either a change event is received or until a resume attempt + // fails. + // eslint-disable-next-line no-constant-condition + while (true) { try { - await this._processErrorIteratorMode(error); const change = await this.cursor.next(); const processedChange = this._processChange(change ?? null); return processedChange; } catch (error) { try { - await this.close(); - } catch { - // We are not concerned with errors from close() + await this._processErrorIteratorMode(error); + } catch (error) { + try { + await this.close(); + } catch { + // We are not concerned with errors from close() + } + throw error; } - throw error; } } }, callback); @@ -706,21 +713,25 @@ export class ChangeStream< tryNext(callback?: Callback): Promise | void { this._setIsIterator(); return maybeCallback(async () => { - try { - const change = await this.cursor.tryNext(); - return change ?? null; - } catch (error) { + // Change streams must resume indefinitely while each resume event succeeds. + // This loop continues until either a change event is received or until a resume attempt + // fails. + // eslint-disable-next-line no-constant-condition + while (true) { try { - await this._processErrorIteratorMode(error); const change = await this.cursor.tryNext(); return change ?? null; } catch (error) { try { - await this.close(); - } catch { - // We are not concerned with errors from close() + await this._processErrorIteratorMode(error); + } catch (error) { + try { + await this.close(); + } catch { + // We are not concerned with errors from close() + } + throw error; } - throw error; } } }, callback); diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index a22e4bcf11..a4f6999e44 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -10,6 +10,7 @@ import { promisify } from 'util'; import { AbstractCursor, ChangeStream, + ChangeStreamDocument, ChangeStreamOptions, Collection, CommandStartedEvent, @@ -1037,56 +1038,6 @@ describe('Change Streams', function () { }); describe('Change Stream Resume Error Tests', function () { - describe('TODO(NODE-4670): fix consecutive resumes unified tests', function () { - let client: MongoClient; - let changeStream: ChangeStream; - - beforeEach(async function () { - client = this.configuration.newClient(); - await client.connect(); - }); - - afterEach(async function () { - await changeStream.close(); - await client.close(); - }); - - it('should support consecutive resumes', { - metadata: { requires: { topology: 'replicaset', mongodb: '>=4.2' } }, - async test() { - const failCommand: FailPoint = { - configureFailPoint: 'failCommand', - mode: { - times: 2 - }, - data: { - failCommands: ['getMore'], - closeConnection: true - } - }; - - await client.db('admin').command(failCommand); - - const collection = client.db('test_consecutive_resume').collection('collection'); - - changeStream = collection.watch([], { batchSize: 1 }); - - await initIteratorMode(changeStream); - - await collection.insertOne({ name: 'bumpy' }); - await collection.insertOne({ name: 'bumpy' }); - await collection.insertOne({ name: 'bumpy' }); - - await sleep(1000); - - for (let i = 0; i < 3; ++i) { - const change = await changeStream.next(); - expect(change).not.to.be.null; - } - } - }); - }); - it.skip('should continue piping changes after a resumable error', { metadata: { requires: { topology: 'replicaset' } }, test: done => { @@ -1767,7 +1718,44 @@ describe('ChangeStream resumability', function () { expect(aggregateEvents).to.have.lengthOf(2); } ); + + it( + `supports consecutive resumes on error code ${code} ${error}`, + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: { times: 5 }, + data: { + failCommands: ['getMore'], + errorCode: code, + errmsg: message + } + } as FailPoint); + + // There's an inherent race condition here because we need to make sure that the `aggregates` that succeed when + // resuming a change stream don't return the change event. So we defer the insert until a period of time + // after the change stream has started listening for a change. 2000ms is long enough for the change + // stream to attempt to resume and fail multiple times before exhausting the failpoint and succeeding. + const [, value] = await Promise.allSettled([ + sleep(2000).then(() => collection.insertOne({ name: 'bailey' })), + changeStream.next() + ]); + + const change = (value as PromiseFulfilledResult).value; + + expect(change).to.have.property('operationType', 'insert'); + + expect(aggregateEvents).to.have.lengthOf(6); + } + ); } + for (const { error, code, message } of resumableErrorCodes) { it( `resumes on error code ${code} (${error})`, @@ -1896,6 +1884,42 @@ describe('ChangeStream resumability', function () { expect(aggregateEvents).to.have.lengthOf(2); } ); + + it( + `supports consecutive resumes on error code ${code} ${error}`, + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: { times: 5 }, + data: { + failCommands: ['getMore'], + errorCode: code, + errmsg: message + } + } as FailPoint); + + // There's an inherent race condition here because we need to make sure that the `aggregates` that succeed when + // resuming a change stream don't return the change event. So we defer the insert until a period of time + // after the change stream has started listening for a change. 2000ms is long enough for the change + // stream to attempt to resume and fail multiple times before exhausting the failpoint and succeeding. + const [, value] = await Promise.allSettled([ + sleep(2000).then(() => collection.insertOne({ name: 'bailey' })), + changeStream.hasNext() + ]); + + const change = (value as PromiseFulfilledResult).value; + + expect(change).to.be.true; + + expect(aggregateEvents).to.have.lengthOf(6); + } + ); } for (const { error, code, message } of resumableErrorCodes) { @@ -2033,6 +2057,42 @@ describe('ChangeStream resumability', function () { expect(aggregateEvents).to.have.lengthOf(2); } ); + + it( + `supports consecutive resumes on error code ${code} ${error}`, + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: { times: 5 }, + data: { + failCommands: ['getMore'], + errorCode: code, + errmsg: message + } + } as FailPoint); + + try { + // tryNext is not blocking and on sharded clusters we don't have control of when + // the actual change event will be ready on the change stream pipeline. This introduces + // a race condition, where sometimes we receive the change event and sometimes + // we don't when we call tryNext, depending on the timing of the sharded cluster. + + // Since we really only care about the resumability, it's enough for this test to throw + // if tryNext ever throws and assert on the number of aggregate events. + await changeStream.tryNext(); + } catch (err) { + expect.fail(`expected tryNext to resume, received error instead: ${err}`); + } + + expect(aggregateEvents).to.have.lengthOf(6); + } + ); } for (const { error, code, message } of resumableErrorCodes) { @@ -2171,6 +2231,43 @@ describe('ChangeStream resumability', function () { expect(aggregateEvents).to.have.lengthOf(2); } ); + + it( + `supports consecutive resumes on error code ${code} (${error})`, + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + changeStream = collection.watch([]); + + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: { times: 5 }, + data: { + failCommands: ['getMore'], + errorCode: code, + errmsg: message + } + } as FailPoint); + + const changes = once(changeStream, 'change'); + await once(changeStream.cursor, 'init'); + + // There's an inherent race condition here because we need to make sure that the `aggregates` that succeed when + // resuming a change stream don't return the change event. So we defer the insert until a period of time + // after the change stream has started listening for a change. 2000ms is long enough for the change + // stream to attempt to resume and fail multiple times before exhausting the failpoint and succeeding. + const [, value] = await Promise.allSettled([ + sleep(2000).then(() => collection.insertOne({ name: 'bailey' })), + changes + ]); + + const [change] = (value as PromiseFulfilledResult).value; + expect(change).to.have.property('operationType', 'insert'); + + expect(aggregateEvents).to.have.lengthOf(6); + } + ); } it( diff --git a/test/integration/change-streams/change_streams.spec.test.ts b/test/integration/change-streams/change_streams.spec.test.ts index 48a56b99df..451acc7a2d 100644 --- a/test/integration/change-streams/change_streams.spec.test.ts +++ b/test/integration/change-streams/change_streams.spec.test.ts @@ -4,9 +4,5 @@ import { loadSpecTests } from '../../spec'; import { runUnifiedSuite } from '../../tools/unified-spec-runner/runner'; describe('Change Streams Spec - Unified', function () { - runUnifiedSuite(loadSpecTests(path.join('change-streams', 'unified')), test => - test.description === 'Test consecutive resume' - ? 'TODO(NODE-4670): fix consecutive resume change stream test' - : false - ); + runUnifiedSuite(loadSpecTests(path.join('change-streams', 'unified'))); }); diff --git a/test/integration/unified-test-format/unified_test_format.spec.test.ts b/test/integration/unified-test-format/unified_test_format.spec.test.ts index 9d1699cf54..0af0a0fe68 100644 --- a/test/integration/unified-test-format/unified_test_format.spec.test.ts +++ b/test/integration/unified-test-format/unified_test_format.spec.test.ts @@ -23,10 +23,6 @@ const filter: TestFilter = ({ description }) => { return 'TODO(NODE-3308): failures due unnecessary getMore and killCursors calls in 5.0'; } - if (description === 'Test consecutive resume') { - return 'TODO(NODE-4670): fix consecutive resume change stream test'; - } - if ( process.env.AUTH === 'auth' && [