Permalink
Browse files

Merge pull request #45 from nodejitsu/client-reconnect

Client reconnect
  • Loading branch information...
jcrugzz committed Mar 29, 2013
2 parents fac825c + 4a87801 commit dc3429727a002e924eae5fd42707aa6c094f3db2
Showing with 227 additions and 43 deletions.
  1. +59 −22 lib/godot/net/client.js
  2. +2 −1 package.json
  3. +49 −20 test/macros/net.js
  4. +117 −0 test/net/client-reconnect-test.js
View
@@ -6,7 +6,10 @@
*/
var dgram = require('dgram'),
- net = require('net');
+ net = require('net'),
+ util = require('util'),
+ backoff = require('backoff'),
+ EventEmitter = require('events').EventEmitter;
//
// ### function Server (options)
@@ -19,6 +22,8 @@ var dgram = require('dgram'),
// Producers attached to a TCP or UDP client.
//
var Client = module.exports = function Client(options) {
+ EventEmitter.call(this);
+
if (!options || !options.type
|| !~['tcp', 'udp', 'unix'].indexOf(options.type)) {
throw new Error('Cannot create client without type: udp, tcp, unix');
@@ -30,6 +35,7 @@ var Client = module.exports = function Client(options) {
this.host = options.host;
this.port = options.port;
this.path = options.path;
+ this.reconnect = options.reconnect;
this.producers = {};
this.handlers = {
data: {},
@@ -42,6 +48,7 @@ var Client = module.exports = function Client(options) {
});
}
};
+util.inherits(Client, EventEmitter);
//
// ### function add (producer)
@@ -117,7 +124,28 @@ Client.prototype.write = function (data) {
// Opens the underlying network connection for this client.
//
Client.prototype.connect = function (port, host, callback) {
- var err;
+ var self = this,
+ connectBackoff, backoffType;
+
+ if (this.reconnect) {
+ if (typeof this.reconnect === 'object') {
+ backoffType = this.reconnect.type || 'exponential';
+ connectBackoff = backoff[backoffType](this.reconnect);
+ connectBackoff.failAfter(this.reconnect.maxTries || 10);
+ }
+ else {
+ connectBackoff = backoff.exponential();
+ connectBackoff.failAfter(10);
+ }
+
+ connectBackoff.on('fail', function (err) {
+ self.emit('error', err);
+ });
+
+ connectBackoff.on('ready', function () {
+ connect();
+ });
+ }
//
// Do some fancy arguments parsing to support everything
@@ -137,12 +165,36 @@ Client.prototype.connect = function (port, host, callback) {
});
function error (arg) {
- err = new Error(arg + ' required to connect');
- if (callback) {
- return callback(err);
+ var err = new Error(arg + ' required to connect');
+ return callback
+ ? callback(err)
+ : self.emit('error', err) ;
+ }
+
+ function onError(err) {
+ return connectBackoff ? connectBackoff.backoff(err) : self.emit('error', err);
+ }
+
+ function connect() {
+ if (self.type === 'tcp') {
+ self.socket = net.connect({ port: self.port, host: self.host }, callback);
+ }
+ else if (self.type === 'udp') {
+ self.socket = dgram.createSocket('udp4');
+ if (callback) {
+ process.nextTick(callback);
+ }
+ }
+ else if (self.type === 'unix') {
+ self.socket = net.connect({ path: self.path }, callback);
}
- throw err;
+ self.socket.on('error', onError);
+ self.socket.on('connect', function () {
+ if (connectBackoff) {
+ connectBackoff.reset();
+ }
+ });
}
// Split cases due to unix using `this.path`
@@ -162,22 +214,7 @@ Client.prototype.connect = function (port, host, callback) {
}
}
- if (this.type === 'tcp') {
- this.socket = new net.Socket({ type: 'tcp4' });
- this.socket.setEncoding('utf8');
- this.socket.connect(this.port, this.host, callback);
- }
- else if (this.type === 'udp') {
- this.socket = dgram.createSocket('udp4');
- if (callback) {
- callback();
- }
- }
- else if (this.type === 'unix') {
- this.socket = new net.Socket({type: 'unix' });
- this.socket.setEncoding('utf8');
- this.socket.connect(this.path, callback);
- }
+ connect();
};
//
View
@@ -21,7 +21,8 @@
"sendgrid-web": "0.0.2",
"telenode": "0.0.3",
"utile": "0.1.7",
- "window-stream": "~0.3.1"
+ "window-stream": "~0.3.1",
+ "backoff": "2.1.x"
},
"devDependencies": {
"optimist": "0.3.4",
View
@@ -12,6 +12,46 @@ var assert = require('assert'),
fs = require('fs'),
godot = require('../../lib/godot');
+//
+// ### function shouldStartServer(options, nested)
+// #### @options {Options} Options to setup communication
+// #### @options.type {udp|tcp} Network protocol.
+// #### @options.port {number} Port to communicate over.
+// #### @nested {Object} Vows context to use once server is started.
+// Starts the server with specified options.
+//
+exports.shouldStartServer = function (options, nested) {
+ var context = {
+ topic: function () {
+ var self = this;
+
+ function create() {
+ mocks.net.createServer(options, function (err, server) {
+ if (err) {
+ console.log('Error creating mock server');
+ console.dir(err);
+ process.exit(1);
+ }
+
+ self.server = server;
+
+ self.callback(null, server);
+ });
+ }
+
+ options.type === 'unix'
+ ? fs.unlink('unix.sock', create)
+ : create();
+ }
+ };
+
+ if (nested) {
+ context['after the server is created'] = nested;
+ }
+
+ return context;
+};
+
//
// ### function shouldSendData (options, nested)
// #### @options {Options} Options to setup communication
@@ -31,26 +71,14 @@ exports.shouldSendData = function (options, nested) {
}
var context = {
- topic: function () {
- var callback = this.callback;
- var that = this;
- // Clears the socket for the unix sockets case
- fs.unlink('unix.sock', function () {
- async.series({
- server: async.apply(mocks.net.createServer, options),
- client: async.apply(helpers.net.createClient, options)
- }, function (err, results) {
- if (err) {
- console.log('Error creating mock server');
- console.dir(err);
- process.exit(1);
- }
- that.server = results.server;
- that.client = results.client;
+ topic: function (server) {
+ var self = this;
+
+ helpers.net.createClient(options, function (err, client) {
+ self.client = client;
- results.server.once('data', function (data) {
- callback(null, data);
- });
+ server.once('data', function (data) {
+ self.callback(null, data);
});
});
},
@@ -65,6 +93,7 @@ exports.shouldSendData = function (options, nested) {
}
};
+
if (nested) {
Object.keys(nested).forEach(function (vow) {
if (!context.hasOwnProperty('after data is sent')) {
@@ -74,7 +103,7 @@ exports.shouldSendData = function (options, nested) {
});
}
- return context;
+ return exports.shouldStartServer(options, context);
};
//
@@ -0,0 +1,117 @@
+/*
+ * client-reconnect-test.js: Basic tests for the reconnection of net client.
+ *
+ * (C) 2013, Nodejitsu Inc.
+ *
+ */
+
+var assert = require('assert'),
+ vows = require('vows'),
+ async = require('utile').async,
+ godot = require('../../lib/godot'),
+ helpers = require('../helpers'),
+ macros = require('../macros'),
+ mocks = require('../mocks');
+
+vows.describe('godot/net/client').addBatch({
+ "Godot client": {
+ "with no backoff and no server": {
+ topic: function () {
+ var callback = this.callback,
+ port = helpers.nextPort;
+
+ var client = godot.createClient({
+ type: 'tcp',
+ producers: [
+ godot.producer(helpers.fixtures['producer-test'])
+ ]
+ });
+
+ client.connect(port);
+ client.on('error', function (err) {
+ callback(null, err);
+ });
+ },
+ "should emit an error": function (_, err) {
+ assert(err);
+ assert.instanceOf(err, Error);
+ }
+ },
+ "with backoff and no server": {
+ topic: function () {
+ var callback = this.callback,
+ port = helpers.nextPort,
+ d = new Date();
+
+ var client = godot.createClient({
+ type: 'tcp',
+ producers: [
+ godot.producer(helpers.fixtures['producer-test'])
+ ],
+ reconnect: {
+ type: 'exponential',
+ maxTries: 2,
+ initialDelay: 100,
+ maxDelay: 300
+ }
+ });
+
+ client.connect(port);
+ client.on('error', function (err) {
+ callback(null, err, (new Date() - d));
+ });
+ },
+ "should emit an error": function (_, err) {
+ assert(err);
+ assert.instanceOf(err, Error);
+ },
+ "should take appropiate amount of time": function (_, err, t) {
+ assert(t >= 300);
+ }
+ },
+ "with backoff and server eventually coming up": {
+ topic: function () {
+ var callback = this.callback,
+ port = helpers.nextPort,
+ d = new Date();
+
+ var client = godot.createClient({
+ type: 'tcp',
+ producers: [
+ godot.producer(helpers.fixtures['producer-test'])
+ ],
+ reconnect: {
+ type: 'exponential',
+ maxTries: 2,
+ initialDelay: 100,
+ maxDelay: 300
+ }
+ });
+
+ client.connect(port);
+ client.on('error', function (err) {
+ throw err;
+ });
+
+ setTimeout(function () {
+ mocks.net.createServer({ type: 'tcp', port: port }, function (err, server) {
+ if (err) {
+ throw err;
+ }
+
+ server.once('data', function (data) {
+ callback(null, data, (new Date()) - d);
+ });
+ });
+ }, 300);
+ },
+ "should send data": function (err, data) {
+ assert(!err);
+ assert(data);
+ },
+ "should take appropiate amount of time": function (_, err, t) {
+ assert(t >= 200);
+ }
+ }
+ }
+}).export(module);

0 comments on commit dc34297

Please sign in to comment.