Skip to content

Commit

Permalink
moved to zmqrpc for management
Browse files Browse the repository at this point in the history
  • Loading branch information
openmason committed Dec 30, 2012
1 parent 92cb10b commit 71c8cd8
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 69 deletions.
3 changes: 2 additions & 1 deletion History.md
@@ -1,4 +1,5 @@
# release 0.2.2 # release 0.2.3
* switched to zmqrpc for cloud management
* fan-out feature added (check feedconfig.yaml) * fan-out feature added (check feedconfig.yaml)
* javascript modules can be loaded from job config * javascript modules can be loaded from job config
* cloudd with new structures ready on npm * cloudd with new structures ready on npm
Expand Down
35 changes: 14 additions & 21 deletions bin/cloudd
Expand Up @@ -21,36 +21,29 @@ winston.default.transports.console.level = 'info';
var _versionString = 'cloudd v'+cloudd.version + '\nnode.js cloud engine'; var _versionString = 'cloudd v'+cloudd.version + '\nnode.js cloud engine';


var cmdport = 'tcp://127.0.0.1:12345'; var cmdport = 'tcp://127.0.0.1:12345';
var zmq = require('zmq'); var zmqrpc = require('zmqrpc').Client;
console.log(_versionString); console.log(_versionString);
var launch='none'; var launch='none';
if(process.argv.length>2) { if(process.argv.length>2) {
launch = process.argv[2]; launch = process.argv[2];
} }
if(launch=='jobs' || launch=='ps' || launch=='tasks' || launch=='submit' || launch=='cron') { if(launch=='jobs' || launch=='ps' || launch=='tasks' || launch=='submit' || launch=='cron') {
var csocket = zmq.socket('req'); var cloudobj = new zmqrpc(cmdport, function(error, res) {
csocket.identity = 'client' + process.pid; if(error) {
csocket.connect(cmdport); console.log('***** Error: ' + error);
var cmd = {command:launch}; } else {
if(launch=='submit') { console.log(res.result);
cmd['id']=process.argv[3];
cmd['config']=process.argv[4];
} else if(launch=='cron') {
cmd['cron']=process.argv[3];
cmd['id']=process.argv[4];
cmd['config']=process.argv[5];
}
csocket.send(JSON.stringify(cmd));
csocket.on('message', function(data) {
var objs = JSON.parse(data.toString());
switch(launch) {
case 'ps' : _ps(objs); break;
case 'tasks': _tasks(objs); break;
case 'jobs' : _jobs(objs);break;
default : console.log(objs.result); break;
} }
process.exit(0); process.exit(0);
}); });
var cmd = {command:launch};
switch(launch) {
case 'submit': cloudobj.submit(process.argv[3], process.argv[4]); break;
case 'cron': cloudobj.cron(process.argv[3], process.argv[4], process.argv[5]); break;
case 'ps': cloudobj.ps(function(error, result) { _ps(result); process.exit(0); }); break;
case 'tasks': cloudobj.tasks(function(error, result) { _tasks(result); process.exit(0); }); break;
case 'jobs': cloudobj.jobs(function(error, result) { _jobs(result); process.exit(0); }); break;
}
} else if(launch=='server') { } else if(launch=='server') {
cloudd.start(cmdport); cloudd.start(cmdport);
} else { } else {
Expand Down
75 changes: 30 additions & 45 deletions lib/cloudd.js
Expand Up @@ -54,56 +54,41 @@ exports.submit = function (jobid, config) {
}; };


// ---------- management functions // ---------- management functions
var zmq = require('zmq'); var zmqrpc = require('zmqrpc').Server;
var socket = zmq.socket('rep');
socket.identity = 'cloudd' + process.pid; var CloudManagement = {
ps: function() { return _jobsList.taskQ.runningTasks; },
jobs: function() { return _jobsList.toStringObj(); },
tasks: function() { return _jobsList.taskQ.qAsArray(); },
submit:function(id, config) {
var res = {result:'job submitted:'+id};
try {
exports.submit(id, to.load(config));
} catch(err) {
winston.error(_pkgname+err);
res.result=err.message;
}
return res;
},
cron: function(cronConfig, id, config) {
var res = {result:'cron submitted:'+id};
try {
exports.submitAt(cronConfig, id, to.load(config));
} catch(err) {
winston.error(_pkgname+err);
res.result=err.message;
}
return res;
}
};


/* Start the server */ /* Start the server */
exports.start = function(port) { exports.start = function(port) {
var cmdport = port || 'tcp://127.0.0.1:12345'; var cmdport = port || 'tcp://127.0.0.1:12345';
_jobsList = new job.JobList(); _jobsList = new job.JobList();

var service = new zmqrpc(cmdport, true);
socket.bind(cmdport, function(err) { service.context(CloudManagement);
if (err) throw err; service.run();
winston.info(_pkgname+'bound to management port!'+cmdport);
// command port message
socket.on('message', function(data) {
winston.info(_pkgname+'received cmd> ' + data.toString());
var obj = JSON.parse(data.toString());
switch(obj.command) {
case 'ps': socket.send(JSON.stringify(_jobsList.taskQ.runningTasks));break;
case 'jobs': socket.send(JSON.stringify(_jobsList.toStringObj()));break;
case 'tasks': socket.send(JSON.stringify(_jobsList.taskQ.qAsArray()));break;
case 'submit':
var res = {result:'job submitted:'+obj.id};
try {
exports.submit(obj.id, to.load(obj.config));
} catch(err) {
winston.error(_pkgname+err);
res.result=err.message;
}
socket.send(JSON.stringify(res));
break;
case 'cron':
res = {result:'cron submitted:'+obj.id};
try {
exports.submitAt(obj.cron, obj.id, to.load(obj.config));
} catch(err) {
winston.error(_pkgname+err);
res.result=err.message;
}
socket.send(JSON.stringify(res));
break;
default: socket.send('{"result":"command ['+data.toString()+'] not found"}');break;
};
});
// poll for work
/*
setTimeout(function() {
_jobsList.process();
}, 100);
*/
});
}; };


/* returns current version /* returns current version
Expand Down
4 changes: 2 additions & 2 deletions package.json
Expand Up @@ -2,14 +2,14 @@
"name": "cloudd", "name": "cloudd",
"description": "node based job execution engine", "description": "node based job execution engine",
"homepage": "https://github.com/openmason/cloudd", "homepage": "https://github.com/openmason/cloudd",
"version": "0.2.2", "version": "0.2.3",
"author": "el aras <openmason@gmail.com>", "author": "el aras <openmason@gmail.com>",
"dependencies": { "dependencies": {
"underscore": ">= 1.4.3", "underscore": ">= 1.4.3",
"winston" : ">= 0.6.2", "winston" : ">= 0.6.2",
"sprintf" : ">= 0.1.1", "sprintf" : ">= 0.1.1",
"cron" : ">= 1.0.1", "cron" : ">= 1.0.1",
"zmq" : ">= 2.2.0", "zmqrpc" : ">= 0.0.6",
"to" : ">= 0.2.9", "to" : ">= 0.2.9",
"handy" : ">= 0.0.11" "handy" : ">= 0.0.11"
}, },
Expand Down

0 comments on commit 71c8cd8

Please sign in to comment.