Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
[CONJS-83] PoolCluster now emit 'remove' event supporting mysql API
  • Loading branch information
rusher committed Jun 17, 2019
1 parent 800d83e commit 95f6068
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 43 deletions.
18 changes: 18 additions & 0 deletions documentation/callback-api.md
Expand Up @@ -613,6 +613,24 @@ cluster.getConnection("slave*", (err, conn) => {
})
```

## `events`

PoolCluster object inherits from the Node.js [`EventEmitter`](https://nodejs.org/api/events.html).
Emits 'remove' event when a node is removed from configuration if the option `removeNodeErrorCount` is defined
(default to 5) and connector fails to connect more than `removeNodeErrorCount` times.
(if other nodes are present, each attemps will wait for value of the option `restoreNodeTimeout`)

```javascript
const mariadb = require('mariadb/callback');
const cluster = mariadb.createPoolCluster({ removeNodeErrorCount: 20, restoreNodeTimeout: 5000 });
cluster.add("master", { host: 'mydb1.com', user: 'myUser', connectionLimit: 5 });
cluster.add("slave1", { host: 'mydb2.com', user: 'myUser', connectionLimit: 5 });
cluster.add("slave2", { host: 'mydb3.com', user: 'myUser', connectionLimit: 5 });*
cluster.on('remove', node => {
console.log(`node ${node} was removed`);
})
```

## `poolCluster.of(pattern, selector) → FilteredPoolCluster`

> * `pattern`: *string* regex pattern to select pools. Example, `"slave*"`. default `'*'`
Expand Down
18 changes: 18 additions & 0 deletions documentation/promise-api.md
Expand Up @@ -1091,6 +1091,24 @@ cluster.add("slave2", { host: 'mydb3.com', user: 'myUser', connectionLimit: 5 })
cluster.getConnection("slave*")
```

## `events`

