Skip to content

Commit

Permalink
Added Fiber#dispose and auto-cleanup of Fiber.
Browse files Browse the repository at this point in the history
A Fiber is composed of both managed (ie object memory) resources and unmanaged
(ie pthread) resources. When a Fiber completes normally, the unmanaged
resources can be reclaimed simply.

When a Fiber that has not completed normally goes out of 'scope' (ie is no
longer reachable from any reachable object), the garbage collector cannot
simply reclaim the managed resources allocated to the Fiber because the Fiber
composes native resources (ie the pthread instance). In this case, the Fiber's
pthread invoke function needs to be forced to exit so that the pthread
instance can be reclaimed.
  • Loading branch information
brixen committed Jun 26, 2016
1 parent faf044a commit 93465e4
Show file tree
Hide file tree
Showing 15 changed files with 181 additions and 7 deletions.
5 changes: 5 additions & 0 deletions core/fiber.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ def transfer(*args)
raise PrimitiveFailure, "Fiber#transfer primitive failed"
end

def dispose
Rubinius.primitive :fiber_dispose
raise PrimitiveFailure, "Fiber#dispose primitive failed"
end

def alive?
status != "dead"
end
Expand Down
57 changes: 51 additions & 6 deletions machine/builtin/fiber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,35 @@ namespace rubinius {
}
}

void Fiber::cancel(STATE) {
{
std::lock_guard<std::mutex> guard(state->vm()->thread()->fiber_mutex());

vm()->thread_state()->raise_fiber_cancel();

state->vm()->set_suspending();

restart_context(state->vm());
wakeup();

while(vm()->suspended_p()) {
std::lock_guard<std::mutex> guard(vm()->fiber_wait_mutex());
vm()->fiber_wait_condition().notify_one();
}
}

vm()->limited_wait_for([this]{ return vm()->running_p(); });

// Release the canceled Fiber.
state->vm()->set_suspended();

vm()->limited_wait_for([this]{ return vm()->zombie_p(); });

vm()->set_canceled();

state->vm()->set_running();
}

