Skip to content
Browse files

Greatly improved memory management

There were a lot of problems with garbage collection before,
particularly with infinite loop generators. The problem is that the
handle to the fiber was always held in a deadlock, since we pass it as
the `this` context to the Fiber entry. So now `yield()` is a global,
to avoid this problem.

Additionally we are now giving v8 proper hints for externally-allocated
memory, so it can handle garbage collection better. Also the coroutine
library can free finished coroutines correctly now.

Incidentally, node-fibers doesn't use any node-specific API's anymore;
if you change the node.h include to be v8.h, it will still compile and
run just fine.
  • Loading branch information...
1 parent b194567 commit 6a77a260d62e07a2247c4c5e5da7bc687da1a4a6 @laverdet committed Jan 18, 2011
Showing with 206 additions and 84 deletions.
  1. +20 −9 coroutine.cc
  2. +18 −21 coroutine.h
  3. +168 −54 node-fibers.cc
View
29 coroutine.cc
@@ -69,18 +69,18 @@ class Thread {
size_t fiber_ids;
stack<size_t> freed_fiber_ids;
vector<vector<const void*> > fls_data;
-
static vector<pthread_dtor_t> dtors;
public:
pthread_t handle;
Coroutine* current_fiber;
+ Coroutine* delete_me;
static void free(void* that) {
delete static_cast<Thread*>(that);
}
- Thread() : fiber_ids(1), fls_data(1), handle(NULL) {
+ Thread() : fiber_ids(1), fls_data(1), handle(NULL), delete_me(NULL) {
current_fiber = new Coroutine(*this, 0);
}
@@ -89,7 +89,9 @@ class Thread {
}
void fiber_did_finish(Coroutine& fiber) {
- // delete ???
+ freed_fiber_ids.push(fiber.id);
+ assert(delete_me == NULL);
+ delete_me = &fiber;
}
Coroutine& new_fiber(Coroutine::entry_t& entry, void* arg) {
@@ -106,17 +108,17 @@ class Thread {
}
void* get_specific(pthread_key_t key) {
- if (fls_data[current_fiber->getid()].size() <= key) {
+ if (fls_data[current_fiber->id].size() <= key) {
return NULL;
}
- return (void*)fls_data[current_fiber->getid()][key];
+ return (void*)fls_data[current_fiber->id][key];
}
void set_specific(pthread_key_t key, const void* data) {
- if (fls_data[current_fiber->getid()].size() <= key) {
- fls_data[current_fiber->getid()].resize(key + 1);
+ if (fls_data[current_fiber->id].size() <= key) {
+ fls_data[current_fiber->id].resize(key + 1);
}
- fls_data[current_fiber->getid()][key] = data;
+ fls_data[current_fiber->id][key] = data;
}
void key_create(pthread_key_t* key, pthread_dtor_t dtor) {
@@ -150,7 +152,6 @@ const bool Coroutine::is_local_storage_enabled() {
return did_hook_pthreads;
}
-
Coroutine::Coroutine(Thread& t, size_t id) : thread(t), id(id) {}
Coroutine::Coroutine(Thread& t, size_t id, entry_t& entry, void* arg) :
@@ -172,6 +173,12 @@ void Coroutine::run() {
thread.current_fiber = this;
swapcontext(&current.context, &context);
thread.current_fiber = &current;
+ if (thread.delete_me) {
+ // TODO: Why does deleting and then reseting cause seg faults? Bad news..
+ thread.delete_me = NULL;
+ Coroutine* cr = thread.delete_me;
+ delete cr;
+ }
}
Coroutine& Coroutine::new_fiber(entry_t* entry, void* arg) {
@@ -182,6 +189,10 @@ void* Coroutine::bottom() const {
return stack.get() - STACK_SIZE;
}
+size_t Coroutine::size() const {
+ return sizeof(Coroutine) + STACK_SIZE;
+}
+
bool Coroutine::operator==(const Coroutine& that) const {
return this == &that;
}
View
39 coroutine.h
@@ -7,6 +7,7 @@
class Coroutine {
public:
+ friend class Thread;
typedef void(entry_t)(void*);
private:
@@ -16,20 +17,7 @@ class Coroutine {
std::auto_ptr<char> stack;
static void trampoline(Coroutine& that, entry_t& entry, void* arg);
-
- public:
- /**
- * Returns the currently-running fiber.
- */
- static Coroutine& current();
-
- /**
- * Is Coroutine-local storage via pthreads enabled? The Coroutine library should work fine
- * without this, but programs that are not aware of coroutines may panic if they make
- * assumptions about the stack. In order to enable this you must LD_PRELOAD (or equivalent)
- * this library.
- */
- static const bool is_local_storage_enabled();
+ ~Coroutine() {}
/**
* Constructor for currently running "fiber". This is really just original thread, but we
@@ -44,11 +32,19 @@ class Coroutine {
*/
Coroutine(Thread& t, size_t id, entry_t& entry, void* arg);
+ public:
/**
- * Don't delete Coroutines, they will delete themselves.
- * TODO: Actually they don't!
+ * Returns the currently-running fiber.
+ */
+ static Coroutine& current();
+
+ /**
+ * Is Coroutine-local storage via pthreads enabled? The Coroutine library should work fine
+ * without this, but programs that are not aware of coroutines may panic if they make
+ * assumptions about the stack. In order to enable this you must LD_PRELOAD (or equivalent)
+ * this library.
*/
- ~Coroutine();
+ static const bool is_local_storage_enabled();
/**
* Start or resume execution in this fiber. Note there is no explicit yield() function,
@@ -67,10 +63,11 @@ class Coroutine {
*/
void* bottom() const;
+ /**
+ * Returns the size this Coroutine takes up in the heap.
+ */
+ size_t size() const;
+
bool operator==(const Coroutine& that) const;
bool operator==(const Coroutine* that) const;
-
- size_t getid() const {
- return id;
- }
};
View
222 node-fibers.cc
@@ -1,4 +1,5 @@
#include "coroutine.h"
+#include <assert.h>
#include <node/node.h>
#include <iostream>
@@ -7,40 +8,109 @@
using namespace std;
using namespace v8;
-using namespace node;
-class Fiber: ObjectWrap {
+class Fiber {
private:
static Locker locker; // Node does not use locks or threads, so we need a global lock
static Persistent<FunctionTemplate> tmpl;
static Fiber* current;
+ Persistent<Object> handle;
Persistent<Function> cb;
Persistent<Context> v8_context;
Persistent<Value> yielded;
+ bool yielded_exception;
Coroutine* entry_fiber;
Coroutine* this_fiber;
bool started;
bool zombie;
public:
- Fiber(Persistent<Function> cb, Persistent<Context> v8_context) :
- ObjectWrap(),
+ Fiber(Persistent<Object> handle, Persistent<Function> cb, Persistent<Context> v8_context) :
+ handle(handle),
cb(cb),
v8_context(v8_context),
started(false),
- zombie(false) {}
- virtual ~Fiber() {
- if (this->started) {
- this->zombie = true;
+ zombie(false) {
+ MakeWeak();
+ handle->SetPointerInInternalField(0, this);
+ }
- // Swap context back to `Fiber::Yield()` to finish execution.
- // TODO: What to do about thrown JS exceptions here?
- this->this_fiber->run();
- }
+ virtual ~Fiber() {
+ assert(!this->started);
+ handle.Dispose();
cb.Dispose();
v8_context.Dispose();
- yielded.Dispose();
+ }
+
+ /**
+ * Call MakeWeak if it's ok for v8 to garbage collect this Fiber.
+ * i.e. After fiber completes, while yielded, or before started
+ */
+ void MakeWeak() {
+ handle.MakeWeak(this, WeakCallback);
+ }
+
+ /**
+ * And call ClearWeak if it's not ok for v8 to garbage collect this Fiber.
+ * i.e. While running.
+ */
+ void ClearWeak() {
+ handle.ClearWeak();
+ }
+
+ /**
+ * Called when there are no more references to this object in Javascript. If this happens and
+ * the fiber is currently suspended we'll unwind the fiber's stack by throwing exceptions in
+ * order to clear all references.
+ */
+ static void WeakCallback(Persistent<Value> value, void *data) {
+ Fiber& that = *static_cast<Fiber*>(data);
+ assert(that.handle == value);
+ assert(value.IsNearDeath());
+ assert(current != &that);
+ if (that.started) {
+ Locker locker;
+ HandleScope scope;
+ that.zombie = true;
+
+ // Swap context back to `Fiber::Yield()` which will throw an exception to unwind the stack.
+ // Futher calls to yield from this fiber will also throw.
+ that.yielded = Persistent<Value>::New(
+ Exception::Error(String::New("This Fiber is a zombie")));
+ that.yielded_exception = true;
+ {
+ Unlocker locker;
+ that.this_fiber->run();
+ assert(!that.started);
+ }
+
+ that.yielded.Dispose();
+ if (that.yielded_exception) {
+ // TODO: Check for Zombie exception?
+ }
+
+ // It's possible that someone else grabbed a reference to the currently running fiber while
+ // we were unwinding it. In this case they can reuse the fiber, but the stack in progress
+ // is already gone.
+ if (!value.IsNearDeath()) {
+ that.zombie = false;
+ that.MakeWeak();
+ return;
+ }
+ }
+
+ delete &that;
+ }
+
+ /**
+ * Unwrap a Fiber instance from a `this` pointer.
+ * TODO: Check that `handle` is actually a Fiber or Bad Things may happen.
+ */
+ static Fiber& Unwrap(Handle<Object> handle) {
+ assert(!handle.IsEmpty());
+ assert(handle->InternalFieldCount() == 1);
+ return *static_cast<Fiber*>(handle->GetPointerFromInternalField(0));
}
/**
@@ -53,13 +123,13 @@ class Fiber: ObjectWrap {
tmpl->SetClassName(String::NewSymbol("Fiber"));
Handle<ObjectTemplate> proto = tmpl->PrototypeTemplate();
- proto->Set(String::New("run"), FunctionTemplate::New(Run));
- proto->Set(String::New("yield"), FunctionTemplate::New(Yield));
- proto->SetAccessor(String::New("started"), GetStarted);
+ proto->Set(String::NewSymbol("run"), FunctionTemplate::New(Run));
+ proto->SetAccessor(String::NewSymbol("started"), GetStarted);
Handle<Function> fn = tmpl->GetFunction();
- fn->SetAccessor(String::New("current"), GetCurrent);
+ fn->SetAccessor(String::NewSymbol("current"), GetCurrent);
target->Set(String::NewSymbol("Fiber"), fn);
+ target->Set(String::NewSymbol("yield"), FunctionTemplate::New(Yield)->GetFunction());
}
/**
@@ -79,10 +149,10 @@ class Fiber: ObjectWrap {
}
Handle<Function> fn = Local<Function>::Cast(args[0]);
- Fiber* that = new Fiber(
+ new Fiber(
+ Persistent<Object>::New(args.This()),
Persistent<Function>::New(fn),
Persistent<Context>::New(Context::GetCurrent()));
- that->Wrap(args.This());
return args.This();
}
@@ -92,70 +162,98 @@ class Fiber: ObjectWrap {
*/
static Handle<Value> Run(const Arguments& args) {
HandleScope scope;
- Fiber* that = ObjectWrap::Unwrap<Fiber>(args.This());
- that->entry_fiber = &Coroutine::current();
+ Fiber& that = Unwrap(args.This());
+ that.entry_fiber = &Coroutine::current();
- if (!that->started) {
+ if (!that.started) {
// Create a new context with entry point `Fiber::RunFiber()`.
- that->started = true;
+ that.started = true;
void** data = new void*[2];
data[0] = (void*)&args;
- data[1] = that;
- that->this_fiber = &that->entry_fiber->new_fiber((void (*)(void*))RunFiber, data);
+ data[1] = &that;
+ that.this_fiber = &that.entry_fiber->new_fiber((void (*)(void*))RunFiber, data);
+ V8::AdjustAmountOfExternalAllocatedMemory(that.this_fiber->size());
} else {
// If the fiber is currently running put the first parameter to `run()` on `yielded`, then
// the pending call to `yield()` will return that value. `yielded` in this case is just a
// misnomer, we're just reusing the same handle.
+ that.yielded_exception = false;
if (args.Length()) {
- that->yielded = Persistent<Value>::New(args[0]);
+ that.yielded = Persistent<Value>::New(args[0]);
} else {
- that->yielded = Persistent<Value>::New(Undefined());
+ that.yielded = Persistent<Value>::New(Undefined());
}
}
// This will jump into either `RunFiber()` or `Yield()`, depending on if the fiber was
// already running.
Fiber* last_fiber = current;
- current = that;
+ current = &that;
{
Unlocker unlocker;
- that->this_fiber->run();
+ that.this_fiber->run();
}
// At this point the fiber either returned or called `yield()`.
current = last_fiber;
// Return the yielded value.
- Handle<Value> val = Local<Value>::New(that->yielded);
- that->yielded.Dispose();
- return val;
+ Handle<Value> val = Local<Value>::New(that.yielded);
+ that.yielded.Dispose();
+ if (that.yielded_exception) {
+ return ThrowException(val);
+ } else {
+ return val;
+ }
}
/**
* This is the entry point for a new fiber, from `run()`.
*/
static void RunFiber(void** data) {
const Arguments* args = (const Arguments*)data[0];
- Fiber* that = (Fiber*)data[1];
+ Fiber& that = *(Fiber*)data[1];
delete[] data;
Locker locker;
HandleScope scope;
+ // Set stack guard for this "thread"
ResourceConstraints constraints;
- constraints.set_stack_limit((uint32_t*)that->this_fiber->bottom());
+ constraints.set_stack_limit((uint32_t*)that.this_fiber->bottom());
SetResourceConstraints(&constraints);
- that->v8_context->Enter();
+ TryCatch try_catch;
+ that.ClearWeak();
+ that.v8_context->Enter();
+
if (args->Length()) {
Local<Value> argv[1] = { Local<Value>::New((*args)[0]) };
- that->yielded = Persistent<Value>::New(that->cb->Call(args->This(), 1, argv));
+ that.yielded = Persistent<Value>::New(that.cb->Call(that.v8_context->Global(), 1, argv));
} else {
- that->yielded = Persistent<Value>::New(that->cb->Call(args->This(), 0, NULL));
+ that.yielded = Persistent<Value>::New(that.cb->Call(that.v8_context->Global(), 0, NULL));
}
- that->v8_context->Exit();
+
+ if (try_catch.HasCaught()) {
+ that.yielded.Dispose();
+ that.yielded = Persistent<Value>::New(try_catch.Exception());
+ that.yielded_exception = true;
+ } else {
+ that.yielded_exception = false;
+ }
+
+ // Do not invoke the garbage collector if there's no context on the stack. It will seg fault
+ // otherwise.
+ V8::AdjustAmountOfExternalAllocatedMemory(-that.this_fiber->size());
+
+ // Don't make weak until after notifying the garbage collector. Otherwise it may try and
+ // free this very fiber!
+ that.MakeWeak();
+
+ // Now safe to leave the context, this stack is done with JS.
+ that.v8_context->Exit();
// The function returned (instead of yielding).
- that->started = false;
+ that.started = false;
}
/**
@@ -164,41 +262,54 @@ class Fiber: ObjectWrap {
*/
static Handle<Value> Yield(const Arguments& args) {
HandleScope scope;
- Fiber* that = ObjectWrap::Unwrap<Fiber>(args.This());
- if (that->zombie) {
+ Fiber& that = *current;
+ if (that.zombie) {
THROW(Exception::Error, "This Fiber is a zombie");
}
+ that.yielded_exception = false;
if (args.Length()) {
- that->yielded = Persistent<Value>::New(args[0]);
+ that.yielded = Persistent<Value>::New(args[0]);
} else {
- that->yielded = Persistent<Value>::New(Undefined());
+ that.yielded = Persistent<Value>::New(Undefined());
}
- // Return control back to `Fiber::run()`
+ // While not running this can be garbage collected if no one has a handle.
+ that.MakeWeak();
+
+ // Return control back to `Fiber::run()`. While control is outside this function we mark it as
+ // ok to garbage collect. If no one ever has a handle to resume the function it's harmful to
+ // keep the handle around.
{
Unlocker unlocker;
- that->entry_fiber->run();
+ that.entry_fiber->run();
}
- current = that;
+ // Now `run()` has been called again.
+
+ // Don't garbage collect anymore!
+ that.ClearWeak();
// `yielded` will contain the first parameter to `run()`
- Handle<Value> val = Local<Value>::New(that->yielded);
- that->yielded.Dispose();
- return val;
+ Handle<Value> val = Local<Value>::New(that.yielded);
+ that.yielded.Dispose();
+ if (that.yielded_exception) {
+ return ThrowException(val);
+ } else {
+ return val;
+ }
}
/**
* Getters for `started`, and `current`.
*/
static Handle<Value> GetStarted(Local<String> property, const AccessorInfo& info) {
- Fiber* that = ObjectWrap::Unwrap<Fiber>(info.This());
- return Boolean::New(that->started);
+ Fiber& that = Unwrap(info.This());
+ return Boolean::New(that.started);
}
static Handle<Value> GetCurrent(Local<String> property, const AccessorInfo& info) {
if (current) {
- return current->handle_;
+ return current->handle;
} else {
return Undefined();
}
@@ -228,9 +339,12 @@ You will not be able to use Fiber without this support enabled.");
}
extern "C" void init(Handle<Object> target) {
+ HandleScope scope;
+ Handle<Object> global = Context::GetCurrent()->Global();
if (Coroutine::is_local_storage_enabled()) {
- Fiber::Init(target);
+ Fiber::Init(global);
} else {
- target->Set(String::New("Fiber"), FunctionTemplate::New(FiberNotSupported)->GetFunction());
+ global->Set(
+ String::NewSymbol("Fiber"), FunctionTemplate::New(FiberNotSupported)->GetFunction());
}
}

0 comments on commit 6a77a26

Please sign in to comment.
Something went wrong with that request. Please try again.