Skip to content

Commit

Permalink
Fixed race starting Fiber.
Browse files Browse the repository at this point in the history
  • Loading branch information
brixen committed Jun 20, 2016
1 parent 5f22ac8 commit b429232
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 67 deletions.
149 changes: 85 additions & 64 deletions machine/builtin/fiber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,49 +78,53 @@ namespace rubinius {
std::lock_guard<std::mutex> guard(state->vm()->thread()->fiber_mutex());

if(!vm()->suspended_p()) {
Exception::raise_fiber_error(state, "attempt to restart non-suspended fiber");
}

while(vm()->suspended_p()) {
std::lock_guard<std::mutex> guard(vm()->wait_mutex());
vm()->wait_condition().notify_one();
std::ostringstream msg;
msg << "attempt to restart non-suspended (" << vm()->transition_flag() << ") fiber";
Exception::raise_fiber_error(state, msg.str().c_str());
}

state->vm()->set_suspending();

restart_context(state->vm());
wakeup();

{
UnmanagedPhase unmanaged(state);

while(vm()->suspended_p()) {
std::lock_guard<std::mutex> guard(vm()->wait_mutex());
vm()->wait_condition().notify_one();
}
}
}

while(!vm()->running_p()) {
; // spin wait
}

{
std::lock_guard<std::mutex> guard(state->vm()->thread()->fiber_mutex());
state->vm()->thread()->current_fiber(state, this);
}
}

