Permalink
Browse files

HTTP works somewhat on net2 now

However it's not working very well: Hitting a 'hello world' server with many
requests (ab -t 60 -c 10) will cause it to crash with the following error.

  Obtained 3 stack frames.

  ./node(_Z11print_tracev+0x1c) [0x80d1b3c]
  ./node(_ZN4node6Parser7ExecuteERKN2v89ArgumentsE+0x69) [0x80d3759]
  ./node [0x811f44b]
  TypeError: Already parsing a buffer
      at Socket.<anonymous> (/home/ryan/projects/node/lib/http2.js:393:20)
      at IOWatcher.callback (/home/ryan/projects/node/lib/net.js:81:12)
      at node.js:985:9
      at node.js:989:1
  • Loading branch information...
1 parent dda1d68 commit 741e3fa91bdeec0c5cdc6e96bc4ce0ae29acae75 @ry ry committed Jan 26, 2010
Showing with 124 additions and 31 deletions.
  1. +10 −4 lib/net.js
  2. +15 −13 src/node_buffer.cc
  3. +4 −1 src/node_buffer.h
  4. +46 −0 src/node_http_parser.cc
  5. +10 −10 src/node_io_watcher.cc
  6. +38 −3 src/node_io_watcher.h
  7. +1 −0 src/node_object_wrap.h
