Skip to content

Commit

Permalink
fix: destroy connections marked as closed on checkIn / checkOut
Browse files Browse the repository at this point in the history
Connections which receive a `close` event are now marked as being
in a `closed` state. The ConnectionPool will now check for this
state, and destroy the connections accordingly.
  • Loading branch information
mbroadst committed Dec 18, 2019
1 parent 56aeb52 commit 2bd17a6
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 6 deletions.
7 changes: 6 additions & 1 deletion lib/cmap/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const EventEmitter = require('events');
const MessageStream = require('./message_stream');
const MongoError = require('../core/error').MongoError;
const MongoNetworkError = require('../core/error').MongoNetworkError;
const MongoWriteConcernError = require('../core/error').MongoWriteConcernError;
const wp = require('../core/wireprotocol');
const apm = require('../core/connection/apm');
Expand All @@ -26,6 +27,7 @@ class Connection extends EventEmitter {
this.socketTimeout = typeof options.socketTimeout === 'number' ? options.socketTimeout : 360000;
this.monitorCommands =
typeof options.monitorCommands === 'boolean' ? options.monitorCommands : false;
this.closed = false;

this[kGeneration] = options.generation;
this[kLastUseTime] = Date.now();
Expand All @@ -40,7 +42,10 @@ class Connection extends EventEmitter {
});

stream.on('close', () => {
this[kQueue].forEach(op => op.cb(new MongoError('Connection closed')));
this.closed = true;
this[kQueue].forEach(op =>
op.cb(new MongoNetworkError(`connection ${this.id} to ${this.address} closed`))
);
this[kQueue].clear();

this.emit('close');
Expand Down
12 changes: 7 additions & 5 deletions lib/cmap/connection_pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,14 @@ class ConnectionPool extends EventEmitter {
const connection = pool[kConnections].pop();
const isStale = connectionIsStale(pool, connection);
const isIdle = connectionIsIdle(pool, connection);
if (!isStale && !isIdle) {
if (!isStale && !isIdle && !connection.closed) {
pool.emit('connectionCheckedOut', new ConnectionCheckedOutEvent(pool, connection));
callback(null, connection);
return;
}

destroyConnection(pool, connection, isStale ? 'stale' : 'idle');
const reason = connection.closed ? 'error' : isStale ? 'stale' : 'idle';
destroyConnection(pool, connection, reason);
}

if (maxPoolSize <= 0 || pool.totalConnectionCount < maxPoolSize) {
Expand Down Expand Up @@ -208,9 +209,9 @@ class ConnectionPool extends EventEmitter {
* @param {Connection} connection The connection to check in
*/
checkIn(connection) {
const closed = this.closed;
const poolClosed = this.closed;
const stale = connectionIsStale(this, connection);
const willDestroy = !!(closed || stale);
const willDestroy = !!(poolClosed || stale || connection.closed);

// Properly adjust state of connection
if (!willDestroy) {
Expand All @@ -221,7 +222,8 @@ class ConnectionPool extends EventEmitter {
this.emit('connectionCheckedIn', new ConnectionCheckedInEvent(this, connection));

if (willDestroy) {
destroyConnection(this, connection, closed ? 'poolClosed' : 'stale');
const reason = connection.closed ? 'error' : poolClosed ? 'poolClosed' : 'stale';
destroyConnection(this, connection, reason);
}
}

Expand Down
43 changes: 43 additions & 0 deletions test/unit/cmap/connection_pool.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,49 @@ describe('Connection Pool', function() {
mock.createServer().then(s => (server = s));
});

it('should destroy connections which have been closed', function(done) {
server.setMessageHandler(request => {
const doc = request.document;
if (doc.ismaster) {
request.reply(mock.DEFAULT_ISMASTER_36);
} else {
// destroy on any other command
request.connection.destroy();
}
});

const pool = new ConnectionPool(
Object.assign({ bson: new BSON(), maxPoolSize: 1 }, server.address())
);

const events = [];
pool.on('connectionClosed', event => events.push(event));

pool.checkOut((err, conn) => {
expect(err).to.not.exist;

conn.command('admin.$cmd', { ping: 1 }, (err, result) => {
expect(err).to.exist;
expect(result).to.not.exist;

pool.checkIn(conn);

expect(events).to.have.length(1);
const closeEvent = events[0];
expect(closeEvent)
.have.property('reason')
.equal('error');

pool.close(done);
});
});

pool.withConnection((err, conn, cb) => {
expect(err).to.not.exist;
cb();
});
});

describe('withConnection', function() {
it('should manage a connection for a successful operation', function(done) {
server.setMessageHandler(request => {
Expand Down

0 comments on commit 2bd17a6

Please sign in to comment.