Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 61 additions & 86 deletions test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { strict as assert } from 'assert';
import { UUID } from 'bson';
import { Long, UUID } from 'bson';
import { expect } from 'chai';
import { on, once } from 'events';
import { gte, lt } from 'semver';
Expand All @@ -11,19 +11,16 @@ import {
type ChangeStream,
type ChangeStreamDocument,
type ChangeStreamOptions,
type Collection,
type CommandStartedEvent,
type Db,
isHello,
LEGACY_HELLO_COMMAND,
Long,
MongoAPIError,
MongoChangeStreamError,
type MongoClient,
MongoServerError,
ReadPreference,
type ResumeToken
} from '../../mongodb';
} from '../../../src/change_stream';
import { type CommandStartedEvent } from '../../../src/cmap/command_monitoring_events';
import { type Collection } from '../../../src/collection';
import { LEGACY_HELLO_COMMAND } from '../../../src/constants';
import { type Db } from '../../../src/db';
import { MongoAPIError, MongoChangeStreamError, MongoServerError } from '../../../src/error';
import { type MongoClient } from '../../../src/mongo_client';
import { ReadPreference } from '../../../src/read_preference';
import { isHello } from '../../../src/utils';
import * as mock from '../../tools/mongodb-mock/index';
import { TestBuilder, UnifiedTestSuiteBuilder } from '../../tools/unified_suite_builder';
import { type FailCommandFailPoint, sleep } from '../../tools/utils';
Expand Down Expand Up @@ -323,30 +320,25 @@ describe('Change Streams', function () {
it('should properly close ChangeStream cursor', {
metadata: { requires: { topology: 'replicaset' } },

test: function (done) {
test: async function () {
const configuration = this.configuration;
const client = configuration.newClient();

client.connect((err, client) => {
expect(err).to.not.exist;
this.defer(() => client.close());
await client.connect();
const database = client.db('integration_tests');
const changeStream = database.collection('changeStreamCloseTest').watch(pipeline);

const database = client.db('integration_tests');
const changeStream = database.collection('changeStreamCloseTest').watch(pipeline);
this.defer(() => changeStream.close());
assert.equal(changeStream.closed, false);
assert.equal(changeStream.cursor.closed, false);

assert.equal(changeStream.closed, false);
assert.equal(changeStream.cursor.closed, false);
await changeStream.close();

changeStream.close(err => {
expect(err).to.not.exist;
// Check the cursor is closed
expect(changeStream.closed).to.be.true;
expect(changeStream.cursor).property('closed', true);

// Check the cursor is closed
expect(changeStream.closed).to.be.true;
expect(changeStream.cursor).property('closed', true);
done();
});
});
await changeStream.close();
await client.close();
}
});

Expand All @@ -355,32 +347,28 @@ describe('Change Streams', function () {
{
metadata: { requires: { topology: 'replicaset' } },

test: function (done) {
test: async function () {
const configuration = this.configuration;
const client = configuration.newClient();

client.connect((err, client) => {
expect(err).to.not.exist;
this.defer(() => client.close());
await client.connect();

const forbiddenStage = {};
const forbiddenStageName = '$alksdjfhlaskdfjh';
forbiddenStage[forbiddenStageName] = 2;
const forbiddenStage = {};
const forbiddenStageName = '$alksdjfhlaskdfjh';
forbiddenStage[forbiddenStageName] = 2;

const database = client.db('integration_tests');
const changeStream = database.collection('forbiddenStageTest').watch([forbiddenStage]);
this.defer(() => changeStream.close());
const database = client.db('integration_tests');
const changeStream = database.collection('forbiddenStageTest').watch([forbiddenStage]);

changeStream.next(err => {
assert.ok(err);
assert.ok(err.message);
assert.ok(
err.message.indexOf(`Unrecognized pipeline stage name: '${forbiddenStageName}'`) > -1
);
const err = await changeStream.next().catch(e => e);
assert.ok(err);
assert.ok(err.message);
assert.ok(
err.message.indexOf(`Unrecognized pipeline stage name: '${forbiddenStageName}'`) > -1
);

done();
});
});
await changeStream.close();
await client.close();
}
}
);
Expand Down Expand Up @@ -459,37 +447,25 @@ describe('Change Streams', function () {

it('should error if resume token projected out of change stream document using iterator', {
metadata: { requires: { topology: 'replicaset' } },
test(done) {
async test() {
const configuration = this.configuration;
const client = configuration.newClient();

client.connect((err, client) => {
expect(err).to.not.exist;
await client.connect();

const database = client.db('integration_tests');
const collection = database.collection('resumetokenProjectedOutCallback');
const changeStream = collection.watch([{ $project: { _id: false } }]);
const database = client.db('integration_tests');
const collection = database.collection('resumetokenProjectedOutCallback');
const changeStream = collection.watch([{ $project: { _id: false } }]);

changeStream.hasNext(() => {
// trigger initialize
});
await initIteratorMode(changeStream);

changeStream.cursor.on('init', () => {
collection.insertOne({ b: 2 }, (err, res) => {
expect(err).to.be.undefined;
expect(res).to.exist;

changeStream.next(err => {
expect(err).to.exist;
changeStream.close(() => {
client.close(() => {
done();
});
});
});
});
});
});
const res = await collection.insertOne({ b: 2 });
expect(res).to.exist;

const err = await changeStream.next().catch(e => e);
expect(err).to.exist;
await changeStream.close();
await client.close();
}
});

Expand Down Expand Up @@ -1291,7 +1267,7 @@ describe('Change Streams', function () {
await mock.cleanup();
});

it('changeStream should close if cursor id for initial aggregate is Long.ZERO', function (done) {
it('changeStream should close if cursor id for initial aggregate is Long.ZERO', async function () {
mockServer.setMessageHandler(req => {
const doc = req.document;
if (isHello(doc)) {
Expand Down Expand Up @@ -1320,17 +1296,16 @@ describe('Change Streams', function () {
const client = this.configuration.newClient(`mongodb://${mockServer.uri()}/`, {
serverApi: null // TODO(NODE-3807): remove resetting serverApi when the usage of mongodb mock server is removed
});
client.connect(err => {
expect(err).to.not.exist;
const collection = client.db('cs').collection('test');
const changeStream = collection.watch();
changeStream.next((err, doc) => {
expect(err).to.exist;
expect(doc).to.not.exist;
expect(err?.message).to.equal('ChangeStream is closed');
changeStream.close(() => client.close(done));
});
});
await client.connect();
const collection = client.db('cs').collection('test');
const changeStream = collection.watch();

const err = await changeStream.next().catch(e => e);
expect(err).to.exist;
expect(err?.message).to.equal('ChangeStream is closed');

await changeStream.close();
await client.close();
});
});
});
Expand Down
41 changes: 16 additions & 25 deletions test/integration/change-streams/change_streams.prose.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { expect } from 'chai';
import { once } from 'events';
import { on, once } from 'events';
import * as sinon from 'sinon';
import { setTimeout } from 'timers';

Expand Down Expand Up @@ -57,9 +57,7 @@ function triggerResumableError(
nextStub.restore();
});

changeStream.next(() => {
// ignore
});
changeStream.next();
}

if (typeof delay === 'number') {
Expand All @@ -78,12 +76,6 @@ const initIteratorMode = async (cs: ChangeStream) => {
return;
};

/** Waits for a change stream to start */
async function waitForStarted(changeStream, callback) {
await once(changeStream.cursor, 'init');
await callback();
}

describe('Change Stream prose tests', function () {
before(async function () {
return await setupDatabase(this.configuration, ['integration_tests']);
Expand Down Expand Up @@ -643,12 +635,10 @@ describe('Change Stream prose tests', function () {
// when resuming a change stream.
it('$changeStream with results must include resumeAfter and not startAfter', {
metadata: { requires: { topology: 'replicaset' } },
test: function (done) {
test: async function () {
let events = [];
client.on('commandStarted', e => recordEvent(events, e));
const changeStream = coll.watch([], { startAfter });
changeStream.on('error', done);
this.defer(() => changeStream.close());

changeStream.on('change', change => {
events.push({ change: { insert: { x: change.fullDocument.x } } });
Expand All @@ -658,21 +648,22 @@ describe('Change Stream prose tests', function () {
events = [];
triggerResumableError(changeStream, () => events.push('error'));
break;
case 3:
expect(events).to.be.an('array').with.lengthOf(3);
expect(events[0]).to.equal('error');
expect(events[1]).nested.property('$changeStream.resumeAfter').to.exist;
expect(events[2]).to.eql({ change: { insert: { x: 3 } } });
done();
break;
}
});

waitForStarted(changeStream, () =>
coll
.insertOne({ x: 2 }, { writeConcern: { w: 'majority', j: true } })
.then(() => coll.insertOne({ x: 3 }, { writeConcern: { w: 'majority', j: true } }))
);
await once(changeStream.cursor, 'init');
const changes = on(changeStream, 'change');
await coll.insertOne({ x: 2 }, { writeConcern: { w: 'majority', j: true } });
await changes.next();
await coll.insertOne({ x: 3 }, { writeConcern: { w: 'majority', j: true } });
await changes.next();

expect(events).to.be.an('array').with.lengthOf(3);
expect(events[0]).to.equal('error');
expect(events[1]).nested.property('$changeStream.resumeAfter').to.exist;
expect(events[2]).to.eql({ change: { insert: { x: 3 } } });

await changeStream.close();
}
});
});
Expand Down