Permalink
Browse files

removed reinitialize logic, added heartbeat logic

  • Loading branch information...
1 parent 0ef1790 commit 115c53f597b720a5123f5efad6efc07c323d2421 @mbrevoort committed Apr 10, 2013
Showing with 39 additions and 35 deletions.
  1. +3 −4 index.js
  2. +27 −26 lib/seaport.js
  3. +4 −0 readme.markdown
  4. +5 −5 test/flaky_server.js
View
@@ -48,9 +48,6 @@ exports.connect = function () {
var active = true;
c.on('connect', s.emit.bind(s, 'connect'));
- c.on('connect', function () {
- s.reinitialize();
- })
c.on('end', onend);
c.on('error', onend);
@@ -81,6 +78,8 @@ exports.connect = function () {
};
exports.createServer = function (opts) {
+ if (!opts) opts = {};
+ opts.isServer = true;
var s = seaport(opts);
s.server = net.createServer(function (c) {
@@ -114,7 +113,7 @@ exports.createServer = function (opts) {
s.on('close', function () {
s.server.close();
});
-
+
s.server.on('listening', s.emit.bind(s, 'listening'));
s.server.on('connection', s.emit.bind(s, 'connection'));
View
@@ -14,6 +14,7 @@ function Seaport (opts) {
var self = this;
if (!(self instanceof Seaport)) return new Seaport(opts);
if (!opts) opts = {};
+ opts.heartbeatInterval = parseInt(opts.heartbeatInterval) || 2000;
self._authorized = {};
function lookupKey () {
@@ -78,6 +79,9 @@ function Seaport (opts) {
self.services = self.doc.createSet('type', 'service');
self.addresses = self.doc.createSet('type', 'address');
self.authorized = self.doc.createSet('type', 'authorize');
+ self.myservices = self.doc.createSet(function (state) {
+ return state.type === 'service' && state._node === self.doc.id;
+ });
self.ports = {};
self.doc.on('create', function (row) {
@@ -107,6 +111,26 @@ function Seaport (opts) {
self.authorize(String(key));
});
}
+
+ function heartbeat() {
+ self.myservices.forEach(function (row) {
+ row.set('_heartbeat', Date.now());
+ })
+ self._heartbeatTimer = setTimeout(heartbeat, opts.heartbeatInterval);
+ }
+
+ function heartbeatChecker() {
+ var staleTime = Date.now() - opts.heartbeatInterval*2;
+ self.services.forEach(function (row) {
+ if ((row.get('_heartbeat') || 0) < staleTime) {
+ self.doc.rm(row.id);
+ }
+ });
+ self._heartbeatCheckerTimer = setTimeout(heartbeatChecker, opts.heartbeatInterval*2);
+ }
+
+ heartbeat();
+ if (opts.isServer) heartbeatChecker();
}
inherits(Seaport, EventEmitter);
@@ -223,6 +247,7 @@ Seaport.prototype.registerMeta = function (role, opts) {
meta.type = 'service';
meta._node = self.doc.id;
+ meta._heartbeat = Date.now();
var id = meta.id = createId();
@@ -243,32 +268,6 @@ Seaport.prototype.register = function (role, opts) {
return this.registerMeta(role, opts).port;
}
-Seaport.prototype.reinitialize = function () {
- var self = this;
- //console.error("REINIT+++\n\n", self.doc, '\n\n')
- // delete this clients copies of records of any
- // other services but itself. Not using self.doc.rm(row.id);
- // because that would cause change events that would ripple
- // through once reconnected. Clear out the history as well.
- for (var id in self.doc.rows) {
- if(self.doc.rows.hasOwnProperty(id)) {
- var row = self.doc.rows[id];
- if (row.state._node !== self.doc.id) {
- delete self.services.rows[id]
- self.services._array = self.services._array.filter(function (it) { return it.id !== id });
- delete self.addresses.rows[id]
- self.addresses._array = self.addresses._array.filter(function (it) { return it.id !== id });
- delete self.doc.rows[id]
- delete self.doc.hist[id]
- // self.doc.rm(row.id);
- }
- }
- }
- //console.error("REINIT---\n\n", self.doc, '\n\n')
- //console.error('!!!!!!\n\n', this.query(""), '\n\n!!!!')
- //console.error(self.services.toJSON())
-}
-
Seaport.matches = matches;
function matches (rv, service) {
if (!rv) return true;
@@ -290,6 +289,8 @@ Seaport.prototype.query = function (rv) {
Seaport.prototype.close = function () {
this.closed = true;
+ clearTimeout(this._heartbeatCheckerTimer);
+ clearTimeout(this._heartbeatTimer);
this.emit('close');
};
View
@@ -247,6 +247,10 @@ Create a seaport instance with an attached tcp server with `.listen()` and
`.address()` methods that will set up streams in "server mode" for incoming tcp
connections.
+Internally seaport uses a heartbeat to help identify stale or orphaned service
+registrations. Use `opts.heartbeatInterval`, in milliseconds, to set the interval
+that clients should update their heartbeat. The default heartbeatInterval is `2000`.
+
## s.createStream(host)
Create a duplex stream of the underlying
View
@@ -5,9 +5,9 @@ var destroyer = require('destroyer');
test('flaky-server', function (t) {
t.plan(9);
-
- var server0 = seaport.createServer();
- var server1 = seaport.createServer();
+ var opts = { heartbeatInterval: 100 };
+ var server0 = seaport.createServer(opts);
+ var server1 = seaport.createServer(opts);
var destroy0 = destroyer(server0);
var destroy1 = destroyer(server1);
@@ -16,8 +16,8 @@ test('flaky-server', function (t) {
var port = server0.address().port;
- var ports0 = seaport.connect(port);
- var ports1 = seaport.connect(port);
+ var ports0 = seaport.connect(port, opts);
+ var ports1 = seaport.connect(port, opts);
var wport = ports0.register('woo');

0 comments on commit 115c53f

Please sign in to comment.