Skip to content

Commit

Permalink
[CONJS-41] cluster promise implementation - part 1
Browse files Browse the repository at this point in the history
  • Loading branch information
rusher committed Sep 26, 2018
1 parent 13fcb8c commit 337e50d
Show file tree
Hide file tree
Showing 5 changed files with 603 additions and 29 deletions.
15 changes: 11 additions & 4 deletions lib/config/pool-cluster-options.js
Expand Up @@ -2,10 +2,17 @@

class PoolClusterOptions {
constructor(opts) {
this.canRetry = opts.canRetry === undefined ? true : opts.canRetry;
this.removeNodeErrorCount = opts.removeNodeErrorCount || 5;
this.restoreNodeTimeout = opts.restoreNodeTimeout || 0;
this.defaultSelector = opts.defaultSelector || "RR";
if (opts) {
this.canRetry = opts.canRetry === undefined ? true : opts.canRetry;
this.removeNodeErrorCount = opts.removeNodeErrorCount || 5;
this.restoreNodeTimeout = opts.restoreNodeTimeout || 0;
this.defaultSelector = opts.defaultSelector || "RR";
} else {
this.canRetry = true;
this.removeNodeErrorCount = 5;
this.restoreNodeTimeout = 0;
this.defaultSelector = "RR";
}
}
}

Expand Down
150 changes: 142 additions & 8 deletions lib/pool-cluster.js
@@ -1,29 +1,163 @@
"use strict";

const PoolClusterOptions = require("./config/pool-cluster-options.js");
const PoolClusterOptions = require("./config/pool-cluster-options");
const PoolOptions = require("./config/pool-options");
const Pool = require("./pool");

