Skip to content

Commit

Permalink
Kill Fibers when their Thread dies
Browse files Browse the repository at this point in the history
  • Loading branch information
evanphx committed Jan 10, 2012
1 parent 794333e commit c690b3c
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 18 deletions.
32 changes: 23 additions & 9 deletions vm/builtin/fiber.cpp
Expand Up @@ -47,9 +47,8 @@ namespace rubinius {
fib->top_ = 0; fib->top_ = 0;
fib->root_ = true; fib->root_ = true;
fib->status_ = Fiber::eRunning; fib->status_ = Fiber::eRunning;
fib->vm_ = state->vm();


fib->data_ = new FiberData(true); fib->data_ = new FiberData(state->vm(), true);


state->memory()->needs_finalization(fib, (FinalizerFunction)&Fiber::finalize); state->memory()->needs_finalization(fib, (FinalizerFunction)&Fiber::finalize);


Expand All @@ -72,9 +71,6 @@ namespace rubinius {


OnStack<1> os(&state, fib); OnStack<1> os(&state, fib);


// Affix this fiber to this thread now.
fib->vm_ = vm;

Array* result = nil<Array>(); Array* result = nil<Array>();
Object* obj = fib->starter()->send(&state, NULL, state.globals().sym_call.get(), fib->value(), Qnil, false); Object* obj = fib->starter()->send(&state, NULL, state.globals().sym_call.get(), fib->value(), Qnil, false);
// GC has run! Don't use stack vars! // GC has run! Don't use stack vars!
Expand Down Expand Up @@ -127,10 +123,9 @@ namespace rubinius {
fib->prev(state, nil<Fiber>()); fib->prev(state, nil<Fiber>());
fib->top_ = 0; fib->top_ = 0;
fib->root_ = false; fib->root_ = false;
fib->vm_ = 0;
fib->status_ = Fiber::eSleeping; fib->status_ = Fiber::eSleeping;


fib->data_ = new FiberData; fib->data_ = 0;


state->memory()->needs_finalization(fib, (FinalizerFunction)&Fiber::finalize); state->memory()->needs_finalization(fib, (FinalizerFunction)&Fiber::finalize);


Expand All @@ -142,14 +137,22 @@ namespace rubinius {


Object* Fiber::resume(STATE, Arguments& args, CallFrame* calling_environment) { Object* Fiber::resume(STATE, Arguments& args, CallFrame* calling_environment) {
#ifdef FIBER_ENABLED #ifdef FIBER_ENABLED
if(status_ == Fiber::eDead) { if(!data_) {
data_ = state->vm()->new_fiber_data();
}

if(status_ == Fiber::eDead || data_->dead_p()) {
Exception::fiber_error(state, "dead fiber called"); Exception::fiber_error(state, "dead fiber called");
} }


if(!prev_->nil_p()) { if(!prev_->nil_p()) {
Exception::fiber_error(state, "double resume"); Exception::fiber_error(state, "double resume");
} }


if(data_->thread() && data_->thread() != state->vm()) {
Exception::fiber_error(state, "cross thread fiber resuming is illegal");
}

Array* val = args.as_array(state); Array* val = args.as_array(state);
value(state, val); value(state, val);


Expand Down Expand Up @@ -191,10 +194,18 @@ namespace rubinius {


Object* Fiber::transfer(STATE, Arguments& args, CallFrame* calling_environment) { Object* Fiber::transfer(STATE, Arguments& args, CallFrame* calling_environment) {
#ifdef FIBER_ENABLED #ifdef FIBER_ENABLED
if(status_ == Fiber::eDead) { if(!data_) {
data_ = state->vm()->new_fiber_data();
}

if(status_ == Fiber::eDead || data_->dead_p()) {
Exception::fiber_error(state, "dead fiber called"); Exception::fiber_error(state, "dead fiber called");
} }


if(data_->thread() && data_->thread() != state->vm()) {
Exception::fiber_error(state, "cross thread fiber resuming is illegal");
}

Array* val = args.as_array(state); Array* val = args.as_array(state);
value(state, val); value(state, val);


Expand Down Expand Up @@ -283,6 +294,7 @@ namespace rubinius {


void Fiber::finalize(STATE, Fiber* fib) { void Fiber::finalize(STATE, Fiber* fib) {
#ifdef FIBER_ENABLED #ifdef FIBER_ENABLED
if(!fib->data_) return;
fib->data_->orphan(state); fib->data_->orphan(state);
delete fib->data_; delete fib->data_;
#endif #endif
Expand All @@ -295,6 +307,8 @@ namespace rubinius {


Fiber* fib = (Fiber*)obj; Fiber* fib = (Fiber*)obj;


if(!fib->data_) return;

AddressDisplacement dis(fib->data_->data_offset(), AddressDisplacement dis(fib->data_->data_offset(),
fib->data_->data_lower_bound(), fib->data_->data_lower_bound(),
fib->data_->data_upper_bound()); fib->data_->data_upper_bound());
Expand Down
5 changes: 0 additions & 5 deletions vm/builtin/fiber.hpp
Expand Up @@ -26,7 +26,6 @@ namespace rubinius {
Exception* exception_; // slot Exception* exception_; // slot
CallFrame* top_; CallFrame* top_;
Status status_; Status status_;
VM* vm_;


bool root_; bool root_;


Expand Down Expand Up @@ -60,10 +59,6 @@ namespace rubinius {
return data_->machine(); return data_->machine();
} }


VM* vm() {
return vm_;
}

FiberData* data() { FiberData* data() {
return data_; return data_;
} }
Expand Down
11 changes: 11 additions & 0 deletions vm/fiber_data.cpp
Expand Up @@ -93,6 +93,12 @@ static void fiber_makectx(fiber_context_t* ctx, void* func, void** stack_bottom,
} }


namespace rubinius { namespace rubinius {

FiberData::~FiberData() {
if(!thread_) return;
thread_->remove_fiber_data(this);
}

void FiberData::take_stack(STATE) { void FiberData::take_stack(STATE) {
assert(stack_); assert(stack_);


Expand Down Expand Up @@ -196,4 +202,9 @@ namespace rubinius {
fiber_switch(&dummy, machine()); fiber_switch(&dummy, machine());
} }
} }

void FiberData::die() {
status_ = eDead;
stack_ = 0;
}
} }
16 changes: 15 additions & 1 deletion vm/fiber_data.hpp
Expand Up @@ -51,6 +51,7 @@ namespace rubinius {
eDead eDead
} status_; } status_;


VM* thread_;
FiberStack* stack_; FiberStack* stack_;


void* heap_; void* heap_;
Expand All @@ -61,14 +62,25 @@ namespace rubinius {


public: public:


FiberData(bool root=false) FiberData(VM* thread, bool root=false)
: status_(root ? eOnStack : eInitial) : status_(root ? eOnStack : eInitial)
, thread_(thread)
, stack_(0) , stack_(0)
, heap_(0) , heap_(0)
, heap_size_(0) , heap_size_(0)
, heap_capacity_(0) , heap_capacity_(0)
{} {}


~FiberData();

bool dead_p() {
return status_ == eDead;
}

VM* thread() {
return thread_;
}

VariableRootBuffers& variable_root_buffers() { VariableRootBuffers& variable_root_buffers() {
return variable_root_buffers_; return variable_root_buffers_;
} }
Expand Down Expand Up @@ -127,6 +139,8 @@ namespace rubinius {


void switch_to(STATE, FiberData* from); void switch_to(STATE, FiberData* from);
void switch_and_orphan(STATE, FiberData* from); void switch_and_orphan(STATE, FiberData* from);

void die();
}; };


} }
Expand Down
26 changes: 25 additions & 1 deletion vm/fiber_stack.cpp
Expand Up @@ -43,12 +43,36 @@ namespace rubinius {
dec_ref(); dec_ref();
} }


FiberStacks::FiberStacks(SharedState& shared) FiberStacks::FiberStacks(VM* thread, SharedState& shared)
: max_stacks_(shared.config.fiber_stacks) : max_stacks_(shared.config.fiber_stacks)
, stack_size_(shared.config.fiber_stack_size) , stack_size_(shared.config.fiber_stack_size)
, thread_(thread)
, trampoline_(0) , trampoline_(0)
{} {}


FiberStacks::~FiberStacks() {
for(Datas::iterator i = datas_.begin();
i != datas_.end();
++i)
{
(*i)->die();
}

for(Stacks::iterator i = stacks_.begin();
i != stacks_.end();
++i)
{
i->free();
}
}

FiberData* FiberStacks::new_data() {
FiberData* data = new FiberData(thread_);
datas_.push_back(data);

return data;
}

FiberStack* FiberStacks::allocate() { FiberStack* FiberStacks::allocate() {
for(Stacks::iterator i = stacks_.begin(); for(Stacks::iterator i = stacks_.begin();
i != stacks_.end(); i != stacks_.end();
Expand Down
12 changes: 11 additions & 1 deletion vm/fiber_stack.hpp
Expand Up @@ -65,18 +65,28 @@ namespace rubinius {


private: private:
typedef std::list<FiberStack> Stacks; typedef std::list<FiberStack> Stacks;
typedef std::list<FiberData*> Datas;


size_t max_stacks_; size_t max_stacks_;
size_t stack_size_; size_t stack_size_;


VM* thread_;
Stacks stacks_; Stacks stacks_;
Datas datas_;
void* trampoline_; void* trampoline_;


public: public:
FiberStacks(SharedState& shared); FiberStacks(VM* thread, SharedState& shared);
~FiberStacks();


FiberStack* allocate(); FiberStack* allocate();


void remove_data(FiberData* data) {
datas_.remove(data);
}

FiberData* new_data();

void* trampoline(); void* trampoline();
}; };
} }
Expand Down
2 changes: 1 addition & 1 deletion vm/vm.cpp
Expand Up @@ -67,7 +67,7 @@ namespace rubinius {
, stack_start_(0) , stack_start_(0)
, run_signals_(false) , run_signals_(false)
, thread_step_(false) , thread_step_(false)
, fiber_stacks_(shared) , fiber_stacks_(this, shared)


, shared(shared) , shared(shared)
, waiting_channel_(this, (Channel*)Qnil) , waiting_channel_(this, (Channel*)Qnil)
Expand Down
8 changes: 8 additions & 0 deletions vm/vm.hpp
Expand Up @@ -269,6 +269,14 @@ namespace rubinius {
return fiber_stacks_.trampoline(); return fiber_stacks_.trampoline();
} }


FiberData* new_fiber_data() {
return fiber_stacks_.new_data();
}

void remove_fiber_data(FiberData* data) {
fiber_stacks_.remove_data(data);
}

VariableRootBuffers& current_root_buffers(); VariableRootBuffers& current_root_buffers();


public: public:
Expand Down

0 comments on commit c690b3c

Please sign in to comment.