Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-5052): prevent cursor and changestream close logic from running more than once #3562

Merged
merged 30 commits into from
Feb 14, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
cbebd10
fix(NODE-5052): prevent cursor and changestream close logic from runn…
nbbeeken Feb 9, 2023
fc3ec7b
fix: keep cleaning up session, check for unhandled errors
nbbeeken Feb 9, 2023
3f02916
move flag setting to after clean up
nbbeeken Feb 9, 2023
0a3bd27
prevent mocha from ending early
nbbeeken Feb 9, 2023
c9db9ef
use uncaughtExceptionMonitor
nbbeeken Feb 9, 2023
c24d62d
do _something_ for unknown origin
nbbeeken Feb 9, 2023
465c058
reset arrays in after hook
nbbeeken Feb 9, 2023
b2238f6
removeAllListeners should be called whenever cursor closes
nbbeeken Feb 9, 2023
4404f73
fix up legacy tests
nbbeeken Feb 9, 2023
3ae17cf
test cleanup
nbbeeken Feb 10, 2023
aecb077
fix lint
nbbeeken Feb 10, 2023
28d6ee5
rollup cleanup into helpers
nbbeeken Feb 10, 2023
53c1314
fix tests
nbbeeken Feb 10, 2023
6387545
reset abstract_cursor
nbbeeken Feb 10, 2023
796ada5
gate on killed/closed
nbbeeken Feb 10, 2023
05acf5d
gate on killed/closed round 2
nbbeeken Feb 10, 2023
adaa40e
do not try to use a session that hasEnded
nbbeeken Feb 10, 2023
eb34100
cleanup test
nbbeeken Feb 10, 2023
3a52ff6
unknown array
nbbeeken Feb 10, 2023
c3c7cdb
clean up find_cursor tests
nbbeeken Feb 10, 2023
2ac087c
drop in crud_api tests
nbbeeken Feb 10, 2023
f1f8fd3
do not set kClosed early
nbbeeken Feb 10, 2023
430e8aa
pass iterable to .race
nbbeeken Feb 10, 2023
16e582f
fix if stmt
nbbeeken Feb 13, 2023
8cbe891
remove gating
nbbeeken Feb 13, 2023
c12b8bb
Merge branch 'main' into NODE-5052-close-change-stream
nbbeeken Feb 13, 2023
feefb02
test: update title
nbbeeken Feb 13, 2023
08037f8
fix: test name org
nbbeeken Feb 13, 2023
4babaee
fix: test title
nbbeeken Feb 13, 2023
d1fc282
improve readability
nbbeeken Feb 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions .mocharc.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@
"require": [
"source-map-support/register",
"ts-node/register",
"test/tools/runner/chai-addons.js"
"test/tools/runner/chai-addons.js",
"test/tools/runner/hooks/unhandled_checker.ts"
],
"extension": [
"js",
"ts"
],
"extension": ["js", "ts"],
"recursive": true,
"timeout": 60000,
"failZero": true,
Expand Down
65 changes: 27 additions & 38 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -412,8 +412,8 @@ export abstract class AbstractCursor<

async close(): Promise<void> {
const needsToEmitClosed = !this[kClosed];
this[kClosed] = true;
await cleanupCursorAsync(this, { needsToEmitClosed });
this[kClosed] = true;
}

