Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-5052): prevent cursor and changestream close logic from running more than once #3562

Merged
merged 30 commits into from
Feb 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
cbebd10
fix(NODE-5052): prevent cursor and changestream close logic from runn…
nbbeeken Feb 9, 2023
fc3ec7b
fix: keep cleaning up session, check for unhandled errors
nbbeeken Feb 9, 2023
3f02916
move flag setting to after clean up
nbbeeken Feb 9, 2023
0a3bd27
prevent mocha from ending early
nbbeeken Feb 9, 2023
c9db9ef
use uncaughtExceptionMonitor
nbbeeken Feb 9, 2023
c24d62d
do _something_ for unknown origin
nbbeeken Feb 9, 2023
465c058
reset arrays in after hook
nbbeeken Feb 9, 2023
b2238f6
removeAllListeners should be called whenever cursor closes
nbbeeken Feb 9, 2023
4404f73
fix up legacy tests
nbbeeken Feb 9, 2023
3ae17cf
test cleanup
nbbeeken Feb 10, 2023
aecb077
fix lint
nbbeeken Feb 10, 2023
28d6ee5
rollup cleanup into helpers
nbbeeken Feb 10, 2023
53c1314
fix tests
nbbeeken Feb 10, 2023
6387545
reset abstract_cursor
nbbeeken Feb 10, 2023
796ada5
gate on killed/closed
nbbeeken Feb 10, 2023
05acf5d
gate on killed/closed round 2
nbbeeken Feb 10, 2023
adaa40e
do not try to use a session that hasEnded
nbbeeken Feb 10, 2023
eb34100
cleanup test
nbbeeken Feb 10, 2023
3a52ff6
unknown array
nbbeeken Feb 10, 2023
c3c7cdb
clean up find_cursor tests
nbbeeken Feb 10, 2023
2ac087c
drop in crud_api tests
nbbeeken Feb 10, 2023
f1f8fd3
do not set kClosed early
nbbeeken Feb 10, 2023
430e8aa
pass iterable to .race
nbbeeken Feb 10, 2023
16e582f
fix if stmt
nbbeeken Feb 13, 2023
8cbe891
remove gating
nbbeeken Feb 13, 2023
c12b8bb
Merge branch 'main' into NODE-5052-close-change-stream
nbbeeken Feb 13, 2023
feefb02
test: update title
nbbeeken Feb 13, 2023
08037f8
fix: test name org
nbbeeken Feb 13, 2023
4babaee
fix: test title
nbbeeken Feb 13, 2023
d1fc282
improve readability
nbbeeken Feb 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions .mocharc.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@
"require": [
"source-map-support/register",
"ts-node/register",
"test/tools/runner/chai-addons.js"
"test/tools/runner/chai-addons.js",
"test/tools/runner/hooks/unhandled_checker.ts"
],
"extension": [
"js",
"ts"
],
"extension": ["js", "ts"],
"recursive": true,
"timeout": 60000,
"failZero": true,
Expand Down
11 changes: 7 additions & 4 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -828,13 +828,16 @@ function cleanupCursor(

cursor[kKilled] = true;

if (session.hasEnded) {
return completeCleanup();
}

executeOperation(
cursor[kClient],
new KillCursorsOperation(cursorId, cursorNs, server, { session })
).finally(() => {
completeCleanup();
});
return;
)
.catch(() => null)
.finally(completeCleanup);
}

/** @internal */
Expand Down
24 changes: 24 additions & 0 deletions test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1019,6 +1019,30 @@ describe('Change Streams', function () {
}
}
);

it(
'when closed throws "ChangeStream is closed"',
{ requires: { topology: '!single' } },
async function () {
changeStream = collection.watch();

const loop = (async function () {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
for await (const _change of changeStream) {
return 'loop entered'; // loop should never be entered
}
return 'loop ended without error'; // loop should not finish without error
})();

await sleep(1);
const closeResult = changeStream.close().catch(error => error);
expect(closeResult).to.not.be.instanceOf(Error);

const result = await loop.catch(error => error);
expect(result).to.be.instanceOf(MongoAPIError);
expect(result.message).to.match(/ChangeStream is closed/i);
}
);
});

