Skip to content

Commit

Permalink
fix: always clear cancelled wait queue members during processing
Browse files Browse the repository at this point in the history
Each time the wait queue in the connection pool is processed it is
currently gated by having available connections. This can result in
a memory leak where many wait queue members are cancelled, but the
pool is unable to clear them out before all connections are used
for new operations.

NODE-2413
  • Loading branch information
mbroadst committed May 27, 2020
1 parent 0759a0e commit 7e942ba
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 4 deletions.
14 changes: 11 additions & 3 deletions lib/cmap/connection_pool.js
Expand Up @@ -190,6 +190,10 @@ class ConnectionPool extends EventEmitter {
return this[kConnections].length;
}

get waitQueueSize() {
return this[kWaitQueue].length;
}

/**
* Check a connection out of this pool. The connection will continue to be tracked, but no reference to it
* will be held by the pool. This means that if a connection is checked out it MUST be checked back in or
Expand Down Expand Up @@ -287,7 +291,7 @@ class ConnectionPool extends EventEmitter {
this[kCancellationToken].emit('cancel');

// drain the wait queue
while (this[kWaitQueue].length) {
while (this.waitQueueSize) {
const waitQueueMember = this[kWaitQueue].pop();
clearTimeout(waitQueueMember.timer);
if (!waitQueueMember[kCancelled]) {
Expand Down Expand Up @@ -441,13 +445,17 @@ function processWaitQueue(pool) {
return;
}

while (pool[kWaitQueue].length && pool.availableConnectionCount) {
while (pool.waitQueueSize) {
const waitQueueMember = pool[kWaitQueue].peekFront();
if (waitQueueMember[kCancelled]) {
pool[kWaitQueue].shift();
continue;
}

if (!pool.availableConnectionCount) {
break;
}

const connection = pool[kConnections].shift();
const isStale = connectionIsStale(pool, connection);
const isIdle = connectionIsIdle(pool, connection);
Expand All @@ -464,7 +472,7 @@ function processWaitQueue(pool) {
}

const maxPoolSize = pool.options.maxPoolSize;
if (pool[kWaitQueue].length && (maxPoolSize <= 0 || pool.totalConnectionCount < maxPoolSize)) {
if (pool.waitQueueSize && (maxPoolSize <= 0 || pool.totalConnectionCount < maxPoolSize)) {
createConnection(pool, (err, connection) => {
const waitQueueMember = pool[kWaitQueue].shift();
if (waitQueueMember == null) {
Expand Down
39 changes: 38 additions & 1 deletion test/unit/cmap/connection_pool.test.js
Expand Up @@ -3,10 +3,11 @@
const util = require('util');
const loadSpecTests = require('../../spec').loadSpecTests;
const ConnectionPool = require('../../../lib/cmap/connection_pool').ConnectionPool;
const WaitQueueTimeoutError = require('../../../lib/cmap/errors').WaitQueueTimeoutError;
const EventEmitter = require('events').EventEmitter;
const mock = require('mongodb-mock-server');
const cmapEvents = require('../../../lib/cmap/events');

const sinon = require('sinon');
const chai = require('chai');
chai.use(require('../../functional/spec-runner/matcher').default);
const expect = chai.expect;
Expand Down Expand Up @@ -113,6 +114,42 @@ describe('Connection Pool', function() {
);
});

it('should clear timed out wait queue members if no connections are available', function(done) {
server.setMessageHandler(request => {
const doc = request.document;
if (doc.ismaster) {
request.reply(mock.DEFAULT_ISMASTER_36);
}
});

const pool = new ConnectionPool(
Object.assign({ bson: new BSON(), maxPoolSize: 1, waitQueueTimeoutMS: 200 }, server.address())
);

pool.checkOut((err, conn) => {
expect(err).to.not.exist;
expect(conn).to.exist;

pool.checkOut(err => {
expect(err).to.exist.and.be.instanceOf(WaitQueueTimeoutError);

// We can only process the wait queue with `checkIn` and `checkOut`, so we
// force the pool here to think there are no available connections, even though
// we are checking the connection back in. This simulates a slow leak where
// incoming requests outpace the ability of the queue to fully process cancelled
// wait queue members
sinon.stub(pool, 'availableConnectionCount').get(() => 0);
pool.checkIn(conn);

expect(pool)
.property('waitQueueSize')
.to.equal(0);

done();
});
});
});

describe('withConnection', function() {
it('should manage a connection for a successful operation', function(done) {
server.setMessageHandler(request => {
Expand Down

0 comments on commit 7e942ba

Please sign in to comment.