void Fiber::suspend_and_continue(STATE) {
UnmanagedPhase unmanaged(state);

Expand Down Expand Up @@ -138,6 +167,8 @@ namespace rubinius {
Object* Fiber::return_value(STATE) {
if(vm()->thread_state()->raise_reason() == cNone) {
return state->vm()->thread()->fiber_value();
} else if(vm()->thread_state()->raise_reason() == cFiberCancel) {
return NULL;
} else {
invoke_context()->thread_state()->set_state(vm()->thread_state());
return NULL;
Expand Down Expand Up @@ -178,12 +209,14 @@ namespace rubinius {
vm->thread()->fiber_value(state, cNil);
}

if(vm->fiber()->status() == eTransfer) {
// restart the root Fiber
vm->thread()->fiber()->invoke_context(vm);
vm->thread()->fiber()->restart(state);
} else {
vm->fiber()->invoke_context()->fiber()->restart(state);
if(vm->thread_state()->raise_reason() != cFiberCancel) {
if(vm->fiber()->status() == eTransfer) {
// restart the root Fiber
vm->thread()->fiber()->invoke_context(vm);
vm->thread()->fiber()->restart(state);
} else {
vm->fiber()->invoke_context()->fiber()->restart(state);
}
}

{
Expand Down Expand Up @@ -378,6 +411,12 @@ namespace rubinius {
return return_value(state);
}

Object* Fiber::dispose(STATE) {
cancel(state);

return this;
}

Object* Fiber::s_yield(STATE, Arguments& args) {
Fiber* fiber = state->vm()->fiber();
OnStack<1> os(state, fiber);
Expand Down Expand Up @@ -424,6 +463,12 @@ namespace rubinius {
}

if(fiber->vm()) {
if(!state->shared().halting_p()) {
if(!fiber->vm()->zombie_p()) {
fiber->cancel(state);
}
}

if(fiber->vm()->zombie_p()) {
VM::discard(state, fiber->vm());
fiber->vm(NULL);
Expand Down
4 changes: 4 additions & 0 deletions machine/builtin/fiber.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ namespace rubinius {

void start(STATE, Arguments& args);
void restart(STATE);
void cancel(STATE);
void suspend_and_continue(STATE);

// Rubinius.primitive :fiber_status
Expand All @@ -134,6 +135,9 @@ namespace rubinius {
// Rubinius.primitive :fiber_transfer
Object* transfer(STATE, Arguments& args);

// Rubinius.primitive :fiber_dispose
Object* dispose(STATE);

public: /* TypeInfo */

class Info : public TypeInfo {
Expand Down
3 changes: 3 additions & 0 deletions machine/builtin/system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1739,6 +1739,9 @@ namespace rubinius {
case cThreadKill:
reason = state->symbol("thread_kill");
break;
case cFiberCancel:
reason = state->symbol("fiber_cancel");
break;
}

tuple->put(state, 0, reason);
Expand Down
6 changes: 6 additions & 0 deletions machine/environment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,8 @@ namespace rubinius {
void Environment::halt(STATE, int exit_code) {
utilities::thread::Mutex::LockGuard guard(halt_lock_);

state->shared().set_halting();

if(state->shared().config.system_log_lifetime.value) {
logger::write("process: exit: %s %d %fs",
shared->pid.c_str(), exit_code, shared->run_time());
Expand All @@ -562,6 +564,8 @@ namespace rubinius {
shared->machine_threads()->shutdown(state);
}

shared->finalizer()->dispose(state);

shared->thread_nexus()->lock(state->vm());

shared->finalizer()->finish(state);
Expand Down Expand Up @@ -743,5 +747,7 @@ namespace rubinius {

State main_state(vm);
state->shared().start_signals(&main_state);

state->shared().set_running();
}
}
2 changes: 2 additions & 0 deletions machine/instructions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ Object* MachineCode::interpreter(STATE, MachineCode* const mcode) {
case cExit:
call_frame->scope->flush_to_heap(state);
return NULL;
case cFiberCancel:
return NULL;
default:
break;
} // switch
Expand Down
39 changes: 38 additions & 1 deletion machine/memory/finalizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@

namespace rubinius {
namespace memory {
void NativeFinalizer::dispose(STATE) {
// TODO: consider building this on the TypeInfo structure.
if(Fiber* fiber = try_as<Fiber>(object())) {
if(!fiber->vm()->zombie_p()) fiber->cancel(state);
}
}

void NativeFinalizer::finalize(STATE) {
(*finalizer_)(state, object());
}
Expand All @@ -35,6 +42,9 @@ namespace rubinius {
}
}

void ExtensionFinalizer::dispose(STATE) {
}

void ExtensionFinalizer::finalize(STATE) {
ManagedPhase managed(state);

Expand Down Expand Up @@ -84,6 +94,9 @@ namespace rubinius {
}
}

void ManagedFinalizer::dispose(STATE) {
}

void ManagedFinalizer::finalize(STATE) {
ManagedPhase managed(state);

Expand Down Expand Up @@ -168,6 +181,7 @@ namespace rubinius {
}

void FinalizerThread::initialize(STATE) {
Thread::create(state, vm());
synchronization_ = new Synchronization();
}

Expand Down Expand Up @@ -212,12 +226,35 @@ namespace rubinius {
vm()->metrics().gc.objects_finalized++;
}
}

state->vm()->thread()->vm()->set_zombie(state);
}

void FinalizerThread::dispose(STATE) {
finishing_ = true;

std::lock_guard<std::mutex> guard(list_mutex());

for(FinalizerObjects::iterator i = process_list_.begin();
i != process_list_.end();
++i)
{
FinalizerObject* fo = *i;
fo->dispose(state);
}

for(FinalizerObjects::iterator i = live_list_.begin();
i != live_list_.end();
++i)
{
FinalizerObject* fo = *i;
fo->dispose(state);
}
}

void FinalizerThread::finish(STATE) {
finishing_ = true;

// TODO: cleanup
while(!process_list_.empty()) {
FinalizerObject* fo = process_list_.back();
process_list_.pop_back();
Expand Down
5 changes: 5 additions & 0 deletions machine/memory/finalizer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ namespace rubinius {
object_ = obj;
}

virtual void dispose(STATE) = 0;
virtual void finalize(STATE) = 0;
virtual void mark(ImmixGC* gc) = 0;
virtual bool match_p(STATE, Object* object, Object* finalizer) = 0;
Expand All @@ -53,6 +54,7 @@ namespace rubinius {
, finalizer_(finalizer)
{ }

void dispose(STATE);
void finalize(STATE);
void mark(ImmixGC* gc);
bool match_p(STATE, Object* object, Object* finalizer) { return false; }
Expand All @@ -67,6 +69,7 @@ namespace rubinius {
, finalizer_(finalizer)
{ }

void dispose(STATE);
void finalize(STATE);
void mark(ImmixGC* gc);
bool match_p(STATE, Object* object, Object* finalizer) { return false; }
Expand All @@ -81,6 +84,7 @@ namespace rubinius {
, finalizer_(finalizer)
{ }

void dispose(STATE);
void finalize(STATE);
void mark(ImmixGC* gc);
bool match_p(STATE, Object* object, Object* finalizer);
Expand Down Expand Up @@ -128,6 +132,7 @@ namespace rubinius {
}

void finish(STATE);
void dispose(STATE);

void native_finalizer(STATE, Object* obj, FinalizerFunction func);
void extension_finalizer(STATE, Object* obj, FinalizerFunction func);
Expand Down
2 changes: 2 additions & 0 deletions machine/memory/immix_marker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ namespace memory {
}

state->memory()->clear_mature_mark_in_progress();

state->vm()->thread()->vm()->set_zombie(state);
}
}
}
1 change: 1 addition & 0 deletions machine/raise_reason.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace rubinius {
cExit,
cCatchThrow,
cThreadKill,
cFiberCancel,
};
}

Expand Down
1 change: 1 addition & 0 deletions machine/shared_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ namespace rubinius {
, type_info_lock_()
, code_resource_lock_()
, use_capi_lock_(false)
, phase_(eBooting)
, om(NULL)
, global_cache(new GlobalCache)
, config(config)
Expand Down
34 changes: 34 additions & 0 deletions machine/shared_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "capi/capi_constants.h"

#include <unistd.h>
#include <atomic>
#include <string>
#include <vector>

Expand Down Expand Up @@ -82,6 +83,13 @@ namespace rubinius {
*/

class SharedState {
public:
enum Phase {
eBooting,
eRunning,
eHalting,
};

private:
ThreadNexus* thread_nexus_;
MachineThreads* machine_threads_;
Expand Down Expand Up @@ -126,6 +134,8 @@ namespace rubinius {
bool use_capi_lock_;
int primitive_hits_[Primitives::cTotalPrimitives];

std::atomic<Phase> phase_;

public:
Globals globals;
Memory* om;
Expand All @@ -141,6 +151,30 @@ namespace rubinius {
SharedState(Environment* env, Configuration& config, ConfigParser& cp);
~SharedState();

bool booting_p() {
return phase_ == eBooting;
}

void set_booting() {
phase_ = eBooting;
}

bool running_p() {
return phase_ == eRunning;
}

void set_running() {
phase_ = eRunning;
}

bool halting_p() {
return phase_ == eHalting;
}

void set_halting() {
phase_ = eHalting;
}

int size();

void set_initialized() {
Expand Down
Loading

0 comments on commit 93465e4

Please sign in to comment.