describe('#return', function () {
Expand Down
177 changes: 70 additions & 107 deletions test/integration/crud/crud_api.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { expect } from 'chai';
import { on } from 'events';

import { MongoClient, MongoError, ObjectId, ReturnDocument } from '../../mongodb';
import { assert as test } from '../shared';
Expand Down Expand Up @@ -60,130 +61,92 @@ describe('CRUD API', function () {
await client.close();
});

it('should correctly execute find method using crud api', function (done) {
const db = client.db();

db.collection('t').insertMany([{ a: 1 }, { a: 1 }, { a: 1 }, { a: 1 }], function (err) {
expect(err).to.not.exist;

//
// Cursor
// --------------------------------------------------
const makeCursor = () => {
// Possible methods on the the cursor instance
return db
.collection('t')
.find({})
.filter({ a: 1 })
.addCursorFlag('noCursorTimeout', true)
.addQueryModifier('$comment', 'some comment')
.batchSize(2)
.comment('some comment 2')
.limit(2)
.maxTimeMS(50)
.project({ a: 1 })
.skip(0)
.sort({ a: 1 });
};
context('when creating a cursor with find', () => {
let collection;

//
// Exercise count method
// -------------------------------------------------
const countMethod = function () {
// Execute the different methods supported by the cursor
const cursor = makeCursor();
cursor.count(function (err, count) {
expect(err).to.not.exist;
test.equal(2, count);
eachMethod();
});
};
beforeEach(async () => {
collection = client.db().collection('t');
await collection.drop().catch(() => null);
await collection.insertMany([{ a: 1 }, { a: 1 }, { a: 1 }, { a: 1 }]);
});

//
// Exercise legacy method each
// -------------------------------------------------
const eachMethod = function () {
let count = 0;
afterEach(async () => {
await collection?.drop().catch(() => null);
});

const makeCursor = () => {
// Possible methods on the the cursor instance
return collection
.find({})
.filter({ a: 1 })
.addCursorFlag('noCursorTimeout', true)
.addQueryModifier('$comment', 'some comment')
.batchSize(1)
.comment('some comment 2')
.limit(2)
.maxTimeMS(50)
.project({ a: 1 })
.skip(0)
.sort({ a: 1 });
};

describe('#count()', () => {
it('returns the number of documents', async () => {
const cursor = makeCursor();
cursor.forEach(
() => {
count = count + 1;
},
err => {
expect(err).to.not.exist;
test.equal(2, count);
toArrayMethod();
}
);
};
const res = await cursor.count();
expect(res).to.equal(2);
});
});

//
// Exercise toArray
// -------------------------------------------------
const toArrayMethod = function () {
describe('#forEach()', () => {
it('iterates all the documents', async () => {
const cursor = makeCursor();
cursor.toArray(function (err, docs) {
expect(err).to.not.exist;
test.equal(2, docs.length);
nextMethod();
let count = 0;
await cursor.forEach(() => {
count += 1;
});
};
expect(count).to.equal(2);
});
});

//
// Exercise next method
// -------------------------------------------------
const nextMethod = function () {
describe('#toArray()', () => {
it('returns an array with all documents', async () => {
const cursor = makeCursor();
cursor.next(function (err, doc) {
expect(err).to.not.exist;
test.ok(doc != null);

cursor.next(function (err, doc) {
expect(err).to.not.exist;
test.ok(doc != null);
const res = await cursor.toArray();
expect(res).to.have.lengthOf(2);
});
});

cursor.next(function (err, doc) {
expect(err).to.not.exist;
expect(doc).to.not.exist;
streamMethod();
});
});
});
};
describe('#next()', () => {
it('is callable without blocking', async () => {
const cursor = makeCursor();
const doc0 = await cursor.next();
expect(doc0).to.exist;
const doc1 = await cursor.next();
expect(doc1).to.exist;
const doc2 = await cursor.next();
expect(doc2).to.not.exist;
});
});

//
// Exercise stream
// -------------------------------------------------
const streamMethod = function () {
let count = 0;
describe('#stream()', () => {
it('creates a node stream that emits data events', async () => {
const count = 0;
const cursor = makeCursor();
const stream = cursor.stream();
stream.on('data', function () {
count = count + 1;
});

on(stream, 'data');
cursor.once('close', function () {
test.equal(2, count);
explainMethod();
expect(count).to.equal(2);
});
};
});
});

//
// Explain method
// -------------------------------------------------
const explainMethod = function () {
describe('#explain()', () => {
it('returns an explain document', async () => {
const cursor = makeCursor();
cursor.explain(function (err, result) {
expect(err).to.not.exist;
test.ok(result != null);

client.close(done);
});
};

// Execute all the methods
countMethod();
const result = await cursor.explain();
expect(result).to.exist;
});
});
});

Expand Down
78 changes: 43 additions & 35 deletions test/integration/crud/find_cursor_methods.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -108,50 +108,58 @@ describe('Find Cursor', function () {
});
});

context('#close', function () {
it('should send a killCursors command when closed before completely iterated', function (done) {
const commands = [];
client.on('commandStarted', filterForCommands(['killCursors'], commands));
describe('#close', function () {
let collection;

const coll = client.db().collection('abstract_cursor');
const cursor = coll.find({}, { batchSize: 2 });
cursor.next(err => {
expect(err).to.not.exist;
cursor.close(err => {
expect(err).to.not.exist;
expect(commands).to.have.length(1);
done();
});
});
beforeEach(async function () {
collection = client.db().collection('abstract_cursor');
await collection.drop().catch(() => null);
await collection.insertMany([{ a: 1 }, { a: 2 }, { a: 3 }, { a: 4 }]);
});

it('should not send a killCursors command when closed after completely iterated', function (done) {
const commands = [];
client.on('commandStarted', filterForCommands(['killCursors'], commands));
afterEach(async function () {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
await collection?.drop().catch(() => null);
});

const coll = client.db().collection('abstract_cursor');
const cursor = coll.find({}, { batchSize: 2 });
cursor.toArray(err => {
expect(err).to.not.exist;
context('when closed before completely iterated', () => {
it('sends a killCursors command', async () => {
const killCursorsCommands = [];
client.on('commandStarted', filterForCommands(['killCursors'], killCursorsCommands));

cursor.close(err => {
expect(err).to.not.exist;
expect(commands).to.have.length(0);
done();
});
const cursor = collection.find({}, { batchSize: 2 });

const doc = await cursor.next();
expect(doc).property('a', 1);

expect(killCursorsCommands).to.have.length(0);
await cursor.close();
dariakp marked this conversation as resolved.
Show resolved Hide resolved
expect(killCursorsCommands).to.have.length(1);
});
});

it('should not send a killCursors command when closed before initialization', function (done) {
const commands = [];
client.on('commandStarted', filterForCommands(['killCursors'], commands));
context('when closed after completely iterated', () => {
it('does not send a killCursors command', async () => {
const killCursorsCommands = [];
client.on('commandStarted', filterForCommands(['killCursors'], killCursorsCommands));

const coll = client.db().collection('abstract_cursor');
const cursor = coll.find({}, { batchSize: 2 });
cursor.close(err => {
expect(err).to.not.exist;
expect(commands).to.have.length(0);
done();
const cursor = collection.find();
await cursor.toArray();
expect(killCursorsCommands).to.have.length(0);
await cursor.close();
expect(killCursorsCommands).to.have.length(0);
});
});

context('when closed before initialization', () => {
it('does not send a killCursors command', async () => {
const killCursorsCommands = [];
client.on('commandStarted', filterForCommands(['killCursors'], killCursorsCommands));

const cursor = collection.find();

expect(killCursorsCommands).to.have.length(0);
await cursor.close();
expect(killCursorsCommands).to.have.length(0);
});
});
});
Expand Down
Loading