Permalink
Browse files

child_process.spawnNode

For making easy worker processes.
  • Loading branch information...
1 parent 205b9be commit 9e26dab150e15cdbc7fbec76dcadeafed7d85646 @ry ry committed May 11, 2011
@@ -179,6 +179,39 @@ amount of data allowed on stdout or stderr - if this value is exceeded then
the child process is killed.
+### child_process.spawnNode(modulePath, arguments, options)
+
+This is a special case of the `spawn()` functionality for spawning Node
+processes. In addition to having all the methods in a normal ChildProcess
+instance, the returned object, has a communication channel built-in. The
+channel is written to with `child.send(message)` and messages are recieved
+by a `'message'` event on the child.
+
+For example:
+
+ var n = spawnNode(__dirname + '/sub.js');
+
+ n.on('message', function(m) {
+ console.log('PARENT got message:', m);
+ });
+
+ n.send({ hello: 'world' });
+
+And then the child script, `'sub.js'` would might look like this:
+
+ process.on('message', function(m) {
+ console.log('CHILD got message:', m);
+ });
+
+ process.send({ foo: 'bar' });
+
+In the child the `process` object will have a `send()` method, and `process`
+will emit objects each time it receives a message on its channel.
+
+By default the spawned Node process will have the stdin, stdout, stderr associated
+with the parent's. This can be overridden by using the `customFds` option.
+
+
### child.kill(signal='SIGTERM')
Send a signal to the child process. If no argument is given, the process will
View
@@ -32,6 +32,60 @@ var spawn = exports.spawn = function(path, args /*, options, customFds */) {
return child;
};
+
+function setupChannel(target, fd) {
+ target._channel = new Stream(fd);
+ target._channel.writable = true;
+ target._channel.readable = true;
+
+ target._channel.resume();
+ target._channel.setEncoding('ascii');
+
+ var buffer = '';
+ target._channel.on('data', function(d) {
+ buffer += d;
+ var i;
+ while ((i = buffer.indexOf('\n')) >= 0) {
+ var json = buffer.slice(0, i);
+ buffer = buffer.slice(i + 1);
+ var m = JSON.parse(json);
+ target.emit('message', m);
+ }
+ });
+
+ target.send = function(m) {
+ target._channel.write(JSON.stringify(m) + '\n');
+ };
+}
+
+
+exports.spawnNode = function(modulePath, args, options) {
+ if (!options) options = {};
+ options.wantChannel = true;
+
+ if (!args) args = [];
+ args.unshift(modulePath);
+
+ // Unless they gave up customFds, just use the parent process
+ if (!options.customFds) options.customFds = [0, 1, 2];
+
+ var child = spawn(process.execPath, args, options);
+
+ setupChannel(child, child.fds[3]);
+
+ child.on('exit', function() {
+ child._channel.destroy();
+ });
+
+ return child;
+};
+
+
+exports._spawnNodeChild = function(fd) {
+ setupChannel(process, fd);
+};
+
+
exports.exec = function(command /*, options, callback */) {
var _slice = Array.prototype.slice;
var args = ['/bin/sh', ['-c', command]].concat(_slice.call(arguments, 1));
@@ -240,6 +294,12 @@ ChildProcess.prototype.spawn = function(path, args, options, customFds) {
envPairs.push(key + '=' + env[key]);
}
+ if (options && options.wantChannel) {
+ // The FILLMEIN will be replaced in C land with an integer!
+ // AWFUL! :D
+ envPairs.push('NODE_CHANNEL_FD=FILLMEIN');
+ }
+
var fds = this._internal.spawn(path,
args,
cwd,
View
@@ -38,6 +38,8 @@
startup.processKillAndExit();
startup.processSignalHandlers();
+ startup.processChannel();
+
startup.removedMethods();
startup.resolveArgv0();
@@ -307,6 +309,19 @@
};
};
+
+ startup.processChannel = function() {
+ // If we were spawned with env NODE_CHANNEL_FD then load that up and
+ // start parsing data from that stream.
+ if (process.env.NODE_CHANNEL_FD) {
+ var fd = parseInt(process.env.NODE_CHANNEL_FD);
+ assert(fd >= 0);
+ var cp = NativeModule.require('child_process');
+ cp._spawnNodeChild(fd);
+ assert(process.send);
+ }
+ }
+
startup._removedProcessMethods = {
'assert': 'process.assert() use require("assert").ok() instead',
'debug': 'process.debug() use console.error() instead',
View
@@ -35,6 +35,9 @@
#include <sys/wait.h>
#endif
+#include <sys/socket.h> /* socketpair */
+#include <sys/un.h>
+
# ifdef __APPLE__
# include <crt_externs.h>
# define environ (*_NSGetEnviron())
@@ -153,7 +156,7 @@ Handle<Value> ChildProcess::Spawn(const Arguments& args) {
// Copy fourth argument, args[3], into a c-string array called env.
Local<Array> env_handle = Local<Array>::Cast(args[3]);
int envc = env_handle->Length();
- char **env = new char*[envc+1]; // heap allocated to detect errors
+ char **env = new char*[envc + 1]; // heap allocated to detect errors
env[envc] = NULL;
for (int i = 0; i < envc; i++) {
String::Utf8Value pair(env_handle->Get(Integer::New(i))->ToString());
@@ -206,7 +209,7 @@ Handle<Value> ChildProcess::Spawn(const Arguments& args) {
String::New("setgid argument must be a number or a string")));
}
-
+ int channel_fd = -1;
int r = child->Spawn(argv[0],
argv,
@@ -218,7 +221,8 @@ Handle<Value> ChildProcess::Spawn(const Arguments& args) {
custom_uid,
custom_uname,
custom_gid,
- custom_gname);
+ custom_gname,
+ &channel_fd);
if (custom_uname != NULL) free(custom_uname);
if (custom_gname != NULL) free(custom_gname);
@@ -235,7 +239,8 @@ Handle<Value> ChildProcess::Spawn(const Arguments& args) {
return ThrowException(Exception::Error(String::New("Error spawning")));
}
- Local<Array> a = Array::New(3);
+
+ Local<Array> a = Array::New(channel_fd >= 0 ? 4 : 3);
assert(fds[0] >= 0);
a->Set(0, Integer::New(fds[0])); // stdin
@@ -244,6 +249,10 @@ Handle<Value> ChildProcess::Spawn(const Arguments& args) {
assert(fds[2] >= 0);
a->Set(2, Integer::New(fds[2])); // stderr
+ if (channel_fd >= 0) {
+ a->Set(3, Integer::New(channel_fd));
+ }
+
return scope.Close(a);
}
@@ -291,6 +300,8 @@ void ChildProcess::Stop() {
// Note that args[0] must be the same as the "file" param. This is an
// execvp() requirement.
//
+// TODO: The arguments are rediculously long. Needs to be put into a struct.
+//
int ChildProcess::Spawn(const char *file,
char *const args[],
const char *cwd,
@@ -301,7 +312,8 @@ int ChildProcess::Spawn(const char *file,
int custom_uid,
char *custom_uname,
int custom_gid,
- char *custom_gname) {
+ char *custom_gname,
+ int* channel) {
HandleScope scope;
assert(pid_ == -1);
assert(!ev_is_active(&child_watcher_));
@@ -332,11 +344,37 @@ int ChildProcess::Spawn(const char *file,
SetCloseOnExec(stderr_pipe[1]);
}
+
+ // The channel will be used by spawnNode() for a little JSON channel.
+ // The pointer is used to pass one end of the socket pair back to the
+ // parent.
+ // channel_fds[0] is for the parent
+ // channel_fds[1] is for the child
+ int channel_fds[2] = { -1, -1 };
+
+#define NODE_CHANNEL_FD "NODE_CHANNEL_FD"
+
+ for (int i = 0; env[i]; i++) {
+ if (!strncmp(env[i], NODE_CHANNEL_FD, sizeof NODE_CHANNEL_FD - 1)) {
+ if (socketpair(AF_UNIX, SOCK_STREAM, 0, channel_fds)) {
+ perror("socketpair()");
+ return -1;
+ }
+
+ assert(channel_fds[0] >= 0 && channel_fds[1] >= 0);
+
+ SetNonBlocking(channel_fds[0]);
+ SetNonBlocking(channel_fds[1]);
+ // Write over the FILLMEIN :D
+ sprintf(env[i], NODE_CHANNEL_FD "=%d", channel_fds[1]);
+ }
+ }
+
// Save environ in the case that we get it clobbered
// by the child process.
char **save_our_env = environ;
- switch (pid_ = vfork()) {
+ switch (pid_ = fork()) {
case -1: // Error.
Stop();
return -4;
@@ -429,7 +467,11 @@ int ChildProcess::Spawn(const char *file,
_exit(127);
}
-
+ // Close the parent's end of the channel.
+ if (channel_fds[0] >= 0) {
+ close(channel_fds[0]);
+ channel_fds[0] = -1;
+ }
environ = env;
@@ -472,6 +514,17 @@ int ChildProcess::Spawn(const char *file,
stdio_fds[2] = custom_fds[2];
}
+ // Close the child's end of the channel.
+ if (channel_fds[1] >= 0) {
+ close(channel_fds[1]);
+ channel_fds[1] = -1;
+ assert(channel_fds[0] >= 0);
+ assert(channel);
+ *channel = channel_fds[0];
+ } else {
+ *channel = -1;
+ }
+
return 0;
}
View
@@ -89,7 +89,8 @@ class ChildProcess : ObjectWrap {
int custom_uid,
char *custom_uname,
int custom_gid,
- char *custom_gname);
+ char *custom_gname,
+ int* channel);
// Simple syscall wrapper. Does not disable the watcher. onexit will be
// called still.
@@ -0,0 +1,9 @@
+var assert = require('assert');
+
+console.log("NODE_CHANNEL_FD", process.env.NODE_CHANNEL_FD);
+assert.ok(process.env.NODE_CHANNEL_FD);
+
+var fd = parseInt(process.env.NODE_CHANNEL_FD);
+assert.ok(fd >= 0);
+
+process.exit(0);
@@ -0,0 +1,10 @@
+var assert = require('assert');
+
+process.on('message', function(m) {
+ console.log('CHILD got message:', m);
+ assert.ok(m.hello);
+ // Note that we have to force exit.
+ process.exit();
+});
+
+process.send({ foo: 'bar' });
@@ -0,0 +1,25 @@
+var assert = require('assert');
+var spawn = require('child_process').spawn;
+var common = require('../common');
+
+var sub = common.fixturesDir + '/child-process-channel.js';
+
+var child = spawn(process.execPath, [ sub ], {
+ customFds: [0, 1, 2],
+ wantChannel: true
+});
+
+console.log("fds", child.fds);
+
+assert.ok(child.fds.length == 4);
+assert.ok(child.fds[3] >= 0);
+
+var childExitCode = -1;
+
+child.on('exit', function(code) {
+ childExitCode = code;
+});
+
+process.on('exit', function() {
+ assert.ok(childExitCode == 0);
+});
@@ -0,0 +1,24 @@
+var assert = require('assert');
+var common = require('../common');
+var spawnNode = require('child_process').spawnNode;
+
+var n = spawnNode(common.fixturesDir + '/child-process-spawn-node.js');
+
+var messageCount = 0;
+
+n.on('message', function(m) {
+ console.log('PARENT got message:', m);
+ assert.ok(m.foo);
+ messageCount++;
+});
+
+n.send({ hello: 'world' });
+
+var childExitCode = -1;
+n.on('exit', function(c) {
+ childExitCode = c;
+});
+
+process.on('exit', function() {
+ assert.ok(childExitCode == 0);
+});

19 comments on commit 9e26dab

ry commented on 9e26dab May 11, 2011

Remember these are big fat processes. Assume 30ms startup time and minimum 10mb memory for each new Node.

You don't want to be doing this all the time.

The more I think about this, the more isolated shared-nothing threads seem reasonable.

Remember these are big fat processes. Assume 30ms startup time and minimum 10mb memory for each new Node. You don't want to be doing this all the time.

Would be a good idea to add this as a note at the bottom of the docs?

tj replied May 11, 2011

woot

agreed isaacs, combined with the fact you can 'send data' across threads without having to serialize it, would be alot faster.

tj replied May 11, 2011

for some use-cases processes are still a better choice, cluster being the obvious example

@aikar You'd still need to serialize it, or at the very lease freeze it and be sure to prevent any getter/setter monkeybusiness, which is also not free. If you don't do that, you're back in a shared-state world. Doesn't v8 have some new snazzy thing for this?

tj replied May 11, 2011

@isaacs the isolates? might be worth looking into

ry replied May 11, 2011

@isaacs, @aikar - why?

concerns about serialization overhead can be redirected to /dev/null

@ry Because then it would reduce the process overhead.

(For the record, I also don't believe that serialization overhead is a real problem, whereas shared-memory concurrency introduces many very real problems.)

tj replied May 11, 2011

IMO people should offload heavy tasks to a job queue anyway so the spawn time doesn't really matter much

ry replied May 11, 2011

process overhead? the 30ms startup and 10mb memory I mentioned is starting up V8 (same for a V8 isolate). fork/exec happens in zero amount of time (relative to V8 time scale). You save nothing by doing this in a thread.

http://bulk.fefe.de/scalability/ (very old - but even then fork was ~400 microseconds)

Ah, I see. I was confused by your comment about them being "fat processes", but yes, that makes sense.

@isaacs
I meant that you could do a direct memcpy of the data to the new context (if thats possible)

but if the majority of everything is in V8, then I guess threads are almost useless.

ry, is the 30ms the time to create a new v8 context (which would apply to vm.runInNewContext)? or some more full copy of v8?

if runInNewContext also suffers a 10mb/30ms creation time thats an interesting fact to know.

is there a reason that encoding is ascii? shouldn't it be UTF8 for JSON? also, would it be a big deal to add methods for sending binary buffers between processes too? i'd imagine this would be useful for offloading parsing/processing of binary data. maybe something like:

target._channel.on('data', function(d, encoding) {
    switch(encoding) {
        case "binary":
            target.emit("message", d, "binary");
            break;
        default:
            // process json as above
            break;
    }
}

target.send = function(m, encoding) {
    switch(encoding) {
        case "binary":
            target._channel.write(m);
            break;
        default:
            target._channel.write(JSON.stringify(m) + '\n');
            break;
    }
}

ry replied May 17, 2011

@billywhizz JSON.stringify only returns ascii. no binary. people can open alternate channels for that.

@ry No, it can return UTF characters.

> console.error(JSON.stringify("\ufaff"))
"﫿"

@ry Or, for one that works in a browser:

> console.error(JSON.stringify("δ"))
"δ"

isaac, it looks like ry has already made the change to use utf8 encoding: 0271b78

Please sign in to comment.