Skip to content

Commit

Permalink
Simplify Fiber.
Browse files Browse the repository at this point in the history
  • Loading branch information
brixen committed Jun 18, 2016
1 parent fdc3539 commit 36b5013
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 142 deletions.
205 changes: 76 additions & 129 deletions machine/builtin/fiber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "on_stack.hpp"
#include "memory.hpp"
#include "metrics.hpp"
#include "thread_phase.hpp"

#include "builtin/array.hpp"
#include "builtin/class.hpp"
Expand Down Expand Up @@ -38,80 +39,69 @@ namespace rubinius {
return vm()->fiber()->vm() == vm()->thread()->vm();
}

Object* Fiber::unpack_arguments(STATE, Arguments& args) {
void Fiber::unpack_arguments(STATE, Arguments& args) {
switch(args.total()) {
case 0:
return cNil;
value(state, cNil);
break;
case 1:
return args.get_argument(0);
value(state, args.get_argument(0));
break;
default:
return args.as_array(state);
value(state, args.as_array(state));
break;
}

// Some versions of GCC can't understand dataflow.
return cNil;
}

Object* Fiber::start_fiber(STATE, Fiber* f, Arguments& args) {
Fiber* fiber = f;
OnStack<1> os(state, fiber);

fiber->arguments(state, args.as_array(state));
fiber->function(Fiber::continue_fiber);
void Fiber::start(STATE, Arguments& args) {
arguments(state, args.as_array(state));

pthread_attr_t attrs;
pthread_attr_init(&attrs);
pthread_attr_setstacksize(&attrs, fiber->stack_size()->to_native());
pthread_attr_setstacksize(&attrs, stack_size()->to_native());
pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);

pthread_create(&fiber->vm()->os_thread(), &attrs,
Fiber::run, (void*)fiber->vm());
pthread_create(&vm()->os_thread(), &attrs,
Fiber::run, (void*)vm());

pthread_attr_destroy(&attrs);

// Wait for Fiber thread to start up and pause.
while(!fiber->vm()->wait_flag());

return continue_fiber(state, fiber, args);
while(!vm()->wait_flag());
}

Object* Fiber::continue_fiber(STATE, Fiber* f, Arguments& args) {
Fiber* fiber = f;
OnStack<1> os(state, fiber);

{
std::lock_guard<std::mutex> guard(fiber->vm()->wait_mutex());

fiber->value(state, unpack_arguments(state, args));
void Fiber::restart(STATE) {
std::lock_guard<std::mutex> guard(state->vm()->thread()->fiber_mutex());

state->vm()->unmanaged_phase();
state->vm()->set_wait_flag(true);
state->vm()->thread()->vm()->set_current_fiber(vm());

state->vm()->thread()->vm()->set_current_fiber(fiber->vm());
}

while(fiber->vm()->wait_flag()) {
std::lock_guard<std::mutex> guard(fiber->vm()->wait_mutex());
fiber->vm()->wait_condition().notify_one();
while(vm()->wait_flag()) {
std::lock_guard<std::mutex> guard(vm()->wait_mutex());
vm()->wait_condition().notify_one();
}
}

void Fiber::suspend(STATE) {
{
std::unique_lock<std::mutex> lk(state->vm()->wait_mutex());
std::unique_lock<std::mutex> lk(vm()->wait_mutex());
vm()->set_wait_flag(true);

// Through the worm hole...
while(!fiber->vm()->wait_flag()) {
state->vm()->wait_condition().wait(lk);
{
UnmanagedPhase unmanaged(state);
vm()->wait_condition().wait(lk);
}

// We're back...
state->vm()->set_wait_flag(false);
vm()->set_wait_flag(false);
}

state->vm()->managed_phase();
std::lock_guard<std::mutex> guard(state->vm()->thread()->fiber_mutex());
}

if(state->vm()->thread_state()->current_exception()->nil_p()) {
return fiber->value();
Object* Fiber::return_value(STATE) {
if(vm()->thread_state()->current_exception()->nil_p()) {
return value();
} else {
invoke_context()->thread_state()->set_state(vm()->thread_state());
return NULL;
}
}
Expand All @@ -138,50 +128,20 @@ namespace rubinius {

NativeMethod::init_thread(state);

{
std::unique_lock<std::mutex> lk(vm->wait_mutex());

vm->set_wait_flag(true);

// Through the worm hole...
while(!vm->fiber()->resume_context()->wait_flag()) {
vm->wait_condition().wait(lk);
}

// We're back...
if(vm->fiber()->status() != eTransfer) {
vm->fiber()->status(eRunning);
}
vm->set_wait_flag(false);
}

vm->managed_phase();
vm->fiber()->suspend(state);

Object* value = vm->fiber()->block()->send(state, G(sym_call),
vm->fiber()->arguments(), vm->fiber()->block());
vm->set_call_frame(NULL);

{
std::lock_guard<std::mutex> guard(vm->fiber()->resume_context()->wait_mutex());

if(value) {
vm->fiber()->value(state, value);
} else {
vm->fiber()->resume_context()->thread_state()->set_state(vm->thread_state());
}

vm->fiber()->status(eDead);
value = vm->fiber()->return_value(state);

vm->unmanaged_phase();
vm->fiber()->vm()->set_wait_flag(true);
vm->fiber()->invoke_context()->fiber()->restart(state);

state->vm()->thread()->vm()->set_current_fiber(vm->fiber()->resume_context());
}
vm->fiber()->status(eDead);
vm->fiber()->vm()->set_wait_flag(true);

while(vm->fiber()->resume_context()->wait_flag()) {
std::lock_guard<std::mutex> guard(vm->fiber()->resume_context()->wait_mutex());
vm->fiber()->resume_context()->wait_condition().notify_one();
}
vm->unmanaged_phase();

state->shared().report_profile(state);

Expand Down Expand Up @@ -218,7 +178,7 @@ namespace rubinius {
fiber->thread_name(state, String::create(state, vm->name().c_str()));
fiber->fiber_id(Fixnum::from(0));
fiber->status(eRunning);
fiber->function(Fiber::continue_fiber);
fiber->invoke_context(vm);

return fiber;
}
Expand Down Expand Up @@ -291,7 +251,9 @@ namespace rubinius {
}

Object* Fiber::resume(STATE, Arguments& args) {
if(status() == eTransfer) {
if(status() == eCreated) {
start(state, args);
} else if(status() == eTransfer) {
Exception::raise_fiber_error(state, "attempt to resume transfered fiber");
} else if(status() == eRunning) {
Exception::raise_fiber_error(state, "attempt to resume running fiber");
Expand All @@ -302,24 +264,40 @@ namespace rubinius {
}

status(eRunning);
resume_context(state->vm());
unpack_arguments(state, args);
invoke_context(state->vm());

// Being cooperative...
restart(state);

return _function_(state, this, args);
// Through the worm hole...
state->vm()->fiber()->suspend(state);

// We're back...
return return_value(state);
}

Object* Fiber::transfer(STATE, Arguments& args) {
if(state->vm()->fiber() == this) {
if(status() == eCreated) {
start(state, args);
} else if(state->vm()->fiber() == this) {
return args.as_array(state);
}

if(status() == eDead) {
} else if(status() == eDead) {
Exception::raise_fiber_error(state, "attempt to transfer to dead fiber");
}

status(eTransfer);
resume_context(state->vm()->thread()->vm());
unpack_arguments(state, args);
invoke_context(state->vm()->thread()->vm());

// Being cooperative...
restart(state);

return _function_(state, this, args);
// Through the worm hole...
state->vm()->fiber()->suspend(state);

// We're back...
return return_value(state);
}

Object* Fiber::s_yield(STATE, Arguments& args) {
Expand All @@ -332,43 +310,17 @@ namespace rubinius {
Exception::raise_fiber_error(state, "can't yield from transferred fiber");
}

{
std::lock_guard<std::mutex> guard(fiber->resume_context()->wait_mutex());
fiber->unpack_arguments(state, args);
fiber->status(eYielding);

fiber->value(state, unpack_arguments(state, args));
fiber->status(eYielding);
// Being cooperative...
fiber->invoke_context()->fiber()->restart(state);

state->vm()->unmanaged_phase();
fiber->vm()->set_wait_flag(true);
// Through the worm hole...
fiber->suspend(state);

state->vm()->thread()->vm()->set_current_fiber(fiber->resume_context());
}

while(fiber->resume_context()->wait_flag()) {
std::lock_guard<std::mutex> guard(fiber->resume_context()->wait_mutex());
fiber->resume_context()->wait_condition().notify_one();
}

{
std::unique_lock<std::mutex> lk(state->vm()->wait_mutex());

// Through the worm hole...
while(!fiber->resume_context()->wait_flag()) {
state->vm()->wait_condition().wait(lk);
}

// We're back...
fiber->status(eRunning);
state->vm()->set_wait_flag(false);
}

state->vm()->managed_phase();

if(state->vm()->thread_state()->current_exception()->nil_p()) {
return fiber->value();
} else {
return NULL;
}
// We're back...
return fiber->invoke_context()->fiber()->return_value(state);
}

void Fiber::finalize(STATE, Fiber* fib) {
Expand All @@ -377,9 +329,4 @@ namespace rubinius {
fib->thread_name()->c_str(state), fib->fiber_id()->to_native());
}
}

void Fiber::Info::mark(Object* obj, memory::ObjectMark& mark) {
auto_mark(obj, mark);
// Fiber* fib = force_as<Fiber>(obj);
}
}
22 changes: 9 additions & 13 deletions machine/builtin/fiber.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,8 @@ namespace rubinius {
private:
attr_field(start_time, uint64_t);

typedef Object* (*FiberFunction)(STATE, Fiber* fiber, Arguments& args);

attr_field(function, FiberFunction);

attr_field(vm, VM*);
attr_field(resume_context, VM*);
attr_field(invoke_context, VM*);

std::atomic<Status> status_;

Expand All @@ -70,19 +66,13 @@ namespace rubinius {
obj->source(nil<String>());
obj->status(eCreated);
obj->start_time(get_current_time());
obj->function(Fiber::start_fiber);
obj->vm(NULL);
obj->resume_context(NULL);
obj->invoke_context(state->vm());
}

static void finalize(STATE, Fiber* fib);
static Object* unpack_arguments(STATE, Arguments& args);

static void* run(void*);

static Object* start_fiber(STATE, Fiber* fiber, Arguments& args);
static Object* continue_fiber(STATE, Fiber* fiber, Arguments& args);

static Fiber* create(STATE, VM* vm);

// Rubinius.primitive :fiber_new
Expand All @@ -104,6 +94,13 @@ namespace rubinius {

bool root_p();

void unpack_arguments(STATE, Arguments& args);
Object* return_value(STATE);

void start(STATE, Arguments& args);
void restart(STATE);
void suspend(STATE);

// Rubinius.primitive :fiber_status
String* status(STATE);

Expand All @@ -118,7 +115,6 @@ namespace rubinius {
class Info : public TypeInfo {
public:
BASIC_TYPEINFO(TypeInfo)
virtual void mark(Object* t, memory::ObjectMark& mark);
};
};
}
Expand Down
1 change: 1 addition & 0 deletions machine/builtin/thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ namespace rubinius {

void Thread::finalize_instance(STATE) {
if(vm() && vm()->zombie_p()) {
fiber_mutex_.std::mutex::~mutex();
VM::discard(state, vm());
vm(NULL);
}
Expand Down
Loading

0 comments on commit 36b5013

Please sign in to comment.