Skip to content

Commit

Permalink
fix: ensure implicit sessions are ended consistently
Browse files Browse the repository at this point in the history
This is a fix and a refator, which ensures that implicit sessions
are ended as soon as possible when cursor commands are executed.
The current code path calls `_endSessions` in a confusing number of
locations, and in some cases can accidentally call the method twice
in a row. Reducing the number of calls to the method, and ensuring
its called only after server command responses and the `close`
method should prevent these errors

NODE-2630
  • Loading branch information
mbroadst committed May 27, 2020
1 parent 7e942ba commit 1a443e7
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 53 deletions.
31 changes: 16 additions & 15 deletions lib/cursor/core_cursor.js
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,15 @@ class CoreCursor extends Readable {
batchSize = this.cursorState.limit - this.cursorState.currentLimit;
}

this.server.getMore(this.ns, this.cursorState, batchSize, this.options, callback);
const cursorState = this.cursorState;
this.server.getMore(this.ns, cursorState, batchSize, this.options, (err, result, conn) => {
// NOTE: `getMore` modifies `cursorState`, would be very ideal not to do so in the future
if (err || (cursorState.cursorId && cursorState.cursorId.isZero())) {
this._endSession();
}

callback(err, result, conn);
});
}

_initializeCursor(callback) {
Expand All @@ -434,18 +442,15 @@ class CoreCursor extends Readable {
}

function done(err, result) {
if (
cursor.cursorState.cursorId &&
cursor.cursorState.cursorId.isZero() &&
cursor._endSession
) {
const cursorState = cursor.cursorState;
if (err || (cursorState.cursorId && cursorState.cursorId.isZero())) {
cursor._endSession();
}

if (
cursor.cursorState.documents.length === 0 &&
cursor.cursorState.cursorId &&
cursor.cursorState.cursorId.isZero() &&
cursorState.documents.length === 0 &&
cursorState.cursorId &&
cursorState.cursorId.isZero() &&
!cursor.cmd.tailable &&
!cursor.cmd.awaitData
) {
Expand Down Expand Up @@ -705,8 +710,8 @@ function _setCursorNotifiedImpl(self, callback) {
self.cursorState.documents = [];
self.cursorState.cursorIndex = 0;

if (self._endSession) {
self._endSession(undefined, () => callback());
if (self.cursorState.session) {
self._endSession(callback);
return;
}

Expand Down Expand Up @@ -792,10 +797,6 @@ function nextFunction(self, callback) {
return handleCallback(callback, err);
}

if (self.cursorState.cursorId && self.cursorState.cursorId.isZero() && self._endSession) {
self._endSession();
}

// Save the returned connection to ensure all getMore's fire over the same connection
self.connection = connection;

Expand Down
44 changes: 10 additions & 34 deletions lib/cursor/cursor.js
Original file line number Diff line number Diff line change
Expand Up @@ -852,9 +852,7 @@ class Cursor extends CoreCursor {
const fetchDocs = () => {
cursor._next((err, doc) => {
if (err) {
return cursor._endSession
? cursor._endSession(() => handleCallback(cb, err))
: handleCallback(cb, err);
return handleCallback(cb, err);
}

if (doc == null) {
Expand Down Expand Up @@ -934,43 +932,21 @@ class Cursor extends CoreCursor {
* @returns {Promise} returns Promise if no callback passed
*/
close(options, callback) {
const Promise = PromiseProvider.get();

if (typeof options === 'function') (callback = options), (options = {});
options = Object.assign({}, { skipKillCursors: false }, options);

this.s.state = CursorState.CLOSED;
if (!options.skipKillCursors) {
// Kill the cursor
this.kill();
}

const completeClose = () => {
// Emit the close event for the cursor
this.emit('close');

// Callback if provided
if (typeof callback === 'function') {
return handleCallback(callback, null, this);
}

// Return a Promise
return new Promise(resolve => {
resolve();
});
};

if (this.cursorState.session) {
if (typeof callback === 'function') {
return this._endSession(() => completeClose());
return maybePromise(callback, cb => {
this.s.state = CursorState.CLOSED;
if (!options.skipKillCursors) {
// Kill the cursor
this.kill();
}

return new Promise(resolve => {
this._endSession(() => completeClose().then(resolve));
this._endSession(() => {
this.emit('close');
cb(null, this);
});
}

return completeClose();
});
}

/**
Expand Down
5 changes: 2 additions & 3 deletions lib/operations/cursor_ops.js
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,9 @@ function toArray(cursor, callback) {
const fetchDocs = () => {
cursor._next((err, doc) => {
if (err) {
return cursor._endSession
? cursor._endSession(() => handleCallback(callback, err))
: handleCallback(callback, err);
return handleCallback(callback, err);
}

if (doc == null) {
return cursor.close({ skipKillCursors: true }, () => handleCallback(callback, null, items));
}
Expand Down
2 changes: 1 addition & 1 deletion test/unit/cmap/connection_pool.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ describe('Connection Pool', function() {
});

const pool = new ConnectionPool(
Object.assign({ bson: new BSON(), maxPoolSize: 1, waitQueueTimeoutMS: 200 }, server.address())
Object.assign({ maxPoolSize: 1, waitQueueTimeoutMS: 200 }, server.address())
);

pool.checkOut((err, conn) => {
Expand Down

0 comments on commit 1a443e7

Please sign in to comment.