Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files

support for sharing streams accross isolates

  • Loading branch information...
Igor Zinkovsky
Igor Zinkovsky committed Jan 20, 2012
1 parent 33b7fc2 commit db3c4efd1d95e6c7fc47b9c07216beb7029cf7bc
Showing with 165 additions and 48 deletions.
  1. +16 −26 lib/child_process.js
  2. +23 −0 lib/net.js
  3. +20 −5 src/node.js
  4. +106 −17 src/node_isolate.cc
@@ -68,29 +68,13 @@ function mergeOptions(target, overrides) {


function setupChannel(target, channel) {
var isWindows = process.platform === 'win32';
target._channel = channel;

var jsonBuffer = '';

if (isWindows) {
var setSimultaneousAccepts = function(handle) {
var simultaneousAccepts = (process.env.NODE_MANY_ACCEPTS &&
process.env.NODE_MANY_ACCEPTS != '0') ? true : false;

if (handle._simultaneousAccepts != simultaneousAccepts) {
handle.setSimultaneousAccepts(simultaneousAccepts);
handle._simultaneousAccepts = simultaneousAccepts;
}
}
}

channel.buffering = false;
channel.onread = function(pool, offset, length, recvHandle) {
if (recvHandle && setSimultaneousAccepts) {
// Update simultaneous accepts on Windows
setSimultaneousAccepts(recvHandle);
}
// Update simultaneous accepts on Windows
net._setSimultaneousAccepts(recvHandle);

if (pool) {
jsonBuffer += pool.toString('ascii', offset, offset + length);
@@ -140,10 +124,8 @@ function setupChannel(target, channel) {

var buffer = Buffer(JSON.stringify(message) + '\n');

if (sendHandle && setSimultaneousAccepts) {
// Update simultaneous accepts on Windows
setSimultaneousAccepts(sendHandle);
}
// Update simultaneous accepts on Windows
net._setSimultaneousAccepts(sendHandle);

var writeReq = channel.write(buffer, 0, buffer.length, sendHandle);

@@ -582,9 +564,13 @@ Isolate.prototype.spawn = function(options) {
self._handle = isolates.create(options.args, options.options);
if (!self._handle) throw new Error('Cannot create isolate.');

self._handle.onmessage = function(msg) {
self._handle.onmessage = function(msg, recvHandle) {
msg = JSON.parse('' + msg);
self.emit('message', msg);

// Update simultaneous accepts on Windows
net._setSimultaneousAccepts(recvHandle);

self.emit('message', msg, recvHandle);
};

self._handle.onexit = function() {
@@ -600,10 +586,14 @@ Isolate.prototype.kill = function(sig) {
};


Isolate.prototype.send = function(msg) {
Isolate.prototype.send = function(msg, sendHandle) {
if (typeof msg === 'undefined') throw new TypeError('Bad argument.');
if (!this._handle) throw new Error('Isolate not running.');
msg = JSON.stringify(msg);
msg = new Buffer(msg);
return this._handle.send(msg);

// Update simultaneous accepts on Windows
net._setSimultaneousAccepts(sendHandle);

return this._handle.send(msg, sendHandle);
};
@@ -942,3 +942,26 @@ exports.isIPv4 = function(input) {
exports.isIPv6 = function(input) {
return exports.isIP(input) === 6;
};


if (process.platform === 'win32') {
var simultaneousAccepts;

exports._setSimultaneousAccepts = function(handle) {
if (typeof handle === 'undefined') {
return;
}

if (typeof simultaneousAccepts === 'undefined') {
simultaneousAccepts = (process.env.NODE_MANY_ACCEPTS &&
process.env.NODE_MANY_ACCEPTS != '0') ? true : false;
}

if (handle._simultaneousAccepts != simultaneousAccepts) {
handle.setSimultaneousAccepts(simultaneousAccepts);
handle._simultaneousAccepts = simultaneousAccepts;
}
}
} else {
exports._setSimultaneousAccepts = function(handle) {}
}
@@ -123,17 +123,27 @@

if (process.tid === 1) return;

var net = NativeModule.require('net');

// isolate initialization
process.send = function(msg) {
process.send = function(msg, sendHandle) {
if (typeof msg === 'undefined') throw new TypeError('Bad argument.');
msg = JSON.stringify(msg);
msg = new Buffer(msg);
return process._send(msg);

// Update simultaneous accepts on Windows
net._setSimultaneousAccepts(sendHandle);

return process._send(msg, sendHandle);
};

process._onmessage = function(msg) {
process._onmessage = function(msg, recvHandle) {
msg = JSON.parse('' + msg);
process.emit('message', msg);

// Update simultaneous accepts on Windows
net._setSimultaneousAccepts(recvHandle);

process.emit('message', msg, recvHandle);
};

process.exit = process._exit;
@@ -439,10 +449,15 @@
// Load tcp_wrap to avoid situation where we might immediately receive
// a message.
// FIXME is this really necessary?
process.binding('tcp_wrap')
process.binding('tcp_wrap');

cp._forkChild();
assert(process.send);
} else if (process.tid !== 1) {
// Load tcp_wrap to avoid situation where we might immediately receive
// a message.
// FIXME is this really necessary?
process.binding('tcp_wrap');
}
}

@@ -26,6 +26,7 @@
#include <node_isolate.h>
#include <node_internals.h>
#include <node_object_wrap.h>
#include <tcp_wrap.h>

#include <stdlib.h>
#include <string.h>
@@ -34,6 +35,8 @@

#define isolate_debugger_constructor NODE_VAR(isolate_debugger_constructor)

#define ISOLATEMESSAGE_SHARED_STREAM 0x0001


namespace node {

@@ -166,23 +169,35 @@ class Channel {


struct IsolateMessage {
size_t size_;
char* data_;
int flags;
struct {
size_t size_;
char* buffer_;
} data_;
uv_stream_info_t shared_stream_info_;

IsolateMessage(const char* buffer, size_t size,
uv_stream_info_t* shared_stream_info) {
flags = 0;

IsolateMessage(const char* data, size_t size) {
// make a copy for now
size_ = size;
data_ = new char[size];
memcpy(data_, data, size);
data_.size_ = size;
data_.buffer_ = new char[size];
memcpy(data_.buffer_, buffer, size);

if (shared_stream_info) {
flags |= ISOLATEMESSAGE_SHARED_STREAM;
shared_stream_info_ = *shared_stream_info;
}
}

~IsolateMessage() {
delete[] data_;
delete[] data_.buffer_;
}

static void Free(char* data, void* arg) {
IsolateMessage* msg = static_cast<IsolateMessage*>(arg);
assert(data == msg->data_);
assert(data == msg->data_.buffer_);
delete msg;
}
};
@@ -208,7 +223,23 @@ Handle<Value> Isolate::Send(const Arguments& args) {
const char* data = Buffer::Data(obj);
size_t size = Buffer::Length(obj);

IsolateMessage* msg = new IsolateMessage(data, size);
IsolateMessage* msg;

if (args[1]->IsObject()) {
uv_stream_info_t stream_info;

Local<Object> send_stream_obj = args[1]->ToObject();
assert(send_stream_obj->InternalFieldCount() > 0);
StreamWrap* send_stream_wrap = static_cast<StreamWrap*>(
send_stream_obj->GetPointerFromInternalField(0));
uv_stream_t* send_stream = send_stream_wrap->GetStream();
int r = uv_export(send_stream, &stream_info);
assert(r == 0);
msg = new IsolateMessage(data, size, &stream_info);
} else {
msg = new IsolateMessage(data, size, NULL);
}

isolate->send_channel_->Send(msg);

return Undefined();
@@ -231,9 +262,31 @@ void Isolate::OnMessage(IsolateMessage* msg, void* arg) {
Isolate* self = static_cast<Isolate*>(arg);
NODE_ISOLATE_CHECK(self);

Buffer* buf = Buffer::New(msg->data_, msg->size_, IsolateMessage::Free, msg);
Handle<Value> argv[] = { buf->handle_ };
MakeCallback(self->globals_.process, "_onmessage", ARRAY_SIZE(argv), argv);
Buffer* buf = Buffer::New(msg->data_.buffer_, msg->data_.size_,
IsolateMessage::Free, msg);

int argc = 1;
Handle<Value> argv[2] = {
buf->handle_
};

if (msg->flags & ISOLATEMESSAGE_SHARED_STREAM) {
// Instantiate the client javascript object and handle.
Local<Object> pending_obj = TCPWrap::Instantiate();

// Unwrap the client javascript object.
assert(pending_obj->InternalFieldCount() > 0);
TCPWrap* pending_wrap =
static_cast<TCPWrap*>(pending_obj->GetPointerFromInternalField(0));

int r = uv_import(pending_wrap->GetStream(), &msg->shared_stream_info_);
assert(r == 0);

argv[1] = pending_obj;
argc++;
}

MakeCallback(self->globals_.process, "_onmessage", argc, argv);
}


@@ -442,9 +495,30 @@ struct IsolateWrap: public ObjectWrap {
NODE_ISOLATE_CHECK(parent_isolate_);
HandleScope scope;
Buffer* buf = Buffer::New(
msg->data_, msg->size_, IsolateMessage::Free, msg);
Handle<Value> argv[] = { buf->handle_ };
MakeCallback(handle_, "onmessage", ARRAY_SIZE(argv), argv);
msg->data_.buffer_, msg->data_.size_, IsolateMessage::Free, msg);

int argc = 1;
Handle<Value> argv[2] = {
buf->handle_
};

if (msg->flags & ISOLATEMESSAGE_SHARED_STREAM) {
// Instantiate the client javascript object and handle.
Local<Object> pending_obj = TCPWrap::Instantiate();

// Unwrap the client javascript object.
assert(pending_obj->InternalFieldCount() > 0);
TCPWrap* pending_wrap =
static_cast<TCPWrap*>(pending_obj->GetPointerFromInternalField(0));

int r = uv_import(pending_wrap->GetStream(), &msg->shared_stream_info_);
assert(r == 0);

argv[1] = pending_obj;
argc++;
}

MakeCallback(handle_, "onmessage", argc, argv);
}

// TODO merge with Isolate::Send(), it's almost identical
@@ -457,9 +531,24 @@ struct IsolateWrap: public ObjectWrap {
const char* data = Buffer::Data(obj);
size_t size = Buffer::Length(obj);

IsolateMessage* msg = new IsolateMessage(data, size);
self->send_channel_->Send(msg);
IsolateMessage* msg;

if (args[1]->IsObject()) {
uv_stream_info_t stream_info;

Local<Object> send_stream_obj = args[1]->ToObject();
assert(send_stream_obj->InternalFieldCount() > 0);
StreamWrap* send_stream_wrap = static_cast<StreamWrap*>(
send_stream_obj->GetPointerFromInternalField(0));
uv_stream_t* send_stream = send_stream_wrap->GetStream();
int r = uv_export(send_stream, &stream_info);
assert(r == 0);
msg = new IsolateMessage(data, size, &stream_info);
} else {
msg = new IsolateMessage(data, size, NULL);
}

self->send_channel_->Send(msg);
return Undefined();
}

3 comments on commit db3c4ef

@AndreasMadsen

This comment has been minimized.

Copy link
Member

replied Feb 1, 2012

I'm not sure about the difference between UDP and TCP handlers on windows, but it seams wrong to put setSimultaneousAccepts intro the net module if it is also required when sending UDP handlers.

I know UDP is not yet supported but will be in cluster anyway.

@bnoordhuis

This comment has been minimized.

Copy link
Member

replied Feb 1, 2012

it seams wrong to put setSimultaneousAccepts intro the net module if it is also required when sending UDP handlers.

It's because accept() is for TCP sockets only.

@AndreasMadsen

This comment has been minimized.

Copy link
Member

replied Feb 1, 2012

I thought so, thanks for making this clear :)

Please sign in to comment.
You can’t perform that action at this time.