Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Use Rubinius.lock instead of Channel #1887

Merged
merged 1 commit into from

2 participants

@ryoqun
Collaborator

Hi, As discussed earlier, I made thread use Rubinius.{lock, unlock, synchronize} instead of Channel.

Mostly, this was a straight forward transition.

I commented wherever I thought the code needed to be commented. If there are any other unclear code, please let me know. I'll add comments. :)

Also, because I'm not a Rubinius locking expert, I may be plain wrong.. :p

Any comments are very appreciated!

vm/builtin/thread.cpp
@@ -258,6 +266,11 @@ static intptr_t thread_debug_id(pthread_t thr) {
if(error) {
Exception::thread_error(state, strerror(error));
}
+
+ // Wait until the new thread locks the thread object and unlock init_lock_.
+ init_lock_.lock();
+ init_lock_.unlock();
+
@dbussink Owner
dbussink added a note

You can't use init_lock_ here directly, since we're after methods that get a gctoken here. That means that the GC might have moved the this pointer. This is also why you see vm->thread-> being used, since that is properly updated when we GC.

Also, why is it locking and unlocking the same lock here? Not really sure I get why

Hmm, looks like the line numbers confused me. So the first comment isn't true. Still wondering a bit about the second one though

@ryoqun Collaborator
ryoqun added a note

Hi, thanks for comments.

About the second comment, I added more explanation.

For background, I originally wanted to lock the thread object at Thread#setup as @lock was initialized as a new Channel instance previously.

I couldn't do that because it seems that I can't do Rubinius.lock(self) in a thread and do Rubinius.unlock(self) in another thread. So I moved the lock of the thread object to Thread::in_new_thread() from Thread#setup. And, I made the parent thread wait for the child thread to lock the thread object by this lock/unlock pair of init_lock_.

@ryoqun Collaborator
ryoqun added a note

I think there is a better code snippet for doing what I described in my previous comment probably... If it's true, let me know! I'll update accordingly!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
vm/builtin/thread.cpp
@@ -258,6 +266,17 @@ static intptr_t thread_debug_id(pthread_t thr) {
if(error) {
Exception::thread_error(state, strerror(error));
}
+
+ // We can't return from here until the new thread completes a minimal
+ // initialization. After the initialization, it unlocks init_lock_.
+ // So, wait here until we can lock init_lock_ after that.
+ init_lock_.lock();
+
+ // We locked init_lock_. And we are sure that the new thread completed
+ // the initialization.
+ // Locking init_lock_ isn't needed anymore, so unlock it.
+ init_lock_.unlock();
+
@dbussink Owner
dbussink added a note

One of the things I worry about is the following case. The in_new_thread code could trigger a GC (it uses methods accepting a GCToken), so we might have moved the this pointer when we wait for the lock.

We probably need to copy 'this' into a variable and mark that variable as OnStack so we handle that case correctly. BTW, I'm online on irc if you want to discuss stuff

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@dbussink
Owner

One more thing, shouldn't the logic in Object* Thread::fork(STATE) be applied to Object* Thread::fork_attached(STATE) too? Or maybe we should make attached a boolean argument to the method and merge the two implementations.

@ryoqun
Collaborator

You're correct. I simply overlooked Thread::fork_attached.

For the fix, I chosen a different approach for the refactoring. If it's not good style, I'll update. I tend to be picky for refactoring..

First, I thought it's not good that two functions are doing slightly different thing while their names are similar with the shared verb (fork). So, I renamed Thread::fork_attached to Thread::fork_as_signal_handler, because the fork_attached is only used for signal handlers and will be so for a while. Also, fork_attached is the single specialization of Thread::fork, so it's acceptable to specialize the function much more. In other words, do so to be "for signal handlers", rather than just to be "(be) attached". If the function would be used by several components, it should be named like the current generic fork_attached.

Secondly, I moved signal-handler-specific code from the call site of fork_attached. This is valid because fork_attached became specific by renaming.

Lastly, I created a helper function to be called from both fork and fork_as_signal_handler. It calls pthread_create and lock/unlock init_lock_.

Additionally, in my opinion, the following code at the call site of fork_attached reads much easier:

void SignalHandler::run(STATE) {
  thread_.get()->fork_as_signal_handler(state); // Reader's reasoning: OK, we're specifically forking this thread as a signal handler. We're doing rather unusual thing (setting up the signal handler), so it's plausible that we must fork thread differently in some way.
}

than

void SignalHandler::run(STATE) {
  thread_.get()->fork(state, true); // Reader's reasoning: Ugh, what does the true mean?? Is fork used with true very often?? I need to read the implementation of fork as well..
}

This is based on the general principle that when two or more versions of similar functions are implemented, it's more readable when their differentiated behaviour is encoded into the names of separated functions, rather then the switch parameter of the single function.

@dbussink dbussink merged commit a0c592b into rubinius:master
@ryoqun
Collaborator

Thanks for merging!

@dbussink I'm really thank you for patiently reviewing this pull request!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Sep 10, 2012
  1. @ryoqun
This page is out of date. Refresh to see the latest.
View
28 kernel/bootstrap/thread.rb
@@ -148,10 +148,9 @@ def thread_is_setup?
end
def alive?
- @lock.receive
- @alive
- ensure
- @lock.send nil
+ Rubinius.synchronize(self) do
+ @alive
+ end
end
def stop?
@@ -169,10 +168,9 @@ def kill
alias_method :terminate, :kill
def sleeping?
- @lock.receive
- @sleep
- ensure
- @lock.send nil
+ Rubinius.synchronize(self) do
+ @sleep
+ end
end
def status
@@ -205,12 +203,12 @@ def add_to_group(group)
def join_inner(timeout = undefined)
result = nil
- @lock.receive
+ Rubinius.lock(self)
begin
if @alive
jc = Rubinius::Channel.new
@joins << jc
- @lock.send nil
+ Rubinius.unlock(self)
begin
if timeout.equal? undefined
while true
@@ -231,23 +229,23 @@ def join_inner(timeout = undefined)
end
end
ensure
- @lock.receive
+ Rubinius.lock(self)
end
end
Kernel.raise @exception if @exception
result = yield
ensure
- @lock.send nil
+ Rubinius.unlock(self)
end
result
end
private :join_inner
def raise(exc=$!, msg=nil, trace=nil)
- @lock.receive
+ Rubinius.lock(self)
unless @alive
- @lock.send nil
+ Rubinius.unlock(self)
return self
end
@@ -268,7 +266,7 @@ def raise(exc=$!, msg=nil, trace=nil)
Kernel.raise exc if self == Thread.current
ensure
- @lock.send nil
+ Rubinius.unlock(self)
end
raise_prim exc
View
10 kernel/bootstrap/thread18.rb
@@ -53,11 +53,10 @@ def self.critical=(value)
def __run__()
begin
begin
- @lock.send nil
+ Rubinius.unlock(self)
@result = @block.call(*@args)
ensure
- @lock.receive
- unlock_locks
+ Rubinius.lock(self)
@joins.each { |join| join.send self }
end
rescue Die
@@ -68,7 +67,8 @@ def __run__()
@exception = e # unless @dying
ensure
@alive = false
- @lock.send nil
+ Rubinius.unlock(self)
+ unlock_locks
end
if @exception
@@ -87,8 +87,6 @@ def setup(prime_lock)
@exception = nil
@critical = false
@dying = false
- @lock = Rubinius::Channel.new
- @lock.send nil if prime_lock
@joins = []
end
View
10 kernel/bootstrap/thread19.rb
@@ -38,11 +38,10 @@ def self.stop
def __run__()
begin
begin
- @lock.send nil
+ Rubinius.unlock(self)
@result = @block.call(*@args)
ensure
- @lock.receive
- unlock_locks
+ Rubinius.lock(self)
@joins.each { |join| join.send self }
end
rescue Die
@@ -54,7 +53,8 @@ def __run__()
@exception = e # unless @dying
ensure
@alive = false
- @lock.send nil
+ Rubinius.unlock(self)
+ unlock_locks
end
if @exception
@@ -73,8 +73,6 @@ def setup(prime_lock)
@exception = nil
@critical = false
@dying = false
- @lock = Rubinius::Channel.new
- @lock.send nil if prime_lock
@joins = []
@killed = false
end
View
38 vm/builtin/thread.cpp
@@ -180,6 +180,28 @@ namespace rubinius {
return cFalse;
}
+ int Thread::start_new_thread(STATE, const pthread_attr_t &attrs) {
+ Thread* self = this;
+ OnStack<1> os(state, self);
+
+ int error = pthread_create(&vm_->os_thread(), &attrs, in_new_thread, (void*)vm_);
+ if(error) {
+ return error;
+ }
+
+ // We can't return from here until the new thread completes a minimal
+ // initialization. After the initialization, it unlocks init_lock_.
+ // So, wait here until we can lock init_lock_ after that.
+ self->init_lock_.lock();
+
+ // We locked init_lock_. And we are sure that the new thread completed
+ // the initialization.
+ // Locking init_lock_ isn't needed anymore, so unlock it.
+ self->init_lock_.unlock();
+
+ return 0;
+ }
+
void* Thread::in_new_thread(void* ptr) {
VM* vm = reinterpret_cast<VM*>(ptr);
@@ -194,7 +216,6 @@ namespace rubinius {
utilities::thread::Thread::set_os_name(tn.str().c_str());
state->set_call_frame(0);
- vm->shared.gc_dependent(state);
if(cDebugThreading) {
std::cerr << "[THREAD " << vm->thread_id()
@@ -203,8 +224,18 @@ namespace rubinius {
vm->set_root_stack(reinterpret_cast<uintptr_t>(&calculate_stack), THREAD_STACK_SIZE);
+ GCTokenImpl gct;
+
+ // Lock the thread object and unlock it at __run__ in the ruby land.
+ vm->thread->hard_lock(state, gct);
+
vm->thread->init_lock_.unlock();
+ // Become GC-dependent after unlocking init_lock_ to avoid deadlocks.
+ // gc_dependent may lock when it detects GC is happening. Also the parent
+ // thread is locked until init_lock_ is unlocked by this child thread.
+ vm->shared.gc_dependent(state);
+
vm->shared.tool_broker()->thread_start(state);
Object* ret = vm->thread->runner_(state);
vm->shared.tool_broker()->thread_stop(state);
@@ -217,7 +248,6 @@ namespace rubinius {
vm->thread->init_lock_.lock();
- GCTokenImpl gct;
std::list<ObjectHeader*>& los = vm->locked_objects();
for(std::list<ObjectHeader*>::iterator i = los.begin();
@@ -253,7 +283,7 @@ namespace rubinius {
pthread_attr_setstacksize(&attrs, THREAD_STACK_SIZE);
pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
- int error = pthread_create(&vm_->os_thread(), &attrs, in_new_thread, (void*)vm_);
+ int error = start_new_thread(state, attrs);
if(error) {
Exception::thread_error(state, strerror(error));
@@ -266,7 +296,7 @@ namespace rubinius {
pthread_attr_init(&attrs);
pthread_attr_setstacksize(&attrs, THREAD_STACK_SIZE);
- return pthread_create(&vm_->os_thread(), &attrs, in_new_thread, (void*)vm_);
+ return start_new_thread(state, attrs);
}
Object* Thread::pass(STATE, CallFrame* calling_environment) {
View
1  vm/builtin/thread.hpp
@@ -249,6 +249,7 @@ namespace rubinius {
static Thread* create(STATE, VM* target, Object* self, Run runner,
bool main_thread = false);
+ int start_new_thread(STATE, const pthread_attr_t &attrs);
static void* in_new_thread(void*);
public: /* TypeInfo */
Something went wrong with that request. Please try again.