From e71baa62182179ab2b66d0a23686d34a412b412d Mon Sep 17 00:00:00 2001 From: Andy Mina Date: Thu, 20 Oct 2022 14:31:53 -0400 Subject: [PATCH 01/22] feat: add asyncIterator --- src/change_stream.ts | 22 ++++++++++++++ .../change-streams/change_stream.test.ts | 30 +++++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/src/change_stream.ts b/src/change_stream.ts index 0304a527c3..bce4dae3b2 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -737,6 +737,28 @@ export class ChangeStream< }, callback); } + [Symbol.asyncIterator](): AsyncIterator { + async function* nativeAsyncIterator(this: ChangeStream) { + if (this.closed) { + return; + } + + while (true) { + if (!(await this.hasNext())) { + break; + } + + yield await this.next(); + } + } + + const iterator = nativeAsyncIterator.call(this); + + return { + next: () => iterator.next() + }; + } + /** Is the cursor closed */ get closed(): boolean { return this[kClosed] || this.cursor.closed; diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index a4f6999e44..2a7eaecb80 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -1688,6 +1688,7 @@ describe('ChangeStream resumability', function () { aggregateEvents = []; }); + // TODO(andymina): resumable error tests here context('iterator api', function () { context('#next', function () { for (const { error, code, message } of resumableErrorCodes) { @@ -2198,6 +2199,35 @@ describe('ChangeStream resumability', function () { ); }); }); + + context.only('#asyncIterator', function () { + /** + * TODO(andymina): three test cases to cover + * + * happy path - asyncIterable works + * unhappy path - it errors out + * resumable error - continues but also throws the error out + */ + // happy path + it('happy path', async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + + const docs = [{ city: 'New York City' }, { city: 'Seattle' }, { city: 'Boston' }]; + await collection.insertMany(docs); + + let count = 0; + for await (const change of changeStream) { + const { fullDocument } = change; + expect(fullDocument.city).to.equal(docs[count].city); + + count++; + if (count === 3) { + changeStream.close(); + } + } + }); + }); }); describe('event emitter based iteration', function () { From e90d3c7478d760355e23e60df3122e3c45517bf6 Mon Sep 17 00:00:00 2001 From: Andy Mina Date: Thu, 20 Oct 2022 15:42:04 -0400 Subject: [PATCH 02/22] test: add unresumable error test --- .../change-streams/change_stream.test.ts | 78 ++++++++++++++++++- 1 file changed, 76 insertions(+), 2 deletions(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 2a7eaecb80..9d5d9a4632 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -10,7 +10,6 @@ import { promisify } from 'util'; import { AbstractCursor, ChangeStream, - ChangeStreamDocument, ChangeStreamOptions, Collection, CommandStartedEvent, @@ -2208,8 +2207,50 @@ describe('ChangeStream resumability', function () { * unhappy path - it errors out * resumable error - continues but also throws the error out */ + for (const { error, code, message } of resumableErrorCodes) { + it( + `resumes on error code ${code} (${error})`, + { requires: { topology: '!single', mongodb: '<4.2' } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + + // on 3.6 servers, no postBatchResumeToken is sent back in the initial aggregate response. + // This means that a resume token isn't cached until the first change has been iterated. + // In order to test the resume, we need to ensure that at least one document has + // been iterated so we have a resume token to resume on. + + // insert the doc + await collection.insertOne({ city: 'New York City' }); + + // fail the call + const mock = sinon + .stub(changeStream.cursor, '_getMore') + .callsFake((_batchSize, callback) => { + mock.restore(); + const error = new MongoServerError({ message }); + error.code = code; + callback(error); + }); + + // insert another doc + await collection.insertOne({ city: 'New York City' }); + + let total_changes = 0; + for await (const change of changeStream) { + total_changes++; + if (total_changes === 2) { + changeStream.close(); + } + } + + expect(aggregateEvents).to.have.lengthOf(2); + } + ); + } + // happy path - it('happy path', async function () { + it('happy path', { requires: { topology: '!single', mongodb: '>=4.2' } }, async function () { changeStream = collection.watch([]); await initIteratorMode(changeStream); @@ -2223,10 +2264,43 @@ describe('ChangeStream resumability', function () { count++; if (count === 3) { + expect(docs.length).to.equal(count); changeStream.close(); } } }); + + // unhappy path + it( + 'unhappy path', + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + + const unresumableErrorCode = 1000; + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: { times: 1 }, + data: { + failCommands: ['getMore'], + errorCode: unresumableErrorCode + } + } as FailPoint); + + await collection.insertOne({ city: 'New York City' }); + + try { + for await (const change of changeStream) { + // should not run + } + } catch (error) { + expect(error).to.be.instanceOf(MongoServerError); + } + } + ); }); }); From c6b409960f3b2156bf12e51c6165c4efd3f4e971 Mon Sep 17 00:00:00 2001 From: Andy Mina Date: Mon, 24 Oct 2022 10:07:47 -0400 Subject: [PATCH 03/22] test: finish tests for asyncIterator --- .../change-streams/change_stream.test.ts | 130 +++++++++++------- 1 file changed, 81 insertions(+), 49 deletions(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 9d5d9a4632..4586b642c7 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -2200,13 +2200,41 @@ describe('ChangeStream resumability', function () { }); context.only('#asyncIterator', function () { - /** - * TODO(andymina): three test cases to cover - * - * happy path - asyncIterable works - * unhappy path - it errors out - * resumable error - continues but also throws the error out - */ + for (const { error, code, message } of resumableErrorCodes) { + it( + `resumes on error code ${code} (${error})`, + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: { times: 1 }, + data: { + failCommands: ['getMore'], + errorCode: code, + errmsg: message + } + } as FailPoint); + + await collection.insertOne({ city: 'New York City' }); + + let total_changes = 0; + for await (const change of changeStream) { + total_changes++; + if (total_changes === 1) { + changeStream.close(); + } + } + + expect(aggregateEvents).to.have.lengthOf(2); + } + ); + } + for (const { error, code, message } of resumableErrorCodes) { it( `resumes on error code ${code} (${error})`, @@ -2220,10 +2248,8 @@ describe('ChangeStream resumability', function () { // In order to test the resume, we need to ensure that at least one document has // been iterated so we have a resume token to resume on. - // insert the doc await collection.insertOne({ city: 'New York City' }); - // fail the call const mock = sinon .stub(changeStream.cursor, '_getMore') .callsFake((_batchSize, callback) => { @@ -2249,58 +2275,64 @@ describe('ChangeStream resumability', function () { ); } - // happy path - it('happy path', { requires: { topology: '!single', mongodb: '>=4.2' } }, async function () { - changeStream = collection.watch([]); - await initIteratorMode(changeStream); - - const docs = [{ city: 'New York City' }, { city: 'Seattle' }, { city: 'Boston' }]; - await collection.insertMany(docs); - - let count = 0; - for await (const change of changeStream) { - const { fullDocument } = change; - expect(fullDocument.city).to.equal(docs[count].city); - - count++; - if (count === 3) { - expect(docs.length).to.equal(count); - changeStream.close(); - } - } - }); - - // unhappy path it( - 'unhappy path', + 'can iterate through changes', { requires: { topology: '!single', mongodb: '>=4.2' } }, async function () { changeStream = collection.watch([]); await initIteratorMode(changeStream); - const unresumableErrorCode = 1000; - await client.db('admin').command({ - configureFailPoint: is4_2Server(this.configuration.version) - ? 'failCommand' - : 'failGetMoreAfterCursorCheckout', - mode: { times: 1 }, - data: { - failCommands: ['getMore'], - errorCode: unresumableErrorCode - } - } as FailPoint); + const docs = [{ city: 'New York City' }, { city: 'Seattle' }, { city: 'Boston' }]; + await collection.insertMany(docs); - await collection.insertOne({ city: 'New York City' }); + let count = 0; + for await (const change of changeStream) { + const { fullDocument } = change; + expect(fullDocument.city).to.equal(docs[count].city); - try { - for await (const change of changeStream) { - // should not run + count++; + if (count === 3) { + changeStream.close(); } - } catch (error) { - expect(error).to.be.instanceOf(MongoServerError); } + + expect(docs.length).to.equal(count); } ); + + context('when the error is not a resumable error', function () { + it( + 'does not resume', + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + + const unresumableErrorCode = 1000; + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: { times: 1 }, + data: { + failCommands: ['getMore'], + errorCode: unresumableErrorCode + } + } as FailPoint); + + await collection.insertOne({ city: 'New York City' }); + + try { + for await (const change of changeStream) { + // should not run + } + } catch (error) { + expect(error).to.be.instanceOf(MongoServerError); + } + } + ); + }); + }); }); From 3c22f5a90d1794a932ff7e5976e1d73d8663949a Mon Sep 17 00:00:00 2001 From: Andy Mina Date: Mon, 24 Oct 2022 10:23:22 -0400 Subject: [PATCH 04/22] chore: fix linter error --- test/integration/change-streams/change_stream.test.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 4586b642c7..58d367214c 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -2199,7 +2199,7 @@ describe('ChangeStream resumability', function () { }); }); - context.only('#asyncIterator', function () { + context('#asyncIterator', function () { for (const { error, code, message } of resumableErrorCodes) { it( `resumes on error code ${code} (${error})`, @@ -2332,7 +2332,6 @@ describe('ChangeStream resumability', function () { } ); }); - }); }); From 21bc430395e9f0b7a7f30490ada8630f37276f45 Mon Sep 17 00:00:00 2001 From: Andy Mina Date: Mon, 24 Oct 2022 11:22:27 -0400 Subject: [PATCH 05/22] test: add tests to check options consistency --- src/change_stream.ts | 26 ++++------ .../change-streams/change_stream.test.ts | 52 +++++++++++++++++-- 2 files changed, 56 insertions(+), 22 deletions(-) diff --git a/src/change_stream.ts b/src/change_stream.ts index bce4dae3b2..7955c60ea1 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -737,26 +737,18 @@ export class ChangeStream< }, callback); } - [Symbol.asyncIterator](): AsyncIterator { - async function* nativeAsyncIterator(this: ChangeStream) { - if (this.closed) { - return; - } - - while (true) { - if (!(await this.hasNext())) { - break; - } - - yield await this.next(); - } + async *[Symbol.asyncIterator](): AsyncIterator { + if (this.closed) { + return; } - const iterator = nativeAsyncIterator.call(this); + while (true) { + if (!(await this.hasNext())) { + break; + } - return { - next: () => iterator.next() - }; + yield await this.next(); + } } /** Is the cursor closed */ diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 58d367214c..59382b2549 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -1687,7 +1687,6 @@ describe('ChangeStream resumability', function () { aggregateEvents = []; }); - // TODO(andymina): resumable error tests here context('iterator api', function () { context('#next', function () { for (const { error, code, message } of resumableErrorCodes) { @@ -2199,7 +2198,7 @@ describe('ChangeStream resumability', function () { }); }); - context('#asyncIterator', function () { + context.only('#asyncIterator', function () { for (const { error, code, message } of resumableErrorCodes) { it( `resumes on error code ${code} (${error})`, @@ -2300,13 +2299,51 @@ describe('ChangeStream resumability', function () { } ); + it( + 'maintains change stream options on resume', + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + changeStream = collection.watch([], changeStreamResumeOptions); + await initIteratorMode(changeStream); + + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: { times: 1 }, + data: { + failCommands: ['getMore'], + errorCode: resumableErrorCodes[0].code, + errmsg: resumableErrorCodes[0].message + } + } as FailPoint); + + expect(changeStream.cursor) + .to.have.property('options') + .that.containSubset(changeStreamResumeOptions); + + await collection.insertOne({ city: 'New York City' }); + + let total_changes = 0; + for await (const change of changeStream) { + total_changes++; + if (total_changes === 1) { + changeStream.close(); + } + } + + expect(changeStream.cursor) + .to.have.property('options') + .that.containSubset(changeStreamResumeOptions); + } + ); + context('when the error is not a resumable error', function () { it( 'does not resume', { requires: { topology: '!single', mongodb: '>=4.2' } }, async function () { changeStream = collection.watch([]); - await initIteratorMode(changeStream); const unresumableErrorCode = 1000; await client.db('admin').command({ @@ -2320,15 +2357,20 @@ describe('ChangeStream resumability', function () { } } as FailPoint); - await collection.insertOne({ city: 'New York City' }); + await initIteratorMode(changeStream); + await collection.insertOne({ city: 'New York City' }); try { for await (const change of changeStream) { - // should not run + // DOESN'T REACH + expect.fail('Change stream produced changes on an unresumable error'); } } catch (error) { expect(error).to.be.instanceOf(MongoServerError); + expect(aggregateEvents).to.have.lengthOf(1); } + // fails here + expect.fail('Change stream did not throw unresumable error'); } ); }); From c9ef9e91dc81315c006ba10b9d5d331bc8de1202 Mon Sep 17 00:00:00 2001 From: Andy Mina Date: Mon, 24 Oct 2022 13:26:57 -0400 Subject: [PATCH 06/22] test: fix assertion order --- src/change_stream.ts | 6 +----- test/integration/change-streams/change_stream.test.ts | 11 ++++++----- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/src/change_stream.ts b/src/change_stream.ts index 7955c60ea1..698a341e3a 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -742,11 +742,7 @@ export class ChangeStream< return; } - while (true) { - if (!(await this.hasNext())) { - break; - } - + while (await this.hasNext()) { yield await this.next(); } } diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 59382b2549..5538df498a 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -2198,7 +2198,7 @@ describe('ChangeStream resumability', function () { }); }); - context.only('#asyncIterator', function () { + context('#asyncIterator', function () { for (const { error, code, message } of resumableErrorCodes) { it( `resumes on error code ${code} (${error})`, @@ -2339,7 +2339,7 @@ describe('ChangeStream resumability', function () { ); context('when the error is not a resumable error', function () { - it( + it.only( 'does not resume', { requires: { topology: '!single', mongodb: '>=4.2' } }, async function () { @@ -2362,15 +2362,16 @@ describe('ChangeStream resumability', function () { await collection.insertOne({ city: 'New York City' }); try { for await (const change of changeStream) { - // DOESN'T REACH expect.fail('Change stream produced changes on an unresumable error'); } + + expect.fail( + 'Change stream did not throw unresumable error and did not produce any events' + ); } catch (error) { expect(error).to.be.instanceOf(MongoServerError); expect(aggregateEvents).to.have.lengthOf(1); } - // fails here - expect.fail('Change stream did not throw unresumable error'); } ); }); From 33c4fa69e33164b0de1387ea260259c57d8c6b51 Mon Sep 17 00:00:00 2001 From: Andy Mina Date: Mon, 24 Oct 2022 13:29:47 -0400 Subject: [PATCH 07/22] chore: fix linter error --- test/integration/change-streams/change_stream.test.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 5538df498a..24c1e2c6ca 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -2339,7 +2339,7 @@ describe('ChangeStream resumability', function () { ); context('when the error is not a resumable error', function () { - it.only( + it( 'does not resume', { requires: { topology: '!single', mongodb: '>=4.2' } }, async function () { @@ -2362,7 +2362,7 @@ describe('ChangeStream resumability', function () { await collection.insertOne({ city: 'New York City' }); try { for await (const change of changeStream) { - expect.fail('Change stream produced changes on an unresumable error'); + expect.fail('Change stream produced events on an unresumable error'); } expect.fail( From 46b9ff19b6e3a739689228d3f830ddc1723af9dd Mon Sep 17 00:00:00 2001 From: Andy Mina Date: Mon, 24 Oct 2022 15:40:15 -0400 Subject: [PATCH 08/22] fix: close leaking ChangeStream in test --- test/integration/change-streams/change_stream.test.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 24c1e2c6ca..f7f5e9d1b9 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -2372,6 +2372,8 @@ describe('ChangeStream resumability', function () { expect(error).to.be.instanceOf(MongoServerError); expect(aggregateEvents).to.have.lengthOf(1); } + + changeStream.close(); } ); }); From eb2fc2668b5299e7455aa915a4cb4086f2720760 Mon Sep 17 00:00:00 2001 From: Andy Mina Date: Tue, 25 Oct 2022 10:22:31 -0400 Subject: [PATCH 09/22] test: refactor tests to manually call asyncIterator --- src/change_stream.ts | 8 +- .../change-streams/change_stream.test.ts | 134 ++++++++---------- 2 files changed, 65 insertions(+), 77 deletions(-) diff --git a/src/change_stream.ts b/src/change_stream.ts index 698a341e3a..3f0922c1c1 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -742,8 +742,12 @@ export class ChangeStream< return; } - while (await this.hasNext()) { - yield await this.next(); + try { + while (await this.hasNext()) { + yield await this.next(); + } + } finally { + await this.close(); } } diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index f7f5e9d1b9..1af3a8084e 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -948,31 +948,57 @@ describe('Change Streams', function () { 'This test only worked because of timing, changeStream.close does not remove the change listener'; }); - describe('#tryNext()', function () { - it('should return null on single iteration of empty cursor', { - metadata: { requires: { topology: 'replicaset' } }, - async test() { - const doc = await changeStream.tryNext(); - expect(doc).to.be.null; - } + // TODO(andymina): ask about testing word semantics here + context('iterator api', function () { + describe('#tryNext()', function () { + it('should return null on single iteration of empty cursor', { + metadata: { requires: { topology: 'replicaset' } }, + async test() { + const doc = await changeStream.tryNext(); + expect(doc).to.be.null; + } + }); + + it('should iterate a change stream until first empty batch', { + metadata: { requires: { topology: 'replicaset' } }, + async test() { + // tryNext doesn't send the initial agg, just checks the driver document batch cache + const firstTry = await changeStream.tryNext(); + expect(firstTry).to.be.null; + + await initIteratorMode(changeStream); + await collection.insertOne({ a: 42 }); + + const secondTry = await changeStream.tryNext(); + expect(secondTry).to.be.an('object'); + + const thirdTry = await changeStream.tryNext(); + expect(thirdTry).to.be.null; + } + }); }); - it('should iterate a change stream until first empty batch', { - metadata: { requires: { topology: 'replicaset' } }, - async test() { - // tryNext doesn't send the initial agg, just checks the driver document batch cache - const firstTry = await changeStream.tryNext(); - expect(firstTry).to.be.null; + describe('#asyncIterator', function () { + it( + 'can iterate through changes', + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + const changeStreamIterator = changeStream[Symbol.asyncIterator](); - await initIteratorMode(changeStream); - await collection.insertOne({ a: 42 }); + const docs = [{ city: 'New York City' }, { city: 'Seattle' }, { city: 'Boston' }]; + await collection.insertMany(docs); - const secondTry = await changeStream.tryNext(); - expect(secondTry).to.be.an('object'); + for (const doc of docs) { + const change = await changeStreamIterator.next(); + const { fullDocument } = change; + expect(fullDocument.city).to.equal(doc.city); + } - const thirdTry = await changeStream.tryNext(); - expect(thirdTry).to.be.null; - } + changeStream.close(); + } + ); }); }); @@ -2206,6 +2232,7 @@ describe('ChangeStream resumability', function () { async function () { changeStream = collection.watch([]); await initIteratorMode(changeStream); + const changeStreamIterator = changeStream[Symbol.asyncIterator](); await client.db('admin').command({ configureFailPoint: is4_2Server(this.configuration.version) @@ -2220,16 +2247,11 @@ describe('ChangeStream resumability', function () { } as FailPoint); await collection.insertOne({ city: 'New York City' }); - - let total_changes = 0; - for await (const change of changeStream) { - total_changes++; - if (total_changes === 1) { - changeStream.close(); - } - } + await changeStreamIterator.next(); expect(aggregateEvents).to.have.lengthOf(2); + + changeStream.close(); } ); } @@ -2241,13 +2263,14 @@ describe('ChangeStream resumability', function () { async function () { changeStream = collection.watch([]); await initIteratorMode(changeStream); + const changeStreamIterator = changeStream[Symbol.asyncIterator](); // on 3.6 servers, no postBatchResumeToken is sent back in the initial aggregate response. // This means that a resume token isn't cached until the first change has been iterated. // In order to test the resume, we need to ensure that at least one document has // been iterated so we have a resume token to resume on. - await collection.insertOne({ city: 'New York City' }); + await changeStreamIterator.next(); const mock = sinon .stub(changeStream.cursor, '_getMore') @@ -2258,53 +2281,23 @@ describe('ChangeStream resumability', function () { callback(error); }); - // insert another doc await collection.insertOne({ city: 'New York City' }); - - let total_changes = 0; - for await (const change of changeStream) { - total_changes++; - if (total_changes === 2) { - changeStream.close(); - } - } + await changeStreamIterator.next(); expect(aggregateEvents).to.have.lengthOf(2); + + changeStream.close(); } ); } - it( - 'can iterate through changes', - { requires: { topology: '!single', mongodb: '>=4.2' } }, - async function () { - changeStream = collection.watch([]); - await initIteratorMode(changeStream); - - const docs = [{ city: 'New York City' }, { city: 'Seattle' }, { city: 'Boston' }]; - await collection.insertMany(docs); - - let count = 0; - for await (const change of changeStream) { - const { fullDocument } = change; - expect(fullDocument.city).to.equal(docs[count].city); - - count++; - if (count === 3) { - changeStream.close(); - } - } - - expect(docs.length).to.equal(count); - } - ); - it( 'maintains change stream options on resume', { requires: { topology: '!single', mongodb: '>=4.2' } }, async function () { changeStream = collection.watch([], changeStreamResumeOptions); await initIteratorMode(changeStream); + const changeStreamIterator = changeStream[Symbol.asyncIterator](); await client.db('admin').command({ configureFailPoint: is4_2Server(this.configuration.version) @@ -2323,14 +2316,7 @@ describe('ChangeStream resumability', function () { .that.containSubset(changeStreamResumeOptions); await collection.insertOne({ city: 'New York City' }); - - let total_changes = 0; - for await (const change of changeStream) { - total_changes++; - if (total_changes === 1) { - changeStream.close(); - } - } + await changeStreamIterator.next(); expect(changeStream.cursor) .to.have.property('options') @@ -2358,13 +2344,11 @@ describe('ChangeStream resumability', function () { } as FailPoint); await initIteratorMode(changeStream); + const changeStreamIterator = changeStream[Symbol.asyncIterator](); await collection.insertOne({ city: 'New York City' }); try { - for await (const change of changeStream) { - expect.fail('Change stream produced events on an unresumable error'); - } - + const change = await changeStreamIterator.next(); expect.fail( 'Change stream did not throw unresumable error and did not produce any events' ); From 1a3bf2ec0289758b42aadc14c77e32cbf27bae71 Mon Sep 17 00:00:00 2001 From: Andy Mina Date: Tue, 25 Oct 2022 10:38:56 -0400 Subject: [PATCH 10/22] test: add test for closing change stream --- .../change-streams/change_stream.test.ts | 43 +++++++++++++++++-- 1 file changed, 40 insertions(+), 3 deletions(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 1af3a8084e..729a1edc2a 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -53,6 +53,9 @@ describe('Change Streams', function () { let changeStream: ChangeStream; let db: Db; + const is4_2Server = (serverVersion: string) => + gte(serverVersion, '4.2.0') && lt(serverVersion, '4.3.0'); + beforeEach(async function () { const configuration = this.configuration; client = configuration.newClient(); @@ -999,6 +1002,41 @@ describe('Change Streams', function () { changeStream.close(); } ); + + context('when an error is thrown', function () { + it( + 'should close the change stream', + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + const changeStreamIterator = changeStream[Symbol.asyncIterator](); + + const unresumableErrorCode = 1000; + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: { times: 1 }, + data: { + failCommands: ['getMore'], + errorCode: unresumableErrorCode + } + } as FailPoint); + + await collection.insertOne({ city: 'New York City' }); + try { + const change = await changeStreamIterator.next(); + expect.fail( + 'Change stream did not throw unresumable error and did not produce any events' + ); + } catch (error) { + expect(changeStream.closed).to.be.true; + expect(changeStream.cursor.closed).to.be.true; + } + } + ); + }); }); }); @@ -2330,6 +2368,8 @@ describe('ChangeStream resumability', function () { { requires: { topology: '!single', mongodb: '>=4.2' } }, async function () { changeStream = collection.watch([]); + await initIteratorMode(changeStream); + const changeStreamIterator = changeStream[Symbol.asyncIterator](); const unresumableErrorCode = 1000; await client.db('admin').command({ @@ -2343,9 +2383,6 @@ describe('ChangeStream resumability', function () { } } as FailPoint); - await initIteratorMode(changeStream); - const changeStreamIterator = changeStream[Symbol.asyncIterator](); - await collection.insertOne({ city: 'New York City' }); try { const change = await changeStreamIterator.next(); From f76107a49517d6e63cc461b1f68246a13ba9951a Mon Sep 17 00:00:00 2001 From: Andy Mina Date: Tue, 25 Oct 2022 13:45:08 -0400 Subject: [PATCH 11/22] test: add more test cases --- src/change_stream.ts | 9 +- src/cursor/abstract_cursor.ts | 2 +- .../change-streams/change_stream.test.ts | 141 ++++++++++++++---- 3 files changed, 118 insertions(+), 34 deletions(-) diff --git a/src/change_stream.ts b/src/change_stream.ts index 3f0922c1c1..2e94c30f6e 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -737,7 +737,8 @@ export class ChangeStream< }, callback); } - async *[Symbol.asyncIterator](): AsyncIterator { + // TODO(andymina): ask about never as third template parameter + async *[Symbol.asyncIterator](): AsyncGenerator { if (this.closed) { return; } @@ -747,7 +748,11 @@ export class ChangeStream< yield await this.next(); } } finally { - await this.close(); + try { + await this.close(); + } catch (error) { + // we're not concerned with errors from close() + } } } diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index 8aa59aa651..f2a1782872 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -297,7 +297,7 @@ export abstract class AbstractCursor< return bufferedDocs; } - [Symbol.asyncIterator](): AsyncIterator { + [Symbol.asyncIterator](): AsyncIterator { async function* nativeAsyncIterator(this: AbstractCursor) { if (this.closed) { return; diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 729a1edc2a..6ee98b9cf8 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -15,12 +15,14 @@ import { CommandStartedEvent, Db, Long, + MongoAPIError, MongoChangeStreamError, MongoClient, MongoServerError, ReadPreference, ResumeToken } from '../../../src'; +import { next } from '../../../src/cursor/abstract_cursor'; import { isHello } from '../../../src/utils'; import * as mock from '../../tools/mongodb-mock/index'; import { @@ -995,7 +997,7 @@ describe('Change Streams', function () { for (const doc of docs) { const change = await changeStreamIterator.next(); - const { fullDocument } = change; + const { fullDocument } = change.value; expect(fullDocument.city).to.equal(doc.city); } @@ -1003,40 +1005,117 @@ describe('Change Streams', function () { } ); - context('when an error is thrown', function () { - it( - 'should close the change stream', - { requires: { topology: '!single', mongodb: '>=4.2' } }, - async function () { - changeStream = collection.watch([]); - await initIteratorMode(changeStream); - const changeStreamIterator = changeStream[Symbol.asyncIterator](); + it( + 'should close the change stream when return is called', + { requires: { topology: '!single' } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); - const unresumableErrorCode = 1000; - await client.db('admin').command({ - configureFailPoint: is4_2Server(this.configuration.version) - ? 'failCommand' - : 'failGetMoreAfterCursorCheckout', - mode: { times: 1 }, - data: { - failCommands: ['getMore'], - errorCode: unresumableErrorCode - } - } as FailPoint); + const docs = [{ city: 'New York City' }, { city: 'Seattle' }, { city: 'Boston' }]; + await collection.insertMany(docs); - await collection.insertOne({ city: 'New York City' }); - try { - const change = await changeStreamIterator.next(); - expect.fail( - 'Change stream did not throw unresumable error and did not produce any events' - ); - } catch (error) { - expect(changeStream.closed).to.be.true; - expect(changeStream.cursor.closed).to.be.true; + const changeStreamAsyncIteratorHelper = async function (changeStream: ChangeStream) { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for await (const change of changeStream) { + return; } + }; + + await changeStreamAsyncIteratorHelper(changeStream); + expect(changeStream.closed).to.be.true; + expect(changeStream.cursor.closed).to.be.true; + } + ); + + it( + 'should close the change stream when an error is thrown', + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + const changeStreamIterator = changeStream[Symbol.asyncIterator](); + + const unresumableErrorCode = 1000; + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: { times: 1 }, + data: { + failCommands: ['getMore'], + errorCode: unresumableErrorCode + } + } as FailPoint); + + await collection.insertOne({ city: 'New York City' }); + try { + await changeStreamIterator.next(); + expect.fail( + 'Change stream did not throw unresumable error and did not produce any events' + ); + } catch (error) { + expect(changeStream.closed).to.be.true; + expect(changeStream.cursor.closed).to.be.true; } - ); - }); + } + ); + + it( + 'should not produce events on closed stream', + { requires: { topology: '!single' } }, + async function () { + changeStream = collection.watch([]); + changeStream.close(); + + const changeStreamIterator = changeStream[Symbol.asyncIterator](); + const change = await changeStreamIterator.next(); + + expect(change.value).to.be.undefined; + } + ); + + it( + 'cannot be used with emitter-based iteration', + { requires: { topology: '!single' } }, + async function () { + changeStream = collection.watch([]); + changeStream.on('change', sinon.stub()); + const changeStreamIterator = changeStream[Symbol.asyncIterator](); + + try { + await changeStreamIterator.next(); + expect.fail('Async iterator was used with emitter-based iteration'); + } catch (error) { + expect(error).to.be.instanceOf(MongoAPIError); + } + } + ); + + it.only( + 'can be used with raw iterator API', + { requires: { topology: '!single' } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + const changeStreamIterator = changeStream[Symbol.asyncIterator](); + + const docs = [{ city: 'Los Angeles' }, { city: 'Miami' }]; + await collection.insertMany(docs); + + await changeStream.next(); + + try { + const change = await changeStreamIterator.next(); + expect(change.value).to.not.be.undefined; + + const { fullDocument } = change.value; + expect(fullDocument.city).to.equal(docs[1].city); + } catch (error) { + expect.fail('Async could not be used with raw iterator API') + } + } + ); }); }); From c7f5599035f7167b2a5f3b61de3f27c1eecd20db Mon Sep 17 00:00:00 2001 From: Andy Mina Date: Tue, 25 Oct 2022 14:00:17 -0400 Subject: [PATCH 12/22] fix: revert change to abstract cursor async iterator --- src/change_stream.ts | 1 - src/cursor/abstract_cursor.ts | 2 +- test/integration/change-streams/change_stream.test.ts | 8 +++----- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/src/change_stream.ts b/src/change_stream.ts index 2e94c30f6e..11225944bc 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -737,7 +737,6 @@ export class ChangeStream< }, callback); } - // TODO(andymina): ask about never as third template parameter async *[Symbol.asyncIterator](): AsyncGenerator { if (this.closed) { return; diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index f2a1782872..8aa59aa651 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -297,7 +297,7 @@ export abstract class AbstractCursor< return bufferedDocs; } - [Symbol.asyncIterator](): AsyncIterator { + [Symbol.asyncIterator](): AsyncIterator { async function* nativeAsyncIterator(this: AbstractCursor) { if (this.closed) { return; diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 6ee98b9cf8..ae6f9126e5 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -22,7 +22,6 @@ import { ReadPreference, ResumeToken } from '../../../src'; -import { next } from '../../../src/cursor/abstract_cursor'; import { isHello } from '../../../src/utils'; import * as mock from '../../tools/mongodb-mock/index'; import { @@ -953,7 +952,6 @@ describe('Change Streams', function () { 'This test only worked because of timing, changeStream.close does not remove the change listener'; }); - // TODO(andymina): ask about testing word semantics here context('iterator api', function () { describe('#tryNext()', function () { it('should return null on single iteration of empty cursor', { @@ -1092,7 +1090,7 @@ describe('Change Streams', function () { } ); - it.only( + it( 'can be used with raw iterator API', { requires: { topology: '!single' } }, async function () { @@ -1112,7 +1110,7 @@ describe('Change Streams', function () { const { fullDocument } = change.value; expect(fullDocument.city).to.equal(docs[1].city); } catch (error) { - expect.fail('Async could not be used with raw iterator API') + expect.fail('Async could not be used with raw iterator API'); } } ); @@ -2464,7 +2462,7 @@ describe('ChangeStream resumability', function () { await collection.insertOne({ city: 'New York City' }); try { - const change = await changeStreamIterator.next(); + await changeStreamIterator.next(); expect.fail( 'Change stream did not throw unresumable error and did not produce any events' ); From 96902498b4553b1aaeb20639716c0598ee7983cd Mon Sep 17 00:00:00 2001 From: Andy Mina Date: Tue, 25 Oct 2022 14:08:26 -0400 Subject: [PATCH 13/22] refactor: lift is4_2 method to global --- test/integration/change-streams/change_stream.test.ts | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index ae6f9126e5..e1b9d08763 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -41,6 +41,9 @@ const initIteratorMode = async (cs: ChangeStream) => { return; }; +const is4_2Server = (serverVersion: string) => + gte(serverVersion, '4.2.0') && lt(serverVersion, '4.3.0'); + // Define the pipeline processing changes const pipeline = [ { $addFields: { addedField: 'This is a field added using $addFields' } }, @@ -54,9 +57,6 @@ describe('Change Streams', function () { let changeStream: ChangeStream; let db: Db; - const is4_2Server = (serverVersion: string) => - gte(serverVersion, '4.2.0') && lt(serverVersion, '4.3.0'); - beforeEach(async function () { const configuration = this.configuration; client = configuration.newClient(); @@ -1791,9 +1791,6 @@ describe('ChangeStream resumability', function () { { error: 'CursorNotFound', code: 43, message: 'cursor not found' } ]; - const is4_2Server = (serverVersion: string) => - gte(serverVersion, '4.2.0') && lt(serverVersion, '4.3.0'); - beforeEach(function () { assert(this.currentTest != null); if ( From 232eaaa914d117458a57b764a91b68da4de18e76 Mon Sep 17 00:00:00 2001 From: Andy Mina Date: Tue, 25 Oct 2022 16:25:56 -0400 Subject: [PATCH 14/22] test: add test for ignoring errors in close --- .../change-streams/change_stream.test.ts | 39 +++++++++++-------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index e1b9d08763..d85eda9465 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -73,6 +73,7 @@ describe('Change Streams', function () { }); afterEach(async () => { + sinon.restore(); await changeStream.close(); await client.close(); await mock.cleanup(); @@ -952,7 +953,7 @@ describe('Change Streams', function () { 'This test only worked because of timing, changeStream.close does not remove the change listener'; }); - context('iterator api', function () { + describe('iterator api', function () { describe('#tryNext()', function () { it('should return null on single iteration of empty cursor', { metadata: { requires: { topology: 'replicaset' } }, @@ -998,8 +999,6 @@ describe('Change Streams', function () { const { fullDocument } = change.value; expect(fullDocument.city).to.equal(doc.city); } - - changeStream.close(); } ); @@ -1009,18 +1008,12 @@ describe('Change Streams', function () { async function () { changeStream = collection.watch([]); await initIteratorMode(changeStream); + const changeStreamIterator = changeStream[Symbol.asyncIterator](); const docs = [{ city: 'New York City' }, { city: 'Seattle' }, { city: 'Boston' }]; await collection.insertMany(docs); - const changeStreamAsyncIteratorHelper = async function (changeStream: ChangeStream) { - // eslint-disable-next-line @typescript-eslint/no-unused-vars - for await (const change of changeStream) { - return; - } - }; - - await changeStreamAsyncIteratorHelper(changeStream); + await changeStreamIterator.return(); expect(changeStream.closed).to.be.true; expect(changeStream.cursor.closed).to.be.true; } @@ -1114,6 +1107,24 @@ describe('Change Streams', function () { } } ); + + it( + 'ignores errors thrown from close', + { requires: { topology: '!single' } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + const changeStreamIterator = changeStream[Symbol.asyncIterator](); + + sinon.stub(changeStream.cursor, 'close').throws(new MongoAPIError('testing')); + + try { + await changeStreamIterator.return(); + } catch (error) { + expect.fail('Async iterator threw an error on close'); + } + } + ); }); }); @@ -2362,8 +2373,6 @@ describe('ChangeStream resumability', function () { await changeStreamIterator.next(); expect(aggregateEvents).to.have.lengthOf(2); - - changeStream.close(); } ); } @@ -2397,8 +2406,6 @@ describe('ChangeStream resumability', function () { await changeStreamIterator.next(); expect(aggregateEvents).to.have.lengthOf(2); - - changeStream.close(); } ); } @@ -2467,8 +2474,6 @@ describe('ChangeStream resumability', function () { expect(error).to.be.instanceOf(MongoServerError); expect(aggregateEvents).to.have.lengthOf(1); } - - changeStream.close(); } ); }); From d9206e8ff6ff35f977b9fc4415b6c8e36346b153 Mon Sep 17 00:00:00 2001 From: Andy Mina Date: Thu, 27 Oct 2022 09:59:42 -0400 Subject: [PATCH 15/22] test: fix failing tests --- src/change_stream.ts | 6 +++-- .../change-streams/change_stream.test.ts | 22 ++++++------------- 2 files changed, 11 insertions(+), 17 deletions(-) diff --git a/src/change_stream.ts b/src/change_stream.ts index 11225944bc..867badf955 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -737,13 +737,15 @@ export class ChangeStream< }, callback); } - async *[Symbol.asyncIterator](): AsyncGenerator { + async *[Symbol.asyncIterator](): AsyncGenerator { if (this.closed) { return; } try { - while (await this.hasNext()) { + // Change streams run indefinitely as long as errors are resumable + // So the only loop breaking condition is if `next()` throws + while (true) { yield await this.next(); } } finally { diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index d85eda9465..9e1bf988cc 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -1013,6 +1013,7 @@ describe('Change Streams', function () { const docs = [{ city: 'New York City' }, { city: 'Seattle' }, { city: 'Boston' }]; await collection.insertMany(docs); + await changeStreamIterator.next(); await changeStreamIterator.return(); expect(changeStream.closed).to.be.true; expect(changeStream.cursor.closed).to.be.true; @@ -1074,12 +1075,8 @@ describe('Change Streams', function () { changeStream.on('change', sinon.stub()); const changeStreamIterator = changeStream[Symbol.asyncIterator](); - try { - await changeStreamIterator.next(); - expect.fail('Async iterator was used with emitter-based iteration'); - } catch (error) { - expect(error).to.be.instanceOf(MongoAPIError); - } + const error = await changeStreamIterator.next().catch(e => e); + expect(error).to.be.instanceOf(MongoAPIError); } ); @@ -2465,15 +2462,10 @@ describe('ChangeStream resumability', function () { } as FailPoint); await collection.insertOne({ city: 'New York City' }); - try { - await changeStreamIterator.next(); - expect.fail( - 'Change stream did not throw unresumable error and did not produce any events' - ); - } catch (error) { - expect(error).to.be.instanceOf(MongoServerError); - expect(aggregateEvents).to.have.lengthOf(1); - } + + const error = await changeStreamIterator.next().catch(e => e); + expect(error).to.be.instanceOf(MongoServerError); + expect(aggregateEvents).to.have.lengthOf(1); } ); }); From 470ea73a8ae08182c3693f3fc3beb7f3a9f9a681 Mon Sep 17 00:00:00 2001 From: Andy Mina Date: Mon, 31 Oct 2022 11:10:38 -0400 Subject: [PATCH 16/22] refactor: move unit tests back to int --- src/change_stream.ts | 2 +- test/types/change_stream.test-d.ts | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/change_stream.ts b/src/change_stream.ts index 867badf955..73526f0d46 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -751,7 +751,7 @@ export class ChangeStream< } finally { try { await this.close(); - } catch (error) { + } catch { // we're not concerned with errors from close() } } diff --git a/test/types/change_stream.test-d.ts b/test/types/change_stream.test-d.ts index 5f36d10d99..6358b0a9a3 100644 --- a/test/types/change_stream.test-d.ts +++ b/test/types/change_stream.test-d.ts @@ -214,3 +214,7 @@ expectError(collection.watch()); collection .watch<{ a: number }, { b: boolean }>() .on('change', change => expectType<{ b: boolean }>(change)); + +expectType, void, void>>( + collection.watch()[Symbol.asyncIterator]() +); From c27c15974341e02e883fa1faf7971fd9334852a9 Mon Sep 17 00:00:00 2001 From: Andy Mina Date: Mon, 31 Oct 2022 11:13:00 -0400 Subject: [PATCH 17/22] test: change assertion format for changestream cursor --- test/integration/change-streams/change_stream.test.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 9e1bf988cc..8968ade6fa 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -367,7 +367,7 @@ describe('Change Streams', function () { // Check the cursor is closed expect(changeStream.closed).to.be.true; - expect(changeStream.cursor.closed).to.be.true; + expect(changeStream.cursor).property('closed', true); done(); }); }); @@ -1016,7 +1016,7 @@ describe('Change Streams', function () { await changeStreamIterator.next(); await changeStreamIterator.return(); expect(changeStream.closed).to.be.true; - expect(changeStream.cursor.closed).to.be.true; + expect(changeStream.cursor).property('closed', true); } ); @@ -1048,7 +1048,7 @@ describe('Change Streams', function () { ); } catch (error) { expect(changeStream.closed).to.be.true; - expect(changeStream.cursor.closed).to.be.true; + expect(changeStream.cursor).property('closed', true); } } ); From 876f8f88db12f3b1af52e803e737a1c8ca8e61d5 Mon Sep 17 00:00:00 2001 From: Andy Mina Date: Mon, 31 Oct 2022 12:05:27 -0400 Subject: [PATCH 18/22] test: use for-await syntax instead of manual iteration --- .../change-streams/change_stream.test.ts | 93 +++++++++++++++---- 1 file changed, 73 insertions(+), 20 deletions(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 8968ade6fa..975abb9e51 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -989,16 +989,20 @@ describe('Change Streams', function () { async function () { changeStream = collection.watch([]); await initIteratorMode(changeStream); - const changeStreamIterator = changeStream[Symbol.asyncIterator](); const docs = [{ city: 'New York City' }, { city: 'Seattle' }, { city: 'Boston' }]; await collection.insertMany(docs); - for (const doc of docs) { - const change = await changeStreamIterator.next(); - const { fullDocument } = change.value; - expect(fullDocument.city).to.equal(doc.city); + for await (const change of changeStream) { + const { fullDocument } = change; + const expectedDoc = docs.shift(); + expect(fullDocument.city).to.equal(expectedDoc.city); + if (docs.length === 0) { + break; + } } + + expect(docs).to.have.length(0, 'expected to find all docs before exiting loop'); } ); @@ -1046,7 +1050,7 @@ describe('Change Streams', function () { expect.fail( 'Change stream did not throw unresumable error and did not produce any events' ); - } catch (error) { + } catch { expect(changeStream.closed).to.be.true; expect(changeStream.cursor).property('closed', true); } @@ -1099,7 +1103,7 @@ describe('Change Streams', function () { const { fullDocument } = change.value; expect(fullDocument.city).to.equal(docs[1].city); - } catch (error) { + } catch { expect.fail('Async could not be used with raw iterator API'); } } @@ -1117,11 +1121,36 @@ describe('Change Streams', function () { try { await changeStreamIterator.return(); - } catch (error) { + } catch { expect.fail('Async iterator threw an error on close'); } } ); + + it( + 'cannot be resumed from partial iteration', + { requires: { topology: '!single' } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + + const docs = [{ city: 'New York City' }, { city: 'Seattle' }, { city: 'Boston' }]; + await collection.insertMany(docs); + + for await (const change of changeStream) { + const { fullDocument } = change; + const expectedDoc = docs.shift(); + expect(fullDocument.city).to.equal(expectedDoc.city); + break; + } + + for await (const change of changeStream) { + expect.fail('Change stream was resumed after partial iteration'); + } + + expect(docs).to.have.length(2, 'expected to find remaining docs after partial iteration'); + } + ); }); }); @@ -2344,7 +2373,7 @@ describe('ChangeStream resumability', function () { }); }); - context('#asyncIterator', function () { + context.only('#asyncIterator', function () { for (const { error, code, message } of resumableErrorCodes) { it( `resumes on error code ${code} (${error})`, @@ -2352,7 +2381,9 @@ describe('ChangeStream resumability', function () { async function () { changeStream = collection.watch([]); await initIteratorMode(changeStream); - const changeStreamIterator = changeStream[Symbol.asyncIterator](); + + const docs = [{ city: 'New York City' }, { city: 'Seattle' }, { city: 'Boston' }]; + await collection.insertMany(docs); await client.db('admin').command({ configureFailPoint: is4_2Server(this.configuration.version) @@ -2366,9 +2397,16 @@ describe('ChangeStream resumability', function () { } } as FailPoint); - await collection.insertOne({ city: 'New York City' }); - await changeStreamIterator.next(); + for await (const change of changeStream) { + const { fullDocument } = change; + const expectedDoc = docs.shift(); + expect(fullDocument.city).to.equal(expectedDoc.city); + if (docs.length === 0) { + break; + } + } + expect(docs).to.have.length(0, 'expected to find all docs before exiting loop'); expect(aggregateEvents).to.have.lengthOf(2); } ); @@ -2399,9 +2437,19 @@ describe('ChangeStream resumability', function () { callback(error); }); - await collection.insertOne({ city: 'New York City' }); - await changeStreamIterator.next(); + const docs = [{ city: 'Seattle' }, { city: 'Boston' }]; + await collection.insertMany(docs); + for await (const change of changeStream) { + const { fullDocument } = change; + const expectedDoc = docs.shift(); + expect(fullDocument.city).to.equal(expectedDoc.city); + if (docs.length === 0) { + break; + } + } + + expect(docs).to.have.length(0, 'expected to find all docs before exiting loop'); expect(aggregateEvents).to.have.lengthOf(2); } ); @@ -2447,7 +2495,9 @@ describe('ChangeStream resumability', function () { async function () { changeStream = collection.watch([]); await initIteratorMode(changeStream); - const changeStreamIterator = changeStream[Symbol.asyncIterator](); + + const docs = [{ city: 'New York City' }, { city: 'Seattle' }, { city: 'Boston' }]; + await collection.insertMany(docs); const unresumableErrorCode = 1000; await client.db('admin').command({ @@ -2461,11 +2511,14 @@ describe('ChangeStream resumability', function () { } } as FailPoint); - await collection.insertOne({ city: 'New York City' }); - - const error = await changeStreamIterator.next().catch(e => e); - expect(error).to.be.instanceOf(MongoServerError); - expect(aggregateEvents).to.have.lengthOf(1); + try { + for await (const change of changeStream) { + expect.fail('Change stream produced events on an unresumable error'); + } + } catch (error) { + expect(error).to.be.instanceOf(MongoServerError); + expect(aggregateEvents).to.have.lengthOf(1); + } } ); }); From 477c3b7ef4026f2b60a066cd5c305fa30297fe2d Mon Sep 17 00:00:00 2001 From: Andy Mina Date: Mon, 31 Oct 2022 12:17:33 -0400 Subject: [PATCH 19/22] fix: remove only from tests --- test/integration/change-streams/change_stream.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 975abb9e51..a0c5cdbc48 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -2373,7 +2373,7 @@ describe('ChangeStream resumability', function () { }); }); - context.only('#asyncIterator', function () { + context('#asyncIterator', function () { for (const { error, code, message } of resumableErrorCodes) { it( `resumes on error code ${code} (${error})`, From 9a3c76c78857e1f0cb7ca18a6d1aa941b5ceb7f3 Mon Sep 17 00:00:00 2001 From: Andy Mina Date: Mon, 31 Oct 2022 12:51:24 -0400 Subject: [PATCH 20/22] fix: disable no unused vars --- test/integration/change-streams/change_stream.test.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index a0c5cdbc48..6cea255682 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -1143,7 +1143,7 @@ describe('Change Streams', function () { expect(fullDocument.city).to.equal(expectedDoc.city); break; } - + // eslint-disable-next-line no-unused-vars for await (const change of changeStream) { expect.fail('Change stream was resumed after partial iteration'); } @@ -2512,6 +2512,7 @@ describe('ChangeStream resumability', function () { } as FailPoint); try { + // eslint-disable-next-line no-unused-vars for await (const change of changeStream) { expect.fail('Change stream produced events on an unresumable error'); } From d5fd6a0bc07a793741f799b66d9e2a2593c20fc6 Mon Sep 17 00:00:00 2001 From: Andy Mina Date: Mon, 31 Oct 2022 13:46:56 -0400 Subject: [PATCH 21/22] fix: add eslint disable line --- test/integration/change-streams/change_stream.test.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index 6cea255682..e9f8f15894 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -1143,7 +1143,7 @@ describe('Change Streams', function () { expect(fullDocument.city).to.equal(expectedDoc.city); break; } - // eslint-disable-next-line no-unused-vars + // eslint-disable-next-line @typescript-eslint/no-unused-vars for await (const change of changeStream) { expect.fail('Change stream was resumed after partial iteration'); } @@ -2512,7 +2512,7 @@ describe('ChangeStream resumability', function () { } as FailPoint); try { - // eslint-disable-next-line no-unused-vars + // eslint-disable-next-line @typescript-eslint/no-unused-vars for await (const change of changeStream) { expect.fail('Change stream produced events on an unresumable error'); } From f56753be52ba22455c15a0a53484a959a40964c0 Mon Sep 17 00:00:00 2001 From: Andy Mina Date: Tue, 1 Nov 2022 14:34:08 -0400 Subject: [PATCH 22/22] test: organize tests better --- .../change-streams/change_stream.test.ts | 292 ++++++++++-------- 1 file changed, 155 insertions(+), 137 deletions(-) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index e9f8f15894..81e559a89d 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -983,174 +983,192 @@ describe('Change Streams', function () { }); describe('#asyncIterator', function () { - it( - 'can iterate through changes', - { requires: { topology: '!single', mongodb: '>=4.2' } }, - async function () { - changeStream = collection.watch([]); - await initIteratorMode(changeStream); + describe('for-await iteration', function () { + it( + 'can iterate through changes', + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); - const docs = [{ city: 'New York City' }, { city: 'Seattle' }, { city: 'Boston' }]; - await collection.insertMany(docs); + const docs = [{ city: 'New York City' }, { city: 'Seattle' }, { city: 'Boston' }]; + await collection.insertMany(docs); - for await (const change of changeStream) { - const { fullDocument } = change; - const expectedDoc = docs.shift(); - expect(fullDocument.city).to.equal(expectedDoc.city); - if (docs.length === 0) { - break; + for await (const change of changeStream) { + const { fullDocument } = change; + const expectedDoc = docs.shift(); + expect(fullDocument.city).to.equal(expectedDoc.city); + if (docs.length === 0) { + break; + } } - } - - expect(docs).to.have.length(0, 'expected to find all docs before exiting loop'); - } - ); - it( - 'should close the change stream when return is called', - { requires: { topology: '!single' } }, - async function () { - changeStream = collection.watch([]); - await initIteratorMode(changeStream); - const changeStreamIterator = changeStream[Symbol.asyncIterator](); - - const docs = [{ city: 'New York City' }, { city: 'Seattle' }, { city: 'Boston' }]; - await collection.insertMany(docs); + expect(docs).to.have.length(0, 'expected to find all docs before exiting loop'); + } + ); - await changeStreamIterator.next(); - await changeStreamIterator.return(); - expect(changeStream.closed).to.be.true; - expect(changeStream.cursor).property('closed', true); - } - ); + it( + 'cannot be resumed from partial iteration', + { requires: { topology: '!single' } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); - it( - 'should close the change stream when an error is thrown', - { requires: { topology: '!single', mongodb: '>=4.2' } }, - async function () { - changeStream = collection.watch([]); - await initIteratorMode(changeStream); - const changeStreamIterator = changeStream[Symbol.asyncIterator](); + const docs = [{ city: 'New York City' }, { city: 'Seattle' }, { city: 'Boston' }]; + await collection.insertMany(docs); - const unresumableErrorCode = 1000; - await client.db('admin').command({ - configureFailPoint: is4_2Server(this.configuration.version) - ? 'failCommand' - : 'failGetMoreAfterCursorCheckout', - mode: { times: 1 }, - data: { - failCommands: ['getMore'], - errorCode: unresumableErrorCode + for await (const change of changeStream) { + const { fullDocument } = change; + const expectedDoc = docs.shift(); + expect(fullDocument.city).to.equal(expectedDoc.city); + break; + } + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for await (const change of changeStream) { + expect.fail('Change stream was resumed after partial iteration'); } - } as FailPoint); - await collection.insertOne({ city: 'New York City' }); - try { - await changeStreamIterator.next(); - expect.fail( - 'Change stream did not throw unresumable error and did not produce any events' + expect(docs).to.have.length( + 2, + 'expected to find remaining docs after partial iteration' ); - } catch { - expect(changeStream.closed).to.be.true; - expect(changeStream.cursor).property('closed', true); } - } - ); + ); - it( - 'should not produce events on closed stream', - { requires: { topology: '!single' } }, - async function () { - changeStream = collection.watch([]); - changeStream.close(); + it( + 'cannot be used with emitter-based iteration', + { requires: { topology: '!single' } }, + async function () { + changeStream = collection.watch([]); + changeStream.on('change', sinon.stub()); - const changeStreamIterator = changeStream[Symbol.asyncIterator](); - const change = await changeStreamIterator.next(); + try { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for await (const change of changeStream) { + expect.fail('Async iterator was used with emitter-based iteration'); + } + } catch (error) { + expect(error).to.be.instanceOf(MongoAPIError); + } + } + ); - expect(change.value).to.be.undefined; - } - ); + it( + 'can be used with raw iterator API', + { requires: { topology: '!single' } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); - it( - 'cannot be used with emitter-based iteration', - { requires: { topology: '!single' } }, - async function () { - changeStream = collection.watch([]); - changeStream.on('change', sinon.stub()); - const changeStreamIterator = changeStream[Symbol.asyncIterator](); + const docs = [{ city: 'Los Angeles' }, { city: 'Miami' }]; + await collection.insertMany(docs); - const error = await changeStreamIterator.next().catch(e => e); - expect(error).to.be.instanceOf(MongoAPIError); - } - ); + await changeStream.next(); + docs.shift(); - it( - 'can be used with raw iterator API', - { requires: { topology: '!single' } }, - async function () { - changeStream = collection.watch([]); - await initIteratorMode(changeStream); - const changeStreamIterator = changeStream[Symbol.asyncIterator](); + try { + for await (const change of changeStream) { + const { fullDocument } = change; + const expectedDoc = docs.shift(); + expect(fullDocument.city).to.equal(expectedDoc.city); - const docs = [{ city: 'Los Angeles' }, { city: 'Miami' }]; - await collection.insertMany(docs); + if (docs.length === 0) { + break; + } + } + } catch { + expect.fail('Async could not be used with raw iterator API'); + } + } + ); + }); - await changeStream.next(); + describe('#return', function () { + it( + 'should close the change stream when return is called', + { requires: { topology: '!single' } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + const changeStreamIterator = changeStream[Symbol.asyncIterator](); - try { - const change = await changeStreamIterator.next(); - expect(change.value).to.not.be.undefined; + const docs = [{ city: 'New York City' }, { city: 'Seattle' }, { city: 'Boston' }]; + await collection.insertMany(docs); - const { fullDocument } = change.value; - expect(fullDocument.city).to.equal(docs[1].city); - } catch { - expect.fail('Async could not be used with raw iterator API'); + await changeStreamIterator.next(); + await changeStreamIterator.return(); + expect(changeStream.closed).to.be.true; + expect(changeStream.cursor).property('closed', true); } - } - ); + ); - it( - 'ignores errors thrown from close', - { requires: { topology: '!single' } }, - async function () { - changeStream = collection.watch([]); - await initIteratorMode(changeStream); - const changeStreamIterator = changeStream[Symbol.asyncIterator](); + it( + 'ignores errors thrown from close', + { requires: { topology: '!single' } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + const changeStreamIterator = changeStream[Symbol.asyncIterator](); - sinon.stub(changeStream.cursor, 'close').throws(new MongoAPIError('testing')); + sinon.stub(changeStream.cursor, 'close').throws(new MongoAPIError('testing')); - try { - await changeStreamIterator.return(); - } catch { - expect.fail('Async iterator threw an error on close'); + try { + await changeStreamIterator.return(); + } catch { + expect.fail('Async iterator threw an error on close'); + } } - } - ); + ); + }); - it( - 'cannot be resumed from partial iteration', - { requires: { topology: '!single' } }, - async function () { - changeStream = collection.watch([]); - await initIteratorMode(changeStream); + describe('#next', function () { + it( + 'should close the change stream when an error is thrown', + { requires: { topology: '!single', mongodb: '>=4.2' } }, + async function () { + changeStream = collection.watch([]); + await initIteratorMode(changeStream); + const changeStreamIterator = changeStream[Symbol.asyncIterator](); - const docs = [{ city: 'New York City' }, { city: 'Seattle' }, { city: 'Boston' }]; - await collection.insertMany(docs); + const unresumableErrorCode = 1000; + await client.db('admin').command({ + configureFailPoint: is4_2Server(this.configuration.version) + ? 'failCommand' + : 'failGetMoreAfterCursorCheckout', + mode: { times: 1 }, + data: { + failCommands: ['getMore'], + errorCode: unresumableErrorCode + } + } as FailPoint); - for await (const change of changeStream) { - const { fullDocument } = change; - const expectedDoc = docs.shift(); - expect(fullDocument.city).to.equal(expectedDoc.city); - break; - } - // eslint-disable-next-line @typescript-eslint/no-unused-vars - for await (const change of changeStream) { - expect.fail('Change stream was resumed after partial iteration'); + await collection.insertOne({ city: 'New York City' }); + try { + await changeStreamIterator.next(); + expect.fail( + 'Change stream did not throw unresumable error and did not produce any events' + ); + } catch { + expect(changeStream.closed).to.be.true; + expect(changeStream.cursor).property('closed', true); + } } + ); - expect(docs).to.have.length(2, 'expected to find remaining docs after partial iteration'); - } - ); + it( + 'should not produce events on closed stream', + { requires: { topology: '!single' } }, + async function () { + changeStream = collection.watch([]); + changeStream.close(); + + const changeStreamIterator = changeStream[Symbol.asyncIterator](); + const change = await changeStreamIterator.next(); + + expect(change.value).to.be.undefined; + } + ); + }); }); });