Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin' into codedb-ffi-io
Browse files Browse the repository at this point in the history
  • Loading branch information
brixen committed Jun 27, 2016
2 parents 3cbca7e + 44d5f80 commit 9a0726c
Show file tree
Hide file tree
Showing 26 changed files with 573 additions and 182 deletions.
10 changes: 10 additions & 0 deletions core/fiber.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ def self.main
raise PrimitiveFailure, "Fiber.main primitive failed"
end

def self.count
Rubinius.primitive :fiber_s_count
raise PrimitiveFailure, "Fiber.count primitive failed"
end

def status
Rubinius.primitive :fiber_status
raise PrimitiveFailure, "Fiber#status primitive failed"
Expand All @@ -49,6 +54,11 @@ def transfer(*args)
raise PrimitiveFailure, "Fiber#transfer primitive failed"
end

def dispose
Rubinius.primitive :fiber_dispose
raise PrimitiveFailure, "Fiber#dispose primitive failed"
end

def alive?
status != "dead"
end
Expand Down
5 changes: 5 additions & 0 deletions core/thread.rb
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ def self.list
Kernel.raise PrimitiveFailure, "Thread.list primitive failed"
end

def self.count
Rubinius.primitive :thread_count
Kernel.raise PrimitiveFailure, "Thread.count primitive failed"
end

def self.stop
sleep
nil
Expand Down
88 changes: 74 additions & 14 deletions machine/builtin/fiber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ namespace rubinius {

if(!vm()->suspended_p()) {
std::ostringstream msg;
msg << "attempt to restart non-suspended (" << vm()->transition_flag() << ") fiber";
msg << "attempt to restart non-suspended ("
<< vm()->fiber_transition_flag() << ") fiber";
Exception::raise_fiber_error(state, msg.str().c_str());
}

Expand All @@ -97,8 +98,8 @@ namespace rubinius {
wakeup();

while(vm()->suspended_p()) {
std::lock_guard<std::mutex> guard(vm()->wait_mutex());
vm()->wait_condition().notify_one();
std::lock_guard<std::mutex> guard(vm()->fiber_wait_mutex());
vm()->fiber_wait_condition().notify_one();
}
}

Expand All @@ -107,15 +108,44 @@ namespace rubinius {
}
}

void Fiber::cancel(STATE) {
{
std::lock_guard<std::mutex> guard(state->vm()->thread()->fiber_mutex());

vm()->thread_state()->raise_fiber_cancel();

state->vm()->set_suspending();

restart_context(state->vm());
wakeup();

while(vm()->suspended_p()) {
std::lock_guard<std::mutex> guard(vm()->fiber_wait_mutex());
vm()->fiber_wait_condition().notify_one();
}
}

vm()->limited_wait_for([this]{ return vm()->running_p(); });

// Release the canceled Fiber.
state->vm()->set_suspended();

vm()->limited_wait_for([this]{ return vm()->zombie_p(); });

vm()->set_canceled();

state->vm()->set_running();
}

