Skip to content

Commit

Permalink
adding support for worker pools.
Browse files Browse the repository at this point in the history
  • Loading branch information
Joachim Kainz committed Oct 3, 2011
1 parent 9945953 commit 3da4b36
Showing 1 changed file with 130 additions and 44 deletions.
174 changes: 130 additions & 44 deletions lib/backgrounder.js
Expand Up @@ -3,27 +3,73 @@ var cp = require("child_process");
var events = require("events"); var events = require("events");
var util = require("util"); var util = require("util");
// //
// Emmit a message or, if a callback was provided, call the callback // Emit a message or, if a callback was provided, call the callback
// //
function emitMessage(child, message) { function emitMessage(child, message) {
var id = message.id; var id = message.id;


if (!id) { if (!id) {
child.emitter.emit(message.type, message.content); child.manager.emitter.emit(message.type, message.content);
return; return;
} }


var callback = child.callbacks[id]; var callback = child.callbacks[id];


if (!callback) { if (!callback) {
console.error("more than one callabck for %s, igoring message %j", id, message); console.error("callback already called for %s, igoring message %j", id, message);
return; return;
} }


delete child.callbacks[id]; delete child.callbacks[id];
callback.apply(null, message.content); callback.apply(null, message.content);
} }
// //
// Send a message to the client (do the actual work: stringify, write to stdin, increment the pending counter)
//
function sendMessage(child, message, callback) {
if (callback) {
var id = ++ child.requestCount;
child.callbacks[id] = callback;
message.id = id;
}

var json = JSON.stringify(message);

child.pending ++;
child.process.stdin.write(json + '\n');
}
//
// Pop the message of the queue
//
function popMessage(pending) {
for(var idx in pending) {
var first = pending[idx];

delete pending[idx];

return first;
}

return undefined;
}
//
// Inform listeneras that the process completed
//
function processCompleted(child, message) {
var manager = child.manager;
manager.emitter.emit(message.type, message.content);
if (0 !== --child.pending) {
return;
}
var next = popMessage(manager.pending);
if (next) {
sendMessage(child, next.message, next.callback);
}
else {
child.manager.emitter.emit('idle', message.content);
}
}
//
// Process a message from the child. Message supported are of type 'console', 'message', and 'idle'. // Process a message from the child. Message supported are of type 'console', 'message', and 'idle'.
// 'idlle' messages decrement the message pending count of the child. // 'idlle' messages decrement the message pending count of the child.
// //
Expand All @@ -35,31 +81,13 @@ function processMessage(child, message) {
emitMessage(child, message); emitMessage(child, message);
} }
else if (message.type === 'completed') { else if (message.type === 'completed') {
child.emitter.emit(message.type, message.content); processCompleted(child, message);
if (0 === --child.pending) {
child.emitter.emit('idle', message.content);
}
} }
else { else {
console.error("unexpected message %s", util.inspect(message)); console.error("unexpected message %s", util.inspect(message));
} }
} }
// //
// Send a message to the client (do the actual work: stringify, write to stdin, increment the pending counter)
//
function sendMessage(child, message, callback) {
if (callback) {
var id = ++ child.requestCount;
child.callbacks[id] = callback;
message.id = id;
}

var json = JSON.stringify(message);

child.pending ++;
child.process.stdin.write(json + '\n');
};
//
// Process the content of self.buffer. Find all message in the buffer and process the messages. // Process the content of self.buffer. Find all message in the buffer and process the messages.
// //
function processBuffer(self) { function processBuffer(self) {
Expand All @@ -73,17 +101,26 @@ function processBuffer(self) {
}); });
} }
// //
// Create a new child process passing the file to be executed by the launched process. // Find the counter for the processes
//
function getCount(config) {
if (!config) {
return 1;
}
return config["children-count"] ? config["children-count"] : 1;
}
//
// Create a new child
// //
function Child(module) { function Child(manager, module, config, id) {
var self = this; var self = this;
this.busy = false; var path = __dirname + '/backgrounder-launcher.js';

this.buffer = ""; this.buffer = "";
this.pending = 0; this.pending = 0;
this.callbacks = []; this.callbacks = [];
this.requestCount = 0; this.requestCount = 0;
var path = __dirname + '/backgrounder-launcher.js'; this.manager = manager;
this.emitter = new events.EventEmitter();
this.process = cp.spawn('node', [path, module]); this.process = cp.spawn('node', [path, module]);
this.process.stdout.on('data', function(data) { this.process.stdout.on('data', function(data) {
self.buffer += data.toString(); self.buffer += data.toString();
Expand All @@ -100,41 +137,90 @@ function Child(module) {
}); });
} }
// //
// Create a new child process passing the file to be executed by the launched process.
//
function Manager(module, config) {
this.emitter = new events.EventEmitter();
this.children = [];
this.pending = [];

var self = this;
var count = getCount(config);

for(var idx=0; idx<count; idx++) {
var child = new Child(this, module, config, idx);

this.children.push(child);
}
}
//
// Allow users to register from messages from the client // Allow users to register from messages from the client
// //
Child.prototype.on = function(event, listener) { Manager.prototype.on = function(event, listener) {
this.emitter.on(event, listener); this.emitter.on(event, listener);
}; };
// //
// Pick a child that does not have any pending requests
//
function getAvailableChild(children) {
for(var idx in children) {
var child = children[idx];

if (!child.pending) {
return child;
}
}
return undefined;
}
//
// Send a user-defined message to the client // Send a user-defined message to the client
// //
Child.prototype.send = function(message, callback) { Manager.prototype.send = function(message, callback) {
sendMessage(this, { var _message = {
"type": "message", "type": "message",
"content": message "content": message
}, callback); };

var child = getAvailableChild(this.children);

if (child) {
sendMessage(child, _message, callback);
}
else {
this.pending.push({
"message" : _message,
"callback" : callback
});
}
}; };
// //
// Tell the client to shut down // Tell the client to shut down
// //
Child.prototype.terminate = function() { Manager.prototype.terminate = function() {
sendMessage(this, { _.each(this.children, function(child){
"type": "terminate" sendMessage(child, {
}); "type": "terminate"
});
})
}; };
// export the spwan method, which creates the client object. // export the spwan method, which creates the client object.
module.exports.spawn = function(module, config, callback) { module.exports.spawn = function(module, config, callback) {
var child = new Child(module); var manager = new Manager(module, config);


if (!config) { if (!config) {
return child; return manager;
} }
sendMessage(child, { var counter = manager.children.length;
"type": "config", _.each(manager.children, function(child){
"content": config sendMessage(child, {
}, function(){ "type": "config",
callback(child); "content": config
}); }, function(){
if (-- counter === 0 && callback) {
callback(manager);
}
});
})


return child; return manager;
}; };

0 comments on commit 3da4b36

Please sign in to comment.