Skip to content
This repository has been archived by the owner on Apr 22, 2023. It is now read-only.

Commit

Permalink
Optimize writing strings with Socket.write
Browse files Browse the repository at this point in the history
  • Loading branch information
piscisaureus committed May 9, 2012
1 parent 4ddafbd commit 0e57aaf
Show file tree
Hide file tree
Showing 8 changed files with 260 additions and 22 deletions.
4 changes: 2 additions & 2 deletions lib/child_process.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,12 @@ function setupChannel(target, channel) {
return false;
}

var buffer = Buffer(JSON.stringify(message) + '\n');
var string = JSON.stringify(message) + '\n';

// Update simultaneous accepts on Windows
net._setSimultaneousAccepts(sendHandle);

var writeReq = channel.write(buffer, 0, buffer.length, sendHandle);
var writeReq = channel.writeUtf8String(string, sendHandle);

if (!writeReq) {
var er = errnoException(errno, 'write', 'cannot write to IPC channel.');
Expand Down
48 changes: 43 additions & 5 deletions lib/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -477,9 +477,22 @@ Socket.prototype.write = function(data, arg1, arg2) {
}
}

// Change strings to buffers. SLOW
if (typeof data === 'string') {
data = new Buffer(data, encoding);
encoding = (encoding || 'utf8').toLowerCase();
switch (encoding) {
case 'utf8':
case 'utf-8':
case 'ascii':
case 'ucs2':
case 'ucs-2':
case 'utf16le':
case 'utf-16le':
// This encoding can be handled in the binding layer.
break;

default:
data = new Buffer(data, encoding);
}
} else if (!Buffer.isBuffer(data)) {
throw new TypeError('First argument must be a buffer or a string.');
}
Expand Down Expand Up @@ -509,8 +522,33 @@ Socket.prototype._write = function(data, encoding, cb) {
return false;
}

// `encoding` is unused right now, `data` is always a buffer.
var writeReq = this._handle.write(data);
var writeReq;

if (Buffer.isBuffer(data)) {
writeReq = this._handle.writeBuffer(data);

} else {
switch (encoding) {
case 'utf8':
case 'utf-8':
writeReq = this._handle.writeUtf8String(data);
break;

case 'ascii':
writeReq = this._handle.writeAsciiString(data);
break;

case 'ucs2':
case 'ucs-2':
case 'utf16le':
case 'utf-16le':
writeReq = this._handle.writeUcs2String(data);
break;

default:
assert(0);
}
}

