Skip to content

Commit

Permalink
Reworked locking around fork().
Browse files Browse the repository at this point in the history
  • Loading branch information
brixen committed Jun 24, 2016
1 parent eb85896 commit 2d7d69c
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 73 deletions.
13 changes: 7 additions & 6 deletions machine/builtin/fiber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ namespace rubinius {

if(!vm()->suspended_p()) {
std::ostringstream msg;
msg << "attempt to restart non-suspended (" << vm()->transition_flag() << ") fiber";
msg << "attempt to restart non-suspended ("
<< vm()->fiber_transition_flag() << ") fiber";
Exception::raise_fiber_error(state, msg.str().c_str());
}

Expand All @@ -97,8 +98,8 @@ namespace rubinius {
wakeup();

while(vm()->suspended_p()) {
std::lock_guard<std::mutex> guard(vm()->wait_mutex());
vm()->wait_condition().notify_one();
std::lock_guard<std::mutex> guard(vm()->fiber_wait_mutex());
vm()->fiber_wait_condition().notify_one();
}
}

Expand All @@ -111,11 +112,11 @@ namespace rubinius {
UnmanagedPhase unmanaged(state);

{
std::unique_lock<std::mutex> lk(vm()->wait_mutex());
std::unique_lock<std::mutex> lk(vm()->fiber_wait_mutex());

vm()->set_suspended();
while(!wakeup_p()) {
vm()->wait_condition().wait(lk);
vm()->fiber_wait_condition().wait(lk);
}
clear_wakeup();
vm()->set_resuming();
Expand Down Expand Up @@ -186,7 +187,7 @@ namespace rubinius {
}

{
std::lock_guard<std::mutex> guard(vm->wait_mutex());
std::lock_guard<std::mutex> guard(vm->fiber_wait_mutex());

vm->fiber()->status(eDead);
vm->set_suspended();
Expand Down
18 changes: 7 additions & 11 deletions machine/builtin/system.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -382,21 +382,19 @@ namespace rubinius {

static int fork_exec(STATE, int errors_fd) {
state->vm()->thread_nexus()->waiting_phase(state->vm());
std::lock_guard<std::mutex> guard(state->vm()->thread_nexus()->fork_mutex());
std::lock_guard<std::mutex> guard(state->vm()->thread_nexus()->process_mutex());

state->shared().machine_threads()->before_fork_exec(state);
state->memory()->set_interrupt();

ThreadNexus::LockStatus status = state->vm()->thread_nexus()->lock(state->vm());
ThreadNexus::LockStatus status = state->vm()->thread_nexus()->fork_lock(state->vm());

// If execvp() succeeds, we'll read EOF and know.
fcntl(errors_fd, F_SETFD, FD_CLOEXEC);

int pid = ::fork();

if(status == ThreadNexus::eLocked) {
state->vm()->thread_nexus()->unlock();
}
state->vm()->thread_nexus()->fork_unlock(status);

if(pid == 0) {
// We're in the child...
Expand Down Expand Up @@ -707,7 +705,7 @@ namespace rubinius {

Object* System::vm_exec(STATE, String* path, Array* args) {
state->vm()->thread_nexus()->waiting_phase(state->vm());
std::lock_guard<std::mutex> guard(state->vm()->thread_nexus()->exec_mutex());
std::lock_guard<std::mutex> guard(state->vm()->thread_nexus()->process_mutex());

/* Setting up the command and arguments may raise an exception so do it
* before everything else.
Expand Down Expand Up @@ -861,18 +859,16 @@ namespace rubinius {
return force_as<Fixnum>(Primitives::failure());
#else
state->vm()->thread_nexus()->waiting_phase(state->vm());
std::lock_guard<std::mutex> guard(state->vm()->thread_nexus()->fork_mutex());
std::lock_guard<std::mutex> guard(state->vm()->thread_nexus()->process_mutex());

state->shared().machine_threads()->before_fork(state);
state->memory()->set_interrupt();

ThreadNexus::LockStatus status = state->vm()->thread_nexus()->lock(state->vm());
ThreadNexus::LockStatus status = state->vm()->thread_nexus()->fork_lock(state->vm());

int pid = ::fork();

if(status == ThreadNexus::eLocked) {
state->vm()->thread_nexus()->unlock();
}
state->vm()->thread_nexus()->fork_unlock(status);

if(pid > 0) {
// We're in the parent...
Expand Down
39 changes: 31 additions & 8 deletions machine/logger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,26 @@ namespace rubinius {
loglevel_ = level;
}

void lock() {
if(logger_) {
logger_->lock();
}
}

bool try_lock() {
if(logger_) {
return logger_->try_lock();
} else {
return false;
}
}

void unlock() {
if(logger_) {
logger_->unlock();
}
}

void close() {
delete logger_;
}
Expand Down Expand Up @@ -124,7 +144,7 @@ namespace rubinius {

void write(const char* message, va_list args) {
if(logger_) {
std::lock_guard<locks::spinlock_mutex> guard(logger_->lock());
std::lock_guard<locks::spinlock_mutex> guard(logger_->spinlock());

char buf[LOGGER_MSG_SIZE];

Expand All @@ -136,7 +156,7 @@ namespace rubinius {

void fatal(const char* message, va_list args) {
if(logger_) {
std::lock_guard<locks::spinlock_mutex> guard(logger_->lock());
std::lock_guard<locks::spinlock_mutex> guard(logger_->spinlock());

if(loglevel_ < eFatal) return;

Expand All @@ -150,7 +170,7 @@ namespace rubinius {

void error(const char* message, va_list args) {
if(logger_) {
std::lock_guard<locks::spinlock_mutex> guard(logger_->lock());
std::lock_guard<locks::spinlock_mutex> guard(logger_->spinlock());

if(loglevel_ < eError) return;

Expand All @@ -164,7 +184,7 @@ namespace rubinius {

void warn(const char* message, va_list args) {
if(logger_) {
std::lock_guard<locks::spinlock_mutex> guard(logger_->lock());
std::lock_guard<locks::spinlock_mutex> guard(logger_->spinlock());

if(loglevel_ < eWarn) return;

Expand All @@ -178,7 +198,7 @@ namespace rubinius {

void info(const char* message, va_list args) {
if(logger_) {
std::lock_guard<locks::spinlock_mutex> guard(logger_->lock());
std::lock_guard<locks::spinlock_mutex> guard(logger_->spinlock());

if(loglevel_ < eInfo) return;

Expand All @@ -192,7 +212,7 @@ namespace rubinius {

void debug(const char* message, va_list args) {
if(logger_) {
std::lock_guard<locks::spinlock_mutex> guard(logger_->lock());
std::lock_guard<locks::spinlock_mutex> guard(logger_->spinlock());

if(loglevel_ < eDebug) return;

Expand All @@ -215,10 +235,13 @@ namespace rubinius {

char* Logger::timestamp() {
time_t clock;
struct tm lt;

time(&clock);
strftime(formatted_time_, LOGGER_TIME_SIZE, "%b %e %H:%M:%S",
localtime(&clock));
localtime_r(&clock, &lt);

strftime(formatted_time_, LOGGER_TIME_SIZE, "%b %e %H:%M:%S", &lt);

return formatted_time_;
}

Expand Down
17 changes: 16 additions & 1 deletion machine/logger.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ namespace rubinius {


void open(logger_type type, const char* identifier, logger_level level=eWarn, ...);
void lock();
bool try_lock();
void unlock();
void close();

void write(const char* message, ...);
Expand Down Expand Up @@ -70,9 +73,21 @@ namespace rubinius {

char* timestamp();

rubinius::locks::spinlock_mutex& lock() {
rubinius::locks::spinlock_mutex& spinlock() {
return lock_;
}

void lock() {
lock_.lock();
}

bool try_lock() {
return lock_.try_lock();
}

void unlock() {
lock_.unlock();
}
};

class Syslog : public Logger {
Expand Down
91 changes: 85 additions & 6 deletions machine/thread_nexus.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include "logger.hpp"
#include "shared_state.hpp"
#include "thread_nexus.hpp"
#include "vm.hpp"
Expand Down Expand Up @@ -150,6 +151,16 @@ namespace rubinius {
}
}

void ThreadNexus::detect_deadlock(uint64_t nanoseconds, uint64_t limit) {
if(nanoseconds > limit) {
logger::fatal("thread nexus: unable to lock, possible deadlock");

list_threads();

rubinius::abort();
}
}

uint64_t ThreadNexus::delay() {
static int i = 0;
static int delay[] = {
Expand All @@ -167,6 +178,76 @@ namespace rubinius {
return ns;
}

ThreadNexus::LockStatus ThreadNexus::fork_lock(VM* vm) {
waiting_phase(vm);

/* Preserve the state of the phase_flag_ in situations where we have the
* entire system serialized.
*/
uint32_t id = 0;
bool held = false;
uint64_t ns = 0;

while(!phase_flag_.compare_exchange_strong(id, vm->thread_id())) {
if(id == vm->thread_id()) {
/* The exchange failed, but it was because the value was already set
* to our id, so we hold the "lock".
*/
vm->set_thread_phase(eManaged);
held = true;
break;
}

ns += delay();

detect_deadlock(ns, lock_limit);

id = 0;
}

/* Lock and hold the waiting_mutex to prevent any other thread from
* holding it across a fork() call.
*/
ns = 0;

while(!waiting_mutex_.try_lock()) {
ns += delay();

detect_deadlock(ns, lock_limit);
}

// Checkpoint all the other threads.
ns = 0;

while(!try_checkpoint(vm)) {
ns += delay();

detect_deadlock(ns, lock_limit);
}

/* Hold the logger lock so that the multi-process semaphore that the
* logger depends on is not held across fork() calls.
*/
ns = 0;

while(!logger::try_lock()) {
ns += delay();

detect_deadlock(ns, lock_limit);
}

return to_lock_status(held);
}

void ThreadNexus::fork_unlock(LockStatus status) {
if(status == eLocked) {
phase_flag_ = 0;
}

logger::unlock();
waiting_mutex_.unlock();
}

bool ThreadNexus::try_checkpoint(VM* vm) {
timer::StopWatch<timer::nanoseconds> timer(
vm->metrics().lock.stop_the_world_ns);
Expand Down Expand Up @@ -232,9 +313,7 @@ namespace rubinius {

vm->set_thread_phase(eWaiting);

while(!phase_flag_.compare_exchange_strong(id, vm->thread_id(),
std::memory_order_acq_rel))
{
while(!phase_flag_.compare_exchange_strong(id, vm->thread_id())) {
if(id == vm->thread_id()) {
/* The exchange failed, but it was because the value was already set
* to our id, so we hold the "lock".
Expand All @@ -244,9 +323,9 @@ namespace rubinius {
}

{
std::unique_lock<std::mutex> lk(wait_mutex_);
wait_condition_.wait(lk,
[this]{ return phase_flag_.load(std::memory_order_acquire) == 0; });
std::unique_lock<std::mutex> lk(waiting_mutex_);
waiting_condition_.wait(lk,
[this]{ return phase_flag_ == 0; });
}

id = 0;
Expand Down
Loading

0 comments on commit 2d7d69c

Please sign in to comment.