Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FAST HTTP YEAH #2355

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 52 additions & 1 deletion lib/_http_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ const STATUS_CODES = exports.STATUS_CODES = {
511 : 'Network Authentication Required' // RFC 6585
};

const kOnExecute = HTTPParser.kOnExecute | 0;


function ServerResponse(req) {
OutgoingMessage.call(this);
Expand Down Expand Up @@ -317,6 +319,21 @@ function connectionListener(socket) {
socket.on('end', socketOnEnd);
socket.on('data', socketOnData);

// We are consuming socket, so it won't get any actual data
socket.on('resume', onSocketResume);
socket.on('pause', onSocketPause);

socket.on('drain', socketOnDrain);

// Override on to unconsume on `data`, `readable` listeners
socket.on = socketOnWrap;

var external = socket._handle._externalStream;
if (external)
parser.consume(external);
external = null;
parser[kOnExecute] = onParserExecute;

// TODO(isaacs): Move all these functions out of here
function socketOnError(e) {
self.emit('clientError', e, this);
Expand All @@ -326,6 +343,16 @@ function connectionListener(socket) {
assert(!socket._paused);
debug('SERVER socketOnData %d', d.length);
var ret = parser.execute(d);

onParserExecuteCommon(ret, d);
}

function onParserExecute(ret, d) {
debug('SERVER socketOnParserExecute %d', ret);
onParserExecuteCommon(ret, undefined);
}

function onParserExecuteCommon(ret, d) {
if (ret instanceof Error) {
debug('parse error');
socket.destroy(ret);
Expand All @@ -335,9 +362,13 @@ function connectionListener(socket) {
var req = parser.incoming;
debug('SERVER upgrade or connect', req.method);

if (!d)
d = parser.getCurrentBuffer();

socket.removeListener('data', socketOnData);
socket.removeListener('end', socketOnEnd);
socket.removeListener('close', serverSocketCloseListener);
parser.unconsume(socket._handle._externalStream);
parser.finish();
freeParser(parser, req, null);
parser = null;
Expand Down Expand Up @@ -400,7 +431,6 @@ function connectionListener(socket) {
socket.resume();
}
}
socket.on('drain', socketOnDrain);

function parserOnIncoming(req, shouldKeepAlive) {
incoming.push(req);
Expand Down Expand Up @@ -480,3 +510,24 @@ function connectionListener(socket) {
}
}
exports._connectionListener = connectionListener;

function onSocketResume() {
this._handle.readStart();
}

function onSocketPause() {
this._handle.readStop();
}

function socketOnWrap(ev, fn) {
var res = net.Socket.prototype.on.call(this, ev, fn);
if (!this.parser) {
this.on = net.Socket.prototype.on;
return res;
}

if (ev === 'data' || ev === 'readable')
this.parser.unconsume(this._handle._externalStream);

return res;
}
11 changes: 11 additions & 0 deletions src/env-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ inline Environment::Environment(v8::Local<v8::Context> context,
printed_error_(false),
trace_sync_io_(false),
debugger_agent_(this),
http_parser_buffer_(nullptr),
context_(context->GetIsolate(), context) {
// We'll be creating new objects so make sure we've entered the context.
v8::HandleScope handle_scope(isolate());
Expand All @@ -200,6 +201,7 @@ inline Environment::~Environment() {
isolate_data()->Put();

delete[] heap_statistics_buffer_;
delete[] http_parser_buffer_;
}

inline void Environment::CleanupHandles() {
Expand Down Expand Up @@ -338,6 +340,15 @@ inline void Environment::set_heap_statistics_buffer(uint32_t* pointer) {
heap_statistics_buffer_ = pointer;
}

inline char* Environment::http_parser_buffer() const {
return http_parser_buffer_;
}

inline void Environment::set_http_parser_buffer(char* buffer) {
CHECK_EQ(http_parser_buffer_, nullptr); // Should be set only once.
http_parser_buffer_ = buffer;
}

inline Environment* Environment::from_cares_timer_handle(uv_timer_t* handle) {
return ContainerOf(&Environment::cares_timer_handle_, handle);
}
Expand Down
35 changes: 35 additions & 0 deletions src/env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ using v8::Local;
using v8::Message;
using v8::StackFrame;
using v8::StackTrace;
using v8::TryCatch;

void Environment::PrintSyncTrace() const {
if (!trace_sync_io_)
Expand Down Expand Up @@ -55,4 +56,38 @@ void Environment::PrintSyncTrace() const {
fflush(stderr);
}


bool Environment::KickNextTick() {
TickInfo* info = tick_info();

if (info->in_tick()) {
return true;
}

if (info->length() == 0) {
isolate()->RunMicrotasks();
}

if (info->length() == 0) {
info->set_index(0);
return true;
}

info->set_in_tick(true);

// process nextTicks after call
TryCatch try_catch;
try_catch.SetVerbose(true);
tick_callback_function()->Call(process_object(), 0, nullptr);

info->set_in_tick(false);

if (try_catch.HasCaught()) {
info->set_last_threw(true);
return false;
}

return true;
}

} // namespace node
7 changes: 7 additions & 0 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -424,9 +424,14 @@ class Environment {
void PrintSyncTrace() const;
inline void set_trace_sync_io(bool value);

bool KickNextTick();

inline uint32_t* heap_statistics_buffer() const;
inline void set_heap_statistics_buffer(uint32_t* pointer);

inline char* http_parser_buffer() const;
inline void set_http_parser_buffer(char* buffer);

inline void ThrowError(const char* errmsg);
inline void ThrowTypeError(const char* errmsg);
inline void ThrowRangeError(const char* errmsg);
Expand Down Expand Up @@ -524,6 +529,8 @@ class Environment {

uint32_t* heap_statistics_buffer_ = nullptr;

char* http_parser_buffer_;

#define V(PropertyName, TypeName) \
v8::Persistent<TypeName> PropertyName ## _;
ENVIRONMENT_STRONG_PERSISTENT_PROPERTIES(V)
Expand Down
27 changes: 1 addition & 26 deletions src/node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1026,7 +1026,6 @@ Handle<Value> MakeCallback(Environment* env,
// If you hit this assertion, you forgot to enter the v8::Context first.
CHECK_EQ(env->context(), env->isolate()->GetCurrentContext());

Local<Object> process = env->process_object();
Local<Object> object, domain;
bool has_async_queue = false;
bool has_domain = false;
Expand Down Expand Up @@ -1092,32 +1091,8 @@ Handle<Value> MakeCallback(Environment* env,
return Undefined(env->isolate());
}

Environment::TickInfo* tick_info = env->tick_info();

if (tick_info->in_tick()) {
return ret;
}

if (tick_info->length() == 0) {
env->isolate()->RunMicrotasks();
}

if (tick_info->length() == 0) {
tick_info->set_index(0);
return ret;
}

tick_info->set_in_tick(true);

// process nextTicks after call
env->tick_callback_function()->Call(process, 0, nullptr);

tick_info->set_in_tick(false);

if (try_catch.HasCaught()) {
tick_info->set_last_threw(true);
if (!env->KickNextTick())
return Undefined(env->isolate());
}

return ret;
}
Expand Down
Loading