diff --git a/lib/cmap/connection.js b/lib/cmap/connection.js index ce108beea9..e86b6f342f 100644 --- a/lib/cmap/connection.js +++ b/lib/cmap/connection.js @@ -40,7 +40,7 @@ class Connection extends EventEmitter { }); stream.on('close', () => { - this[kQueue].forEach(op => op.callback(new MongoError('Connection closed'))); + this[kQueue].forEach(op => op.cb(new MongoError('Connection closed'))); this[kQueue].clear(); this.emit('close'); diff --git a/lib/cmap/connection_pool.js b/lib/cmap/connection_pool.js index 973791a2f4..6c85ffa1be 100644 --- a/lib/cmap/connection_pool.js +++ b/lib/cmap/connection_pool.js @@ -177,7 +177,9 @@ class ConnectionPool extends EventEmitter { destroyConnection(this, connection, reason); } - callback(null); + if (typeof callback === 'function') { + callback(); + } } clear(callback) { @@ -227,6 +229,34 @@ class ConnectionPool extends EventEmitter { ); } + /** + * Runs a lambda with an implicitly checked out connection, checking that connection back in when the lambda + * has completed by calling back. + * + * NOTE: please note the required signature of `fn` + * + * @param {ConnectionPool~withConnectionCallback} fn A function which operates on a managed connection + * @param {Function} callback The original callback + * @return {Promise} + */ + withConnection(fn, callback) { + this.checkOut((err, conn) => { + // don't callback with `err` here, we might want to act upon it inside `fn` + + fn(err, conn, (fnErr, result) => { + if (fnErr) { + callback(fnErr); + } else { + callback(undefined, result); + } + + if (conn) { + this.checkIn(conn); + } + }); + }); + } + get totalConnectionCount() { return this[kConnections].length + (this.options.maxPoolSize - this[kPermits]); } @@ -302,6 +332,15 @@ function destroyConnection(pool, connection, reason) { process.nextTick(() => connection.destroy()); } +/** + * A callback provided to `withConnection` + * + * @callback ConnectionPool~withConnectionCallback + * @param {MongoError} error An error instance representing the error during the execution. + * @param {Connection} connection The managed connection which was checked out of the pool. + * @param {Function} callback A function to call back after connection management is complete + */ + module.exports = { ConnectionPool }; diff --git a/test/unit/cmap/connection_pool.test.js b/test/unit/cmap/connection_pool.test.js index e42f4a80c4..8872839cd4 100644 --- a/test/unit/cmap/connection_pool.test.js +++ b/test/unit/cmap/connection_pool.test.js @@ -45,6 +45,80 @@ describe('Connection Pool', function() { mock.createServer().then(s => (server = s)); }); + describe('withConnection', function() { + it('should manage a connection for a successful operation', function(done) { + server.setMessageHandler(request => { + const doc = request.document; + if (doc.ismaster) { + request.reply(mock.DEFAULT_ISMASTER_36); + } + }); + + const pool = new ConnectionPool(Object.assign({ bson: new BSON() }, server.address())); + const callback = (err, result) => { + expect(err).to.not.exist; + expect(result).to.exist; + pool.close(done); + }; + + pool.withConnection((err, conn, cb) => { + expect(err).to.not.exist; + + conn.command('$admin.cmd', { ismaster: 1 }, (cmdErr, ismaster) => { + expect(cmdErr).to.not.exist; + cb(undefined, ismaster); + }); + }, callback); + }); + + it('should allow user interaction with an error', function(done) { + server.setMessageHandler(request => { + const doc = request.document; + if (doc.ismaster) { + request.connection.destroy(); + } + }); + + const pool = new ConnectionPool( + Object.assign({ bson: new BSON(), waitQueueTimeoutMS: 250 }, server.address()) + ); + + const callback = err => { + expect(err).to.exist; + expect(err).to.match(/Timed out/); + pool.close(done); + }; + + pool.withConnection((err, conn, cb) => { + expect(err).to.exist; + expect(err).to.match(/Timed out/); + cb(err); + }, callback); + }); + + it('should return an error to the original callback', function(done) { + server.setMessageHandler(request => { + const doc = request.document; + if (doc.ismaster) { + request.reply(mock.DEFAULT_ISMASTER_36); + } + }); + + const pool = new ConnectionPool(Object.assign({ bson: new BSON() }, server.address())); + const callback = (err, result) => { + expect(err).to.exist; + expect(result).to.not.exist; + expect(err).to.match(/my great error/); + pool.close(done); + }; + + pool.withConnection((err, conn, cb) => { + expect(err).to.not.exist; + cb(new Error('my great error')); + }, callback); + }); + }); + describe('spec tests', function() { const threads = new Map(); const connections = new Map();