if (!writeReq || typeof writeReq !== 'object') {
this._destroy(errnoException(errno, 'write'), cb);
Expand All @@ -525,7 +563,7 @@ Socket.prototype._write = function(data, encoding, cb) {
};


function afterWrite(status, handle, req, buffer) {
function afterWrite(status, handle, req) {
var self = handle.socket;

// callback may come after call to destroy.
Expand Down
6 changes: 5 additions & 1 deletion src/pipe_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,13 @@ void PipeWrap::Initialize(Handle<Object> target) {

NODE_SET_PROTOTYPE_METHOD(t, "readStart", StreamWrap::ReadStart);
NODE_SET_PROTOTYPE_METHOD(t, "readStop", StreamWrap::ReadStop);
NODE_SET_PROTOTYPE_METHOD(t, "write", StreamWrap::Write);
NODE_SET_PROTOTYPE_METHOD(t, "shutdown", StreamWrap::Shutdown);

NODE_SET_PROTOTYPE_METHOD(t, "writeBuffer", StreamWrap::WriteBuffer);
NODE_SET_PROTOTYPE_METHOD(t, "writeAsciiString", StreamWrap::WriteAsciiString);
NODE_SET_PROTOTYPE_METHOD(t, "writeUtf8String", StreamWrap::WriteUtf8String);
NODE_SET_PROTOTYPE_METHOD(t, "writeUtf16String", StreamWrap::WriteUcs2String);

NODE_SET_PROTOTYPE_METHOD(t, "bind", Bind);
NODE_SET_PROTOTYPE_METHOD(t, "listen", Listen);
NODE_SET_PROTOTYPE_METHOD(t, "connect", Connect);
Expand Down
198 changes: 190 additions & 8 deletions src/stream_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ using v8::TryCatch;
using v8::Context;
using v8::Arguments;
using v8::Integer;
using v8::Number;
using v8::Exception;


#define UNWRAP \
Expand All @@ -64,10 +66,25 @@ using v8::Integer;


typedef class ReqWrap<uv_shutdown_t> ShutdownWrap;
typedef class ReqWrap<uv_write_t> WriteWrap;

class WriteWrap: public ReqWrap<uv_write_t> {
public:
void* operator new(size_t size, char* storage) { return storage; }

// This is just to keep the compiler happy. It should never be called, since
// we don't use exceptions in node.
void operator delete(void* ptr, char* storage) { assert(0); }

protected:
// People should not be using the non-placement new and delete operator on a
// WriteWrap. Ensure this never happens.
void* operator new (size_t size) { assert(0); };
void operator delete(void* ptr) { assert(0); };
};


static Persistent<String> buffer_sym;
static Persistent<String> bytes_sym;
static Persistent<String> write_queue_size_sym;
static Persistent<String> onread_sym;
static Persistent<String> oncomplete_sym;
Expand All @@ -84,6 +101,7 @@ void StreamWrap::Initialize(Handle<Object> target) {
HandleWrap::Initialize(target);

buffer_sym = NODE_PSYMBOL("buffer");
bytes_sym = NODE_PSYMBOL("bytes");
write_queue_size_sym = NODE_PSYMBOL("writeQueueSize");
onread_sym = NODE_PSYMBOL("onread");
oncomplete_sym = NODE_PSYMBOL("oncomplete");
Expand Down Expand Up @@ -226,7 +244,7 @@ void StreamWrap::OnRead2(uv_pipe_t* handle, ssize_t nread, uv_buf_t buf,
}


Handle<Value> StreamWrap::Write(const Arguments& args) {
Handle<Value> StreamWrap::WriteBuffer(const Arguments& args) {
HandleScope scope;

UNWRAP
Expand All @@ -248,7 +266,15 @@ Handle<Value> StreamWrap::Write(const Arguments& args) {
length = args[2]->IntegerValue();
}

WriteWrap* req_wrap = new WriteWrap();
if (length > INT_MAX) {
uv_err_t err;
err.code = UV_ENOBUFS;
SetErrno(err);
return scope.Close(v8::Null());
}

char* storage = new char[sizeof(WriteWrap)];
WriteWrap* req_wrap = new (storage) WriteWrap();

req_wrap->object_->SetHiddenValue(buffer_sym, buffer_obj);

Expand Down Expand Up @@ -280,19 +306,175 @@ Handle<Value> StreamWrap::Write(const Arguments& args) {
}

req_wrap->Dispatched();
req_wrap->object_->Set(bytes_sym, Number::New((uint32_t) length));

wrap->UpdateWriteQueueSize();

if (r) {
SetErrno(uv_last_error(uv_default_loop()));
delete req_wrap;
req_wrap->~WriteWrap();
delete[] storage;
return scope.Close(v8::Null());
} else {
return scope.Close(req_wrap->object_);
}
}


enum WriteEncoding {
kAscii,
kUtf8,
kUcs2
};

template <WriteEncoding encoding>
Handle<Value> StreamWrap::WriteStringImpl(const Arguments& args) {
HandleScope scope;
int r;

UNWRAP

if (args.Length() < 1)
return ThrowTypeError("Not enough arguments");

Local<String> string = args[0]->ToString();

// Compute the size of the storage that the string will be flattened into.
size_t storage_size;
switch (encoding) {
case kAscii:
storage_size = string->Length();
break;

case kUtf8:
if (!(string->MayContainNonAscii())) {
// If the string has only ascii characters, we know exactly how big
// the storage should be.
storage_size = string->Length();
} else if (string->Length() < 65536) {
// A single UCS2 codepoint never takes up more than 3 utf8 bytes.
// Unless the string is really long we just allocate so much space that
// we're certain the string fits in there entirely.
// TODO: maybe check handle->write_queue_size instead of string length?
storage_size = 3 * string->Length();
} else {
// The string is really long. Compute the allocation size that we
// actually need.
storage_size = string->Utf8Length();
}
break;

case kUcs2:
storage_size += string->Length() * sizeof(uint16_t);
break;

default:
// Unreachable.
assert(0);
}

if (storage_size > INT_MAX) {
uv_err_t err;
err.code = UV_ENOBUFS;
SetErrno(err);
return scope.Close(v8::Null());
}

char* storage = new char[sizeof(WriteWrap) + storage_size + 15];
WriteWrap* req_wrap = new (storage) WriteWrap();

char* data = reinterpret_cast<char*>(ROUND_UP(
reinterpret_cast<uintptr_t>(storage) + sizeof(WriteWrap), 16));
size_t data_size;
switch (encoding) {
case kAscii:
data_size = string->WriteAscii(data, 0, -1,
String::NO_NULL_TERMINATION | String::HINT_MANY_WRITES_EXPECTED);
break;

case kUtf8:
data_size = string->WriteUtf8(data, -1, NULL,
String::NO_NULL_TERMINATION | String::HINT_MANY_WRITES_EXPECTED);
break;

case kUcs2: {
int chars_copied = string->Write((uint16_t*) data, 0, -1,
String::NO_NULL_TERMINATION | String::HINT_MANY_WRITES_EXPECTED);
data_size = chars_copied * sizeof(uint16_t);
break;
}

default:
// Unreachable
assert(0);
}

assert(data_size <= storage_size);

uv_buf_t buf;
buf.base = data;
buf.len = data_size;

bool ipc_pipe = wrap->stream_->type == UV_NAMED_PIPE &&
((uv_pipe_t*)wrap->stream_)->ipc;

if (!ipc_pipe) {
r = uv_write(&req_wrap->req_,
wrap->stream_,
&buf,
1,
StreamWrap::AfterWrite);

} else {
uv_stream_t* send_stream = NULL;

if (args[1]->IsObject()) {
Local<Object> send_stream_obj = args[1]->ToObject();
assert(send_stream_obj->InternalFieldCount() > 0);
StreamWrap* send_stream_wrap = static_cast<StreamWrap*>(
send_stream_obj->GetPointerFromInternalField(0));
send_stream = send_stream_wrap->GetStream();
}

r = uv_write2(&req_wrap->req_,
wrap->stream_,
&buf,
1,
send_stream,
StreamWrap::AfterWrite);
}

req_wrap->Dispatched();
req_wrap->object_->Set(bytes_sym, Number::New((uint32_t) data_size));

wrap->UpdateWriteQueueSize();

if (r) {
SetErrno(uv_last_error(uv_default_loop()));
req_wrap->~WriteWrap();
delete[] storage;
return scope.Close(v8::Null());
} else {
return scope.Close(req_wrap->object_);
}
}


Handle<Value> StreamWrap::WriteAsciiString(const Arguments& args) {
return WriteStringImpl<kAscii>(args);
}


Handle<Value> StreamWrap::WriteUtf8String(const Arguments& args) {
return WriteStringImpl<kUtf8>(args);
}


Handle<Value> StreamWrap::WriteUcs2String(const Arguments& args) {
return WriteStringImpl<kUcs2>(args);
}


void StreamWrap::AfterWrite(uv_write_t* req, int status) {
WriteWrap* req_wrap = (WriteWrap*) req->data;
StreamWrap* wrap = (StreamWrap*) req->handle->data;
Expand All @@ -309,16 +491,16 @@ void StreamWrap::AfterWrite(uv_write_t* req, int status) {

wrap->UpdateWriteQueueSize();

Local<Value> argv[4] = {
Local<Value> argv[] = {
Integer::New(status),
Local<Value>::New(wrap->object_),
Local<Value>::New(req_wrap->object_),
req_wrap->object_->GetHiddenValue(buffer_sym),
Local<Value>::New(req_wrap->object_)
};

MakeCallback(req_wrap->object_, oncomplete_sym, ARRAY_SIZE(argv), argv);

delete req_wrap;
req_wrap->~WriteWrap();
delete[] reinterpret_cast<char*>(req_wrap);
}


Expand Down
9 changes: 8 additions & 1 deletion src/stream_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,15 @@ class StreamWrap : public HandleWrap {
static void Initialize(v8::Handle<v8::Object> target);

// JavaScript functions
static v8::Handle<v8::Value> Write(const v8::Arguments& args);
static v8::Handle<v8::Value> ReadStart(const v8::Arguments& args);
static v8::Handle<v8::Value> ReadStop(const v8::Arguments& args);
static v8::Handle<v8::Value> Shutdown(const v8::Arguments& args);

static v8::Handle<v8::Value> WriteBuffer(const v8::Arguments& args);
static v8::Handle<v8::Value> WriteAsciiString(const v8::Arguments& args);
static v8::Handle<v8::Value> WriteUtf8String(const v8::Arguments& args);
static v8::Handle<v8::Value> WriteUcs2String(const v8::Arguments& args);

protected:
StreamWrap(v8::Handle<v8::Object> object, uv_stream_t* stream);
virtual ~StreamWrap() { }
Expand All @@ -61,6 +65,9 @@ class StreamWrap : public HandleWrap {
static void OnReadCommon(uv_stream_t* handle, ssize_t nread,
uv_buf_t buf, uv_handle_type pending);

template <enum WriteEncoding encoding>
static v8::Handle<v8::Value> WriteStringImpl(const v8::Arguments& args);

size_t slab_offset_;
uv_stream_t* stream_;
};
Expand Down
Loading

0 comments on commit 0e57aaf

Please sign in to comment.