Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

first pass auto auto discovery

  • Loading branch information...
commit 39465b2069428bdd55888183f049b6ae94a7e997 1 parent ff3e10b
@devdazed devdazed authored
Showing with 166 additions and 6 deletions.
  1. +166 −6 lib/pool.js
View
172 lib/pool.js
@@ -18,6 +18,40 @@ var replyNotAvailable = function (callback) {
}
};
+/**
+ * Normalizes (flattens) the rules
+ */
+function normalizeRules(denormrules){
+ //flatten rules to simplify matching
+ var rules = [];
+ denormrules.forEach(function(rule){
+ if(rule.datacenter){
+ if(rule.racks){
+ rule.racks.forEach(function(rack){
+ if(rule.hosts){
+ rule.hosts.forEach(function(host){
+ rules.push({ datacenter: rule.datacenter, rack: rack, host: host});
+ });
+ } else {
+ rules.push({ datacenter: rule.datacenter, rack: rack});
+ }
+ });
+ } else if(rule.hosts){
+ rule.hosts.forEach(function(host){
+ rules.push({ host: host });
+ });
+ } else {
+ rules.push({ datacenter: rule.datacenter });
+ }
+ } else if (rule.hosts) {
+ rule.hosts.forEach(function(host){
+ rules.push({ host: host });
+ });
+ }
+ });
+ return rules;
+}
+
/**
* Creates a connection to a keyspace for each of the servers in the pool;
@@ -32,7 +66,14 @@ var replyNotAvailable = function (callback) {
* pass : 'qwerty',
* timeout : 30000,
* cqlVersion : '3.0.0',
- * hostPoolSize : 1
+ * hostPoolSize : 1,
+ * autoDiscover: true, *
+ * whitelist: [
+ * { datacenter: 'cassandra-east', 'racks': ['rack1'], hosts:['10.10.10.10'] }
+ * ],
+ * blacklist: [
+ * { datacenter: 'cassandra-east', 'racks': ['rack1'], hosts:['10.10.10.10'] }
+ * ]
* });
*
* @constructor
@@ -49,6 +90,25 @@ var Pool = function(options){
this.hostPoolSize = options.hostPoolSize ? options.hostPoolSize : 1;
this.retryInterval = null;
this.closing = false;
+ if (options.autoDiscover){
+ this.autoDiscover = true;
+ if(options.whitelist && options.blacklist){
+ throw errors.create({ why : 'Please either either whitelist OR blacklist, not both',
+ name : 'InvalidOptionsException' });
+ }
+
+ if (Array.isArray(options.whitelist)){
+ this.rules = options.whitelist;
+ this.ruleType = 'whitelist';
+ } else if (Array.isArray(options.blacklist)){
+ this.rules = options.blacklist;
+ this.ruleType = 'blacklist';
+ } else {
+ this.rules = [];
+ this.ruleType = 'all';
+ }
+ }
+
if(!options.hosts && options.host){
this.hosts = [options.host];
@@ -56,6 +116,10 @@ var Pool = function(options){
this.hosts = options.hosts;
}
+ if (!this.hosts){
+ this.hosts = ['localhost'];
+ }
+
if(!Array.isArray(this.hosts)){
throw(new Error('HelenusError: Invalid hosts supplied for connection pool'));
}
@@ -73,6 +137,8 @@ Pool.prototype.connect = function(callback){
len = this.hosts.length * this.hostPoolSize,
connected = 0;
+ callback = callback || NOOP;
+
function onConnect(err, connection, keyspace, host){
finished += 1;
@@ -97,6 +163,7 @@ Pool.prototype.connect = function(callback){
if(connected === 0){
replyNotAvailable(callback);
}
+
//now that we have a connection, lets start monitoring
self.monitorConnections();
}
@@ -129,6 +196,65 @@ Pool.prototype.connect = function(callback){
};
/**
+ * After connecting to a seed host, auto-detect the cluster and connect to the remaining nodes
+ */
+Pool.prototype.discover = function(callback){
+ var self = this,
+ rules = normalizeRules(this.rules);
+
+ this.execute('describe_ring', this.keyspace, function(err, details){
+ if(err){
+ return callback(err);
+ }
+
+ //use an object to ensure unique values
+ var hosts = {};
+
+ details.forEach(function(tokenRange){
+ tokenRange.endpoint_details.forEach(function(host){
+ if (self.ruleType !== 'all'){
+ var match = false, i = 0, rule;
+
+ for (; i < rules.length; i += 1){
+ rule = rules[i];
+ if(rule.host){
+ if(rule.host === host.host){
+ match = true;
+ break;
+ }
+ } else {
+ if(rule.datacenter && host.datacenter === rule.datacenter){
+ if(rule.rack){
+ if(rule.rack === host.rack){
+ match = true;
+ break;
+ }
+ } else {
+ match = true;
+ break;
+ }
+ }
+ }
+ }
+
+ if(match && self.ruleType === 'whitelist'){
+ hosts[host.host] = '';
+ }
+
+ if (!match && self.ruleType === 'blacklist'){
+ hosts[host.host] = '';
+ }
+ } else {
+ hosts[host.host] = '';
+ }
+ });
+ });
+
+ callback(null, Object.keys(hosts));
+ });
+};
+
+/**
* Changes the current keyspace for the connection
*/
Pool.prototype.use = function(keyspace, callback){
@@ -300,15 +426,49 @@ Pool.prototype.monitorConnections = function(){
*/
function checkDead(){
- if(self.closing) return;
+ if(self.closing) {
+ return;
+ }
- if(self.dead.length > 0){
- connect(self.dead.pop());
- checkDead();
+ if (self.autoDiscover){
+ self.discover(function(err, hosts){
+ if(err){
+ self.emit('error', err);
+ }
+
+ //add new hosts
+ hosts.forEach(function(host){
+ if(!(host in self.hosts) && !(host in self.dead)){
+ connect(host);
+ }
+ });
+
+ //remove hosts that are no longer in the ring
+ //they should be dead, so no need to go over the live hosts
+ var validDead = [];
+ self.dead.forEach(function(host){
+ if(host in hosts){
+ validDead.push(host);
+ }
+ });
+ self.dead = validDead;
+
+ if(self.dead.length > 0){
+ connect(self.dead.pop());
+ checkDead();
+ }
+ });
+ } else {
+ if(self.dead.length > 0){
+ connect(self.dead.pop());
+ checkDead();
+ }
}
}
- this.retryInterval = setInterval(checkDead, 5000);
+ this.retryInterval = setInterval(checkDead, 15000);
+ //do it once at the beginning
+ checkDead();
};
/**
Please sign in to comment.
Something went wrong with that request. Please try again.