void Fiber::suspend(STATE) {
void Fiber::suspend_and_continue(STATE) {
{
std::unique_lock<std::mutex> lk(vm()->wait_mutex());
vm()->set_suspended();

vm()->set_suspended();
{
UnmanagedPhase unmanaged(state);
vm()->wait_condition().wait(lk);
}

while(!wakeup_p()) {
vm()->wait_condition().wait(lk);
}
}
clear_wakeup();
vm()->set_resuming();
}

while(invoke_context()->running_p()) {
; // spin wait
}

{
std::lock_guard<std::mutex> guard(state->vm()->thread()->fiber_mutex());

vm()->set_running();

while(invoke_context()->suspending_p()) {
while(!restart_context()->suspended_p()) {
; // spin wait
}

Expand Down Expand Up @@ -159,7 +163,7 @@ namespace rubinius {

NativeMethod::init_thread(state);

vm->fiber()->suspend(state);
vm->fiber()->suspend_and_continue(state);

Object* value = vm->fiber()->block()->send(state, G(sym_call),
as<Array>(vm->thread()->fiber_value()), vm->fiber()->block());
Expand All @@ -179,8 +183,12 @@ namespace rubinius {
vm->fiber()->invoke_context()->fiber()->restart(state);
}

vm->fiber()->status(eDead);
vm->fiber()->vm()->set_finished();
{
std::lock_guard<std::mutex> guard(vm->wait_mutex());

vm->fiber()->status(eDead);
vm->set_suspended();
}

vm->unmanaged_phase();

Expand Down Expand Up @@ -295,60 +303,69 @@ namespace rubinius {
}

Object* Fiber::resume(STATE, Arguments& args) {
if(state->vm()->thread() != thread()) {
Exception::raise_fiber_error(state, "attempt to resume fiber across threads");
}
{
std::lock_guard<std::mutex> guard(state->vm()->thread()->fiber_mutex());

unpack_arguments(state, args);

if(status() == eCreated) {
start(state, args);
} else if(status() == eTransfer) {
Exception::raise_fiber_error(state, "attempt to resume transfered fiber");
} else if(status() == eRunning) {
Exception::raise_fiber_error(state, "attempt to resume running fiber");
} else if(status() == eDead) {
Exception::raise_fiber_error(state, "attempt to resume dead fiber");
} else if(root_p()) {
Exception::raise_fiber_error(state, "attempt to resume root fiber");
}
if(state->vm()->thread() != thread()) {
Exception::raise_fiber_error(state, "attempt to resume fiber across threads");
} else if(status() == eTransfer) {
Exception::raise_fiber_error(state, "attempt to resume transfered fiber");
} else if(status() == eRunning) {
Exception::raise_fiber_error(state, "attempt to resume running fiber");
} else if(status() == eDead) {
Exception::raise_fiber_error(state, "attempt to resume dead fiber");
} else if(root_p()) {
Exception::raise_fiber_error(state, "attempt to resume root fiber");
}

unpack_arguments(state, args);
invoke_context(state->vm());

status(eRunning);
invoke_context(state->vm());
if(status() == eCreated) {
start(state, args);
}

status(eRunning);
}

// Being cooperative...
restart(state);

// Through the worm hole...
state->vm()->fiber()->suspend(state);
state->vm()->fiber()->suspend_and_continue(state);

// We're back...
return return_value(state);
}

Object* Fiber::transfer(STATE, Arguments& args) {
if(state->vm()->thread() != thread()) {
Exception::raise_fiber_error(state, "attempt to transfer fiber across threads");
}
{
std::lock_guard<std::mutex> guard(state->vm()->thread()->fiber_mutex());

unpack_arguments(state, args);
if(state->vm()->thread() != thread()) {
Exception::raise_fiber_error(state, "attempt to transfer fiber across threads");
} else if(status() == eDead) {
Exception::raise_fiber_error(state, "attempt to transfer to dead fiber");
} else if(state->vm()->fiber() == this) {
// This should arguably be a FiberError
return args.as_array(state);
}

if(status() == eCreated) {
start(state, args);
} else if(state->vm()->fiber() == this) {
return args.as_array(state);
} else if(status() == eDead) {
Exception::raise_fiber_error(state, "attempt to transfer to dead fiber");
}
unpack_arguments(state, args);
invoke_context(state->vm());

if(status() == eCreated) {
start(state, args);
}

status(eTransfer);
invoke_context(state->vm());
status(eTransfer);
}

// Being cooperative...
restart(state);

// Through the worm hole...
state->vm()->fiber()->suspend(state);
state->vm()->fiber()->suspend_and_continue(state);

// We're back...
return return_value(state);
Expand All @@ -358,20 +375,24 @@ namespace rubinius {
Fiber* fiber = state->vm()->fiber();
OnStack<1> os(state, fiber);

if(fiber->root_p()) {
Exception::raise_fiber_error(state, "can't yield from root fiber");
} else if(fiber->status() == eTransfer) {
Exception::raise_fiber_error(state, "can't yield from transferred fiber");
}
{
std::lock_guard<std::mutex> guard(state->vm()->thread()->fiber_mutex());

fiber->unpack_arguments(state, args);
fiber->status(eYielding);
if(fiber->root_p()) {
Exception::raise_fiber_error(state, "can't yield from root fiber");
} else if(fiber->status() == eTransfer) {
Exception::raise_fiber_error(state, "can't yield from transferred fiber");
}

fiber->unpack_arguments(state, args);
fiber->status(eYielding);
}

// Being cooperative...
fiber->invoke_context()->fiber()->restart(state);

// Through the worm hole...
fiber->suspend(state);
fiber->suspend_and_continue(state);

// We're back...
return fiber->return_value(state);
Expand Down
22 changes: 19 additions & 3 deletions machine/builtin/fiber.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ namespace rubinius {

attr_field(vm, VM*);
attr_field(invoke_context, VM*);
attr_field(restart_context, VM*);

std::atomic<Status> status_;
std::atomic<bool> wakeup_;

public:
static void bootstrap(STATE);
Expand All @@ -63,10 +65,12 @@ namespace rubinius {
obj->fiber_id(Fixnum::from(++Fiber::fiber_ids_));
obj->source(nil<String>());
obj->thread(state->vm()->thread());
obj->status(eCreated);
obj->start_time(get_current_time());
obj->vm(NULL);
obj->invoke_context(state->vm());
obj->restart_context(state->vm());
obj->status(eCreated);
obj->clear_wakeup();
}

static void finalize(STATE, Fiber* fib);
Expand All @@ -89,6 +93,8 @@ namespace rubinius {
// Rubinius.primitive :fiber_s_main
static Fiber* s_main(STATE);

bool root_p();

Status status() {
return status_;
}
Expand All @@ -97,14 +103,24 @@ namespace rubinius {
status_ = status;
}

bool root_p();
void wakeup() {
wakeup_ = true;
}

void clear_wakeup() {
wakeup_ = false;
}

bool wakeup_p() {
return wakeup_;
}

void unpack_arguments(STATE, Arguments& args);
Object* return_value(STATE);

void start(STATE, Arguments& args);
void restart(STATE);
void suspend(STATE);
void suspend_and_continue(STATE);

// Rubinius.primitive :fiber_status
String* status(STATE);
Expand Down
8 changes: 8 additions & 0 deletions machine/vm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ namespace rubinius {
return wait_condition_;
}

FiberTransition transition_flag() {
return transition_flag_;
}

bool suspending_p() const {
return transition_flag_ == eSuspending;
}
Expand All @@ -207,6 +211,10 @@ namespace rubinius {
return transition_flag_ == eRunning;
}

bool finished_p() const {
return transition_flag_ == eFinished;
}

void set_suspending() {
transition_flag_ = eSuspending;
}
Expand Down

0 comments on commit b429232

Please sign in to comment.