Skip to content
This repository has been archived by the owner on Apr 22, 2023. It is now read-only.

Streams2 + net + http: writev #5246

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
75 changes: 69 additions & 6 deletions lib/_stream_writable.js
Expand Up @@ -34,6 +34,7 @@ util.inherits(Writable, Stream);

function WriteReq(chunk, encoding, cb) {
this.chunk = chunk;
this.isBuffer = Buffer.isBuffer(chunk);
this.encoding = encoding;
this.callback = cb;
}
Expand All @@ -51,6 +52,19 @@ function WritableState(options, stream) {
// contains buffers or objects.
this.objectMode = !!options.objectMode;

// Coalesce input and write at once on next tick
this.coalesce = options.coalesce || false;
if (this.coalesce && this.objectMode)
throw new Error('Can\'t use objectMode and coalesce simultaneously');

// Unlimited buffers count in ._writev
if (this.coalesce === true)
this.coalesce = Infinity;

// > 0 if data was buffered to be later coalesced, but
// process.nextTick() wasn't called yet.
this.coalesceQueued = 0;

// cast to ints.
this.highWaterMark = ~~this.highWaterMark;

Expand Down Expand Up @@ -195,20 +209,39 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) {
var ret = state.length < state.highWaterMark;
state.needDrain = !ret;

if (state.writing)
if (state.writing) {
state.buffer.push(new WriteReq(chunk, encoding, cb));
else
doWrite(stream, state, len, chunk, encoding, cb);
} else if (state.coalesce && stream._writev) {
state.writing = true;
state.coalesceQueued++;
state.buffer.push(new WriteReq(chunk, encoding, cb));

// Write all at once on next tick
process.nextTick(function() {
// Coalesce was dequeued
if (state.coalesceQueued === 0)
return;

state.coalesceQueued--;
if (!state.bufferProcessing && state.buffer.length)
clearBuffer(stream, state);
});
} else {
doWrite(stream, state, len, chunk, encoding, false, cb);
}

return ret;
}

