Skip to content

Commit

Permalink
feat(pool): add support for resetting the connection pool
Browse files Browse the repository at this point in the history
SDAM requires us to be able to rest the connection pool in certain
conditions, but we have traditionally not done this in the node
driver. This implements support for the reset, and unblocks us from
implementing tests like "cursors survive primary stepdown"

NODE-1682
  • Loading branch information
mbroadst authored and daprahamian committed Aug 13, 2019
1 parent 9ea7d95 commit 2d1ff40
Showing 1 changed file with 38 additions and 17 deletions.
55 changes: 38 additions & 17 deletions lib/core/connection/pool.js
Expand Up @@ -26,6 +26,8 @@ var CONNECTED = 'connected';
var DESTROYING = 'destroying';
var DESTROYED = 'destroyed';

const CONNECTION_EVENTS = ['error', 'close', 'timeout', 'parseError', 'connect', 'message'];

var _id = 0;

/**
Expand Down Expand Up @@ -631,9 +633,6 @@ Pool.prototype.unref = function() {
});
};

// Events
var events = ['error', 'close', 'timeout', 'parseError', 'connect', 'message'];

// Destroy the connections
function destroy(self, connections, options, callback) {
let connectionCount = connections.length;
Expand Down Expand Up @@ -669,10 +668,7 @@ function destroy(self, connections, options, callback) {

// Destroy all connections
connections.forEach(conn => {
for (var i = 0; i < events.length; i++) {
conn.removeAllListeners(events[i]);
}

CONNECTION_EVENTS.forEach(eventName => conn.removeAllListeners(eventName));
conn.destroy(options, connectionDestroyed);
});
}
Expand Down Expand Up @@ -758,19 +754,44 @@ Pool.prototype.destroy = function(force, callback) {
* @param {function} [callback]
*/
Pool.prototype.reset = function(callback) {
// this.destroy(true, err => {
// if (err && typeof callback === 'function') {
// callback(err, null);
// return;
// }
const connections = this.availableConnections.concat(this.inUseConnections);
let connectionCount = connections.length;
const connectionDestroyed = () => {
connectionCount--;
if (connectionCount > 0) {
return;
}

// stateTransition(this, DISCONNECTED);
// this.connect();
// clear all pool state
this.inUseConnections = [];
this.availableConnections = [];
this.connectingConnections = 0;
this.executing = false;
this.reconnectConnection = null;
this.numberOfConsecutiveTimeouts = 0;
this.connectionIndex = 0;
this.retriesLeft = this.options.reconnectTries;
this.reconnectId = null;

// create an initial connection, and kick off execution again
_createConnection(this);

// if (typeof callback === 'function') callback(null, null);
// });
if (typeof callback === 'function') {
callback(null, null);
}
};

// if we already have no connections, just reset state and callback
if (connectionCount === 0) {
connectionDestroyed();
return;
}

if (typeof callback === 'function') callback();
// destroy all connections
connections.forEach(conn => {
CONNECTION_EVENTS.forEach(eventName => conn.removeAllListeners(eventName));
conn.destroy({ force: true }, connectionDestroyed);
});
};

// Prepare the buffer that Pool.prototype.write() uses to send to the server
Expand Down

0 comments on commit 2d1ff40

Please sign in to comment.