PoolCluster object inherits from the Node.js [`EventEmitter`](https://nodejs.org/api/events.html).
Emits 'remove' event when a node is removed from configuration if the option `removeNodeErrorCount` is defined
(default to 5) and connector fails to connect more than `removeNodeErrorCount` times.
(if other nodes are present, each attemps will wait for value of the option `restoreNodeTimeout`)

```javascript
const mariadb = require('mariadb');
const cluster = mariadb.createPoolCluster({ removeNodeErrorCount: 20, restoreNodeTimeout: 5000 });
cluster.add("master", { host: 'mydb1.com', user: 'myUser', connectionLimit: 5 });
cluster.add("slave1", { host: 'mydb2.com', user: 'myUser', connectionLimit: 5 });
cluster.add("slave2", { host: 'mydb3.com', user: 'myUser', connectionLimit: 5 });*
cluster.on('remove', node => {
console.log(`node ${node} was removed`);
})
```

## `poolCluster.of(pattern, selector) → FilteredPoolCluster`

> * `pattern`: *string* regex pattern to select pools. Example, `"slave*"`. default `'*'`
Expand Down
42 changes: 9 additions & 33 deletions lib/pool-cluster-callback.js
@@ -1,6 +1,7 @@
'use strict';

const PoolCluster = require('./pool-cluster');
const util = require('util');

/**
* Create a new Cluster.
Expand All @@ -11,18 +12,11 @@ const PoolCluster = require('./pool-cluster');
* @constructor
*/
function PoolClusterCallback(args) {
const cluster = new PoolCluster(args);
cluster.setCallback();
PoolCluster.call(this, args);
this.setCallback();

/**
* Add a new pool node to cluster.
*
* @param id identifier
* @param config pool configuration
*/
this.add = (id, config) => {
cluster.add(id, config);
};
const initialGetConnection = this.getConnection.bind(this);
const initialEnd = this.end.bind(this);

/**
* End cluster (and underlying pools).
Expand All @@ -35,27 +29,13 @@ function PoolClusterCallback(args) {
}
const endingFct = callback ? callback : () => {};

cluster
.end()
initialEnd()
.then(() => {
endingFct();
})
.catch(endingFct);
};

this.of = (pattern, selector) => {
return cluster.of(pattern, selector);
};

/**
* Remove nodes according to pattern.
*
* @param pattern pattern
*/
this.remove = pattern => {
cluster.remove(pattern);
};

/**
* Get connection from available pools matching pattern, according to selector
*
Expand All @@ -77,14 +57,10 @@ function PoolClusterCallback(args) {
}
const endingFct = cal ? cal : conn => {};

cluster.getConnection(pat, sel, endingFct);
initialGetConnection(pat, sel, endingFct);
};

//*****************************************************************
// internal public testing methods
//*****************************************************************

this.__tests = cluster.__tests;
}

util.inherits(PoolClusterCallback, PoolCluster);

module.exports = PoolClusterCallback;
31 changes: 25 additions & 6 deletions lib/pool-cluster.js
Expand Up @@ -5,6 +5,8 @@ const PoolOptions = require('./config/pool-options');
const Pool = require('./pool-promise');
const PoolCallback = require('./pool-callback');
const FilteredPoolCluster = require('./filtered-pool-cluster');
const EventEmitter = require('events');
const util = require('util');

/**
* Create a new Cluster.
Expand All @@ -19,6 +21,7 @@ function PoolCluster(args) {
const nodes = {};
let cachedPatterns = {};
let nodeCounter = 0;
EventEmitter.call(this);

/**
* Add a new pool node to cluster.
Expand Down Expand Up @@ -90,14 +93,14 @@ function PoolCluster(args) {
* @return {Promise}
*/
this.getConnection = (pattern, selector) => {
return _getConnection(pattern, selector);
return _getConnection(this, pattern, selector);
};

/**
* Force using callback methods.
*/
this.setCallback = () => {
this.getConnection = _getConnectionCallback;
this.getConnection = _getConnectionCallback.bind(this, this);
_createPool = _createPoolCallback;
};

Expand All @@ -112,7 +115,13 @@ function PoolCluster(args) {
* @return {Promise}
* @private
*/
const _getConnection = (pattern, selector, avoidNodeKey, lastError) => {
const _getConnection = (
cluster,
pattern,
selector,
avoidNodeKey,
lastError
) => {
const matchingNodeList = _matchingNodes(pattern || /^/);

if (matchingNodeList.length === 0) {
Expand All @@ -134,10 +143,10 @@ function PoolCluster(args) {
return Promise.reject(new Error(errMsg));
}

const retry = _getConnection.bind(this, pattern, selector);
const retry = _getConnection.bind(this, this, pattern, selector);
try {
const nodeKey = _selectPool(matchingNodeList, selector, avoidNodeKey);
return _handleConnectionError(matchingNodeList, nodeKey, retry);
return _handleConnectionError(cluster, matchingNodeList, nodeKey, retry);
} catch (e) {
return Promise.reject(e);
}
Expand All @@ -162,6 +171,7 @@ function PoolCluster(args) {
* @private
*/
const _getConnectionCallback = (
cluster,
pattern,
selector,
callback,
Expand All @@ -183,6 +193,7 @@ function PoolCluster(args) {
}

const retry = _getConnectionCallback.bind(
this,
this,
pattern,
selector,
Expand All @@ -191,6 +202,7 @@ function PoolCluster(args) {
try {
const nodeKey = _selectPool(matchingNodeList, selector, avoidNodeKey);
_handleConnectionCallbackError(
this,
matchingNodeList,
nodeKey,
retry,
Expand Down Expand Up @@ -309,13 +321,14 @@ function PoolCluster(args) {
/**
* Connect, or if fail handle retry / set timeout error
*
* @param cluster current cluster
* @param nodeList current node list
* @param nodeKey node name to connect
* @param retryFct retry function
* @return {Promise}
* @private
*/
const _handleConnectionError = (nodeList, nodeKey, retryFct) => {
const _handleConnectionError = (cluster, nodeList, nodeKey, retryFct) => {
const node = nodes[nodeKey];
return node
.getConnection()
Expand All @@ -334,6 +347,7 @@ function PoolCluster(args) {
delete nodes[nodeKey];
cachedPatterns = {};
delete nodeList.lastRrIdx;
process.nextTick(() => cluster.emit('remove', nodeKey));
//remove node from configuration if not already removed
node.end().catch(err => {
// dismiss error
Expand All @@ -349,13 +363,15 @@ function PoolCluster(args) {
/**
* Connect, or if fail handle retry / set timeout error
*
* @param cluster current cluster
* @param nodeList current node list
* @param nodeKey node name to connect
* @param retryFct retry function
* @param callback callback function
* @private
*/
const _handleConnectionCallbackError = (
cluster,
nodeList,
nodeKey,
retryFct,
Expand All @@ -374,6 +390,7 @@ function PoolCluster(args) {
delete nodes[nodeKey];
cachedPatterns = {};
delete nodeList.lastRrIdx;
process.nextTick(() => cluster.emit('remove', nodeKey));
//remove node from configuration if not already removed
node.end(() => {
//dismiss error
Expand Down Expand Up @@ -402,4 +419,6 @@ function PoolCluster(args) {
this.__tests = new TestMethods();
}

util.inherits(PoolCluster, EventEmitter);

module.exports = PoolCluster;

0 comments on commit 95f6068

Please sign in to comment.