Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Coroutine recycling

Instantiating new coroutines is quite expensive. Between allocating a
new stack, getcontext(), and makecontext() we spend a good amount of
time redoing work. The stack allocation cost is mitigated a little bit
with the pool allocator, however it can't beat full object recylcing.

This implements a pool of finished coroutines which are then reset and
then returned again via `create_fiber`.
  • Loading branch information...
commit fbef75c58fdd2701e2fd79dbd2a334cdb95dd904 1 parent 46480cc
@laverdet authored
Showing with 101 additions and 66 deletions.
  1. +47 −28 coroutine.cc
  2. +19 −7 coroutine.h
  3. +35 −31 node-fibers.cc
View
75 coroutine.cc
@@ -9,14 +9,18 @@
#include <stack>
#include <vector>
+#include <boost/ptr_container/ptr_vector.hpp>
+
// TODO: It's clear I'm missing something with respects to ResourceConstraints.set_stack_limit.
// No matter what I give it, it seems the stack size is always the same. And then if the actual
// amount of memory allocated for the stack is too small it seg faults. It seems 265k is as low as
// I can go without fixing the underlying bug.
#define STACK_SIZE (1024 * 265)
+#define MAX_POOL_SIZE 120
#include <iostream>
using namespace std;
+using namespace boost;
/**
* These are all the pthread functions we hook. Some are more complicated to hook than others.
@@ -70,10 +74,11 @@ void thread_trampoline(void** data);
*/
class Thread {
private:
+ static vector<pthread_dtor_t> dtors;
size_t fiber_ids;
stack<size_t> freed_fiber_ids;
vector<vector<void*> > fls_data;
- static vector<pthread_dtor_t> dtors;
+ ptr_vector<Coroutine> fiber_pool;
public:
pthread_t handle;
@@ -113,14 +118,25 @@ class Thread {
}
void fiber_did_finish(Coroutine& fiber) {
- freed_fiber_ids.push(fiber.id);
- assert(delete_me == NULL);
- delete_me = &fiber;
- coroutine_fls_dtor(fiber);
+ if (fiber_pool.size() < MAX_POOL_SIZE) {
+ fiber_pool.push_back(&fiber);
+ } else {
+ freed_fiber_ids.push(fiber.id);
+ coroutine_fls_dtor(fiber);
+ // Can't delete right now because we're currently on this stack!
+ assert(delete_me == NULL);
+ delete_me = &fiber;
+ }
}
- Coroutine& new_fiber(Coroutine::entry_t& entry, void* arg) {
+ Coroutine& create_fiber(Coroutine::entry_t& entry, void* arg) {
size_t id;
+ if (!fiber_pool.empty()) {
+ Coroutine& fiber = *fiber_pool.pop_back().release();
+ fiber.reset(entry, arg);
+ return fiber;
+ }
+
if (!freed_fiber_ids.empty()) {
id = freed_fiber_ids.top();
freed_fiber_ids.pop();
@@ -178,9 +194,10 @@ class Loader {
/**
* Coroutine class definition
*/
-void Coroutine::trampoline(Coroutine& that, entry_t& entry, void* arg) {
- entry(arg);
- that.thread.fiber_did_finish(that);
+void Coroutine::trampoline(Coroutine &that) {
+ while (true) {
+ that.entry(that.arg);
+ }
}
Coroutine& Coroutine::current() {
@@ -197,32 +214,42 @@ Coroutine::Coroutine(Thread& t, size_t id) : thread(t), id(id) {}
Coroutine::Coroutine(Thread& t, size_t id, entry_t& entry, void* arg) :
thread(t),
id(id),
- stack(STACK_SIZE) {
+ stack(STACK_SIZE),
+ entry(entry),
+ arg(arg) {
getcontext(&context);
context.uc_stack.ss_size = STACK_SIZE;
context.uc_stack.ss_sp = &stack[0];
- makecontext(&context, (void(*)(void))trampoline, 3, this, entry, arg);
+ makecontext(&context, (void(*)(void))trampoline, 1, this);
+}
+
+Coroutine& Coroutine::create_fiber(entry_t* entry, void* arg) {
+ Thread& thread = *static_cast<Thread*>(o_pthread_getspecific(thread_key));
+ return thread.create_fiber(*entry, arg);
+}
+
+void Coroutine::reset(entry_t* entry, void* arg) {
+ this->entry = entry;
+ this->arg = arg;
}
void Coroutine::run() {
Coroutine& current = *thread.current_fiber;
- if (current == this) {
- throw runtime_error("fiber is already running");
- }
- context.uc_link = &current.context;
+ assert(&current != this);
thread.current_fiber = this;
swapcontext(&current.context, &context);
thread.current_fiber = &current;
if (thread.delete_me) {
- // TODO: Why does deleting and then reseting cause seg faults? Bad news..
+ assert(thread.delete_me == this);
+ Thread& thread = this->thread;
+ delete thread.delete_me;
thread.delete_me = NULL;
- Coroutine* cr = thread.delete_me;
- delete cr;
}
}
-Coroutine& Coroutine::new_fiber(entry_t* entry, void* arg) {
- return thread.new_fiber(*entry, arg);
+void Coroutine::finish(Coroutine& next) {
+ this->thread.fiber_did_finish(*this);
+ swapcontext(&context, &next.context);
}
void* Coroutine::bottom() const {
@@ -233,14 +260,6 @@ size_t Coroutine::size() const {
return sizeof(Coroutine) + STACK_SIZE;
}
-bool Coroutine::operator==(const Coroutine& that) const {
- return this == &that;
-}
-
-bool Coroutine::operator==(const Coroutine* that) const {
- return this == that;
-}
-
/**
* TLS hooks
*/
View
26 coroutine.h
@@ -6,10 +6,12 @@
#include <ucontext.h>
#include <ext/pool_allocator.h>
#include <vector>
+#include <boost/ptr_container/ptr_vector.hpp>
class Coroutine {
public:
friend class Thread;
+ friend void boost::checked_delete<const Coroutine>(const Coroutine*);
typedef void(entry_t)(void*);
private:
@@ -20,8 +22,10 @@ class Coroutine {
size_t id;
ucontext_t context;
std::vector<char_noinit, __gnu_cxx::__pool_alloc<char_noinit> > stack;
+ entry_t* entry;
+ void* arg;
- static void trampoline(Coroutine& that, entry_t& entry, void* arg);
+ static void trampoline(Coroutine& that);
~Coroutine() {}
/**
@@ -37,6 +41,11 @@ class Coroutine {
*/
Coroutine(Thread& t, size_t id, entry_t& entry, void* arg);
+ /**
+ * Resets the context of this coroutine from the start. Used to recyle old coroutines.
+ */
+ void reset(entry_t* entry, void* arg);
+
public:
/**
* Returns the currently-running fiber.
@@ -44,6 +53,11 @@ class Coroutine {
static Coroutine& current();
/**
+ * Create a new fiber.
+ */
+ static Coroutine& create_fiber(entry_t* entry, void* arg = NULL);
+
+ /**
* Is Coroutine-local storage via pthreads enabled? The Coroutine library should work fine
* without this, but programs that are not aware of coroutines may panic if they make
* assumptions about the stack. In order to enable this you must LD_PRELOAD (or equivalent)
@@ -58,10 +72,11 @@ class Coroutine {
void run();
/**
- * Create a new fiber. This just calls back into Thread because Coroutine is the only public
- * class in this library.
+ * Finish this coroutine.. This will halt execution of this coroutine and resume execution
+ * of `next`. If you do not call this function, and instead just return from `entry` the
+ * application will exit. This function may or may not actually return.
*/
- Coroutine& new_fiber(entry_t* entry, void* arg = NULL);
+ void finish(Coroutine& next);
/**
* Returns address of the lowest usable byte in this Coroutine's stack.
@@ -72,7 +87,4 @@ class Coroutine {
* Returns the size this Coroutine takes up in the heap.
*/
size_t size() const;
-
- bool operator==(const Coroutine& that) const;
- bool operator==(const Coroutine* that) const;
};
View
66 node-fibers.cc
@@ -81,6 +81,7 @@ class Fiber {
that.yielded_exception = true;
{
Unlocker locker;
+ that.entry_fiber = &Coroutine::current();
that.this_fiber->run();
assert(!that.started);
}
@@ -171,7 +172,7 @@ class Fiber {
void** data = new void*[2];
data[0] = (void*)&args;
data[1] = &that;
- that.this_fiber = &that.entry_fiber->new_fiber((void (*)(void*))RunFiber, data);
+ that.this_fiber = &Coroutine::create_fiber((void (*)(void*))RunFiber, data);
V8::AdjustAmountOfExternalAllocatedMemory(that.this_fiber->size());
} else {
// If the fiber is currently running put the first parameter to `run()` on `yielded`, then
@@ -214,46 +215,49 @@ class Fiber {
Fiber& that = *(Fiber*)data[1];
delete[] data;
- Locker locker;
- HandleScope scope;
+ {
+ Locker locker;
+ HandleScope scope;
- // Set stack guard for this "thread"
- ResourceConstraints constraints;
- constraints.set_stack_limit((uint32_t*)that.this_fiber->bottom());
- SetResourceConstraints(&constraints);
+ // Set stack guard for this "thread"
+ ResourceConstraints constraints;
+ constraints.set_stack_limit((uint32_t*)that.this_fiber->bottom());
+ SetResourceConstraints(&constraints);
- TryCatch try_catch;
- that.ClearWeak();
- that.v8_context->Enter();
+ TryCatch try_catch;
+ that.ClearWeak();
+ that.v8_context->Enter();
- if (args->Length()) {
- Local<Value> argv[1] = { Local<Value>::New((*args)[0]) };
- that.yielded = Persistent<Value>::New(that.cb->Call(that.v8_context->Global(), 1, argv));
- } else {
- that.yielded = Persistent<Value>::New(that.cb->Call(that.v8_context->Global(), 0, NULL));
- }
+ if (args->Length()) {
+ Local<Value> argv[1] = { Local<Value>::New((*args)[0]) };
+ that.yielded = Persistent<Value>::New(that.cb->Call(that.v8_context->Global(), 1, argv));
+ } else {
+ that.yielded = Persistent<Value>::New(that.cb->Call(that.v8_context->Global(), 0, NULL));
+ }
- if (try_catch.HasCaught()) {
- that.yielded.Dispose();
- that.yielded = Persistent<Value>::New(try_catch.Exception());
- that.yielded_exception = true;
- } else {
- that.yielded_exception = false;
- }
+ if (try_catch.HasCaught()) {
+ that.yielded.Dispose();
+ that.yielded = Persistent<Value>::New(try_catch.Exception());
+ that.yielded_exception = true;
+ } else {
+ that.yielded_exception = false;
+ }
- // Do not invoke the garbage collector if there's no context on the stack. It will seg fault
- // otherwise.
- V8::AdjustAmountOfExternalAllocatedMemory(-that.this_fiber->size());
+ // Do not invoke the garbage collector if there's no context on the stack. It will seg fault
+ // otherwise.
+ V8::AdjustAmountOfExternalAllocatedMemory(-that.this_fiber->size());
- // Don't make weak until after notifying the garbage collector. Otherwise it may try and
- // free this very fiber!
- that.MakeWeak();
+ // Don't make weak until after notifying the garbage collector. Otherwise it may try and
+ // free this very fiber!
+ that.MakeWeak();
- // Now safe to leave the context, this stack is done with JS.
- that.v8_context->Exit();
+ // Now safe to leave the context, this stack is done with JS.
+ that.v8_context->Exit();
+ }
// The function returned (instead of yielding).
that.started = false;
+ that.this_fiber->finish(*that.entry_fiber);
}
/**
Please sign in to comment.
Something went wrong with that request. Please try again.