Skip to content

Commit

Permalink
[CONJS-41] cluster promise implementation - part 2
Browse files Browse the repository at this point in the history
- implementation of PoolCluster.of
- failover handling
  • Loading branch information
rusher committed Sep 27, 2018
1 parent 337e50d commit 6769da5
Show file tree
Hide file tree
Showing 6 changed files with 629 additions and 290 deletions.
53 changes: 53 additions & 0 deletions lib/filtered-pool-cluster.js
@@ -0,0 +1,53 @@
/**
* Similar to pool cluster with pre-set pattern and selector.
* Additional method query
*
* @param poolCluster cluster
* @param patternArg pre-set pattern
* @param selectorArg pre-set selector
* @constructor
*/
function FilteredPoolCluster(poolCluster, patternArg, selectorArg) {
const cluster = poolCluster;
const pattern = patternArg;
const selector = selectorArg;

/**
* Get a connection according to previously indicated pattern and selector.
*
* @return {Promise}
*/
this.getConnection = () => {
return cluster.getConnection(pattern, selector);
};

/**
* Execute a query on one connection from available pools matching pattern
* in cluster.
*
* @param sql sql command
* @param value parameter value of sql command (not mandatory)
* @return {Promise}
*/
this.query = function(sql, value) {
return cluster
.getConnection(pattern, selector)
.then(conn => {
return conn
.query(sql, value)
.then(res => {
conn.end();
return res;
})
.catch(err => {
conn.end();
return err;
});
})
.catch(err => {
return err;
});
};
}

module.exports = FilteredPoolCluster;
210 changes: 162 additions & 48 deletions lib/pool-cluster.js
Expand Up @@ -3,13 +3,29 @@
const PoolClusterOptions = require("./config/pool-cluster-options");
const PoolOptions = require("./config/pool-options");
const Pool = require("./pool");
const FilteredPoolCluster = require("./filtered-pool-cluster");

