Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

adding support for restarting child processes

  • Loading branch information...
commit 4a5425b8dd3e80d5f68ed3afb8a9133841466b32 1 parent 84f1a49
Joachim Kainz authored
View
25 examples/restarting-multi-master.js
@@ -0,0 +1,25 @@
+var backgrounder = require("../lib/backgrounder");
+var worker = backgrounder.spawn(__dirname + "/restarting-multi-worker.js", {
+ "children-count" : 5,
+ "auto-restart" : true
+}, function(){
+ console.log("Master: Config finished,");
+ var counter = 0;
+
+ var sender = function() {
+ for(var idx=0; idx<5; idx++) {
+ worker.send({}, function(arg1, arg2, arg3, arg4, arg5) {
+ console.log("Master: client called the callback with %s arguments:",
+ arguments.length, arg1, arg2, arg3, arg4, arg5);
+ if (++counter === 100) {
+ worker.terminate();
+ } else {
+ sender();
+ }
+
+ });
+ }
+ };
+ sender();
+});
+
View
35 examples/restarting-multi-worker.js
@@ -0,0 +1,35 @@
+var http = require('http');
+var id = process.pid;
+var loaded = 0;
+
+/**
+ * There really is no configuration here, but it is essential that we are calling the callback functtion.
+ * Failing to call the callback function will prevent the master from confinuing
+ */
+process.on('config', function(message, callback) {
+ callback();
+});
+
+process.on('message', function(message, callback) {
+ var options = {
+ host: 'jolira.github.com',
+ port: 80,
+ path: '/backgrounder/'
+ };
+
+ http.get(options, function(res) {
+ callback(id, "status-code:", res.statusCode, "loaded", ++loaded);
+
+ if (loaded > 2) {
+ process.exit(0);
+ }
+ }).on('error', function(e) {
+ callback(terminate, "error:", e);
+ });
+});
+
+process.on('terminate', function(message, callback) {
+ console.log("Worker: %s Loaded", id, loaded);
+});
+
+console.log('Worker: Started %s!', id);
View
217 lib/backgrounder.js
@@ -3,6 +3,28 @@ var cp = require("child_process");
var events = require("events");
var util = require("util");
//
+// Pick a child that does not have any pending requests
+//
+function getAvailableChild(manager) {
+ for(var idx = manager.lastUsed+1; idx < manager.children.length; idx++) {
+ var child = manager.children[idx];
+
+ if (!child.pending) {
+ return manager.lastUsed = idx;
+ }
+ }
+
+ for(var idx = .0; idx < manager.children.length; idx++) {
+ var child = manager.children[idx];
+
+ if (!child.pending) {
+ return manager.lastUsed = idx;
+ }
+ }
+
+ return manager.lastUsed = -1;
+}
+//
// Emit a message or, if a callback was provided, call the callback
//
function emitMessage(child, message) {
@@ -24,23 +46,17 @@ function emitMessage(child, message) {
callback.apply(null, message.content);
}
//
-// Remove a child
+// Send a message to the client (do the actual work: stringify, write to stdin, increment the pending counter)
//
-function removeChild(child) {
- var children = child.manager.children;
+function sendMessage(manager, id, message, callback) {
+ console.log("send message %s", id);
+ var child = manager.children[id];
- for(var idx in children) {
- if (child === children[idx]) {
- delete children[idx];
- break;
+ if (!child) {
+ console.error("child is undefined", id, message);
+ return;
}
- }
-}
-//
-// Send a message to the client (do the actual work: stringify, write to stdin, increment the pending counter)
-//
-function sendMessage(child, message, callback) {
if (callback) {
var id = ++ child.requestCount;
child.callbacks[id] = callback;
@@ -55,40 +71,56 @@ function sendMessage(child, message, callback) {
child.process.stdin.write(json + '\n');
}
catch (e) {
- removeChild(child);
- throw e;
+ child.restart();
+ sendMessage(manager, id, message, callback);
}
}
//
-// Pop the message of the queue
+// Return the first index on the queue
//
-function popMessage(pending) {
- for(var idx in pending) {
- var first = pending[idx];
+function firstPendingMessage(manager) {
+ for(var idx in manager.pending) {
+ return idx;
+ }
- delete pending[idx];
+ manager.pending = [];
+
+ return undefined;
+}
+//
+// If there is a pending message, send it with the next available child
+//
+function sendPendingMessage(manager, content) {
+ var idx = firstPendingMessage(manager);
- return first;
+ if (!idx) {
+ manager.emitter.emit('idle', content);
+ return;
}
- return undefined;
+ var childIdx = getAvailableChild(manager);
+
+ if (childIdx == -1) {
+ return; // no client available
+ }
+
+ var pending = manager.pending[idx];
+ delete manager.pending[idx];
+ sendMessage(manager, childIdx, pending.message, pending.callback);
}
//
// Inform listeneras that the process completed
//
function processCompleted(child, message) {
+ var id = child.id;
var manager = child.manager;
manager.emitter.emit(message.type, message.content);
if (0 !== --child.pending) {
return;
}
- var next = popMessage(manager.pending);
- if (next) {
- sendMessage(child, next.message, next.callback);
- }
- else {
- child.manager.emitter.emit('idle', message.content);
- }
+ process.nextTick(function() {
+ sendPendingMessage(child.manager, message.content);
+ });
}
//
// Process a message from the child. Message supported are of type 'console', 'message', and 'idle'.
@@ -133,7 +165,7 @@ function getCount(config) {
//
// Create a new child
//
-function Child(manager, module, config, id) {
+function Child(manager, module) {
var self = this;
var path = __dirname + '/backgrounder-launcher.js';
@@ -156,60 +188,77 @@ function Child(manager, module, config, id) {
}
console.error(_data);
});
- this.process.on('exit', function(code, signal) {
- removeChild(self);
- });
}
//
-// Create a new child process passing the file to be executed by the launched process.
+// start or restart a child
//
-function Manager(module, config) {
- this.emitter = new events.EventEmitter();
- this.children = [];
- this.pending = [];
- this.lastUsed = -1;
-
- var self = this;
+function startChild(manager, module, config, idx, callback) {
+ var starter = function(callback) {
+ if (this) {
+ if (manager.terminated) {
+ if (manager.children[idx] == this) {
+ delete manager.children[idx];
+ }
+ return;
+ }
+ if (manager.children[idx] && manager.children[idx] != this) {
+ console.log("already restarted");
+ return;
+ }
+ }
+ var child = new Child(manager, module);
+ child.restart = config["auto-restart"] ? starter : function() {
+ if (manager.children[idx] == child) {
+ delete manager.children[idx];
+ }
+ };
+ console.log("restarted %s", idx);
+ manager.children[idx] = child;
+ child.process.stdin.on('close', function(code, signal) {
+ delete child.process;
+ console.log("stdin %s closed!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!", idx);
+ child.restart();
+ });
+ if (config) {
+ sendMessage(manager, idx, {
+ "type": "config",
+ "content": config
+ }, function(){
+ if (callback) {
+ callback.apply(null, arguments);
+ }
+ });
+ }
+ return child;
+ };
+ starter(callback);
+}
+//
+// Start a manager
+//
+function startManager(module, manager, config, callback) {
var count = getCount(config);
+ manager.children = [];
for(var idx=0; idx<count; idx++) {
- var child = new Child(this, module, config, idx);
-
- this.children.push(child);
+ startChild(manager, module, config || {}, idx, callback);
}
}
//
+// Create a new child process passing the file to be executed by the launched process.
+//
+function Manager(module) {
+ this.emitter = new events.EventEmitter();
+ this.pending = [];
+ this.lastUsed = -1;
+}
+//
// Allow users to register from messages from the client
//
Manager.prototype.on = function(event, listener) {
this.emitter.on(event, listener);
};
//
-// Pick a child that does not have any pending requests
-//
-function getAvailableChild(manager) {
- for(var idx = manager.lastUsed+1; idx < manager.children.length; idx++) {
- var child = manager.children[idx];
-
- if (!child.pending) {
- manager.lastUsed = idx;
- return child;
- }
- }
-
- for(var idx = .0; idx < manager.children.length; idx++) {
- var child = manager.children[idx];
-
- if (!child.pending) {
- manager.lastUsed = idx;
- return child;
- }
- }
-
- manager.lastUsed = -1;
- return undefined;
-}
-//
// Send a user-defined message to the client
//
Manager.prototype.send = function(message, callback) {
@@ -218,10 +267,10 @@ Manager.prototype.send = function(message, callback) {
"content": message
};
- var child = getAvailableChild(this);
+ var idx = getAvailableChild(this);
- if (child) {
- sendMessage(child, _message, callback);
+ if (idx != -1) {
+ sendMessage(this, idx, _message, callback);
}
else {
this.pending.push({
@@ -234,8 +283,10 @@ Manager.prototype.send = function(message, callback) {
// Tell the client to shut down
//
Manager.prototype.terminate = function() {
- _.each(this.children, function(child){
- sendMessage(child, {
+ var self = this;
+ this.terminated = true;
+ _.each(this.children, function(child, idx){
+ sendMessage(self, idx, {
"type": "terminate"
});
})
@@ -243,21 +294,15 @@ Manager.prototype.terminate = function() {
// export the spwan method, which creates the client object.
module.exports.spawn = function(module, config, callback) {
var manager = new Manager(module, config);
+ var counter = 0;
- if (!config) {
- return manager;
- }
- var counter = manager.children.length;
- _.each(manager.children, function(child){
- sendMessage(child, {
- "type": "config",
- "content": config
- }, function(){
- if (-- counter === 0 && callback) {
+ startManager(module, manager, config, function(child){
+ if (++ counter >= manager.children.length) {
+ if (callback) {
callback(manager);
}
- });
- })
+ }
+ });
return manager;
};
Please sign in to comment.
Something went wrong with that request. Please try again.