Permalink
Browse files

adding code to check if a registered service should be considered ava…

…ilable.
  • Loading branch information...
1 parent 5d8847f commit 78739e59df7310f7ab25c07679b88d0ad2d6b844 @soldair committed Feb 13, 2014
Showing with 103 additions and 8 deletions.
  1. +95 −6 index.js
  2. +2 −1 package.json
  3. +6 −1 test/test.js
View
101 index.js
@@ -5,7 +5,7 @@ var through = require('through');
var ini = require('ini');
var fs = require('fs');
var net = require('net');
-var tls = require('tls');
+var concats = require('concat-stream');
var undef;
module.exports = function(s3key,s3secret,bucket){
@@ -46,22 +46,105 @@ module.exports = function(s3key,s3secret,bucket){
s.write(buf);
s.end();
},
+ get:function(key,cb){
+ client.getFile(key,function(err,res){
+ if(err) return cb(err);
+ res.pipe(concats(function(data){
+ cb(false,json(data),data);
+ }));
+ });
+ },
list:function(service,cb){
opts = {};
if(typeof service == "function"){
cb = service;
service = undef;
}
+
if(service) opts.prefix = service;
client.list(opts,function(err,data){
-
- // todo list data.
- cb(err,data);
+ if(err) return cb(err);
+ if(data.Code){
+ var e = new Error(data.Message);
+ e.data = data;
+ e.code = data.Code;
+ return cb(e);
+ }
+
+ var out = [];
+ if(data.Contents) {
+ data.Contents.forEach(function(o){
+ var parts = o.Key.split('-');
+ o.service = parts[0];
+ o.id = parts[1];
+ o.host = parts[2];
+ o.port = parts[3];
+ out.push(o);
+ });
+ }
+
+ // todo list data.
+ cb(err,out);
+
});
},
- clean:function(check){
+ checkLimit:10,
+ staleTimeout:120000,
+ check:function(list,check,cb){
+ if(arguments.length === 2){
+ cb = check;
+ check = module.exports.tcpCheck;
+ }
// list and delete service id files where services have not checked in in a while and or fail the check.
+ var z = this;
+ z.list(function(err,list){
+
+ if(err) return cb(err);
+
+ var c = z.checkLimit;
+ var timeout = z.staleTimeout;
+
+ var todo = [];
+ list.forEach(function(o){
+ var mtime = +o.LastModified;
+ if(Date.now()-mtime > timeout){
+ o.stale = true;
+ o.offline = true;
+ } else {
+ // make sure its listening on the port.
+ todo.push(o);
+ }
+ });
+
+ var active = 0, started = 0
+ , job = function fn(o){
+ ++started;
+ check(o,function(err, success){
+ o.listening = success;
+ o.offline = !success;
+ if(todo.length) fn(todo.shift());
+ });
+ };
+
+ if(!todo.length){
+ return cb(false,list);
+ }
+
+ while(active < c && todo.length) {
+ ++active;
+ job(todo.shift());
+ }
+
+ });
+ },
+ clean:function(keys,cb){
+ client.deleteMultiple(keys,function(err,res){
+ if(err) return cb(err);
+ res.pipe(concats(function(data){
+ cb(false,data+'');
+ }))
+ });
}
};
@@ -84,7 +167,7 @@ module.exports.tcpCheck = function(info,cb){
console.log('error> ',arguments);
if(!called) {
called = true;
- cb(e);
+ cb(err);
}
});
@@ -106,3 +189,9 @@ module.exports.s3cfg = function(){
return ini.parse(fs.readFileSync(cfg).toString());
}
}
+
+function json(b){
+ try{
+ return JSON.parse(b);
+ } catch(e){}
+}
View
@@ -12,7 +12,8 @@
"ini": "~1.1.0",
"through": "~2.3.4",
"knox": "~0.8.9",
- "udid": "~0.1.4"
+ "udid": "~0.1.4",
+ "concat-stream": "~1.4.1"
},
"devDependencies": {
"tape": "~2.4.2"
View
@@ -15,7 +15,12 @@ test("can foos",function(t){
r.list(function(err,list){
console.log(err,list);
t.ok(list,'should have service file list');
- t.end();
+
+ r.get(list[0].Key,function(err,data){
+ console.log(err,data)
+ //
+ t.end();
+ });
});
});

0 comments on commit 78739e5

Please sign in to comment.