void Fiber::suspend_and_continue(STATE) {
UnmanagedPhase unmanaged(state);

{
std::unique_lock<std::mutex> lk(vm()->wait_mutex());
std::unique_lock<std::mutex> lk(vm()->fiber_wait_mutex());

vm()->set_suspended();
while(!wakeup_p()) {
vm()->wait_condition().wait(lk);
vm()->fiber_wait_condition().wait(lk);
}
clear_wakeup();
vm()->set_resuming();
Expand All @@ -137,6 +167,8 @@ namespace rubinius {
Object* Fiber::return_value(STATE) {
if(vm()->thread_state()->raise_reason() == cNone) {
return state->vm()->thread()->fiber_value();
} else if(vm()->thread_state()->raise_reason() == cFiberCancel) {
return NULL;
} else {
invoke_context()->thread_state()->set_state(vm()->thread_state());
return NULL;
Expand Down Expand Up @@ -177,16 +209,18 @@ namespace rubinius {
vm->thread()->fiber_value(state, cNil);
}

if(vm->fiber()->status() == eTransfer) {
// restart the root Fiber
vm->thread()->fiber()->invoke_context(vm);
vm->thread()->fiber()->restart(state);
} else {
vm->fiber()->invoke_context()->fiber()->restart(state);
if(vm->thread_state()->raise_reason() != cFiberCancel) {
if(vm->fiber()->status() == eTransfer) {
// restart the root Fiber
vm->thread()->fiber()->invoke_context(vm);
vm->thread()->fiber()->restart(state);
} else {
vm->fiber()->invoke_context()->fiber()->restart(state);
}
}

{
std::lock_guard<std::mutex> guard(vm->wait_mutex());
std::lock_guard<std::mutex> guard(vm->fiber_wait_mutex());

vm->fiber()->status(eDead);
vm->set_suspended();
Expand Down Expand Up @@ -377,6 +411,12 @@ namespace rubinius {
return return_value(state);
}

Object* Fiber::dispose(STATE) {
cancel(state);

return this;
}

Object* Fiber::s_yield(STATE, Arguments& args) {
Fiber* fiber = state->vm()->fiber();
OnStack<1> os(state, fiber);
Expand Down Expand Up @@ -412,10 +452,30 @@ namespace rubinius {
return state->vm()->thread()->fiber();
}

void Fiber::finalize(STATE, Fiber* fib) {
Fixnum* Fiber::s_count(STATE) {
return state->shared().vm_fibers_count(state);
}

void Fiber::finalize(STATE, Fiber* fiber) {
if(state->shared().config.machine_fiber_log_finalizer.value) {
logger::write("fiber: finalizer: %s, %d",
fib->thread_name()->c_str(state), fib->fiber_id()->to_native());
fiber->thread_name()->c_str(state), fiber->fiber_id()->to_native());
}

if(fiber->vm()) {
if(!state->shared().halting_p()) {
if(!fiber->vm()->zombie_p()) {
fiber->cancel(state);
}
}

if(fiber->vm()->zombie_p()) {
VM::discard(state, fiber->vm());
fiber->vm(NULL);
} else {
logger::write("fiber: finalizer: fiber not completed: %s, %d",
fiber->thread_name()->c_str(state), fiber->fiber_id()->to_native());
}
}
}
}
7 changes: 7 additions & 0 deletions machine/builtin/fiber.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ namespace rubinius {
// Rubinius.primitive :fiber_s_main
static Fiber* s_main(STATE);

// Rubinius.primitive :fiber_s_count
static Fixnum* s_count(STATE);

bool root_p();

Status status() {
Expand Down Expand Up @@ -120,6 +123,7 @@ namespace rubinius {

void start(STATE, Arguments& args);
void restart(STATE);
void cancel(STATE);
void suspend_and_continue(STATE);

// Rubinius.primitive :fiber_status
Expand All @@ -131,6 +135,9 @@ namespace rubinius {
// Rubinius.primitive :fiber_transfer
Object* transfer(STATE, Arguments& args);

// Rubinius.primitive :fiber_dispose
Object* dispose(STATE);

public: /* TypeInfo */

class Info : public TypeInfo {
Expand Down
42 changes: 28 additions & 14 deletions machine/builtin/system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,14 @@ namespace rubinius {
args[idx] = 0;

// If we added anything, then exec, otherwise fall through and fail.
if(idx > 0) execvp(args[0], args);
if(idx > 0) {
for(int i = 0; i < 5; i++) {
if(::execvp(args[0], args) < 0) {
if(errno != EAGAIN) break;
}
}
}

// If we failed, clean up the args.
delete[] args;
}
Expand Down Expand Up @@ -385,21 +392,19 @@ namespace rubinius {

static int fork_exec(STATE, int errors_fd) {
state->vm()->thread_nexus()->waiting_phase(state->vm());
std::lock_guard<std::mutex> guard(state->vm()->thread_nexus()->fork_mutex());
std::lock_guard<std::mutex> guard(state->vm()->thread_nexus()->process_mutex());

state->shared().machine_threads()->before_fork_exec(state);
state->memory()->set_interrupt();

ThreadNexus::LockStatus status = state->vm()->thread_nexus()->lock(state->vm());
ThreadNexus::LockStatus status = state->vm()->thread_nexus()->fork_lock(state->vm());

// If execvp() succeeds, we'll read EOF and know.
fcntl(errors_fd, F_SETFD, FD_CLOEXEC);

int pid = ::fork();

if(status == ThreadNexus::eLocked) {
state->vm()->thread_nexus()->unlock();
}
state->vm()->thread_nexus()->fork_unlock(status);

if(pid == 0) {
// We're in the child...
Expand Down Expand Up @@ -465,7 +470,11 @@ namespace rubinius {
}

if(exe.argc()) {
(void)::execvp(exe.command(), exe.argv());
for(int i = 0; i < 5; i++) {
if(::execvp(exe.command(), exe.argv()) < 0) {
if(errno != EAGAIN) break;
}
}
} else {
exec_sh_fallback(state, exe.command(), exe.command_size());
}
Expand Down Expand Up @@ -710,7 +719,7 @@ namespace rubinius {

Object* System::vm_exec(STATE, String* path, Array* args) {
state->vm()->thread_nexus()->waiting_phase(state->vm());
std::lock_guard<std::mutex> guard(state->vm()->thread_nexus()->exec_mutex());
std::lock_guard<std::mutex> guard(state->vm()->thread_nexus()->process_mutex());

/* Setting up the command and arguments may raise an exception so do it
* before everything else.
Expand Down Expand Up @@ -755,7 +764,11 @@ namespace rubinius {
}

if(exe.argc()) {
(void)::execvp(exe.command(), exe.argv());
for(int i = 0; i < 5; i++) {
if(::execvp(exe.command(), exe.argv()) < 0) {
if(errno != EAGAIN) break;
}
}
} else {
exec_sh_fallback(state, exe.command(), exe.command_size());
}
Expand Down Expand Up @@ -864,18 +877,16 @@ namespace rubinius {
return force_as<Fixnum>(Primitives::failure());
#else
state->vm()->thread_nexus()->waiting_phase(state->vm());
std::lock_guard<std::mutex> guard(state->vm()->thread_nexus()->fork_mutex());
std::lock_guard<std::mutex> guard(state->vm()->thread_nexus()->process_mutex());

state->shared().machine_threads()->before_fork(state);
state->memory()->set_interrupt();

ThreadNexus::LockStatus status = state->vm()->thread_nexus()->lock(state->vm());
ThreadNexus::LockStatus status = state->vm()->thread_nexus()->fork_lock(state->vm());

int pid = ::fork();

if(status == ThreadNexus::eLocked) {
state->vm()->thread_nexus()->unlock();
}
state->vm()->thread_nexus()->fork_unlock(status);

if(pid > 0) {
// We're in the parent...
Expand Down Expand Up @@ -1731,6 +1742,9 @@ namespace rubinius {
case cThreadKill:
reason = state->symbol("thread_kill");
break;
case cFiberCancel:
reason = state->symbol("fiber_cancel");
break;
}

tuple->put(state, 0, reason);
Expand Down
16 changes: 8 additions & 8 deletions machine/builtin/thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,10 @@ namespace rubinius {
logger::write("thread: finalizer: %s", thread->vm()->name().c_str());
}

thread->finalize_instance(state);
}

void Thread::finalize_instance(STATE) {
if(vm() && vm()->zombie_p()) {
fiber_mutex_.std::mutex::~mutex();
VM::discard(state, vm());
vm(NULL);
if(thread->vm() && thread->vm()->zombie_p()) {
thread->fiber_mutex().std::mutex::~mutex();
VM::discard(state, thread->vm());
thread->vm(NULL);
}
}

Expand Down Expand Up @@ -432,6 +428,10 @@ namespace rubinius {
return state->shared().vm_threads(state);
}

Fixnum* Thread::count(STATE) {
return state->shared().vm_threads_count(state);
}

Object* Thread::set_priority(STATE, Fixnum* new_priority) {
priority(state, new_priority);
return new_priority;
Expand Down
4 changes: 3 additions & 1 deletion machine/builtin/thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ namespace rubinius {
// Rubinius.primitive :thread_list
static Array* list(STATE);

// Rubinius.primitive :thread_count
static Fixnum* count(STATE);

public: /* Instance primitives */

void fork(STATE);
Expand Down Expand Up @@ -237,7 +240,6 @@ namespace rubinius {
static Thread* create(STATE, Class* klass, VM* vm);

static void finalize(STATE, Thread* thread);
void finalize_instance(STATE);

int start_thread(STATE, void* (*function)(void*));
static void* run(void*);
Expand Down
6 changes: 6 additions & 0 deletions machine/environment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,8 @@ namespace rubinius {
void Environment::halt(STATE, int exit_code) {
utilities::thread::Mutex::LockGuard guard(halt_lock_);

state->shared().set_halting();

if(state->shared().config.system_log_lifetime.value) {
logger::write("process: exit: %s %d %fs",
shared->pid.c_str(), exit_code, shared->run_time());
Expand All @@ -562,6 +564,8 @@ namespace rubinius {
shared->machine_threads()->shutdown(state);
}

shared->finalizer()->dispose(state);

shared->thread_nexus()->lock(state->vm());

shared->finalizer()->finish(state);
Expand Down Expand Up @@ -743,5 +747,7 @@ namespace rubinius {

State main_state(vm);
state->shared().start_signals(&main_state);

state->shared().set_running();
}
}
Loading

0 comments on commit 9a0726c

Please sign in to comment.