/**
Expand Down Expand Up @@ -778,62 +778,51 @@ function cleanupCursor(
const error = options?.error;
const needsToEmitClosed = options?.needsToEmitClosed ?? cursor[kDocuments].length === 0;
baileympearson marked this conversation as resolved.
Show resolved Hide resolved

if (error) {
if (cursor.loadBalanced && error instanceof MongoNetworkError) {
return completeCleanup();
}
}

if (cursorId == null || server == null || cursorId.isZero() || cursorNs == null) {
const cleanupListeners = () => {
if (needsToEmitClosed) {
cursor[kClosed] = true;
cursor[kId] = Long.ZERO;
cursor.emit(AbstractCursor.CLOSE);
}
cursor.removeAllListeners();
};

if (session) {
if (session.owner === cursor) {
session.endSession({ error }).finally(() => {
callback();
});
return;
}

if (!session.inTransaction()) {
maybeClearPinnedConnection(session, { error });
}
const cleanupSession = () => {
if (session?.owner === cursor && !session?.hasEnded) {
session.endSession({ error }).finally(callback);
return;
}
if (!session?.inTransaction()) {
maybeClearPinnedConnection(session, { error });
}
process.nextTick(callback);
};

return callback();
}
const completeCleanup = () => {
cleanupListeners();
return cleanupSession();
};

function completeCleanup() {
if (session) {
if (session.owner === cursor) {
session.endSession({ error }).finally(() => {
cursor.emit(AbstractCursor.CLOSE);
callback();
});
return;
}
if (cursor[kClosed]) {
return completeCleanup();
}

if (!session.inTransaction()) {
maybeClearPinnedConnection(session, { error });
}
if (error) {
if (cursor.loadBalanced && error instanceof MongoNetworkError) {
return completeCleanup();
}
}

cursor.emit(AbstractCursor.CLOSE);
return callback();
if (cursorId == null || server == null || cursorId.isZero() || cursorNs == null) {
return completeCleanup();
}

cursor[kKilled] = true;

executeOperation(
cursor[kClient],
new KillCursorsOperation(cursorId, cursorNs, server, { session })
).finally(() => {
completeCleanup();
});
).finally(completeCleanup);
return;
}

Expand Down
43 changes: 43 additions & 0 deletions test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2699,4 +2699,47 @@ describe('ChangeStream resumability', function () {
expect(changeStream.cursor.maxWireVersion).equal(maxWireVersion);
}
);

it(
'does not raise unhandled rejection errors',
{ requires: { topology: '!single' } },
async function () {
if (globalThis.AbortSignal?.timeout == null) {
this.skipReason = 'test requires AbortSignal.timeout';
this.skip();
}

const unhandledRejections: AsyncIterableIterator<[reason: Error, promise: Promise<any>]> = on(
process,
dariakp marked this conversation as resolved.
Show resolved Hide resolved
'unhandledRejection',
{ signal: AbortSignal.timeout(2000) }
);

changeStream = collection.watch();

const shouldErrorLoop = (async function () {
try {
dariakp marked this conversation as resolved.
Show resolved Hide resolved
// eslint-disable-next-line @typescript-eslint/no-unused-vars
for await (const _change of changeStream) {
// ignore
}
return null;
} catch (error) {
return error;
}
})();

await sleep(200);
const closeResult = changeStream.close().catch(error => error);
expect(closeResult).to.not.be.instanceOf(Error);

const result = await shouldErrorLoop;
expect(result).to.be.instanceOf(MongoAPIError);
expect(result.message).to.match(/ChangeStream is closed/i);

const noUnhandledPromiseRejections = await unhandledRejections.next().catch(error => error);
dariakp marked this conversation as resolved.
Show resolved Hide resolved
dariakp marked this conversation as resolved.
Show resolved Hide resolved
expect(noUnhandledPromiseRejections).to.be.instanceOf(Error);
expect(noUnhandledPromiseRejections).to.have.nested.property('cause.name', 'TimeoutError');
}
);
});
178 changes: 69 additions & 109 deletions test/integration/crud/crud_api.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { expect } from 'chai';
import { on } from 'events';

import { MongoClient, MongoError, ObjectId, ReturnDocument } from '../../mongodb';
import { assert as test } from '../shared';
Expand Down Expand Up @@ -60,130 +61,89 @@ describe('CRUD API', function () {
await client.close();
});

it('should correctly execute find method using crud api', function (done) {
const db = client.db();

db.collection('t').insertMany([{ a: 1 }, { a: 1 }, { a: 1 }, { a: 1 }], function (err) {
expect(err).to.not.exist;

//
// Cursor
// --------------------------------------------------
const makeCursor = () => {
// Possible methods on the the cursor instance
return db
.collection('t')
.find({})
.filter({ a: 1 })
.addCursorFlag('noCursorTimeout', true)
.addQueryModifier('$comment', 'some comment')
.batchSize(2)
.comment('some comment 2')
.limit(2)
.maxTimeMS(50)
.project({ a: 1 })
.skip(0)
.sort({ a: 1 });
};
context('when creating a cursor with find', () => {
let db;

//
// Exercise count method
// -------------------------------------------------
const countMethod = function () {
// Execute the different methods supported by the cursor
const cursor = makeCursor();
cursor.count(function (err, count) {
expect(err).to.not.exist;
test.equal(2, count);
eachMethod();
});
};

//
// Exercise legacy method each
// -------------------------------------------------
const eachMethod = function () {
let count = 0;
beforeEach(async () => {
db = client.db();
await db.collection('t').deleteMany({});
dariakp marked this conversation as resolved.
Show resolved Hide resolved
await db.collection('t').insertMany([{ a: 1 }, { a: 1 }, { a: 1 }, { a: 1 }]);
});

const makeCursor = () => {
// Possible methods on the the cursor instance
return db
.collection('t')
.find({})
.filter({ a: 1 })
.addCursorFlag('noCursorTimeout', true)
.addQueryModifier('$comment', 'some comment')
.batchSize(1)
.comment('some comment 2')
.limit(2)
.maxTimeMS(50)
.project({ a: 1 })
.skip(0)
.sort({ a: 1 });
};

describe('#count()', () => {
it('returns the number of documents', async () => {
const cursor = makeCursor();
cursor.forEach(
() => {
count = count + 1;
},
err => {
expect(err).to.not.exist;
test.equal(2, count);
toArrayMethod();
}
);
};
const res = await cursor.count();
expect(res).to.equal(2);
});
});

//
// Exercise toArray
// -------------------------------------------------
const toArrayMethod = function () {
describe('#forEach()', () => {
it('iterates all the documents', async () => {
const cursor = makeCursor();
cursor.toArray(function (err, docs) {
expect(err).to.not.exist;
test.equal(2, docs.length);
nextMethod();
let count = 0;
await cursor.forEach(() => {
count += 1;
});
};
expect(count).to.equal(2);
});
});

//
// Exercise next method
// -------------------------------------------------
const nextMethod = function () {
describe('#toArray()', () => {
it('returns an array with all documents', async () => {
const cursor = makeCursor();
cursor.next(function (err, doc) {
expect(err).to.not.exist;
test.ok(doc != null);

cursor.next(function (err, doc) {
expect(err).to.not.exist;
test.ok(doc != null);
const res = await cursor.toArray();
expect(res).to.have.lengthOf(2);
});
});

cursor.next(function (err, doc) {
expect(err).to.not.exist;
expect(doc).to.not.exist;
streamMethod();
});
});
});
};
describe('#next()', () => {
it('is callable without blocking', async () => {
const cursor = makeCursor();
const doc0 = await cursor.next();
expect(doc0).to.exist;
const doc1 = await cursor.next();
expect(doc1).to.exist;
const doc2 = await cursor.next();
expect(doc2).to.not.exist;
});
});

//
// Exercise stream
// -------------------------------------------------
const streamMethod = function () {
let count = 0;
describe('#stream()', () => {
it('creates a node stream that emits data events', async () => {
const count = 0;
const cursor = makeCursor();
const stream = cursor.stream();
stream.on('data', function () {
count = count + 1;
});

on(stream, 'data');
cursor.once('close', function () {
test.equal(2, count);
explainMethod();
});
};

//
// Explain method
// -------------------------------------------------
const explainMethod = function () {
const cursor = makeCursor();
cursor.explain(function (err, result) {
expect(err).to.not.exist;
test.ok(result != null);

client.close(done);
expect(count).to.equal(2);
});
};
});
});
});

// Execute all the methods
countMethod();
describe('#explain()', () => {
it('returns an explain document', async () => {
const cursor = makeCursor();
const result = await cursor.explain();
expect(result).to.exist;
});
});

Expand Down
Loading