Skip to content

Commit

Permalink
Use non-blocking lock for managed transition & checkpointing.
Browse files Browse the repository at this point in the history
  • Loading branch information
brixen committed Jun 26, 2016
1 parent 2395d05 commit 435e0b1
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 83 deletions.
10 changes: 7 additions & 3 deletions machine/memory/finalizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,9 @@ namespace rubinius {
MachineThread::wakeup(state);

while(thread_running_) {
LockUnmanaged<std::mutex> guard(state, list_mutex());
UnmanagedPhase unmanaged(state);
std::lock_guard<std::mutex> guard(list_mutex());

list_condition().notify_one();
}
}
Expand Down Expand Up @@ -240,15 +242,17 @@ namespace rubinius {
void FinalizerThread::native_finalizer(STATE, Object* obj, FinalizerFunction func) {
if(finishing_) return;

LockUnmanaged<std::mutex> guard(state, list_mutex());
UnmanagedPhase unmanaged(state);
std::lock_guard<std::mutex> guard(list_mutex());

add_finalizer(state, new NativeFinalizer(state, obj, func));
}

void FinalizerThread::extension_finalizer(STATE, Object* obj, FinalizerFunction func) {
if(finishing_) return;

LockUnmanaged<std::mutex> guard(state, list_mutex());
UnmanagedPhase unmanaged(state);
std::lock_guard<std::mutex> guard(list_mutex());

add_finalizer(state, new ExtensionFinalizer(state, obj, func));
}
Expand Down
68 changes: 41 additions & 27 deletions machine/thread_nexus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@

namespace rubinius {
void ThreadNexus::blocking_phase(VM* vm) {
bool held = waiting_lock(vm);
vm->set_thread_phase(eBlocking);
if(!held) unlock();
spinning_lock(vm, [vm]{ vm->set_thread_phase(eBlocking); });
}

void ThreadNexus::managed_phase(VM* vm) {
if(!waiting_lock(vm)) unlock();
spinning_lock(vm, [vm]{ vm->set_thread_phase(eManaged); });
}

void ThreadNexus::unmanaged_phase(VM* vm) {
Expand All @@ -32,23 +30,6 @@ namespace rubinius {
vm->set_thread_phase(eWaiting);
}

void ThreadNexus::restore_phase(VM* vm, Phase phase) {
switch(phase) {
case eManaged:
managed_phase(vm);
break;
case eBlocking:
blocking_phase(vm);
break;
case eUnmanaged:
unmanaged_phase(vm);
break;
case eWaiting:
waiting_phase(vm);
break;
}
}

bool ThreadNexus::blocking_p(VM* vm) {
atomic::memory_barrier();
return (vm->thread_phase() & eBlocking) == eBlocking;
Expand Down Expand Up @@ -200,7 +181,7 @@ namespace rubinius {

ns += delay();

detect_deadlock(ns, lock_limit);
detect_deadlock(ns, cLockLimit);

id = 0;
}
Expand All @@ -212,7 +193,7 @@ namespace rubinius {
while(!try_checkpoint(vm)) {
ns += delay();

detect_deadlock(ns, lock_limit);
detect_deadlock(ns, cLockLimit);
}

/* Lock and hold the waiting_mutex to prevent any other thread from
Expand All @@ -222,7 +203,7 @@ namespace rubinius {
while(!waiting_mutex_.try_lock()) {
ns += delay();

detect_deadlock(ns, lock_limit);
detect_deadlock(ns, cLockLimit);
}

/* Hold the logger lock so that the multi-process semaphore that the
Expand All @@ -232,7 +213,7 @@ namespace rubinius {
while(!logger::try_lock()) {
ns += delay();

detect_deadlock(ns, lock_limit);
detect_deadlock(ns, cLockLimit);
}

return to_lock_status(held);
Expand Down Expand Up @@ -273,7 +254,7 @@ namespace rubinius {

ns += delay();

detect_deadlock(ns, lock_limit, other_vm);
detect_deadlock(ns, cLockLimit, other_vm);
}
}
}
Expand Down Expand Up @@ -303,7 +284,7 @@ namespace rubinius {

ns += delay();

detect_deadlock(ns, lock_limit, other_vm);
detect_deadlock(ns, cLockLimit, other_vm);
}
}
}
Expand Down Expand Up @@ -335,4 +316,37 @@ namespace rubinius {
vm->set_thread_phase(eManaged);
return false;
}

void ThreadNexus::spinning_lock(VM* vm, std::function<void ()> f) {
uint32_t id = 0;
int spin = 0;
bool held = false;
uint64_t ns = 0;

vm->set_thread_phase(eWaiting);

while(!phase_flag_.compare_exchange_strong(id, vm->thread_id())) {
if(id == vm->thread_id()) {
/* The exchange failed, but it was because the value was already set
* to our id, so we hold the "lock".
*/
held = true;
break;
}

if(++spin > cSpinLimit) {
ns += delay();

detect_deadlock(ns, cLockLimit);
}

id = 0;
}

// Call the provided function while holding the lock.
f();

// Release the lock unless we already held it.
if(!held) phase_flag_ = 0;
}
}
28 changes: 8 additions & 20 deletions machine/thread_nexus.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include <atomic>
#include <condition_variable>
#include <functional>
#include <list>
#include <mutex>

Expand Down Expand Up @@ -37,7 +38,9 @@ namespace rubinius {
ThreadList threads_;
uint32_t thread_ids_;

const static uint64_t lock_limit = 5000000000;
const static uint64_t cLockLimit = 5000000000;
const static int cSpinLimit = 10000;


public:
enum Phase {
Expand Down Expand Up @@ -123,28 +126,15 @@ namespace rubinius {
}

bool waiting_lock(VM* vm);
void spinning_lock(VM* vm, std::function<void ()> f);

LockStatus fork_lock(VM* vm);
void fork_unlock(LockStatus status);

LockStatus try_lock(VM* vm) {
void check_stop(VM* vm, std::function<void ()> f) {
while(stop_p()) {
bool held = waiting_lock(vm);

// Assumption about stop_ may change while we progress.
if(stop_p()) {
if(try_checkpoint(vm)) {
if(stop_p()) {
unset_stop();
return to_lock_status(held);
}
}
}

// Either we're not stop_'ing or something blocked us from serializing.
if(!held) unlock();
spinning_lock(vm, [&, this]{ f(); unset_stop(); });
}

return eNotLocked;
}

LockStatus lock(VM* vm) {
Expand Down Expand Up @@ -173,8 +163,6 @@ namespace rubinius {
void delete_vm(VM* vm);

void after_fork_child(STATE);

void restore_phase(VM* vm, Phase phase);
};
}

Expand Down
26 changes: 1 addition & 25 deletions machine/thread_phase.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,29 +76,6 @@ namespace rubinius {
}
};

template <typename T>
class LockUnmanaged {
T& lock_;
State* state_;
ThreadNexus::Phase phase_;

public:
LockUnmanaged(STATE, T& in_lock)
: lock_(in_lock)
, state_(state)
, phase_(state->vm()->thread_phase())
{
state_->vm()->thread_nexus()->waiting_phase(state_->vm());

lock_.lock();
}

~LockUnmanaged() {
lock_.unlock();
state_->vm()->thread_nexus()->restore_phase(state_->vm(), phase_);
}
};

template <typename T>
class LockWaiting {
T& lock_;
Expand All @@ -107,12 +84,11 @@ namespace rubinius {
LockWaiting(STATE, T& in_lock)
: lock_(in_lock)
{
ThreadNexus::Phase phase = state->vm()->thread_phase();
state->vm()->thread_nexus()->waiting_phase(state->vm());

lock_.lock();

state->vm()->thread_nexus()->restore_phase(state->vm(), phase);
state->vm()->thread_nexus()->managed_phase(state->vm());
}

~LockWaiting() {
Expand Down
12 changes: 4 additions & 8 deletions machine/vm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -467,14 +467,10 @@ namespace rubinius {
void checkpoint(STATE) {
metrics().machine.checkpoints++;

ThreadNexus::LockStatus status = thread_nexus_->try_lock(this);
if(status != ThreadNexus::eNotLocked) {
metrics().machine.stops++;

collect_maybe(state);

if(status == ThreadNexus::eLocked) thread_nexus_->unlock();
}
thread_nexus_->check_stop(this, [this, state]{
metrics().machine.stops++;
collect_maybe(state);
});

if(profile_counter_++ >= profile_interval_) {
update_profile(state);
Expand Down

0 comments on commit 435e0b1

Please sign in to comment.