Skip to content

Commit

Permalink
http_parser: consume StreamBase instance
Browse files Browse the repository at this point in the history
Consume StreamBase instance and operate on incoming data directly
without allocating Buffer instances. Improves performance.

PR-URL: #2355
Reviewed-By: Trevor Norris <trev.norris@gmail.com>
  • Loading branch information
indutny committed Aug 26, 2015
1 parent d616768 commit 59b91f1
Show file tree
Hide file tree
Showing 5 changed files with 264 additions and 37 deletions.
53 changes: 52 additions & 1 deletion lib/_http_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ const STATUS_CODES = exports.STATUS_CODES = {
511 : 'Network Authentication Required' // RFC 6585
};

const kOnExecute = HTTPParser.kOnExecute | 0;


function ServerResponse(req) {
OutgoingMessage.call(this);
Expand Down Expand Up @@ -317,6 +319,18 @@ function connectionListener(socket) {
socket.on('end', socketOnEnd);
socket.on('data', socketOnData);

// We are consuming socket, so it won't get any actual data
socket.on('resume', onSocketResume);
socket.on('pause', onSocketPause);

socket.on('drain', socketOnDrain);

// Override on to unconsume on `data`, `readable` listeners
socket.on = socketOnWrap;

parser.consume(socket._handle._externalStream);
parser[kOnExecute] = onParserExecute;

// TODO(isaacs): Move all these functions out of here
function socketOnError(e) {
self.emit('clientError', e, this);
Expand All @@ -326,6 +340,19 @@ function connectionListener(socket) {
assert(!socket._paused);
debug('SERVER socketOnData %d', d.length);
var ret = parser.execute(d);

onParserExecuteCommon(ret, d);
}

function onParserExecute(ret, d) {
debug('SERVER socketOnParserExecute %d', ret);
onParserExecuteCommon(ret, undefined);

// Kick-off next ticks
setImmediate(function() {});
}

function onParserExecuteCommon(ret, d) {
if (ret instanceof Error) {
debug('parse error');
socket.destroy(ret);
Expand All @@ -335,9 +362,13 @@ function connectionListener(socket) {
var req = parser.incoming;
debug('SERVER upgrade or connect', req.method);

if (!d)
d = parser.getCurrentBuffer();

socket.removeListener('data', socketOnData);
socket.removeListener('end', socketOnEnd);
socket.removeListener('close', serverSocketCloseListener);
parser.unconsume(socket._handle._externalStream);
parser.finish();
freeParser(parser, req, null);
parser = null;
Expand Down Expand Up @@ -400,7 +431,6 @@ function connectionListener(socket) {
socket.resume();
}
}
socket.on('drain', socketOnDrain);

function parserOnIncoming(req, shouldKeepAlive) {
incoming.push(req);
Expand Down Expand Up @@ -480,3 +510,24 @@ function connectionListener(socket) {
}
}
exports._connectionListener = connectionListener;

function onSocketResume() {
this._handle.readStart();
}

function onSocketPause() {
this._handle.readStop();
}

function socketOnWrap(ev, fn) {
var res = net.Socket.prototype.on.call(this, ev, fn);
if (!this.parser) {
this.on = net.Socket.prototype.on;
return res;
}

if (ev === 'data' || ev === 'readable')
this.parser.unconsume(this._handle._externalStream);

return res;
}
11 changes: 11 additions & 0 deletions src/env-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ inline Environment::Environment(v8::Local<v8::Context> context,
printed_error_(false),
trace_sync_io_(false),
debugger_agent_(this),
http_parser_buffer_(nullptr),
context_(context->GetIsolate(), context) {
// We'll be creating new objects so make sure we've entered the context.
v8::HandleScope handle_scope(isolate());
Expand All @@ -200,6 +201,7 @@ inline Environment::~Environment() {
isolate_data()->Put();

delete[] heap_statistics_buffer_;
delete[] http_parser_buffer_;
}

inline void Environment::CleanupHandles() {
Expand Down Expand Up @@ -338,6 +340,15 @@ inline void Environment::set_heap_statistics_buffer(uint32_t* pointer) {
heap_statistics_buffer_ = pointer;
}

inline char* Environment::http_parser_buffer() const {
return http_parser_buffer_;
}

inline void Environment::set_http_parser_buffer(char* buffer) {
CHECK_EQ(http_parser_buffer_, nullptr); // Should be set only once.
http_parser_buffer_ = buffer;
}

inline Environment* Environment::from_cares_timer_handle(uv_timer_t* handle) {
return ContainerOf(&Environment::cares_timer_handle_, handle);
}
Expand Down
5 changes: 5 additions & 0 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,9 @@ class Environment {
inline uint32_t* heap_statistics_buffer() const;
inline void set_heap_statistics_buffer(uint32_t* pointer);

inline char* http_parser_buffer() const;
inline void set_http_parser_buffer(char* buffer);

inline void ThrowError(const char* errmsg);
inline void ThrowTypeError(const char* errmsg);
inline void ThrowRangeError(const char* errmsg);
Expand Down Expand Up @@ -524,6 +527,8 @@ class Environment {

uint32_t* heap_statistics_buffer_ = nullptr;

char* http_parser_buffer_;

#define V(PropertyName, TypeName) \
v8::Persistent<TypeName> PropertyName ## _;
ENVIRONMENT_STRONG_PERSISTENT_PROPERTIES(V)
Expand Down

0 comments on commit 59b91f1

Please sign in to comment.