function PoolCluster(args) {
const opts = new PoolClusterOptions(args);
const nodes = {};
let cachedPatterns = {};
let nodeCounter = 0;

this.add = (id, config) => {
//TODO
let identifier;
if (typeof id === "string" || id instanceof String) {
identifier = id;
if (nodes[identifier])
throw new Error("Node identifier '" + identifier + "' already exist !");
} else {
identifier = "PoolNode-" + nodeCounter++;
}
const options = new PoolOptions(config);
const pool = new Pool(options, false);
pool.activatePool();
nodes[identifier] = pool;
};

this.end = callback => {
//TODO
this.end = () => {
cachedPatterns = {};
const poolEndPromise = [];
Object.keys(nodes).forEach(pool => {
poolEndPromise.push(nodes[pool].end());
delete nodes[pool];
});

return Promise.all(poolEndPromise);
};

this.of = (pattern, selector) => {
//TODO
//TODO ?
};

this.remove = pattern => {
//TODO
if (!pattern)
return Promise.reject(
new Error("pattern parameter in Cluster.remove(pattern) is mandatory")
);
const regex = RegExp(pattern);
Object.keys(nodes).forEach(key => {
if (regex.test(key)) {
nodes[key].end();
delete nodes[key];
}
});
cachedPatterns = {};
};

this.getConnection = (pattern, selector, avoidNodeKey) => {
let retry = this.getConnection.bind(this, pattern, selector);
if (!pattern)
return Promise.reject(
new Error("pattern parameter in Cluster.getConnection(pattern, selector) is mandatory")
);
if (cachedPatterns[pattern]) {
return _getConnection(cachedPatterns[pattern], selector, retry, avoidNodeKey);
}
const regex = RegExp(pattern);
const matchingNodeList = [];
Object.keys(nodes).forEach(key => {
if (regex.test(key)) {
if (!nodes[key].timeout || (nodes[key].timeout && nodes[key].timeout < Date.now())) {
matchingNodeList.push(key);
}
}
});

if (matchingNodeList.length === 0) {
return Promise.reject(new Error("No node found for pattern '" + pattern + "'"));
}

cachedPatterns[pattern] = matchingNodeList;
return _getConnection(matchingNodeList, selector, retry, avoidNodeKey);
};

const _getConnection = (nodeList, selectorParam, retryFct, avoidNodeKey) => {
const selector = selectorParam || opts.defaultSelector;
switch (selector) {
case "RR":
let lastRoundRobin = nodeList.lastRrIdx;
if (lastRoundRobin === undefined) lastRoundRobin = -1;
if (++lastRoundRobin >= nodeList.length) lastRoundRobin = 0;
let nodeKey = nodeList[lastRoundRobin];
if (avoidNodeKey === nodeKey) {
if (++lastRoundRobin >= nodeList.length) lastRoundRobin = 0;
nodeKey = nodeList[lastRoundRobin];
}
nodeList.lastRrIdx = lastRoundRobin;
return _handleConnectionError(nodeList, nodeKey, retryFct, avoidNodeKey);

case "RANDOM":
let randomIdx = Math.floor(Math.random() * nodeList.length);
let randomNodeKey = nodeList[randomIdx];
if (avoidNodeKey === randomNodeKey) {
if (++randomIdx >= nodeList.length) randomIdx = 0;
randomNodeKey = nodeList[randomNodeKey];
}
return _handleConnectionError(nodeList, randomNodeKey, retryFct, avoidNodeKey);

case "ORDER":
let orderIdx = 0;
if (avoidNodeKey === nodeList[0] && nodeList.length > 1) orderIdx = 1;
return _handleConnectionError(nodeList, nodeList[orderIdx], retryFct, avoidNodeKey);
}
return Promise.reject(
new Error(
"Wrong selector value '" + selector + "'. Possible values are 'RR','RANDOM' or 'ORDER'"
)
);
};

this.getConnection = (pattern, selector, cb) => {
//TODO
const _handleConnectionError = (nodeList, nodeKey, retryFct, avoidNodeKey) => {
const node = nodes[nodeKey];
return node
.getConnection()
.then(conn => {
node.errorCount = 0;
return Promise.resolve(conn);
})
.catch(err => {
node.errorCount = node.errorCount ? node.errorCount + 1 : 1;
if (node.errorCount >= opts.removeNodeErrorCount && nodes[nodeKey]) {
if (opts.restoreNodeTimeout === 0) {
delete nodes[nodeKey];
cachedPatterns = {};
delete nodeList.lastRrIdx;
//remove node from configuration if not already removed
node.end().catch(err => {
// dismiss error
});
} else {
node.timeout = Date.now() + opts.restoreNodeTimeout;
}
if (nodeList.length === 0) return Promise.reject(err);
}

if (opts.canRetry) return retryFct(nodeKey);
return Promise.reject(err);
});
};

//*****************************************************************
// internal public testing methods
//*****************************************************************

function TestMethods() {}
TestMethods.prototype.getNodes = () => {
return nodes;
};

this.__tests = new TestMethods();
}

module.exports = PoolCluster;
45 changes: 31 additions & 14 deletions lib/pool.js
Expand Up @@ -38,7 +38,6 @@ function Pool(options, useCallback) {
* @return Promise
*/
this.end = function() {
if (firstTaskTimeout) clearTimeout(firstTaskTimeout);
if (closed) {
return Promise.reject(
Errors.createError(
Expand All @@ -52,6 +51,7 @@ function Pool(options, useCallback) {
)
);
}
closed = true;

//close unused connections
const idleConnectionsEndings = [];
Expand All @@ -60,8 +60,24 @@ function Pool(options, useCallback) {
idleConnectionsEndings.push(conn.end());
}

closed = true;
taskQueue.clear();
//reject all waiting task
if (firstTaskTimeout) {
clearTimeout(firstTaskTimeout);
let task;
const err = Errors.createError(
"retrieve connection from pool timeout",
false,
null,
"HY000",
Errors.ER_GET_CONNECTION_TIMEOUT,
undefined,
false
);
while ((task = taskQueue.shift())) {
process.nextTick(task.reject, err);
}
}

return Promise.all(idleConnectionsEndings);
};

Expand Down Expand Up @@ -233,16 +249,17 @@ function Pool(options, useCallback) {
firstTaskTimeout = null;
if (task === taskQueue.peekFront()) {
taskQueue.shift();
const err = Errors.createError(
"retrieve connection from pool timeout",
false,
null,
"HY000",
Errors.ER_GET_CONNECTION_TIMEOUT,
undefined,
false
task.reject(
Errors.createError(
"retrieve connection from pool timeout",
false,
null,
"HY000",
Errors.ER_GET_CONNECTION_TIMEOUT,
undefined,
false
)
);
process.nextTick(task.reject, err);
} else {
throw new Error("Rejection by timeout without task !!!");
}
Expand Down Expand Up @@ -411,7 +428,7 @@ function Pool(options, useCallback) {
* Grow pool connections until reaching connection limit.
*/
const checkPoolSize = function() {
if (!connectionInCreation && this.totalConnections() < opts.connectionLimit) {
if (!connectionInCreation && this.totalConnections() < opts.connectionLimit && !closed) {
connectionInCreation = true;
process.nextTick(addConnectionToPool, this);
}
Expand Down Expand Up @@ -470,7 +487,7 @@ function Pool(options, useCallback) {
rejectTimeout(nextTask);
} else {
firstTaskTimeout = setTimeout(rejectAndResetTimeout, nextTask.timeout - currTime, nextTask);
break;
return;
}
}
};
Expand Down
10 changes: 7 additions & 3 deletions promise.js
Expand Up @@ -2,9 +2,11 @@

let Connection = require("./lib/connection");
let Pool = require("./lib/pool");
let PoolCluster = require("./lib/pool-cluster");

let ConnOptions = require("./lib/config/connection-options");
let PoolOptions = require("./lib/config/pool-options");
let PoolClusterOptions = require("./lib/config/pool-cluster-options");

module.exports.createConnection = function createConnection(opts) {
try {
Expand All @@ -22,6 +24,8 @@ module.exports.createPool = function createPool(opts) {
return pool;
};

// exports.createPoolCluster = function createPoolCluster(config) {
// //TODO
// };

module.exports.createPoolCluster = function createPoolCluster(opts) {
const options = new PoolClusterOptions(opts);
return new PoolCluster(options);
};

0 comments on commit 337e50d

Please sign in to comment.