Permalink
Browse files

Merge pull request #72 from calvinfo/master

Adding callback on nodes unavailable exception
  • Loading branch information...
2 parents 73e512a + b01c0fb commit b513d501986ead7b793adf738f3858e21d9467a6 @devdazed devdazed committed Sep 27, 2012
Showing with 127 additions and 38 deletions.
  1. +14 −27 lib/connection.js
  2. +12 −0 lib/errors.js
  3. +48 −9 lib/pool.js
  4. +15 −1 test/cql2.js
  5. +15 −1 test/cql3.js
  6. +6 −0 test/helpers/bad_connection.json
  7. +17 −0 test/thrift.js
View
@@ -5,7 +5,8 @@ var util = require('util'),
ttype = require('./cassandra/cassandra_types'),
Row = require('./row'),
zlib = require('zlib'),
- Keyspace = require('./keyspace');
+ Keyspace = require('./keyspace'),
+ errors = require('./errors');
/**
* A No-Operation function for default callbacks
@@ -40,20 +41,6 @@ var DEFAULT_HOST = 'localhost';
var DEFAULT_TIMEOUT = 4000;
/**
- * Creates a JS Error from a Cassndra Error
- * @param {Object} err The cassandra error eg: { name:'Exception', why:'Some Reason' }
- * @private
- * @memberOf Connection
- */
-function createError(err){
- //sometimes the message comes back as the field why and sometimes as the field message
- var error = new Error(err.why || err.message);
- error.name = 'Helenus' + err.name;
- Error.captureStackTrace(error, createError);
- return error;
-}
-
-/**
* Formats CQL properly, paradigm borrowed from node-mysql:
* https://github.com/felixge/node-mysql/blob/master/lib/client.js#L145-199
*
@@ -72,14 +59,14 @@ function formatCQL(cql, params){
//escape the params and format the CQL string
cql = cql.replace(/\?|%[sjd]/g, function() {
if (params.length === 0) {
- throw createError(new Error('Too Few Parameters Given'));
+ throw errors.create(new Error('Too Few Parameters Given'));
}
return escapeCQL(params.shift());
});
if (params.length) {
- throw createError(new Error('Too Many Parameters Given'));
+ throw errors.create(new Error('Too Many Parameters Given'));
}
return cql;
@@ -255,15 +242,15 @@ Connection.prototype.connect = function(callback){
if (err){
self._connection.connection.destroy();
- callback(createError(err));
+ callback(errors.create(err));
return;
}
if (self.cqlVersion) {
self._client.set_cql_version(self.cqlVersion, function(err) {
if (err) {
self._connection.connection.destroy();
- callback(createError(err));
+ callback(errors.create(err));
return;
}
onCqlVersionSelected();
@@ -289,7 +276,7 @@ Connection.prototype.connect = function(callback){
});
timer = setTimeout(function(){
- callback(createError({ name: 'TimeoutException', why: 'Connection Timed Out'}));
+ callback(errors.create({ name: 'TimeoutException', why: 'Connection Timed Out'}));
self._connection.connection.destroy();
}, this.timeout);
};
@@ -305,13 +292,13 @@ Connection.prototype.use = function(keyspace, callback){
function onDescribe(err, definition){
if (err) {
- callback(createError(err));
+ callback(errors.create(err));
return;
}
self._client.set_keyspace(keyspace, function(err){
if(err){
- callback(createError(err));
+ callback(errors.create(err));
} else {
callback(null, new Keyspace(self, definition));
}
@@ -334,7 +321,7 @@ Connection.prototype.authenticate = function(callback){
self._client.login(authRequest, function(err){
if (err){
- callback(createError(err));
+ callback(errors.create(err));
} else {
callback(null);
}
@@ -365,7 +352,7 @@ Connection.prototype.execute = function(){
*/
function onReturn(err, results){
if(err){
- callback(createError(err));
+ callback(errors.create(err));
} else {
callback(null, results);
}
@@ -466,7 +453,7 @@ Connection.prototype.createKeyspace = function(keyspace, options, callback){
options = options || {};
if(!keyspace){
- callback(createError({name:'InvalidNameError', why:'Keyspace name not specified'}));
+ callback(errors.create({name:'InvalidNameError', why:'Keyspace name not specified'}));
return;
}
@@ -489,7 +476,7 @@ Connection.prototype.createKeyspace = function(keyspace, options, callback){
*/
function onComplete(err, response){
if(err){
- callback(createError(err));
+ callback(errors.create(err));
} else {
callback(null, response);
}
@@ -511,7 +498,7 @@ Connection.prototype.dropKeyspace = function(keyspace, callback){
function onComplete(err, response){
if(err){
- callback(createError(err));
+ callback(errors.create(err));
} else {
callback(null, response);
}
View
@@ -0,0 +1,12 @@
+
+/**
+ * Creates a JS Error from a Cassndra Error
+ * @param {Object} err The cassandra error eg: { name:'Exception', why:'Some Reason' }
+ */
+exports.create = function createError(err){
+ //sometimes the message comes back as the field why and sometimes as the field message
+ var error = new Error(err.why || err.message);
+ error.name = 'Helenus' + err.name;
+ Error.captureStackTrace(error, createError);
+ return error;
+};
View
@@ -1,5 +1,6 @@
var Connection = require('./connection'),
- util = require('util');
+ util = require('util'),
+ errors = require('./errors');
/**
* A No-Operation default for empty callbacks
@@ -8,6 +9,16 @@ var Connection = require('./connection'),
*/
var NOOP = function(){};
+
+var replyNotAvailable = function (callback) {
+
+ if(typeof callback === 'function'){
+ callback(errors.create({ why : 'Could Not Connect To Any Nodes',
+ name : 'NoAvailableNodesException' }));
+ }
+};
+
+
/**
* Creates a connection to a keyspace for each of the servers in the pool;
* @param {Object} options The options for the connection
@@ -83,7 +94,7 @@ Pool.prototype.connect = function(callback){
if(finished === len){
if(self.clients.length === 0){
- callback(new Error('Could Not Connect To Any Nodes'));
+ replyNotAvailable(callback);
}
}
}
@@ -146,9 +157,16 @@ Pool.prototype.use = function(keyspace, callback){
*/
Pool.prototype.execute = function(command){
var args = Array.prototype.slice.apply(arguments),
- conn = this.getConnection();
+ conn = this.getConnection(),
+ callback;
+
+ if (!conn) {
+ callback = args.pop();
+ replyNotAvailable(callback);
+ } else {
+ conn.execute.apply(conn, args);
+ }
- conn.execute.apply(conn, args);
};
/**
@@ -159,9 +177,15 @@ Pool.prototype.execute = function(command){
*/
Pool.prototype.cql = function(){
var args = Array.prototype.slice.apply(arguments),
- conn = this.getConnection();
+ conn = this.getConnection(),
+ callback;
- conn.cql.apply(conn, args);
+ if(!conn) {
+ callback = args.pop();
+ replyNotAvailable(callback);
+ } else {
+ conn.cql.apply(conn, args);
+ }
};
/**
@@ -173,7 +197,9 @@ Pool.prototype.getConnection = function(){
host = this.clients[rnd];
if (!host){
- this.emit('error', new Error('No Available Connections'));
+ this.emit('error', errors.create({ why : 'No Available Connections',
+ name : 'NoAvailableNodesException' }));
+ return;
}
if (host.ready){
@@ -202,15 +228,28 @@ Pool.prototype.getConnection = function(){
*/
Pool.prototype.createKeyspace = function(name, options, callback){
var conn = this.getConnection();
- conn.createKeyspace.call(conn, name, options, callback);
+
+ if (!conn) {
+ if (typeof options === 'function') {
+ callback = options;
+ }
+ replyNotAvailable(callback);
+ } else {
+ conn.createKeyspace.call(conn, name, options, callback);
+ }
};
/**
* Creates a keyspace see Connection.prototype.createKeyspace
*/
Pool.prototype.dropKeyspace = function(name, callback){
var conn = this.getConnection();
- conn.dropKeyspace.call(conn, name, callback);
+
+ if(!conn){
+ replyNotAvailable(callback);
+ } else {
+ conn.dropKeyspace.call(conn, name, callback);
+ }
};
/**
View
@@ -1,4 +1,5 @@
-var poolConfig = require('./helpers/connection'), Helenus, conn,
+var poolConfig = require('./helpers/connection'),
+ badConfig = require('./helpers/bad_connection'), Helenus, conn,
config = require('./helpers/cql2'),
canSelectCqlVersion = require('./helpers/can_select_cql_version');
@@ -24,6 +25,19 @@ module.exports = {
});
},
+ 'test a bad connection will return an error':function(test, assert){
+ var badConn = new Helenus.ConnectionPool(badConfig);
+ // Add error handler to avoid uncaught exception.
+ badConn.on('error', function (err) { assert.isDefined(err); });
+ badConn.connect(function(err) {
+ assert.isDefined(err);
+ badConn.cql(config['create_ks#cql'], function(err, res){
+ assert.isDefined(err);
+ test.finish();
+ });
+ });
+ },
+
'test cql create keyspace':function(test, assert){
conn.cql(config['create_ks#cql'], function(err, res){
assert.ifError(err);
View
@@ -1,4 +1,5 @@
-var poolConfig = require('./helpers/connection'), Helenus, conn,
+var poolConfig = require('./helpers/connection'),
+ badConfig = require('./helpers/bad_connection'), Helenus, conn,
config = require('./helpers/cql3'),
canSelectCqlVersion = require('./helpers/can_select_cql_version');
@@ -51,6 +52,19 @@ module.exports = {
});
},
+ 'test a bad connection will return an error':function(test, assert){
+ var badConn = new Helenus.ConnectionPool(badConfig);
+ // Add error handler to avoid uncaught exception.
+ badConn.on('error', function (err) { assert.isDefined(err); });
+ badConn.connect(function(err) {
+ assert.isDefined(err);
+ badConn.cql(config['create_ks#cql'], function(err, res){
+ assert.isDefined(err);
+ test.finish();
+ });
+ });
+ },
+
'test cql create keyspace':testResultless(config['create_ks#cql']),
'test cql use keyspace':testResultless(config['use#cql']),
@@ -0,0 +1,6 @@
+{
+ "hosts" : ["unknown"],
+ "user" : "test",
+ "password" : "test1233",
+ "timeout" : 3000
+}
View
@@ -1,5 +1,6 @@
var config = require('./helpers/thrift'),
system = require('./helpers/connection'),
+ badSystem = require('./helpers/bad_connection'),
Helenus, conn, ks, cf_standard, row_standard, cf_composite, cf_counter;
module.exports = {
@@ -36,6 +37,22 @@ module.exports = {
});
},
+ 'test bad pool connect':function(test, assert) {
+ var badConn = new Helenus.ConnectionPool(badSystem);
+ // Add error handler to avoid uncaught exception.
+ badConn.on('error', function (err) { assert.isDefined(err); });
+ badConn.connect(function(err, keyspace) {
+ assert.isDefined(err);
+ badConn.createKeyspace(config.keyspace, function(err){
+ assert.isDefined(err);
+ badConn.dropKeyspace(config.keyspace, function(err){
+ assert.isDefined(err);
+ test.finish();
+ });
+ });
+ });
+ },
+
'test pool.createKeyspace':function(test, assert){
conn.createKeyspace(config.keyspace, function(err){
assert.ifError(err);

0 comments on commit b513d50

Please sign in to comment.