View
@@ -28,7 +28,7 @@ var getaddrinfo = process.getaddrinfo;
var needsLookup = process.needsLookup;
var EINPROGRESS = process.EINPROGRESS;
var ENOENT = process.ENOENT;
-var END_OF_FILE = 42;
+var END_OF_FILE = 0;
function Socket (peerInfo) {
process.EventEmitter.call();
@@ -39,6 +39,7 @@ function Socket (peerInfo) {
self.recvBuffer = null;
self.readWatcher = new IOWatcher();
+ self.readWatcher.host = this;
self.readWatcher.callback = function () {
// If this is the first recv (recvBuffer doesn't exist) or we've used up
// most of the recvBuffer, allocate a new one.
@@ -61,9 +62,9 @@ function Socket (peerInfo) {
}
} else {
bytesRead = read(self.fd,
- self.recvBuffer,
- self.recvBuffer.used,
- self.recvBuffer.length - self.recvBuffer.used);
+ self.recvBuffer,
+ self.recvBuffer.used,
+ self.recvBuffer.length - self.recvBuffer.used);
}
debug('bytesRead ' + bytesRead + '\n');
@@ -99,6 +100,7 @@ function Socket (peerInfo) {
}
};
self.writeWatcher = new IOWatcher();
+ self.writeWatcher.host = this;
self.writeWatcher.callback = self._doFlush;
self.writable = false;
@@ -131,6 +133,7 @@ Socket.prototype._allocateNewRecvBuf = function () {
var newBufferSize = 1024; // TODO make this adjustable from user API
+ /*
if (toRead) {
// Is the extra system call even worth it?
var bytesToRead = toRead(self.fd);
@@ -145,6 +148,7 @@ Socket.prototype._allocateNewRecvBuf = function () {
newBufferSize = 128;
}
}
+ */
self.recvBuffer = new process.Buffer(newBufferSize);
self.recvBuffer.used = 0;
@@ -421,6 +425,7 @@ Socket.prototype.forceClose = function (exception) {
this.writeWatcher.stop();
this.readWatcher.stop();
+
close(this.fd);
debug('close socket ' + this.fd);
this.fd = null;
@@ -455,6 +460,7 @@ function Server (listener) {
}
self.watcher = new IOWatcher();
+ self.watcher.host = self;
self.watcher.callback = function (readable, writeable) {
while (self.fd) {
var peerInfo = accept(self.fd);
View
@@ -33,24 +33,15 @@ bool IsBuffer(v8::Handle<v8::Value> val) {
/* Determines the absolute position for a relative offset */
-static inline size_t buffer_abs_off(buffer *buffer, size_t off) {
+size_t buffer_abs_off(buffer *buffer, size_t off) {
struct buffer *root = buffer_root(buffer);
off += buffer->offset;
return MIN(root->length, off);
}
-static inline void buffer_ref(struct buffer *buffer) {
- assert(buffer->root == NULL);
- buffer->refs++;
-}
-
-
-static inline void buffer_unref(struct buffer *buffer) {
- assert(buffer->root == NULL);
- assert(buffer->refs > 0);
- buffer->refs--;
- if (buffer->refs == 0 && buffer->weak) free(buffer);
+void buffer_ref(struct buffer *buffer) {
+ buffer_root(buffer)->refs++;
}
@@ -69,15 +60,26 @@ static void RootWeakCallback(Persistent<Value> value, void *data)
struct buffer *buffer = static_cast<struct buffer*>(data);
assert(buffer->root == NULL); // this is the root
assert(value == buffer->handle);
- buffer->handle.Dispose();
+ value.ClearWeak();
if (buffer->refs) {
buffer->weak = true;
} else {
+ buffer->handle.Dispose();
free(buffer);
}
}
+void buffer_unref(struct buffer *buffer) {
+ struct buffer * root = buffer_root(buffer);
+ assert(root->refs > 0);
+ root->refs--;
+ if (root->refs == 0 && root->weak) {
+ root->handle.MakeWeak(root, RootWeakCallback);
+ }
+}
+
+
static void SliceWeakCallback(Persistent<Value> value, void *data)
{
struct buffer *buffer = static_cast<struct buffer*>(data);
View
@@ -56,6 +56,9 @@ static inline size_t buffer_remaining(struct buffer *buffer, size_t off) {
return end - buffer_p(buffer, off);
}
-}
+void buffer_ref(struct buffer *buffer);
+void buffer_unref(struct buffer *buffer);
+
+} // namespace node buffer
#endif // NODE_BUFFER
View
@@ -19,6 +19,30 @@
// No copying is performed when slicing the buffer, only small reference
// allocations.
+ #include <execinfo.h>
+ #include <stdio.h>
+ #include <stdlib.h>
+
+ /* Obtain a backtrace and print it to stdout. */
+ void
+ print_trace (void)
+ {
+ void *array[10];
+ size_t size;
+ char **strings;
+ size_t i;
+
+ size = backtrace (array, 10);
+ strings = backtrace_symbols (array, size);
+
+ printf ("Obtained %zd stack frames.\n", size);
+
+ for (i = 0; i < size; i++)
+ printf ("%s\n", strings[i]);
+
+ free (strings);
+ }
+
namespace node {
using namespace v8;
@@ -93,6 +117,7 @@ static Persistent<String> should_keep_alive_sym;
, Integer::New(length) \
}; \
Local<Value> ret = cb->Call(parser->handle_, 3, argv); \
+ assert(parser->buffer_); \
return ret.IsEmpty() ? -1 : 0; \
}
@@ -141,6 +166,10 @@ class Parser : public ObjectWrap {
parser_.data = this;
}
+ ~Parser() {
+ assert(buffer_ == NULL && "Destroying a parser while it's parsing");
+ }
+
DEFINE_HTTP_CB(on_message_begin)
DEFINE_HTTP_CB(on_message_complete)
@@ -215,6 +244,7 @@ class Parser : public ObjectWrap {
Parser *parser = ObjectWrap::Unwrap<Parser>(args.This());
if (parser->buffer_) {
+ print_trace();
return ThrowException(Exception::TypeError(
String::New("Already parsing a buffer")));
}
@@ -243,9 +273,13 @@ class Parser : public ObjectWrap {
// Assign 'buffer_' while we parse. The callbacks will access that varible.
parser->buffer_ = buffer;
+ buffer_ref(parser->buffer_);
+
size_t nparsed =
http_parser_execute(&(parser->parser_), buffer_p(buffer, off), len);
+ buffer_unref(parser->buffer_);
+
// Unassign the 'buffer_' variable
assert(parser->buffer_);
parser->buffer_ = NULL;
@@ -266,6 +300,17 @@ class Parser : public ObjectWrap {
return scope.Close(nparsed_obj);
}
+ static Handle<Value> ExecuteEOF(const Arguments& args) {
+ HandleScope scope;
+
+ Parser *parser = ObjectWrap::Unwrap<Parser>(args.This());
+
+ assert(!parser->buffer_);
+ http_parser_execute(&parser->parser_, NULL, 0);
+
+ return Undefined();
+ }
+
private:
@@ -282,6 +327,7 @@ void InitHttpParser(Handle<Object> target) {
//t->SetClassName(String::NewSymbol("HTTPParser"));
NODE_SET_PROTOTYPE_METHOD(t, "execute", Parser::Execute);
+ NODE_SET_PROTOTYPE_METHOD(t, "executeEOF", Parser::ExecuteEOF);
target->Set(String::NewSymbol("HTTPParser"), t->GetFunction());
View
@@ -59,16 +59,17 @@ void IOWatcher::Callback(EV_P_ ev_io *w, int revents) {
//
-// var io = new process.IOWatcher(function (readable, writable) {
-//
-// });
+// var io = new process.IOWatcher();
+// io.callback = function (readable, writable) { ... };
// io.set(fd, true, false);
// io.start();
//
Handle<Value> IOWatcher::New(const Arguments& args) {
HandleScope scope;
IOWatcher *s = new IOWatcher();
+
+
s->Wrap(args.This());
return args.This();
@@ -78,19 +79,18 @@ Handle<Value> IOWatcher::New(const Arguments& args) {
Handle<Value> IOWatcher::Start(const Arguments& args) {
HandleScope scope;
- IOWatcher *io = ObjectWrap::Unwrap<IOWatcher>(args.Holder());
+ IOWatcher *io = Unwrap(args.Holder());
ev_io_start(EV_DEFAULT_UC_ &io->watcher_);
-
- io->Ref();
+ assert(ev_is_active(&io->watcher_));
return Undefined();
}
Handle<Value> IOWatcher::Set(const Arguments& args) {
HandleScope scope;
- IOWatcher *io = ObjectWrap::Unwrap<IOWatcher>(args.Holder());
+ IOWatcher *io = Unwrap(args.Holder());
if (!args[0]->IsInt32()) {
return ThrowException(Exception::TypeError(
@@ -122,16 +122,16 @@ Handle<Value> IOWatcher::Set(const Arguments& args) {
Handle<Value> IOWatcher::Stop(const Arguments& args) {
HandleScope scope;
- IOWatcher *io = ObjectWrap::Unwrap<IOWatcher>(args.Holder());
+ IOWatcher *io = Unwrap(args.This());
io->Stop();
return Undefined();
}
void IOWatcher::Stop () {
- if (watcher_.active) {
+ if (ev_is_active(&watcher_)) {
ev_io_stop(EV_DEFAULT_UC_ &watcher_);
- Unref();
+ assert(!ev_is_active(&watcher_));
}
}
View
@@ -7,33 +7,68 @@
namespace node {
-class IOWatcher : ObjectWrap {
+class IOWatcher {
public:
static void Initialize(v8::Handle<v8::Object> target);
protected:
static v8::Persistent<v8::FunctionTemplate> constructor_template;
- IOWatcher() : ObjectWrap() {
+ IOWatcher() {
ev_init(&watcher_, IOWatcher::Callback);
watcher_.data = this;
}
~IOWatcher() {
- ev_io_stop(EV_DEFAULT_UC_ &watcher_);
+ assert(!ev_is_active(&watcher_));
}
static v8::Handle<v8::Value> New(const v8::Arguments& args);
static v8::Handle<v8::Value> Start(const v8::Arguments& args);
static v8::Handle<v8::Value> Stop(const v8::Arguments& args);
static v8::Handle<v8::Value> Set(const v8::Arguments& args);
+ inline void Wrap(v8::Handle<v8::Object> handle) {
+ assert(handle_.IsEmpty());
+ assert(handle->InternalFieldCount() > 0);
+ handle_ = v8::Persistent<v8::Object>::New(handle);
+ handle_->SetInternalField(0, v8::External::New(this));
+ MakeWeak();
+ }
+
+ inline void MakeWeak(void) {
+ handle_.MakeWeak(this, WeakCallback);
+ }
+
+
+
private:
static void Callback(EV_P_ ev_io *watcher, int revents);
+ static void WeakCallback (v8::Persistent<v8::Value> value, void *data)
+ {
+ IOWatcher *io = static_cast<IOWatcher*>(data);
+ assert(value == io->handle_);
+ if (!ev_is_active(&io->watcher_)) {
+ value.Dispose();
+ delete io;
+ } else {
+ //value.ClearWeak();
+ io->MakeWeak();
+ }
+ }
+
+ static IOWatcher* Unwrap(v8::Handle<v8::Object> handle) {
+ assert(!handle.IsEmpty());
+ assert(handle->InternalFieldCount() > 0);
+ return static_cast<IOWatcher*>(v8::Handle<v8::External>::Cast(
+ handle->GetInternalField(0))->Value());
+ }
+
void Stop();
ev_io watcher_;
+ v8::Persistent<v8::Object> handle_;
};
} // namespace node
View
@@ -53,6 +53,7 @@ class ObjectWrap {
assert(!handle_.IsEmpty());
assert(handle_.IsWeak());
refs_++;
+ MakeWeak();
}
/* Unref() marks an object as detached from the event loop. This is its

0 comments on commit 741e3fa

Please sign in to comment.