From 906b73a759fa95b17c8ed5524ec1a1f01b80e2b4 Mon Sep 17 00:00:00 2001 From: Marcel Laverdet Date: Sun, 16 Jan 2011 14:09:21 +0900 Subject: [PATCH] Initial commit --- LICENSE | 24 ++++ Makefile | 15 +++ README | 37 ++++++ coroutine.cc | 336 +++++++++++++++++++++++++++++++++++++++++++++++++ coroutine.h | 74 +++++++++++ fiber-shim | 2 + node-fibers.cc | 236 ++++++++++++++++++++++++++++++++++ 7 files changed, 724 insertions(+) create mode 100644 LICENSE create mode 100644 Makefile create mode 100644 README create mode 100644 coroutine.cc create mode 100644 coroutine.h create mode 100755 fiber-shim create mode 100644 node-fibers.cc diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..91e952f --- /dev/null +++ b/LICENSE @@ -0,0 +1,24 @@ +Copyright 2011 Marcel Laverdet +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to +deal in the Software without restriction, including without limitation the +rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +sell copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +IN THE SOFTWARE. + + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + +I am providing code in this repository to you under an open source license. +Because this is my personal repository, the license you receive to my code is +from me and not from my employer (Facebook). diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..2c9549e --- /dev/null +++ b/Makefile @@ -0,0 +1,15 @@ +NODE_PREFIX := $(shell node --vars | egrep ^NODE_PREFIX: | cut -c14-) +# CPPFLAGS = -ggdb -O0 -Wall -I$(NODE_PREFIX)/include -I$(NODE_PREFIX)/include/node -I/Users/marcel/code/v8/include +CPPFLAGS = -m64 -ggdb -O0 -Wall -I$(NODE_PREFIX)/include -I$(NODE_PREFIX)/include/node + +all: node-fibers.node + +coroutine.dylib: coroutine.cc + $(CXX) $(CPPFLAGS) -dynamiclib -o $@ $^ + +node-fibers.node: node-fibers.cc coroutine.dylib + $(CXX) $(CPPFLAGS) -bundle -undefined dynamic_lookup -o $@ $^ + +clean: + -$(RM) node-fibers.node coroutine.dylib + -$(RM) -r *.dSYM diff --git a/README b/README new file mode 100644 index 0000000..0767a22 --- /dev/null +++ b/README @@ -0,0 +1,37 @@ +Fiber support for Node and v8. + +To build this software: + + make + +Only OS X is supported right now. + +If you intend to use fibers, be sure to start node with the included +`fiber-shim` script. This is a quick example of what you can do with +node-fibers: + + $ cat generator.js + var util = require('util'); + var Fiber = require('./node-fibers').Fiber; + + var inc = Fiber(function(start) { + var total = start; + while (true) { + total += this.yield(total); + } + }); + + for (var ii = inc.run(1); ii < 10; ii = inc.run(1)) { + util.print(ii + '\n'); + } + + $ ./fiber-shim node generator.js + 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8 + 9 diff --git a/coroutine.cc b/coroutine.cc new file mode 100644 index 0000000..a908940 --- /dev/null +++ b/coroutine.cc @@ -0,0 +1,336 @@ +#include "coroutine.h" +#include +#define __GNU_SOURCE +#include +#include +#include + +#include +#include +#include + +#define STACK_SIZE (1024 * 1024 * 2) + +#include +using namespace std; + +/** + * These are all the pthread functions we hook. Some are more complicated to hook than others. + */ +typedef void(*pthread_dtor_t)(void*); + +static int (*o_pthread_create)(pthread_t*, const pthread_attr_t*, void*(*)(void*), void*); +static int (*o_pthread_key_delete)(pthread_key_t); +static int (*o_pthread_equal)(pthread_t, pthread_t); +static void* (*o_pthread_getspecific)(pthread_key_t); +static int (*o_pthread_join)(pthread_key_t, void**); +static pthread_t (*o_pthread_self)(void); +static int (*o_pthread_setspecific)(pthread_key_t, const void*); + +typedef int(pthread_key_create_t)(pthread_key_t*, pthread_dtor_t); +static pthread_key_create_t* o_pthread_key_create = NULL; +static pthread_key_create_t& dyn_pthread_key_create(); + +/** + * Very early on when this library is loaded there are callers to pthread_key_create, + * pthread_getspecific, and pthread_setspecific. We normally could pass these calls through down to + * the original implementation but that doesn't work because dlsym() tries to get a lock which ends + * up calling pthread_getspecific and pthread_setspecific. So have to implement our own versions of + * these functions assuming one thread only and then as soon as we can, put all that saved data into + * real TLS. + * + * This code assumes the underlying pthread library uses increasing TLS keys, while remaining + * under a constant number of them. These are both not safe assumptions since pthread_t is + * technically opaque. + */ +static const size_t MAX_EARLY_KEYS = 500; +static const void* pthread_early_vals[500] = { NULL }; +static pthread_key_t last_non_fiber_key = NULL; + +/** + * General bookkeeping for this library. + */ +static bool initialized = false; +static bool did_hook_pthreads = false; +static pthread_key_t thread_key; + +/** + * Boing + */ +void thread_trampoline(void** data); + +/** + * Thread is only used internally for this library. It keeps track of all the fibers this thread + * is currently running, and handles all the fiber-local storage logic. We store a handle to a + * Thread object in TLS, and then it emulates TLS on top of fibers. + */ +class Thread { + private: + size_t fiber_ids; + stack freed_fiber_ids; + vector > fls_data; + + static vector dtors; + + public: + pthread_t handle; + Coroutine* current_fiber; + + static void free(void* that) { + delete static_cast(that); + } + + Thread() : fiber_ids(1), fls_data(1), handle(NULL) { + current_fiber = new Coroutine(*this, 0); + } + + ~Thread() { + assert(freed_fiber_ids.size() == fiber_ids); + } + + void fiber_did_finish(Coroutine& fiber) { + // delete ??? + } + + Coroutine& new_fiber(Coroutine::entry_t& entry, void* arg) { + size_t id; + if (!freed_fiber_ids.empty()) { + id = freed_fiber_ids.top(); + freed_fiber_ids.pop(); + // TODO: clear existing TLS + } else { + fls_data.resize(fls_data.size() + 1); + id = fiber_ids++; + } + return *new Coroutine(*this, id, *entry, arg); + } + + void* get_specific(pthread_key_t key) { + if (fls_data[current_fiber->getid()].size() <= key) { + return NULL; + } + return (void*)fls_data[current_fiber->getid()][key]; + } + + void set_specific(pthread_key_t key, const void* data) { + if (fls_data[current_fiber->getid()].size() <= key) { + fls_data[current_fiber->getid()].resize(key + 1); + } + fls_data[current_fiber->getid()][key] = data; + } + + void key_create(pthread_key_t* key, pthread_dtor_t dtor) { + dtors.push_back(dtor); + *key = dtors.size() - 1; // TODO: This is NOT thread-safe! =O + } +}; +vector Thread::dtors; + +/** + * Just a simple library initialization hook. + */ +class Loader { + public: Loader(); +}; + +/** + * Coroutine class definition + */ +void Coroutine::trampoline(Coroutine& that, entry_t& entry, void* arg) { + entry(arg); + that.thread.fiber_did_finish(that); +} + +Coroutine& Coroutine::current() { + Thread& thread = *static_cast(o_pthread_getspecific(thread_key)); + return *thread.current_fiber; +} + +const bool Coroutine::is_local_storage_enabled() { + return did_hook_pthreads; +} + + +Coroutine::Coroutine(Thread& t, size_t id) : thread(t), id(id) {} + +Coroutine::Coroutine(Thread& t, size_t id, entry_t& entry, void* arg) : + thread(t), + id(id), + stack(new char[STACK_SIZE]) { + getcontext(&context); + context.uc_stack.ss_size = STACK_SIZE; + context.uc_stack.ss_sp = stack.get(); + makecontext(&context, (void(*)(void))trampoline, 3, this, entry, arg); +} + +void Coroutine::run() { + Coroutine& current = *thread.current_fiber; + if (current == this) { + throw runtime_error("fiber is already running"); + } + context.uc_link = ¤t.context; + thread.current_fiber = this; + swapcontext(¤t.context, &context); + thread.current_fiber = ¤t; +} + +Coroutine& Coroutine::new_fiber(entry_t* entry, void* arg) { + return thread.new_fiber(*entry, arg); +} + +void* Coroutine::bottom() const { + return stack.get() - STACK_SIZE; +} + +bool Coroutine::operator==(const Coroutine& that) const { + return this == &that; +} + +bool Coroutine::operator==(const Coroutine* that) const { + return this == that; +} + +/** + * TLS hooks + */ + +// See comment above MAX_EARLY_KEYS as to why these functions are difficult to hook. +// Note well that in the `!initialized` case there is no heap. Calls to malloc, etc will crash your +// shit. +void* pthread_getspecific(pthread_key_t key) { + if (initialized) { + if (last_non_fiber_key >= key) { + // If this key was reserved before the library loaded then go to the original TLS. This should + // generally be very low-level stuff. + return o_pthread_getspecific(key); + } + Thread& thread = *static_cast(o_pthread_getspecific(thread_key)); + return thread.get_specific(key - last_non_fiber_key - 1); + } else { + // We can't invoke the original function because dlsym tries to call pthread_getspecific + return const_cast(pthread_early_vals[key]); + } +} + +int pthread_setspecific(pthread_key_t key, const void* data) { + if (initialized) { + if (last_non_fiber_key >= key) { + return o_pthread_setspecific(key, data); + } + Thread& thread = *static_cast(o_pthread_getspecific(thread_key)); + thread.set_specific(key - last_non_fiber_key - 1, data); + return 0; + } else { + pthread_early_vals[key] = data; + return 0; + } +} + +static pthread_key_create_t& dyn_pthread_key_create() { + if (o_pthread_key_create == NULL) { + o_pthread_key_create = (pthread_key_create_t*)dlsym(RTLD_NEXT, "pthread_key_create"); + } + return *o_pthread_key_create; +} + +int pthread_key_create(pthread_key_t* key, pthread_dtor_t dtor) { + if (initialized) { + Thread& thread = *static_cast(o_pthread_getspecific(thread_key)); + thread.key_create(key, dtor); + *key += last_non_fiber_key + 1; + return 0; + } else { + int ret = dyn_pthread_key_create()(key, dtor); + assert(*key < MAX_EARLY_KEYS); + if (*key > last_non_fiber_key) { + last_non_fiber_key = *key; + } + return ret; + } +} + +/** + * Other pthread-related hooks. + */ + +// Entry point for pthread_create. We need this to record the Thread in real TLS. +void thread_trampoline(void** args_vector) { + void* (*entry)(void*) = (void*(*)(void*))args_vector[0]; + void* arg = args_vector[1]; + Thread& thread = *static_cast(args_vector[1]); + delete[] args_vector; + thread.handle = o_pthread_self(); + o_pthread_setspecific(thread_key, &thread); + entry(arg); +} + +int pthread_create(pthread_t* handle, const pthread_attr_t* attr, void* (*entry)(void*), void* arg) { + assert(initialized); + void** args_vector = new void*[3]; + args_vector[0] = (void*)entry; + args_vector[1] = arg; + Thread* thread = new Thread; + args_vector[2] = thread; + *handle = (pthread_t)thread; + return o_pthread_create( + &thread->handle, attr, (void* (*)(void*))thread_trampoline, (void*)args_vector); +} + +int pthread_key_delete(pthread_key_t key) { + assert(initialized); + if (key <= last_non_fiber_key) { + return o_pthread_key_delete(key); + } else { + // TODO: Call all dtors + return 0; + } +} + +int pthread_equal(pthread_t left, pthread_t right) { + return left == right; +} + +int pthread_join(pthread_t thread, void** retval) { + assert(initialized); + // pthread_join should return EDEADLK if you try to join with yourself.. + return pthread_join(reinterpret_cast(thread).handle, retval); +} + +pthread_t pthread_self() { + assert(initialized); + did_hook_pthreads = true; + Thread& thread = *static_cast(o_pthread_getspecific(thread_key)); + return (pthread_t)thread.current_fiber; +} + +/** + * Initialization of this library. By the time we make it here the heap should be good to go. Also + * it's possible the TLS functions have been called, so we need to clean up that mess. + */ +Loader::Loader() { + // Grab hooks to the real version of all hooked functions. + o_pthread_create = (int(*)(pthread_t*, const pthread_attr_t*, void* (*)(void*), void*))dlsym(RTLD_NEXT, "pthread_create"); + o_pthread_key_delete = (int(*)(pthread_key_t))dlsym(RTLD_NEXT, "pthread_key_delete"); + o_pthread_equal = (int(*)(pthread_t, pthread_t))dlsym(RTLD_NEXT, "pthread_equal"); + o_pthread_getspecific = (void*(*)(pthread_key_t))dlsym(RTLD_NEXT, "pthread_getspecific"); + o_pthread_join = (int(*)(pthread_key_t, void**))dlsym(RTLD_NEXT, "pthread_join"); + o_pthread_self = (pthread_t(*)(void))dlsym(RTLD_NEXT, "pthread_self"); + o_pthread_setspecific = (int(*)(pthread_key_t, const void*))dlsym(RTLD_NEXT, "pthread_setspecific"); + dyn_pthread_key_create(); + + // Create a real TLS key to store the handle to Thread. + o_pthread_key_create(&thread_key, Thread::free); + if (thread_key > last_non_fiber_key) { + last_non_fiber_key = thread_key; + } + Thread* thread = new Thread; + thread->handle = o_pthread_self(); + o_pthread_setspecific(thread_key, thread); + + // Put all the data from the fake pthread_setspecific into real TLS + initialized = true; + for (size_t ii = 0; ii < last_non_fiber_key; ++ii) { + pthread_setspecific((pthread_key_t)ii, pthread_early_vals[ii]); + } +}; +Loader loader; diff --git a/coroutine.h b/coroutine.h new file mode 100644 index 0000000..d024abf --- /dev/null +++ b/coroutine.h @@ -0,0 +1,74 @@ +#define _XOPEN_SOURCE +#include +#include +#include + +class Coroutine { + public: + typedef void(entry_t)(void*); + + private: + class Thread& thread; + size_t id; + ucontext_t context; + std::auto_ptr stack; + + static void trampoline(Coroutine& that, entry_t& entry, void* arg); + + public: + /** + * Returns the currently-running fiber. + */ + static Coroutine& current(); + + /** + * Is Coroutine-local storage via pthreads enabled? The Coroutine library should work fine + * without this, but programs that are not aware of coroutines may panic if they make + * assumptions about the stack. In order to enable this you must LD_PRELOAD (or equivalent) + * this library. + */ + static const bool is_local_storage_enabled(); + + /** + * Constructor for currently running "fiber". This is really just original thread, but we + * need a way to get back into the main thread after yielding to a fiber. Basically this + * shouldn't be called from anywhere. + */ + Coroutine(Thread& t, size_t id); + + /** + * This constructor will actually create a new fiber context. Execution does not begin + * until you call run() for the first time. + */ + Coroutine(Thread& t, size_t id, entry_t& entry, void* arg); + + /** + * Don't delete Coroutines, they will delete themselves. + * TODO: Actually they don't! + */ + ~Coroutine(); + + /** + * Start or resume execution in this fiber. Note there is no explicit yield() function, + * you must manually run another fiber. + */ + void run(); + + /** + * Create a new fiber. This just calls back into Thread because Coroutine is the only public + * class in this library. + */ + Coroutine& new_fiber(entry_t* entry, void* arg = NULL); + + /** + * Returns address of the lowest usable byte in this Coroutine's stack. + */ + void* bottom() const; + + bool operator==(const Coroutine& that) const; + bool operator==(const Coroutine* that) const; + + size_t getid() const { + return id; + } +}; diff --git a/fiber-shim b/fiber-shim new file mode 100755 index 0000000..ff6718c --- /dev/null +++ b/fiber-shim @@ -0,0 +1,2 @@ +#!/bin/sh +DYLD_INSERT_LIBRARIES=`pwd`/coroutine.dylib DYLD_FORCE_FLAT_NAMESPACE=1 $@ diff --git a/node-fibers.cc b/node-fibers.cc new file mode 100644 index 0000000..4dbf926 --- /dev/null +++ b/node-fibers.cc @@ -0,0 +1,236 @@ +#include "coroutine.h" +#include + +#include + +#define THROW(x, m) return ThrowException(x(String::New(m))) + +using namespace std; +using namespace v8; +using namespace node; + +class Fiber: ObjectWrap { + private: + static Locker locker; // Node does not use locks or threads, so we need a global lock + static Persistent tmpl; + static Fiber* current; + + Persistent cb; + Persistent v8_context; + Persistent yielded; + Coroutine* entry_fiber; + Coroutine* this_fiber; + bool started; + bool zombie; + + public: + Fiber(Persistent cb, Persistent v8_context) : + ObjectWrap(), + cb(cb), + v8_context(v8_context), + started(false), + zombie(false) {} + virtual ~Fiber() { + if (this->started) { + this->zombie = true; + + // Swap context back to `Fiber::Yield()` to finish execution. + // TODO: What to do about thrown JS exceptions here? + this->this_fiber->run(); + } + cb.Dispose(); + v8_context.Dispose(); + yielded.Dispose(); + } + + /** + * Initialize the Fiber library. + */ + static void Init(Handle target) { + HandleScope scope; + tmpl = Persistent::New(FunctionTemplate::New(New)); + tmpl->InstanceTemplate()->SetInternalFieldCount(1); + tmpl->SetClassName(String::NewSymbol("Fiber")); + + Handle proto = tmpl->PrototypeTemplate(); + proto->Set(String::New("run"), FunctionTemplate::New(Run)); + proto->Set(String::New("yield"), FunctionTemplate::New(Yield)); + proto->SetAccessor(String::New("started"), GetStarted); + + Handle fn = tmpl->GetFunction(); + fn->SetAccessor(String::New("current"), GetCurrent); + target->Set(String::NewSymbol("Fiber"), fn); + } + + /** + * Instantiate a new Fiber object. When a fiber is created it only grabs a handle to the + * callback; it doesn't create any new contexts until run() is called. + */ + static Handle New(const Arguments& args) { + if (!args.IsConstructCall()) { + return FromConstructorTemplate(tmpl, args); + } + + HandleScope scope; + if (args.Length() != 1) { + THROW(Exception::TypeError, "Fiber expects 1 argument"); + } else if (!args[0]->IsFunction()) { + THROW(Exception::TypeError, "Fiber expects a function"); + } + + Handle fn = Local::Cast(args[0]); + Fiber* that = new Fiber( + Persistent::New(fn), + Persistent::New(Context::GetCurrent())); + that->Wrap(args.This()); + return args.This(); + } + + /** + * Begin or resume the current fiber. If the fiber is not currently running a new context will + * be created and the callback will start. Otherwise we switch back into the exist context. + */ + static Handle Run(const Arguments& args) { + HandleScope scope; + Fiber* that = ObjectWrap::Unwrap(args.This()); + that->entry_fiber = &Coroutine::current(); + + if (!that->started) { + // Create a new context with entry point `Fiber::RunFiber()`. + that->started = true; + void** data = new void*[2]; + data[0] = (void*)&args; + data[1] = that; + that->this_fiber = &that->entry_fiber->new_fiber((void (*)(void*))RunFiber, data); + } else { + // If the fiber is currently running put the first parameter to `run()` on `yielded`, then + // the pending call to `yield()` will return that value. `yielded` in this case is just a + // misnomer, we're just reusing the same handle. + if (args.Length()) { + that->yielded = Persistent::New(args[0]); + } else { + that->yielded = Persistent::New(Undefined()); + } + } + + // This will jump into either `RunFiber()` or `Yield()`, depending on if the fiber was + // already running. + Fiber* last_fiber = current; + current = that; + { + Unlocker unlocker; + that->this_fiber->run(); + } + // At this point the fiber either returned or called `yield()`. + current = last_fiber; + + // Return the yielded value. + Handle val = Local::New(that->yielded); + that->yielded.Dispose(); + return val; + } + + /** + * This is the entry point for a new fiber, from `run()`. + */ + static void RunFiber(void** data) { + const Arguments* args = (const Arguments*)data[0]; + Fiber* that = (Fiber*)data[1]; + delete[] data; + + Locker locker; + HandleScope scope; + + ResourceConstraints constraints; + constraints.set_stack_limit((uint32_t*)that->this_fiber->bottom()); + SetResourceConstraints(&constraints); + + that->v8_context->Enter(); + if (args->Length()) { + Local argv[1] = { Local::New((*args)[0]) }; + that->yielded = Persistent::New(that->cb->Call(args->This(), 1, argv)); + } else { + that->yielded = Persistent::New(that->cb->Call(args->This(), 0, NULL)); + } + that->v8_context->Exit(); + + // The function returned (instead of yielding). + that->started = false; + } + + /** + * Yield control back to the function that called `run()`. The first parameter to this function + * is returned from `run()`. The context is saved, to be later resumed from `run()`. + */ + static Handle Yield(const Arguments& args) { + HandleScope scope; + Fiber* that = ObjectWrap::Unwrap(args.This()); + if (that->zombie) { + THROW(Exception::Error, "This Fiber is a zombie"); + } + + if (args.Length()) { + that->yielded = Persistent::New(args[0]); + } else { + that->yielded = Persistent::New(Undefined()); + } + + // Return control back to `Fiber::run()` + { + Unlocker unlocker; + that->entry_fiber->run(); + } + current = that; + + // `yielded` will contain the first parameter to `run()` + Handle val = Local::New(that->yielded); + that->yielded.Dispose(); + return val; + } + + /** + * Getters for `started`, and `current`. + */ + static Handle GetStarted(Local property, const AccessorInfo& info) { + Fiber* that = ObjectWrap::Unwrap(info.This()); + return Boolean::New(that->started); + } + + static Handle GetCurrent(Local property, const AccessorInfo& info) { + if (current) { + return current->handle_; + } else { + return Undefined(); + } + } +}; + +Persistent Fiber::tmpl; +Locker Fiber::locker; +Fiber* Fiber::current = NULL; + +/** + * If the library wasn't preloaded then we should gracefully fail instead of segfaulting if they + * attempt to use a Fiber. + */ +Handle FiberNotSupported(const Arguments&) { + THROW(Exception::Error, +"Fiber support was not enabled when you ran node. To enable support for fibers, please run \ +node with the included `fiber-shim` script. For example, instead of running:\n\ +\n\ + node script.js\n\ +\n\ +You should run:\n\ +\n\ + ./fiber-shim node script.js\n\ +\n\ +You will not be able to use Fiber without this support enabled."); +} + +extern "C" void init(Handle target) { + if (Coroutine::is_local_storage_enabled()) { + Fiber::Init(target); + } else { + target->Set(String::New("Fiber"), FunctionTemplate::New(FiberNotSupported)->GetFunction()); + } +}