function doWrite(stream, state, len, chunk, encoding, cb) {
function doWrite(stream, state, len, chunk, encoding, writev, cb) {
state.writelen = len;
state.writecb = cb;
state.writing = true;
state.sync = true;
stream._write(chunk, encoding, state.onwrite);
if (!writev)
stream._write(chunk, encoding, state.onwrite);
else
stream._writev(chunk, len, state.onwrite);
state.sync = false;
}

Expand Down Expand Up @@ -274,6 +307,7 @@ function onwriteDrain(stream, state) {

// if there's something in the buffer waiting, then process it
function clearBuffer(stream, state) {
var coalesce = state.coalesce;
state.bufferProcessing = true;

for (var c = 0; c < state.buffer.length; c++) {
Expand All @@ -282,8 +316,24 @@ function clearBuffer(stream, state) {
var encoding = entry.encoding;
var cb = entry.callback;
var len = state.objectMode ? 1 : chunk.length;
var chunks = null;
var chunksLen = len;

// Coalesce multiple Buffer chunks and write them at once with ._writev()
if (coalesce && stream._writev && entry.isBuffer) {
chunks = [entry.chunk];
while (chunks.length < coalesce &&
c + 1 < state.buffer.length && state.buffer[c + 1].isBuffer) {
var next = state.buffer[++c].chunk;
chunks.push(next);
chunksLen += next.length;
}
}

doWrite(stream, state, len, chunk, encoding, cb);
if (chunks && chunks.length > 1)
doWrite(stream, state, chunksLen, chunks, encoding, true, cb);
else
doWrite(stream, state, len, chunk, encoding, false, cb);

// if we didn't call the onwrite immediately, then
// it means that we need to wait until it does.
Expand All @@ -302,10 +352,23 @@ function clearBuffer(stream, state) {
state.buffer.length = 0;
}

// Internal, use at your risk
Writable.prototype._clearBuffer = function() {
var state = this._writableState;

if ((!state.writing || state.coalesceQueued) &&
!state.bufferProcessing &&
state.buffer.length) {
clearBuffer(this, state);
}
};

Writable.prototype._write = function(chunk, encoding, cb) {
cb(new Error('not implemented'));
};

Writable.prototype._writev = null;

Writable.prototype.end = function(chunk, encoding, cb) {
var state = this._writableState;

Expand Down
5 changes: 3 additions & 2 deletions lib/http.js
Expand Up @@ -779,9 +779,10 @@ OutgoingMessage.prototype.write = function(chunk, encoding) {
} else {
// buffer
len = chunk.length;
this._send(len.toString(16) + CRLF);
this._send(new Buffer(len.toString(16)));
this._send(crlf_buf);
this._send(chunk);
ret = this._send(CRLF);
ret = this._send(crlf_buf);
}
} else {
ret = this._send(chunk, encoding);
Expand Down
31 changes: 27 additions & 4 deletions lib/net.js
Expand Up @@ -437,6 +437,10 @@ Socket.prototype._destroy = function(exception, cb) {

self._connecting = false;

// In coalesce mode buffer might be not empty at this point,
// try writing it out.
this._clearBuffer();

this.readable = this.writable = false;

timers.unenroll(this);
Expand Down Expand Up @@ -597,15 +601,15 @@ Socket.prototype.write = function(chunk, encoding, cb) {
};


Socket.prototype._write = function(data, encoding, cb) {
Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {
// If we are still connecting, then buffer this for later.
// The Writable logic will buffer up any more writes while
// waiting for this one to be done.
if (this._connecting) {
this._pendingData = data;
this._pendingEncoding = encoding;
this.once('connect', function() {
this._write(data, encoding, cb);
this._writeGeneric(writev, data, encoding, cb);
});
return;
}
Expand All @@ -619,8 +623,16 @@ Socket.prototype._write = function(data, encoding, cb) {
return false;
}

var enc = Buffer.isBuffer(data) ? 'buffer' : encoding;
var writeReq = createWriteReq(this._handle, data, enc);
var writeReq;
if (writev) {
var writeReq = this._handle.writeBuffer(data);
// Retain chunks
if (writeReq)
writeReq._chunks = data.slice();
} else {
var enc = Buffer.isBuffer(data) ? 'buffer' : encoding;
var writeReq = createWriteReq(this._handle, data, enc);
}

if (!writeReq || typeof writeReq !== 'object')
return this._destroy(errnoException(process._errno, 'write'), cb);
Expand All @@ -636,6 +648,16 @@ Socket.prototype._write = function(data, encoding, cb) {
writeReq.cb = cb;
};


Socket.prototype._writev = function(chunks, size, cb) {
this._writeGeneric(true, chunks, size, cb);
};


Socket.prototype._write = function(data, encoding, cb) {
this._writeGeneric(false, data, encoding, cb);
};

function createWriteReq(handle, data, encoding) {
switch (encoding) {
case 'buffer':
Expand Down Expand Up @@ -1143,6 +1165,7 @@ function onconnection(clientHandle) {

var socket = new Socket({
handle: clientHandle,
coalesce: true,
allowHalfOpen: self.allowHalfOpen
});
socket.readable = socket.writable = true;
Expand Down
57 changes: 46 additions & 11 deletions src/stream_wrap.cc
Expand Up @@ -288,26 +288,61 @@ Handle<Value> StreamWrap::WriteBuffer(const Arguments& args) {

UNWRAP(StreamWrap)

// The first argument is a buffer.
assert(args.Length() >= 1 && Buffer::HasInstance(args[0]));
Local<Object> buffer_obj = args[0]->ToObject();
size_t offset = 0;
size_t length = Buffer::Length(buffer_obj);
assert(args.Length() >= 1);

// Determine buffer count
unsigned long count;
size_t length = 0;
bool is_writev = args[0]->IsArray();
if (!is_writev) {
assert(Buffer::HasInstance(args[0]));
// The first argument is a buffer.
count = 1;
length = Buffer::Length(args[0]);
} else {
// The first argument is multiple buffer.
assert(args[0]->IsArray());
count = args[0].As<v8::Array>()->Length();
}

uv_buf_t buf_[128];
uv_buf_t* buf = buf_;

// Allocate buffers if on-stack is not enough
if (count > ARRAY_SIZE(buf_))
buf = new uv_buf_t[count];

// Create Write request
char* storage = new char[sizeof(WriteWrap)];
WriteWrap* req_wrap = new (storage) WriteWrap();

req_wrap->object_->SetHiddenValue(buffer_sym, buffer_obj);
if (is_writev) {
v8::Local<v8::Array> chunks = args[0].As<v8::Array>();
for (unsigned long i = 0; i < count; i++) {
v8::Local<v8::Value> chunk = chunks->Get(i);
assert(Buffer::HasInstance(chunk));
buf[i].base = Buffer::Data(chunk);
buf[i].len = Buffer::Length(chunk);
length += buf[i].len;
}
} else {
// Simple non-writev case
buf[0].base = Buffer::Data(args[0]);
buf[0].len = length;

uv_buf_t buf;
buf.base = Buffer::Data(buffer_obj) + offset;
buf.len = length;
req_wrap->object_->SetHiddenValue(buffer_sym, args[0]);
}

int r = uv_write(&req_wrap->req_,
wrap->stream_,
&buf,
1,
buf,
count,
StreamWrap::AfterWrite);

// Deallocate not on-stack buffers
if (buf != buf_)
delete[] buf;

req_wrap->Dispatched();
req_wrap->object_->Set(bytes_sym,
Integer::NewFromUnsigned(length));
Expand Down
4 changes: 3 additions & 1 deletion test/simple/test-http-client-timeout-event.js
Expand Up @@ -54,5 +54,7 @@ server.listen(options.port, options.host, function() {
req.destroy();
assert.equal(timeout_events, 1);
}, 10);
req.end();
setTimeout(function () {
req.end();
}, 5);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

});
2 changes: 1 addition & 1 deletion test/simple/test-http-upgrade-server.js
Expand Up @@ -155,7 +155,7 @@ function test_standard_http() {
writeReq(conn, 'GET / HTTP/1.1\r\n\r\n');
});

conn.on('data', function(data) {
conn.once('data', function(data) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

assert.equal('string', typeof data);
assert.equal('HTTP/1.1 200', data.substr(0, 12));
conn.end();
Expand Down
76 changes: 76 additions & 0 deletions test/simple/test-stream2-writev.js
@@ -0,0 +1,76 @@
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.

var common = require('../common.js');
var W = require('_stream_writable');
var D = require('_stream_duplex');
var assert = require('assert');

var util = require('util');
util.inherits(TestWriter, W);

function TestWriter() {
W.call(this, { coalesce: 2, decodeStrings: false });
this.result = [];
};

TestWriter.prototype._write = function(chunk, encoding, cb) {
writeCalled++;
this.result.push(new Buffer(chunk, encoding));
setTimeout(cb, 3);
};

TestWriter.prototype._writev = function(chunks, len, cb) {
writevCalled++;
assert.ok(Array.isArray(chunks));
assert.ok(chunks.length <= 2);
assert.ok(chunks.every(function(chunk) {
return Buffer.isBuffer(chunk);
}));
this.result = this.result.concat(chunks);
setTimeout(cb, 3);
};

var finishCalled = false;
var writeCalled = 0;
var writevCalled = 0;
var t = new TestWriter();

[ 'hello', 'world', '!', '!' ].forEach(function(chunk) {
t.write(new Buffer(chunk));
});
t.write('opa');
[ 'hello', 'world', '!', '!' ].forEach(function(chunk) {
t.write(new Buffer(chunk));
});

t.once('finish', function() {
finishCalled = true;
});
t.end();

process.once('exit', function() {
assert.ok(finishCalled);
assert.equal(writeCalled, 1);
assert.equal(writevCalled, 4);
assert.equal(t.result.length, 9);
assert.equal(t.result.join(' '), 'hello world ! ! opa hello world ! !');
});