Skip to content

Commit

Permalink
feat: add a withConnection helper to the connection pool
Browse files Browse the repository at this point in the history
This helper allows you to easily manage the "checkedoutness" of a
connection. It will automatically check out a connection, pass it
to the function you provide, and check it back in for you when
your function calls back.
  • Loading branch information
mbroadst committed Dec 18, 2019
1 parent 7b62d79 commit d59dced
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 2 deletions.
2 changes: 1 addition & 1 deletion lib/cmap/connection.js
Expand Up @@ -40,7 +40,7 @@ class Connection extends EventEmitter {
});

stream.on('close', () => {
this[kQueue].forEach(op => op.callback(new MongoError('Connection closed')));
this[kQueue].forEach(op => op.cb(new MongoError('Connection closed')));
this[kQueue].clear();

this.emit('close');
Expand Down
41 changes: 40 additions & 1 deletion lib/cmap/connection_pool.js
Expand Up @@ -177,7 +177,9 @@ class ConnectionPool extends EventEmitter {
destroyConnection(this, connection, reason);
}

callback(null);
if (typeof callback === 'function') {
callback();
}
}

clear(callback) {
Expand Down Expand Up @@ -227,6 +229,34 @@ class ConnectionPool extends EventEmitter {
);
}

/**
* Runs a lambda with an implicitly checked out connection, checking that connection back in when the lambda
* has completed by calling back.
*
* NOTE: please note the required signature of `fn`
*
* @param {ConnectionPool~withConnectionCallback} fn A function which operates on a managed connection
* @param {Function} callback The original callback
* @return {Promise}
*/
withConnection(fn, callback) {
this.checkOut((err, conn) => {
// don't callback with `err` here, we might want to act upon it inside `fn`

fn(err, conn, (fnErr, result) => {
if (fnErr) {
callback(fnErr);
} else {
callback(undefined, result);
}

if (conn) {
this.checkIn(conn);
}
});
});
}

get totalConnectionCount() {
return this[kConnections].length + (this.options.maxPoolSize - this[kPermits]);
}
Expand Down Expand Up @@ -302,6 +332,15 @@ function destroyConnection(pool, connection, reason) {
process.nextTick(() => connection.destroy());
}

/**
* A callback provided to `withConnection`
*
* @callback ConnectionPool~withConnectionCallback
* @param {MongoError} error An error instance representing the error during the execution.
* @param {Connection} connection The managed connection which was checked out of the pool.
* @param {Function} callback A function to call back after connection management is complete
*/

module.exports = {
ConnectionPool
};
74 changes: 74 additions & 0 deletions test/unit/cmap/connection_pool.test.js
Expand Up @@ -45,6 +45,80 @@ describe('Connection Pool', function() {
mock.createServer().then(s => (server = s));
});

describe('withConnection', function() {
it('should manage a connection for a successful operation', function(done) {
server.setMessageHandler(request => {
const doc = request.document;
if (doc.ismaster) {
request.reply(mock.DEFAULT_ISMASTER_36);
}
});

const pool = new ConnectionPool(Object.assign({ bson: new BSON() }, server.address()));
const callback = (err, result) => {
expect(err).to.not.exist;
expect(result).to.exist;
pool.close(done);
};

pool.withConnection((err, conn, cb) => {
expect(err).to.not.exist;

conn.command('$admin.cmd', { ismaster: 1 }, (cmdErr, ismaster) => {
expect(cmdErr).to.not.exist;
cb(undefined, ismaster);
});
}, callback);
});

it('should allow user interaction with an error', function(done) {
server.setMessageHandler(request => {
const doc = request.document;
if (doc.ismaster) {
request.connection.destroy();
}
});

const pool = new ConnectionPool(
Object.assign({ bson: new BSON(), waitQueueTimeoutMS: 250 }, server.address())
);

const callback = err => {
expect(err).to.exist;
expect(err).to.match(/Timed out/);
pool.close(done);
};

pool.withConnection((err, conn, cb) => {
expect(err).to.exist;
expect(err).to.match(/Timed out/);
cb(err);
}, callback);
});

it('should return an error to the original callback', function(done) {
server.setMessageHandler(request => {
const doc = request.document;
if (doc.ismaster) {
request.reply(mock.DEFAULT_ISMASTER_36);
}
});

const pool = new ConnectionPool(Object.assign({ bson: new BSON() }, server.address()));
const callback = (err, result) => {
expect(err).to.exist;
expect(result).to.not.exist;
expect(err).to.match(/my great error/);
pool.close(done);
};

pool.withConnection((err, conn, cb) => {
expect(err).to.not.exist;
cb(new Error('my great error'));
}, callback);
});
});

describe('spec tests', function() {
const threads = new Map();
const connections = new Map();
Expand Down

0 comments on commit d59dced

Please sign in to comment.