Skip to content

Commit

Permalink
feat(eachAsync): dedupe async iteration with a common helper
Browse files Browse the repository at this point in the history
  • Loading branch information
mbroadst authored and daprahamian committed Aug 13, 2019
1 parent 2d1ff40 commit c296f3a
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 70 deletions.
128 changes: 59 additions & 69 deletions lib/core/connection/pool.js
Expand Up @@ -19,6 +19,7 @@ const apm = require('./apm');
const Buffer = require('safe-buffer').Buffer;
const connect = require('./connect');
const updateSessionFromResponse = require('../sessions').updateSessionFromResponse;
const eachAsync = require('../utils').eachAsync;

var DISCONNECTED = 'disconnected';
var CONNECTING = 'connecting';
Expand Down Expand Up @@ -635,42 +636,35 @@ Pool.prototype.unref = function() {

// Destroy the connections
function destroy(self, connections, options, callback) {
let connectionCount = connections.length;
function connectionDestroyed() {
connectionCount--;
if (connectionCount > 0) {
return;
}

// clear all pool state
self.inUseConnections = [];
self.availableConnections = [];
self.connectingConnections = 0;
self.executing = false;
self.queue = [];
self.reconnectConnection = null;
self.numberOfConsecutiveTimeouts = 0;
self.connectionIndex = 0;
self.retriesLeft = self.options.reconnectTries;
self.reconnectId = null;

// Set state to destroyed
stateTransition(self, DESTROYED);
if (typeof callback === 'function') {
callback(null, null);
}
}
eachAsync(
connections,
(conn, cb) => {
CONNECTION_EVENTS.forEach(eventName => conn.removeAllListeners(eventName));
conn.destroy(options, cb);
},
err => {
if (err) {
if (typeof callback === 'function') callback(err, null);
return;
}

if (connectionCount === 0) {
connectionDestroyed();
return;
}
// clear all pool state
self.inUseConnections = [];
self.availableConnections = [];
self.connectingConnections = 0;
self.executing = false;
self.queue = [];
self.reconnectConnection = null;
self.numberOfConsecutiveTimeouts = 0;
self.connectionIndex = 0;
self.retriesLeft = self.options.reconnectTries;
self.reconnectId = null;

// Destroy all connections
connections.forEach(conn => {
CONNECTION_EVENTS.forEach(eventName => conn.removeAllListeners(eventName));
conn.destroy(options, connectionDestroyed);
});
// Set state to destroyed
stateTransition(self, DESTROYED);
if (typeof callback === 'function') callback(null, null);
}
);
}

/**
Expand Down Expand Up @@ -755,43 +749,39 @@ Pool.prototype.destroy = function(force, callback) {
*/
Pool.prototype.reset = function(callback) {
const connections = this.availableConnections.concat(this.inUseConnections);
let connectionCount = connections.length;
const connectionDestroyed = () => {
connectionCount--;
if (connectionCount > 0) {
return;
}
eachAsync(
connections,
(conn, cb) => {
CONNECTION_EVENTS.forEach(eventName => conn.removeAllListeners(eventName));
conn.destroy({ force: true }, cb);
},
err => {
if (err) {
if (typeof callback === 'function') {
callback(err, null);
return;
}
}

// clear all pool state
this.inUseConnections = [];
this.availableConnections = [];
this.connectingConnections = 0;
this.executing = false;
this.reconnectConnection = null;
this.numberOfConsecutiveTimeouts = 0;
this.connectionIndex = 0;
this.retriesLeft = this.options.reconnectTries;
this.reconnectId = null;

// create an initial connection, and kick off execution again
_createConnection(this);

if (typeof callback === 'function') {
callback(null, null);
// clear all pool state
this.inUseConnections = [];
this.availableConnections = [];
this.connectingConnections = 0;
this.executing = false;
this.reconnectConnection = null;
this.numberOfConsecutiveTimeouts = 0;
this.connectionIndex = 0;
this.retriesLeft = this.options.reconnectTries;
this.reconnectId = null;

// create an initial connection, and kick off execution again
_createConnection(this);

if (typeof callback === 'function') {
callback(null, null);
}
}
};

// if we already have no connections, just reset state and callback
if (connectionCount === 0) {
connectionDestroyed();
return;
}

// destroy all connections
connections.forEach(conn => {
CONNECTION_EVENTS.forEach(eventName => conn.removeAllListeners(eventName));
conn.destroy({ force: true }, connectionDestroyed);
});
);
};

// Prepare the buffer that Pool.prototype.write() uses to send to the server
Expand Down
34 changes: 33 additions & 1 deletion lib/core/utils.js
Expand Up @@ -116,6 +116,37 @@ function isPromiseLike(maybePromise) {
return maybePromise && typeof maybePromise.then === 'function';
}

/**
* Applies the function `eachFn` to each item in `arr`, in parallel.
*
* @param {array} arr an array of items to asynchronusly iterate over
* @param {function} eachFn A function to call on each item of the array. The callback signature is `(item, callback)`, where the callback indicates iteration is complete.
* @param {function} callback The callback called after every item has been iterated
*/
function eachAsync(arr, eachFn, callback) {
if (arr.length === 0) {
callback(null);
return;
}

const length = arr.length;
let completed = 0;
function eachCallback(err) {
if (err) {
callback(err, null);
return;
}

if (++completed === length) {
callback(null);
}
}

for (let idx = 0; idx < length; ++idx) {
eachFn(arr[idx], eachCallback);
}
}

module.exports = {
uuidV4,
calculateDurationInMs,
Expand All @@ -124,5 +155,6 @@ module.exports = {
retrieveEJSON,
retrieveKerberos,
maxWireVersion,
isPromiseLike
isPromiseLike,
eachAsync
};

0 comments on commit c296f3a

Please sign in to comment.