Skip to content

Commit

Permalink
test: restrict new change stream tests to not run on standalone servers
Browse files Browse the repository at this point in the history
  • Loading branch information
baileympearson committed Jun 15, 2022
1 parent 4297c64 commit 875b1c3
Showing 1 changed file with 130 additions and 104 deletions.
234 changes: 130 additions & 104 deletions test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1496,12 +1496,14 @@ describe('Change Streams', { sessions: { skipLeakTests: true } }, function () {
});
});

describe('ChangeStream resumability', { requires: { topology: '!single' } }, function () {
describe.only('ChangeStream resumability', function () {
let client: MongoClient;
let collection: Collection;
let changeStream: ChangeStream;
let aggregateEvents: CommandStartedEvent[] = [];

const baseRequirements = { topology: '!single' };

const changeStreamResumeOptions: ChangeStreamOptions = {
fullDocument: 'updateLookup',
collation: { locale: 'en', maxVariable: 'punct' },
Expand Down Expand Up @@ -1554,7 +1556,7 @@ describe('ChangeStream resumability', { requires: { topology: '!single' } }, fun
for (const { error, code } of resumableErrorCodes) {
it(
`resumes on error code ${code} (${error})`,
{ requires: { mongodb: '>=4.2' } },
{ requires: { topology: '!single', mongodb: '>=4.2' } },
async function () {
changeStream = collection.watch([]);
await initIteratorMode(changeStream);
Expand Down Expand Up @@ -1582,7 +1584,7 @@ describe('ChangeStream resumability', { requires: { topology: '!single' } }, fun
for (const { error, code } of resumableErrorCodes) {
it(
`resumes on error code ${code} (${error})`,
{ requires: { mongodb: '<4.2' } },
{ requires: { topology: '!single', mongodb: '<4.2' } },
async function () {
changeStream = collection.watch([]);
await initIteratorMode(changeStream);
Expand Down Expand Up @@ -1616,7 +1618,7 @@ describe('ChangeStream resumability', { requires: { topology: '!single' } }, fun

it(
'maintains change stream options on resume',
{ requires: { mongodb: '>=4.2' } },
{ requires: { topology: '!single', mongodb: '>=4.2' } },
async function () {
changeStream = collection.watch([], changeStreamResumeOptions);
await initIteratorMode(changeStream);
Expand Down Expand Up @@ -1647,38 +1649,42 @@ describe('ChangeStream resumability', { requires: { topology: '!single' } }, fun
);

context('when the error is not a resumable error', function () {
it('does not resume', { requires: { mongodb: '>=4.2' } }, async function () {
changeStream = collection.watch([]);
it(
'does not resume',
{ requires: { topology: '!single', mongodb: '>=4.2' } },
async function () {
changeStream = collection.watch([]);

const unresumableErrorCode = 1000;
await client.db('admin').command(<FailPoint>{
configureFailPoint: is4_2Server(this.configuration.version)
? 'failCommand'
: 'failGetMoreAfterCursorCheckout',
mode: { times: 1 },
data: {
failCommands: ['getMore'],
errorCode: unresumableErrorCode
}
});
const unresumableErrorCode = 1000;
await client.db('admin').command(<FailPoint>{
configureFailPoint: is4_2Server(this.configuration.version)
? 'failCommand'
: 'failGetMoreAfterCursorCheckout',
mode: { times: 1 },
data: {
failCommands: ['getMore'],
errorCode: unresumableErrorCode
}
});

await initIteratorMode(changeStream);
await initIteratorMode(changeStream);

await collection.insertOne({ name: 'bailey' });
await collection.insertOne({ name: 'bailey' });

const error = await changeStream.next().catch(err => err);
const error = await changeStream.next().catch(err => err);

expect(error).to.be.instanceOf(MongoServerError);
expect(aggregateEvents).to.have.lengthOf(1);
});
expect(error).to.be.instanceOf(MongoServerError);
expect(aggregateEvents).to.have.lengthOf(1);
}
);
});
});

context('#hasNext', function () {
for (const { error, code } of resumableErrorCodes) {
it(
`resumes on error code ${code} (${error})`,
{ requires: { mongodb: '>=4.2' } },
{ requires: { topology: '!single', mongodb: '>=4.2' } },
async function () {
changeStream = collection.watch([]);
await initIteratorMode(changeStream);
Expand Down Expand Up @@ -1707,7 +1713,7 @@ describe('ChangeStream resumability', { requires: { topology: '!single' } }, fun
for (const { error, code } of resumableErrorCodes) {
it(
`resumes on error code ${code} (${error})`,
{ requires: { mongodb: '<4.2' } },
{ requires: { topology: '!single', mongodb: '<4.2' } },
async function () {
changeStream = collection.watch([]);
await initIteratorMode(changeStream);
Expand Down Expand Up @@ -1741,7 +1747,7 @@ describe('ChangeStream resumability', { requires: { topology: '!single' } }, fun

it(
'maintains change stream options on resume',
{ requires: { mongodb: '>=4.2' } },
{ requires: { topology: '!single', mongodb: '>=4.2' } },
async function () {
changeStream = collection.watch([], changeStreamResumeOptions);
await initIteratorMode(changeStream);
Expand Down Expand Up @@ -1772,38 +1778,42 @@ describe('ChangeStream resumability', { requires: { topology: '!single' } }, fun
);

context('when the error is not a resumable error', function () {
it('does not resume', { requires: { mongodb: '>=4.2' } }, async function () {
changeStream = collection.watch([]);
it(
'does not resume',
{ requires: { topology: '!single', mongodb: '>=4.2' } },
async function () {
changeStream = collection.watch([]);

const unresumableErrorCode = 1000;
await client.db('admin').command(<FailPoint>{
configureFailPoint: is4_2Server(this.configuration.version)
? 'failCommand'
: 'failGetMoreAfterCursorCheckout',
mode: { times: 1 },
data: {
failCommands: ['getMore'],
errorCode: unresumableErrorCode
}
});
const unresumableErrorCode = 1000;
await client.db('admin').command(<FailPoint>{
configureFailPoint: is4_2Server(this.configuration.version)
? 'failCommand'
: 'failGetMoreAfterCursorCheckout',
mode: { times: 1 },
data: {
failCommands: ['getMore'],
errorCode: unresumableErrorCode
}
});

await initIteratorMode(changeStream);
await initIteratorMode(changeStream);

await collection.insertOne({ name: 'bailey' });
await collection.insertOne({ name: 'bailey' });

const error = await changeStream.hasNext().catch(err => err);
const error = await changeStream.hasNext().catch(err => err);

expect(error).to.be.instanceOf(MongoServerError);
expect(aggregateEvents).to.have.lengthOf(1);
});
expect(error).to.be.instanceOf(MongoServerError);
expect(aggregateEvents).to.have.lengthOf(1);
}
);
});
});

context('#tryNext', function () {
for (const { error, code } of resumableErrorCodes) {
it(
`resumes on error code ${code} (${error})`,
{ requires: { mongodb: '>=4.2' } },
{ requires: { topology: '!single', mongodb: '>=4.2' } },
async function () {
changeStream = collection.watch([]);
await initIteratorMode(changeStream);
Expand Down Expand Up @@ -1831,7 +1841,7 @@ describe('ChangeStream resumability', { requires: { topology: '!single' } }, fun
for (const { error, code } of resumableErrorCodes) {
it(
`resumes on error code ${code} (${error})`,
{ requires: { mongodb: '<4.2' } },
{ requires: { topology: '!single', mongodb: '<4.2' } },
async function () {
changeStream = collection.watch([]);
await initIteratorMode(changeStream);
Expand Down Expand Up @@ -1865,7 +1875,7 @@ describe('ChangeStream resumability', { requires: { topology: '!single' } }, fun

it(
'maintains change stream options on resume',
{ requires: { mongodb: '>=4.2' } },
{ requires: { topology: '!single', mongodb: '>=4.2' } },
async function () {
changeStream = collection.watch([], changeStreamResumeOptions);
await initIteratorMode(changeStream);
Expand Down Expand Up @@ -1896,30 +1906,34 @@ describe('ChangeStream resumability', { requires: { topology: '!single' } }, fun
);

context('when the error is not a resumable error', function () {
it('does not resume', { requires: { mongodb: '>=4.2' } }, async function () {
changeStream = collection.watch([]);
it(
'does not resume',
{ requires: { topology: '!single', mongodb: '>=4.2' } },
async function () {
changeStream = collection.watch([]);

const unresumableErrorCode = 1000;
await client.db('admin').command(<FailPoint>{
configureFailPoint: is4_2Server(this.configuration.version)
? 'failCommand'
: 'failGetMoreAfterCursorCheckout',
mode: { times: 1 },
data: {
failCommands: ['getMore'],
errorCode: unresumableErrorCode
}
});
const unresumableErrorCode = 1000;
await client.db('admin').command(<FailPoint>{
configureFailPoint: is4_2Server(this.configuration.version)
? 'failCommand'
: 'failGetMoreAfterCursorCheckout',
mode: { times: 1 },
data: {
failCommands: ['getMore'],
errorCode: unresumableErrorCode
}
});

await initIteratorMode(changeStream);
await initIteratorMode(changeStream);

await collection.insertOne({ name: 'bailey' });
await collection.insertOne({ name: 'bailey' });

const error = await changeStream.tryNext().catch(err => err);
const error = await changeStream.tryNext().catch(err => err);

expect(error).to.be.instanceOf(MongoServerError);
expect(aggregateEvents).to.have.lengthOf(1);
});
expect(error).to.be.instanceOf(MongoServerError);
expect(aggregateEvents).to.have.lengthOf(1);
}
);
});
});
});
Expand All @@ -1928,7 +1942,7 @@ describe('ChangeStream resumability', { requires: { topology: '!single' } }, fun
for (const { error, code } of resumableErrorCodes) {
it(
`resumes on error code ${code} (${error})`,
{ requires: { mongodb: '>=4.2' } },
{ requires: { topology: '!single', mongodb: '>=4.2' } },
async function () {
changeStream = collection.watch([]);

Expand Down Expand Up @@ -1958,7 +1972,7 @@ describe('ChangeStream resumability', { requires: { topology: '!single' } }, fun

it(
'maintains the change stream options on resume',
{ requires: { mongodb: '>=4.2' } },
{ requires: { topology: '!single', mongodb: '>=4.2' } },
async function () {
changeStream = collection.watch([], changeStreamResumeOptions);

Expand Down Expand Up @@ -1991,52 +2005,64 @@ describe('ChangeStream resumability', { requires: { topology: '!single' } }, fun
);

context('when the error is not a resumable error', function () {
it('does not resume', { requires: { mongodb: '>=4.2' } }, async function () {
changeStream = collection.watch([]);
it(
'does not resume',
{ requires: { topology: '!single', mongodb: '>=4.2' } },
async function () {
changeStream = collection.watch([]);

const unresumableErrorCode = 1000;
await client.db('admin').command(<FailPoint>{
configureFailPoint: is4_2Server(this.configuration.version)
? 'failCommand'
: 'failGetMoreAfterCursorCheckout',
mode: { times: 1 },
data: {
failCommands: ['getMore'],
errorCode: unresumableErrorCode
}
});
const unresumableErrorCode = 1000;
await client.db('admin').command(<FailPoint>{
configureFailPoint: is4_2Server(this.configuration.version)
? 'failCommand'
: 'failGetMoreAfterCursorCheckout',
mode: { times: 1 },
data: {
failCommands: ['getMore'],
errorCode: unresumableErrorCode
}
});

const willBeError = once(changeStream, 'change').catch(error => error);
await once(changeStream.cursor, 'init');
await collection.insertOne({ name: 'bailey' });
const willBeError = once(changeStream, 'change').catch(error => error);
await once(changeStream.cursor, 'init');
await collection.insertOne({ name: 'bailey' });

const error = await willBeError;
const error = await willBeError;

expect(error).to.be.instanceOf(MongoServerError);
expect(aggregateEvents).to.have.lengthOf(1);
});
expect(error).to.be.instanceOf(MongoServerError);
expect(aggregateEvents).to.have.lengthOf(1);
}
);
});
});

it('caches the server version after the initial aggregate call', async function () {
changeStream = collection.watch([], changeStreamResumeOptions);
await initIteratorMode(changeStream);
it(
'caches the server version after the initial aggregate call',
{ requires: { topology: '!single' } },
async function () {
changeStream = collection.watch([], changeStreamResumeOptions);
await initIteratorMode(changeStream);

expect(changeStream.cursor.maxWireVersion).not.to.be.undefined;
});
expect(changeStream.cursor.maxWireVersion).not.to.be.undefined;
}
);

it('caches the server version after each getMore call', async function () {
changeStream = collection.watch([], changeStreamResumeOptions);
await initIteratorMode(changeStream);
it(
'caches the server version after each getMore call',
{ requires: { topology: '!single' } },
async function () {
changeStream = collection.watch([], changeStreamResumeOptions);
await initIteratorMode(changeStream);

const maxWireVersion = changeStream.cursor.maxWireVersion;
console.error(maxWireVersion);
changeStream.cursor.maxWireVersion = 20;
const maxWireVersion = changeStream.cursor.maxWireVersion;
console.error(maxWireVersion);
changeStream.cursor.maxWireVersion = 20;

await collection.insertOne({ name: 'bailey' });
await collection.insertOne({ name: 'bailey' });

await changeStream.next();
await changeStream.next();

expect(changeStream.cursor.maxWireVersion).equal(maxWireVersion);
});
expect(changeStream.cursor.maxWireVersion).equal(maxWireVersion);
}
);
});

0 comments on commit 875b1c3

Please sign in to comment.