/**
* Create a new Cluster.
* Cluster handle pools with patterns and handle failover / distributed load
* according to selectors (round robin / random / ordered )
*
* @param args cluster argurments. see pool-cluster-options.
* @constructor
*/
function PoolCluster(args) {

const opts = new PoolClusterOptions(args);
const nodes = {};
let cachedPatterns = {};
let nodeCounter = 0;

/**
* Add a new pool node to cluster.
*
* @param id identifier
* @param config pool configuration
*/
this.add = (id, config) => {
let identifier;
if (typeof id === "string" || id instanceof String) {
Expand All @@ -25,6 +41,11 @@ function PoolCluster(args) {
nodes[identifier] = pool;
};

/**
* End cluster (and underlying pools).
*
* @return {Promise<any[]>}
*/
this.end = () => {
cachedPatterns = {};
const poolEndPromise = [];
Expand All @@ -37,88 +58,181 @@ function PoolCluster(args) {
};

this.of = (pattern, selector) => {
//TODO ?
return new FilteredPoolCluster(this, pattern, selector);
};

/**
* Remove nodes according to pattern.
*
* @param pattern pattern
*/
this.remove = pattern => {
if (!pattern)
return Promise.reject(
new Error("pattern parameter in Cluster.remove(pattern) is mandatory")
);
throw new Error("pattern parameter in Cluster.remove(pattern) is mandatory");

const regex = RegExp(pattern);
Object.keys(nodes).forEach(key => {
if (regex.test(key)) {
nodes[key].end();
delete nodes[key];
cachedPatterns = {};
}
});
cachedPatterns = {};
};

this.getConnection = (pattern, selector, avoidNodeKey) => {
let retry = this.getConnection.bind(this, pattern, selector);
if (!pattern)
return Promise.reject(
new Error("pattern parameter in Cluster.getConnection(pattern, selector) is mandatory")
);
if (cachedPatterns[pattern]) {
return _getConnection(cachedPatterns[pattern], selector, retry, avoidNodeKey);
/**
* Get connection from available pools matching pattern, according to selector
*
* @param pattern pattern filter (not mandatory)
* @param selector node selector ('RR','RANDOM' or 'ORDER')
* @return {Promise}
*/
this.getConnection = (pattern, selector) => {
return _getConnection(pattern, selector);
};

/**
* Get connection from available pools matching pattern, according to selector
* with additional parameter to avoid reusing failing node
*
* @param pattern pattern filter (not mandatory)
* @param selector node selector ('RR','RANDOM' or 'ORDER')
* @param avoidNodeKey failing node
* @return {Promise}
* @private
*/
const _getConnection = (pattern, selector, avoidNodeKey) => {
const matchingNodeList = _matchingNodes(pattern || /^/);

if (matchingNodeList.length === 0) {
if (Object.keys(nodes).length === 0)
return Promise.reject(
new Error(
"No node have been added to cluster or nodes have been removed due to too much connection error"
)
);
return Promise.reject(new Error("No node found for pattern '" + pattern + "'"));
}

const retry = _getConnection.bind(this, pattern, selector);
return _selectPool(matchingNodeList, selector, retry, avoidNodeKey);
};

/**
* Selecting nodes according to pattern.
*
* @param pattern pattern
* @return {Json}
* @private
*/
const _matchingNodes = pattern => {
if (cachedPatterns[pattern]) return cachedPatterns[pattern];

const regex = RegExp(pattern);
const matchingNodeList = [];
Object.keys(nodes).forEach(key => {
if (regex.test(key)) {
if (!nodes[key].timeout || (nodes[key].timeout && nodes[key].timeout < Date.now())) {
matchingNodeList.push(key);
}
matchingNodeList.push(key);
}
});

if (matchingNodeList.length === 0) {
return Promise.reject(new Error("No node found for pattern '" + pattern + "'"));
}

cachedPatterns[pattern] = matchingNodeList;
return _getConnection(matchingNodeList, selector, retry, avoidNodeKey);
return matchingNodeList;
};

const _getConnection = (nodeList, selectorParam, retryFct, avoidNodeKey) => {
/**
* Select next node to be chosen in nodeList according to selector and failed nodes.
*
* @param nodeList current node list
* @param selectorParam selector
* @param retryFct retry function in case of connection fails
* @param avoidNodeKey last failing node to avoid selecting this one.
* @return {Promise}
* @private
*/
const _selectPool = (nodeList, selectorParam, retryFct, avoidNodeKey) => {
const selector = selectorParam || opts.defaultSelector;
let retry = 0;
let selectorFct;
let nodeKey;
switch (selector) {
case "RR":
let lastRoundRobin = nodeList.lastRrIdx;
if (lastRoundRobin === undefined) lastRoundRobin = -1;
if (++lastRoundRobin >= nodeList.length) lastRoundRobin = 0;
let nodeKey = nodeList[lastRoundRobin];
if (avoidNodeKey === nodeKey) {
if (++lastRoundRobin >= nodeList.length) lastRoundRobin = 0;
nodeKey = nodeList[lastRoundRobin];
}
nodeList.lastRrIdx = lastRoundRobin;
return _handleConnectionError(nodeList, nodeKey, retryFct, avoidNodeKey);
selectorFct = roundRobinSelector;
break;

case "RANDOM":
let randomIdx = Math.floor(Math.random() * nodeList.length);
let randomNodeKey = nodeList[randomIdx];
if (avoidNodeKey === randomNodeKey) {
if (++randomIdx >= nodeList.length) randomIdx = 0;
randomNodeKey = nodeList[randomNodeKey];
}
return _handleConnectionError(nodeList, randomNodeKey, retryFct, avoidNodeKey);
selectorFct = randomSelector;
break;

case "ORDER":
let orderIdx = 0;
if (avoidNodeKey === nodeList[0] && nodeList.length > 1) orderIdx = 1;
return _handleConnectionError(nodeList, nodeList[orderIdx], retryFct, avoidNodeKey);
selectorFct = orderedSelector;
break;

default:
return Promise.reject(
new Error(
"Wrong selector value '" + selector + "'. Possible values are 'RR','RANDOM' or 'ORDER'"
)
);
}

nodeKey = selectorFct(nodeList, retry);
while (
(avoidNodeKey === nodeKey || nodes[nodeKey].timeout > Date.now()) &&
retry < nodeList.length - 1
) {
retry++;
nodeKey = selectorFct(nodeList);
}
return Promise.reject(
new Error(
"Wrong selector value '" + selector + "'. Possible values are 'RR','RANDOM' or 'ORDER'"
)
);
return _handleConnectionError(nodeList, nodeKey, retryFct);
};

/**
* Round robin selector: using nodes one after the other.
*
* @param nodeList node list
* @return {String}
*/
const roundRobinSelector = nodeList => {
let lastRoundRobin = nodeList.lastRrIdx;
if (lastRoundRobin === undefined) lastRoundRobin = -1;
if (++lastRoundRobin >= nodeList.length) lastRoundRobin = 0;
nodeList.lastRrIdx = lastRoundRobin;
return nodeList[lastRoundRobin];
};

/**
* Random selector: use a random node.
*
* @param nodeList node list
* @return {String}
*/
const randomSelector = nodeList => {
let randomIdx = Math.floor(Math.random() * nodeList.length);
return nodeList[randomIdx];
};

/**
* Ordered selector: always use the nodes in sequence, unless failing.
*
* @param nodeList node list
* @param retry sequence number if last node is tagged has failing
* @return {String}
*/
const orderedSelector = (nodeList, retry) => {
return nodeList[retry];
};

const _handleConnectionError = (nodeList, nodeKey, retryFct, avoidNodeKey) => {
/**
* Connect, or if fail handle retry / set timeout error
*
* @param nodeList current node list
* @param nodeKey node name to connect
* @param retryFct retry function
* @return {Promise}
* @private
*/
const _handleConnectionError = (nodeList, nodeKey, retryFct) => {
const node = nodes[nodeKey];
return node
.getConnection()
Expand Down
50 changes: 27 additions & 23 deletions lib/pool.js
Expand Up @@ -445,35 +445,39 @@ function Pool(options, useCallback) {
const task = taskQueue.shift();
if (task) {
const conn = idleConnections.shift();
activeConnections[conn.threadId] = conn;
if (conn) {
activeConnections[conn.threadId] = conn;

resetTimeoutToNextTask();
resetTimeoutToNextTask();

//handle task
if (task.sql) {
if (useCallback) {
conn.query(task.sql, task.values, (err, rows, fields) => {
conn.releaseWithoutError();
if (err) {
task.reject(err);
} else {
task.resolve(rows);
}
});
} else {
conn
.query(task.sql, task.values)
.then(res => {
conn.releaseWithoutError();
task.resolve(res);
})
.catch(err => {
//handle task
if (task.sql) {
if (useCallback) {
conn.query(task.sql, task.values, (err, rows, fields) => {
conn.releaseWithoutError();
task.reject(err);
if (err) {
task.reject(err);
} else {
task.resolve(rows);
}
});
} else {
conn
.query(task.sql, task.values)
.then(res => {
conn.releaseWithoutError();
task.resolve(res);
})
.catch(err => {
conn.releaseWithoutError();
task.reject(err);
});
}
} else {
task.resolve(conn);
}
} else {
task.resolve(conn);
taskQueue.unshift(task);
}
}
};
Expand Down

0 comments on commit 6769da5

Please sign in to comment.