diff --git a/lib/_http_server.js b/lib/_http_server.js index 6769f4a1521bd9..3f6d2a702892ed 100644 --- a/lib/_http_server.js +++ b/lib/_http_server.js @@ -79,6 +79,8 @@ const STATUS_CODES = exports.STATUS_CODES = { 511 : 'Network Authentication Required' // RFC 6585 }; +const kOnExecute = HTTPParser.kOnExecute | 0; + function ServerResponse(req) { OutgoingMessage.call(this); @@ -317,6 +319,18 @@ function connectionListener(socket) { socket.on('end', socketOnEnd); socket.on('data', socketOnData); + // We are consuming socket, so it won't get any actual data + socket.on('resume', onSocketResume); + socket.on('pause', onSocketPause); + + socket.on('drain', socketOnDrain); + + // Override on to unconsume on `data`, `readable` listeners + socket.on = socketOnWrap; + + parser.consume(socket._handle._externalStream); + parser[kOnExecute] = onParserExecute; + // TODO(isaacs): Move all these functions out of here function socketOnError(e) { self.emit('clientError', e, this); @@ -326,6 +340,19 @@ function connectionListener(socket) { assert(!socket._paused); debug('SERVER socketOnData %d', d.length); var ret = parser.execute(d); + + onParserExecuteCommon(ret, d); + } + + function onParserExecute(ret, d) { + debug('SERVER socketOnParserExecute %d', ret); + onParserExecuteCommon(ret, undefined); + + // Kick-off next ticks + setImmediate(function() {}); + } + + function onParserExecuteCommon(ret, d) { if (ret instanceof Error) { debug('parse error'); socket.destroy(ret); @@ -335,9 +362,13 @@ function connectionListener(socket) { var req = parser.incoming; debug('SERVER upgrade or connect', req.method); + if (!d) + d = parser.getCurrentBuffer(); + socket.removeListener('data', socketOnData); socket.removeListener('end', socketOnEnd); socket.removeListener('close', serverSocketCloseListener); + parser.unconsume(socket._handle._externalStream); parser.finish(); freeParser(parser, req, null); parser = null; @@ -400,7 +431,6 @@ function connectionListener(socket) { socket.resume(); } } - socket.on('drain', socketOnDrain); function parserOnIncoming(req, shouldKeepAlive) { incoming.push(req); @@ -480,3 +510,24 @@ function connectionListener(socket) { } } exports._connectionListener = connectionListener; + +function onSocketResume() { + this._handle.readStart(); +} + +function onSocketPause() { + this._handle.readStop(); +} + +function socketOnWrap(ev, fn) { + var res = net.Socket.prototype.on.call(this, ev, fn); + if (!this.parser) { + this.on = net.Socket.prototype.on; + return res; + } + + if (ev === 'data' || ev === 'readable') + this.parser.unconsume(this._handle._externalStream); + + return res; +} diff --git a/src/env-inl.h b/src/env-inl.h index 369bc2f780eb90..cbc8c4ff1f5eaf 100644 --- a/src/env-inl.h +++ b/src/env-inl.h @@ -178,6 +178,7 @@ inline Environment::Environment(v8::Local context, printed_error_(false), trace_sync_io_(false), debugger_agent_(this), + http_parser_buffer_(nullptr), context_(context->GetIsolate(), context) { // We'll be creating new objects so make sure we've entered the context. v8::HandleScope handle_scope(isolate()); @@ -200,6 +201,7 @@ inline Environment::~Environment() { isolate_data()->Put(); delete[] heap_statistics_buffer_; + delete[] http_parser_buffer_; } inline void Environment::CleanupHandles() { @@ -338,6 +340,15 @@ inline void Environment::set_heap_statistics_buffer(uint32_t* pointer) { heap_statistics_buffer_ = pointer; } +inline char* Environment::http_parser_buffer() const { + return http_parser_buffer_; +} + +inline void Environment::set_http_parser_buffer(char* buffer) { + CHECK_EQ(http_parser_buffer_, nullptr); // Should be set only once. + http_parser_buffer_ = buffer; +} + inline Environment* Environment::from_cares_timer_handle(uv_timer_t* handle) { return ContainerOf(&Environment::cares_timer_handle_, handle); } diff --git a/src/env.h b/src/env.h index 1801ffecd3ccea..3115c5fe538d63 100644 --- a/src/env.h +++ b/src/env.h @@ -427,6 +427,9 @@ class Environment { inline uint32_t* heap_statistics_buffer() const; inline void set_heap_statistics_buffer(uint32_t* pointer); + inline char* http_parser_buffer() const; + inline void set_http_parser_buffer(char* buffer); + inline void ThrowError(const char* errmsg); inline void ThrowTypeError(const char* errmsg); inline void ThrowRangeError(const char* errmsg); @@ -524,6 +527,8 @@ class Environment { uint32_t* heap_statistics_buffer_ = nullptr; + char* http_parser_buffer_; + #define V(PropertyName, TypeName) \ v8::Persistent PropertyName ## _; ENVIRONMENT_STRONG_PERSISTENT_PROPERTIES(V) diff --git a/src/node_http_parser.cc b/src/node_http_parser.cc index 6c5d76ecf6cc95..b08bc0f88acb9e 100644 --- a/src/node_http_parser.cc +++ b/src/node_http_parser.cc @@ -6,6 +6,8 @@ #include "base-object-inl.h" #include "env.h" #include "env-inl.h" +#include "stream_base.h" +#include "stream_base-inl.h" #include "util.h" #include "util-inl.h" #include "v8.h" @@ -36,6 +38,7 @@ namespace node { using v8::Array; using v8::Boolean; using v8::Context; +using v8::EscapableHandleScope; using v8::Exception; using v8::Function; using v8::FunctionCallbackInfo; @@ -54,6 +57,7 @@ const uint32_t kOnHeaders = 0; const uint32_t kOnHeadersComplete = 1; const uint32_t kOnBody = 2; const uint32_t kOnMessageComplete = 3; +const uint32_t kOnExecute = 4; #define HTTP_CB(name) \ @@ -295,7 +299,7 @@ class Parser : public BaseObject { HTTP_DATA_CB(on_body) { - HandleScope scope(env()->isolate()); + EscapableHandleScope scope(env()->isolate()); Local obj = object(); Local cb = obj->Get(kOnBody); @@ -303,6 +307,15 @@ class Parser : public BaseObject { if (!cb->IsFunction()) return 0; + // We came from consumed stream + if (current_buffer_.IsEmpty()) { + // Make sure Buffer will be in parent HandleScope + current_buffer_ = scope.Escape(Buffer::Copy( + env()->isolate(), + current_buffer_data_, + current_buffer_len_).ToLocalChecked()); + } + Local argv[3] = { current_buffer_, Integer::NewFromUnsigned(env()->isolate(), at - current_buffer_data_), @@ -374,8 +387,6 @@ class Parser : public BaseObject { // var bytesParsed = parser->execute(buffer); static void Execute(const FunctionCallbackInfo& args) { - Environment* env = Environment::GetCurrent(args); - Parser* parser = Unwrap(args.Holder()); CHECK(parser->current_buffer_.IsEmpty()); CHECK_EQ(parser->current_buffer_len_, 0); @@ -390,40 +401,11 @@ class Parser : public BaseObject { // amount of overhead. Nothing else will run while http_parser_execute() // runs, therefore this pointer can be set and used for the execution. parser->current_buffer_ = buffer_obj; - parser->current_buffer_len_ = buffer_len; - parser->current_buffer_data_ = buffer_data; - parser->got_exception_ = false; - size_t nparsed = - http_parser_execute(&parser->parser_, &settings, buffer_data, buffer_len); - - parser->Save(); + Local ret = parser->Execute(buffer_data, buffer_len); - // Unassign the 'buffer_' variable - parser->current_buffer_.Clear(); - parser->current_buffer_len_ = 0; - parser->current_buffer_data_ = nullptr; - - // If there was an exception in one of the callbacks - if (parser->got_exception_) - return; - - Local nparsed_obj = Integer::New(env->isolate(), nparsed); - // If there was a parse error in one of the callbacks - // TODO(bnoordhuis) What if there is an error on EOF? - if (!parser->parser_.upgrade && nparsed != buffer_len) { - enum http_errno err = HTTP_PARSER_ERRNO(&parser->parser_); - - Local e = Exception::Error(env->parse_error_string()); - Local obj = e->ToObject(env->isolate()); - obj->Set(env->bytes_parsed_string(), nparsed_obj); - obj->Set(env->code_string(), - OneByteString(env->isolate(), http_errno_name(err))); - - args.GetReturnValue().Set(e); - } else { - args.GetReturnValue().Set(nparsed_obj); - } + if (!ret.IsEmpty()) + args.GetReturnValue().Set(ret); } @@ -478,7 +460,148 @@ class Parser : public BaseObject { } - private: + static void Consume(const FunctionCallbackInfo& args) { + Parser* parser = Unwrap(args.Holder()); + Local stream_obj = args[0].As(); + StreamBase* stream = static_cast(stream_obj->Value()); + CHECK_NE(stream, nullptr); + + stream->Consume(); + + parser->prev_alloc_cb_ = stream->alloc_cb(); + parser->prev_read_cb_ = stream->read_cb(); + + stream->set_alloc_cb({ OnAllocImpl, parser }); + stream->set_read_cb({ OnReadImpl, parser }); + } + + + static void Unconsume(const FunctionCallbackInfo& args) { + Parser* parser = Unwrap(args.Holder()); + + // Already unconsumed + if (parser->prev_alloc_cb_.is_empty()) + return; + + CHECK(args[0]->IsExternal()); + Local stream_obj = args[0].As(); + StreamBase* stream = static_cast(stream_obj->Value()); + CHECK_NE(stream, nullptr); + + stream->set_alloc_cb(parser->prev_alloc_cb_); + stream->set_read_cb(parser->prev_read_cb_); + } + + + static void GetCurrentBuffer(const FunctionCallbackInfo& args) { + Parser* parser = Unwrap(args.Holder()); + + Local ret = Buffer::Copy( + parser->env(), + parser->current_buffer_data_, + parser->current_buffer_len_).ToLocalChecked(); + + args.GetReturnValue().Set(ret); + } + + protected: + static const size_t kAllocBufferSize = 64 * 1024; + + static void OnAllocImpl(size_t suggested_size, uv_buf_t* buf, void* ctx) { + Parser* parser = static_cast(ctx); + Environment* env = parser->env(); + + if (env->http_parser_buffer() == nullptr) + env->set_http_parser_buffer(new char[kAllocBufferSize]); + + buf->base = env->http_parser_buffer(); + buf->len = kAllocBufferSize; + } + + + static void OnReadImpl(ssize_t nread, + const uv_buf_t* buf, + uv_handle_type pending, + void* ctx) { + Parser* parser = static_cast(ctx); + HandleScope scope(parser->env()->isolate()); + + if (nread < 0) { + uv_buf_t tmp_buf; + tmp_buf.base = nullptr; + tmp_buf.len = 0; + parser->prev_read_cb_.fn(nread, + &tmp_buf, + pending, + parser->prev_read_cb_.ctx); + return; + } + + // Ignore, empty reads have special meaning in http parser + if (nread == 0) + return; + + parser->current_buffer_.Clear(); + Local ret = parser->Execute(buf->base, nread); + + // Exception + if (ret.IsEmpty()) + return; + + Local obj = parser->object(); + Local cb = obj->Get(kOnExecute); + + if (!cb->IsFunction()) + return; + + // Hooks for GetCurrentBuffer + parser->current_buffer_len_ = nread; + parser->current_buffer_data_ = buf->base; + + cb.As()->Call(obj, 1, &ret); + + parser->current_buffer_len_ = 0; + parser->current_buffer_data_ = nullptr; + } + + + Local Execute(char* data, size_t len) { + EscapableHandleScope scope(env()->isolate()); + + current_buffer_len_ = len; + current_buffer_data_ = data; + got_exception_ = false; + + size_t nparsed = + http_parser_execute(&parser_, &settings, data, len); + + Save(); + + // Unassign the 'buffer_' variable + current_buffer_.Clear(); + current_buffer_len_ = 0; + current_buffer_data_ = nullptr; + + // If there was an exception in one of the callbacks + if (got_exception_) + return scope.Escape(Local()); + + Local nparsed_obj = Integer::New(env()->isolate(), nparsed); + // If there was a parse error in one of the callbacks + // TODO(bnoordhuis) What if there is an error on EOF? + if (!parser_.upgrade && nparsed != len) { + enum http_errno err = HTTP_PARSER_ERRNO(&parser_); + + Local e = Exception::Error(env()->parse_error_string()); + Local obj = e->ToObject(env()->isolate()); + obj->Set(env()->bytes_parsed_string(), nparsed_obj); + obj->Set(env()->code_string(), + OneByteString(env()->isolate(), http_errno_name(err))); + + return scope.Escape(e); + } + return scope.Escape(nparsed_obj); + } Local CreateHeaders() { // num_values_ is either -1 or the entry # of the last header @@ -542,6 +665,8 @@ class Parser : public BaseObject { Local current_buffer_; size_t current_buffer_len_; char* current_buffer_data_; + StreamResource::Callback prev_alloc_cb_; + StreamResource::Callback prev_read_cb_; static const struct http_parser_settings settings; }; @@ -581,6 +706,8 @@ void InitHttpParser(Handle target, Integer::NewFromUnsigned(env->isolate(), kOnBody)); t->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "kOnMessageComplete"), Integer::NewFromUnsigned(env->isolate(), kOnMessageComplete)); + t->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "kOnExecute"), + Integer::NewFromUnsigned(env->isolate(), kOnExecute)); Local methods = Array::New(env->isolate()); #define V(num, name, string) \ @@ -595,6 +722,9 @@ void InitHttpParser(Handle target, env->SetProtoMethod(t, "reinitialize", Parser::Reinitialize); env->SetProtoMethod(t, "pause", Parser::Pause); env->SetProtoMethod(t, "resume", Parser::Pause); + env->SetProtoMethod(t, "consume", Parser::Consume); + env->SetProtoMethod(t, "unconsume", Parser::Unconsume); + env->SetProtoMethod(t, "getCurrentBuffer", Parser::GetCurrentBuffer); target->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "HTTPParser"), t->GetFunction()); diff --git a/test/parallel/test-http-server-unconsume.js b/test/parallel/test-http-server-unconsume.js new file mode 100644 index 00000000000000..0d33263ad6ffa6 --- /dev/null +++ b/test/parallel/test-http-server-unconsume.js @@ -0,0 +1,30 @@ +'use strict'; +var common = require('../common'); +var assert = require('assert'); +var http = require('http'); +var net = require('net'); + +var received = ''; + +var server = http.createServer(function(req, res) { + res.writeHead(200); + res.end(); + + req.socket.on('data', function(data) { + received += data; + }); + + server.close(); +}).listen(common.PORT, function() { + var socket = net.connect(common.PORT, function() { + socket.write('PUT / HTTP/1.1\r\n\r\n'); + + socket.once('data', function() { + socket.end('hello world'); + }); + }); +}); + +process.on('exit', function() { + assert.equal(received, 'hello world'); +});