diff --git a/src/change_stream.ts b/src/change_stream.ts index 0304a527c3..73526f0d46 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -737,6 +737,26 @@ export class ChangeStream< }, callback); } + async *[Symbol.asyncIterator](): AsyncGenerator { + if (this.closed) { + return; + } + + try { + // Change streams run indefinitely as long as errors are resumable + // So the only loop breaking condition is if `next()` throws + while (true) { + yield await this.next(); + } + } finally { + try { + await this.close(); + } catch { + // we're not concerned with errors from close() + } + } + } + /** Is the cursor closed */ get closed(): boolean { return this[kClosed] || this.cursor.closed; diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index a4f6999e44..81e559a89d 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -10,12 +10,12 @@ import { promisify } from 'util'; import { AbstractCursor, ChangeStream, - ChangeStreamDocument, ChangeStreamOptions, Collection, CommandStartedEvent, Db, Long, + MongoAPIError, MongoChangeStreamError, MongoClient, MongoServerError, @@ -41,6 +41,9 @@ const initIteratorMode = async (cs: ChangeStream) => { return; }; +const is4_2Server = (serverVersion: string) => + gte(serverVersion, '4.2.0') && lt(serverVersion, '4.3.0'); + // Define the pipeline processing changes const pipeline = [ { $addFields: { addedField: 'This is a field added using $addFields' } }, @@ -70,6 +73,7 @@ describe('Change Streams', function () { }); afterEach(async () => { + sinon.restore(); await changeStream.close(); await client.close(); await mock.cleanup(); @@ -363,7 +367,7 @@ describe('Change Streams', function () { // Check the cursor is closed expect(changeStream.closed).to.be.true; - expect(changeStream.cursor.closed).to.be.true; + expect(changeStream.cursor).property('closed', true); done(); }); }); @@ -949,31 +953,222 @@ describe('Change Streams', function () { 'This test only worked because of timing, changeStream.close does not remove the change listener'; }); - describe('#tryNext()', function () { - it('should return null on single iteration of empty cursor', { - metadata: { requires: { topology: 'replicaset' } }, - async test() { - const doc = await changeStream.tryNext(); - expect(doc).to.be.null; - } + describe('iterator api', function () { + describe('#tryNext()', function () { + it('should return null on single iteration of empty cursor', { + metadata: { requires: { topology: 'replicaset' } }, + async test() { + const doc = await changeStream.tryNext(); + expect(doc).to.be.null; + } + }); + + it('should iterate a change stream until first empty batch', { + metadata: { requires: { topology: 'replicaset' } }, + async test() { + // tryNext doesn't send the initial agg, just checks the driver document batch cache + const firstTry = await changeStream.tryNext(); + expect(firstTry).to.be.null; + + await initIteratorMode(changeStream); + await collection.insertOne({ a: 42 }); + + const secondTry = await changeStream.tryNext(); + expect(secondTry).to.be.an('object'); + + const thirdTry = await changeStream.tryNext(); + expect(thirdTry).to.be.null; + } + }); }); - it('should iterate a change stream until first empty batch', { - metadata: { requires: { topology: 'replicaset' } }, - async test() { - // tryNext doesn't send the initial agg, just checks the driver document batch cache - const firstTry = await changeStream.tryNext(); - expect(firstTry).to.be.null; + describe('#asyncIterator', function () { + describe('for-await iteration', function () { + it( + 'can iterate through changes', + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); - await initIteratorMode(changeStream); - await collection.insertOne({ a: 42 }); + const docs = [{ city: 'New York City' }, { city: 'Seattle' }, { city: 'Boston' }]; + await collection.insertMany(docs); - const secondTry = await changeStream.tryNext(); - expect(secondTry).to.be.an('object'); + for await (const change of changeStream) { + const { fullDocument } = change; + const expectedDoc = docs.shift(); + expect(fullDocument.city).to.equal(expectedDoc.city); + if (docs.length === 0) { + break; + } + } - const thirdTry = await changeStream.tryNext(); - expect(thirdTry).to.be.null; - } + expect(docs).to.have.length(0, 'expected to find all docs before exiting loop'); + } + ); + + it( + 'cannot be resumed from partial iteration', + { requires: { topology: '!single' } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + + const docs = [{ city: 'New York City' }, { city: 'Seattle' }, { city: 'Boston' }]; + await collection.insertMany(docs); + + for await (const change of changeStream) { + const { fullDocument } = change; + const expectedDoc = docs.shift(); + expect(fullDocument.city).to.equal(expectedDoc.city); + break; + } + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for await (const change of changeStream) { + expect.fail('Change stream was resumed after partial iteration'); + } + + expect(docs).to.have.length( + 2, + 'expected to find remaining docs after partial iteration' + ); + } + ); + + it( + 'cannot be used with emitter-based iteration', + { requires: { topology: '!single' } }, + async function () { + changeStream = collection.watch([]); + changeStream.on('change', sinon.stub()); + + try { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for await (const change of changeStream) { + expect.fail('Async iterator was used with emitter-based iteration'); + } + } catch (error) { + expect(error).to.be.instanceOf(MongoAPIError); + } + } + ); + + it( + 'can be used with raw iterator API', + { requires: { topology: '!single' } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + + const docs = [{ city: 'Los Angeles' }, { city: 'Miami' }]; + await collection.insertMany(docs); + + await changeStream.next(); + docs.shift(); + + try { + for await (const change of changeStream) { + const { fullDocument } = change; + const expectedDoc = docs.shift(); + expect(fullDocument.city).to.equal(expectedDoc.city); + + if (docs.length === 0) { + break; + } + } + } catch { + expect.fail('Async could not be used with raw iterator API'); + } + } + ); + }); + + describe('#return', function () { + it( + 'should close the change stream when return is called', + { requires: { topology: '!single' } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + const changeStreamIterator = changeStream[Symbol.asyncIterator](); + + const docs = [{ city: 'New York City' }, { city: 'Seattle' }, { city: 'Boston' }]; + await collection.insertMany(docs); + + await changeStreamIterator.next(); + await changeStreamIterator.return(); + expect(changeStream.closed).to.be.true; + expect(changeStream.cursor).property('closed', true); + } + ); + + it( + 'ignores errors thrown from close', + { requires: { topology: '!single' } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + const changeStreamIterator = changeStream[Symbol.asyncIterator](); + + sinon.stub(changeStream.cursor, 'close').throws(new MongoAPIError('testing')); + + try { + await changeStreamIterator.return(); + } catch { + expect.fail('Async iterator threw an error on close'); + } + } + ); + }); + + describe('#next', function () { + it( + 'should close the change stream when an error is thrown', + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + const changeStreamIterator = changeStream[Symbol.asyncIterator](); + + const unresumableErrorCode = 1000; + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: { times: 1 }, + data: { + failCommands: ['getMore'], + errorCode: unresumableErrorCode + } + } as FailPoint); + + await collection.insertOne({ city: 'New York City' }); + try { + await changeStreamIterator.next(); + expect.fail( + 'Change stream did not throw unresumable error and did not produce any events' + ); + } catch { + expect(changeStream.closed).to.be.true; + expect(changeStream.cursor).property('closed', true); + } + } + ); + + it( + 'should not produce events on closed stream', + { requires: { topology: '!single' } }, + async function () { + changeStream = collection.watch([]); + changeStream.close(); + + const changeStreamIterator = changeStream[Symbol.asyncIterator](); + const change = await changeStreamIterator.next(); + + expect(change.value).to.be.undefined; + } + ); + }); }); }); @@ -1651,9 +1846,6 @@ describe('ChangeStream resumability', function () { { error: 'CursorNotFound', code: 43, message: 'cursor not found' } ]; - const is4_2Server = (serverVersion: string) => - gte(serverVersion, '4.2.0') && lt(serverVersion, '4.3.0'); - beforeEach(function () { assert(this.currentTest != null); if ( @@ -2198,6 +2390,158 @@ describe('ChangeStream resumability', function () { ); }); }); + + context('#asyncIterator', function () { + for (const { error, code, message } of resumableErrorCodes) { + it( + `resumes on error code ${code} (${error})`, + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + + const docs = [{ city: 'New York City' }, { city: 'Seattle' }, { city: 'Boston' }]; + await collection.insertMany(docs); + + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: { times: 1 }, + data: { + failCommands: ['getMore'], + errorCode: code, + errmsg: message + } + } as FailPoint); + + for await (const change of changeStream) { + const { fullDocument } = change; + const expectedDoc = docs.shift(); + expect(fullDocument.city).to.equal(expectedDoc.city); + if (docs.length === 0) { + break; + } + } + + expect(docs).to.have.length(0, 'expected to find all docs before exiting loop'); + expect(aggregateEvents).to.have.lengthOf(2); + } + ); + } + + for (const { error, code, message } of resumableErrorCodes) { + it( + `resumes on error code ${code} (${error})`, + { requires: { topology: '!single', mongodb: '<4.2' } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + const changeStreamIterator = changeStream[Symbol.asyncIterator](); + + // on 3.6 servers, no postBatchResumeToken is sent back in the initial aggregate response. + // This means that a resume token isn't cached until the first change has been iterated. + // In order to test the resume, we need to ensure that at least one document has + // been iterated so we have a resume token to resume on. + await collection.insertOne({ city: 'New York City' }); + await changeStreamIterator.next(); + + const mock = sinon + .stub(changeStream.cursor, '_getMore') + .callsFake((_batchSize, callback) => { + mock.restore(); + const error = new MongoServerError({ message }); + error.code = code; + callback(error); + }); + + const docs = [{ city: 'Seattle' }, { city: 'Boston' }]; + await collection.insertMany(docs); + + for await (const change of changeStream) { + const { fullDocument } = change; + const expectedDoc = docs.shift(); + expect(fullDocument.city).to.equal(expectedDoc.city); + if (docs.length === 0) { + break; + } + } + + expect(docs).to.have.length(0, 'expected to find all docs before exiting loop'); + expect(aggregateEvents).to.have.lengthOf(2); + } + ); + } + + it( + 'maintains change stream options on resume', + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + changeStream = collection.watch([], changeStreamResumeOptions); + await initIteratorMode(changeStream); + const changeStreamIterator = changeStream[Symbol.asyncIterator](); + + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: { times: 1 }, + data: { + failCommands: ['getMore'], + errorCode: resumableErrorCodes[0].code, + errmsg: resumableErrorCodes[0].message + } + } as FailPoint); + + expect(changeStream.cursor) + .to.have.property('options') + .that.containSubset(changeStreamResumeOptions); + + await collection.insertOne({ city: 'New York City' }); + await changeStreamIterator.next(); + + expect(changeStream.cursor) + .to.have.property('options') + .that.containSubset(changeStreamResumeOptions); + } + ); + + context('when the error is not a resumable error', function () { + it( + 'does not resume', + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + + const docs = [{ city: 'New York City' }, { city: 'Seattle' }, { city: 'Boston' }]; + await collection.insertMany(docs); + + const unresumableErrorCode = 1000; + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: { times: 1 }, + data: { + failCommands: ['getMore'], + errorCode: unresumableErrorCode + } + } as FailPoint); + + try { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for await (const change of changeStream) { + expect.fail('Change stream produced events on an unresumable error'); + } + } catch (error) { + expect(error).to.be.instanceOf(MongoServerError); + expect(aggregateEvents).to.have.lengthOf(1); + } + } + ); + }); + }); }); describe('event emitter based iteration', function () { diff --git a/test/types/change_stream.test-d.ts b/test/types/change_stream.test-d.ts index 5f36d10d99..6358b0a9a3 100644 --- a/test/types/change_stream.test-d.ts +++ b/test/types/change_stream.test-d.ts @@ -214,3 +214,7 @@ expectError(collection.watch()); collection .watch<{ a: number }, { b: boolean }>() .on('change', change => expectType<{ b: boolean }>(change)); + +expectType, void, void>>( + collection.watch()[Symbol.asyncIterator]() +);