Skip to content

Commit

Permalink
test(NODE-3293): clean up change stream tests setImmediate usage (#2956)
Browse files Browse the repository at this point in the history
Co-authored-by: Daria Pardue <81593090+dariakp@users.noreply.github.com>
Co-authored-by: Eric Adum <eric.adum@mongodb.com>
  • Loading branch information
3 people committed Sep 1, 2021
1 parent 78ec0dd commit 319f8ae
Showing 1 changed file with 100 additions and 129 deletions.
229 changes: 100 additions & 129 deletions test/functional/change_stream.test.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
'use strict';
const assert = require('assert');
const { Transform, PassThrough } = require('stream');
const { MongoNetworkError, MongoDriverError } = require('../../src/error');
const { MongoNetworkError } = require('../../src/error');
const { delay, setupDatabase, withClient, withCursor } = require('./shared');
const co = require('co');
const mock = require('../tools/mock');
const { EventCollector } = require('../tools/utils');
const { EventCollector, getSymbolFrom } = require('../tools/utils');
const chai = require('chai');
const expect = chai.expect;
const sinon = require('sinon');
Expand Down Expand Up @@ -99,12 +99,7 @@ function triggerResumableError(changeStream, delay, onClose) {
* @param {Function} callback
*/
function waitForStarted(changeStream, callback) {
const timeout = setTimeout(() => {
expect.fail('Change stream never started');
}, 2000);

changeStream.cursor.once('init', () => {
clearTimeout(timeout);
callback();
});
}
Expand Down Expand Up @@ -176,26 +171,25 @@ const pipeline = [
];

describe('Change Streams', function () {
before(function () {
return setupDatabase(this.configuration, ['integration_tests']);
before(async function () {
return await setupDatabase(this.configuration, ['integration_tests']);
});

beforeEach(function () {
beforeEach(async function () {
const configuration = this.configuration;
const client = configuration.newClient();

return client
.connect()
.then(() => {
const db = client.db('integration_tests');
return db.createCollection('test');
})
.then(
() => client.close(),
() => client.close()
);
await client.connect();
const db = client.db('integration_tests');
try {
await db.createCollection('test');
} catch {
// ns already exists, don't care
} finally {
await client.close();
}
});
afterEach(() => mock.cleanup());
afterEach(async () => await mock.cleanup());

it('should close the listeners after the cursor is closed', {
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } },
Expand Down Expand Up @@ -580,39 +574,39 @@ describe('Change Streams', function () {
}
});

it(
'should error if resume token projected out of change stream document using imperative callback form',
{
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } },
it('should error if resume token projected out of change stream document using iterator', {
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } },
test(done) {
const configuration = this.configuration;
const client = configuration.newClient();

test: function (done) {
const configuration = this.configuration;
const client = configuration.newClient();
client.connect((err, client) => {
expect(err).to.not.exist;

client.connect((err, client) => {
expect(err).to.not.exist;
this.defer(() => client.close());
const database = client.db('integration_tests');
const collection = database.collection('resumetokenProjectedOutCallback');
const changeStream = collection.watch([{ $project: { _id: false } }]);

const database = client.db('integration_tests');
const changeStream = database
.collection('resumetokenProjectedOutCallback')
.watch([{ $project: { _id: false } }]);
this.defer(() => changeStream.close());
changeStream.hasNext(() => {}); // trigger initialize

// Trigger the first database event
waitForStarted(changeStream, () => {
this.defer(database.collection('resumetokenProjectedOutCallback').insert({ b: 2 }));
});
changeStream.cursor.on('init', () => {
collection.insertOne({ b: 2 }, (err, res) => {
expect(err).to.be.undefined;
expect(res).to.exist;

// Fetch the change notification
changeStream.next(err => {
expect(err).to.exist;
done();
changeStream.next(err => {
expect(err).to.exist;
changeStream.close(() => {
client.close(() => {
done();
});
});
});
});
});
}
});
}
);
});

it('should error if resume token projected out of change stream document using event listeners', {
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } },
Expand Down Expand Up @@ -1792,109 +1786,86 @@ describe('Change Streams', function () {
}
});

// FIXME: NODE-1797
describe('should error when used as iterator and emitter concurrently', function () {
let client, coll, changeStream, repeatInsert, val;
val = 0;
let client, coll, changeStream, kMode, initPromise;

beforeEach(async function () {
client = this.configuration.newClient();
await client.connect().catch(() => expect.fail('Failed to connect to client'));
await client.connect();

coll = client.db(this.configuration.db).collection('tester');
changeStream = coll.watch();

repeatInsert = setInterval(async function () {
await coll.insertOne({ c: val }).catch('Failed to insert document');
val++;
}, 75);
kMode = getSymbolFrom(changeStream, 'mode');
initPromise = new Promise(resolve => waitForStarted(changeStream, resolve));
});

afterEach(async function () {
if (repeatInsert) {
clearInterval(repeatInsert);
}
let err;
if (changeStream) {
await changeStream.close();
try {
if (changeStream[kMode] === 'emitter') {
// shutting down the client will end the session, if this happens before
// the stream initialization aggregate operation is processed, it will throw
// a session ended error, which can't be caught if we end the stream, so
// we need to wait for the stream to initialize before closing all the things
await initPromise;
}
await changeStream.close();
} catch (error) {
// don't throw before closing the client
err = error;
}
}

await mock.cleanup();
if (client) {
await client.close();
}
});

it(
'should throw MongoDriverError when set as an emitter with "on" and used as an iterator with "hasNext"',
{
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } },
test: async function () {
await new Promise(resolve => changeStream.on('change', resolve));
try {
await changeStream.hasNext().catch(err => {
expect.fail(err.message);
});
} catch (error) {
return expect(error).to.be.instanceof(MongoDriverError);
}
return expect.fail('Should not reach here');
}
if (err) {
throw err;
}
);

it(
'should throw MongoDriverError when set as an iterator with "hasNext" and used as an emitter with "on"',
{
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } },
test: async function () {
await changeStream
.hasNext()
.catch(() => expect.fail('Failed to set changeStream to iterator'));
try {
await new Promise(resolve => changeStream.on('change', resolve));
} catch (error) {
return expect(error).to.be.instanceof(MongoDriverError);
}
return expect.fail('Should not reach here');
}
}
);

it(
'should throw MongoDriverError when set as an emitter with "once" and used as an iterator with "next"',
{
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } },
test: async function () {
await new Promise(resolve => changeStream.once('change', resolve));
try {
await changeStream.next().catch(err => {
expect.fail(err.message);
});
} catch (error) {
return expect(error).to.be.instanceof(MongoDriverError);
}
return expect.fail('Should not reach here');
}
});

it(`should throw when mixing event listeners with iterator methods`, {
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } },
async test() {
expect(changeStream).to.have.property(kMode, false);
// ChangeStream detects emitter usage via 'newListener' event
// so this covers all emitter methods
changeStream.on('change', () => {});
expect(changeStream).to.have.property(kMode, 'emitter');

const errRegex = /ChangeStream cannot be used as an iterator/;

// These all throw synchronously so it should be safe to not await the results
expect(() => {
changeStream.next();
}).to.throw(errRegex);
expect(() => {
changeStream.hasNext();
}).to.throw(errRegex);
expect(() => {
changeStream.tryNext();
}).to.throw(errRegex);
}
);

it(
'should throw MongoDriverError when set as an iterator with "tryNext" and used as an emitter with "on"',
{
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } },
test: async function () {
await changeStream
.tryNext()
.catch(() => expect.fail('Failed to set changeStream to iterator'));
try {
await new Promise(resolve => changeStream.on('change', resolve));
} catch (error) {
return expect(error).to.be.instanceof(MongoDriverError);
}
return expect.fail('Should not reach here');
}
});

it(`should throw when mixing iterator methods with event listeners`, {
metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } },
async test() {
expect(changeStream).to.have.property(kMode, false);
const res = await changeStream.tryNext();
expect(res).to.not.exist;
expect(changeStream).to.have.property(kMode, 'iterator');

// This does throw synchronously
// the newListener event is called sync
// which calls streamEvents, which calls setIsEmitter, which will throw
expect(() => {
changeStream.on('change', () => {});
}).to.throw(/ChangeStream cannot be used as an EventEmitter/);
}
);
});
});

describe('should properly handle a changeStream event being processed mid-close', function () {
Expand Down

0 comments on commit 319f8ae

Please sign in to comment.