Skip to content

Commit

Permalink
Cleaned up ThreadGroup.
Browse files Browse the repository at this point in the history
  • Loading branch information
brixen committed May 16, 2015
1 parent 433d9b8 commit 00c04b9
Show file tree
Hide file tree
Showing 9 changed files with 36 additions and 40 deletions.
1 change: 1 addition & 0 deletions kernel/bootstrap/load_order.txt
Expand Up @@ -44,6 +44,7 @@ stat.rbc
string.rbc
symbol.rbc
thread.rbc
thread_mirror.rbc
thunk.rbc
time.rbc
true.rbc
Expand Down
7 changes: 2 additions & 5 deletions kernel/bootstrap/thread.rb
Expand Up @@ -219,10 +219,6 @@ def group
@group
end

def add_to_group(group)
@group = group
end

def raise(exc=undefined, msg=nil, trace=nil)
Rubinius.lock(self)

Expand Down Expand Up @@ -377,7 +373,6 @@ def __run__
Rubinius.check_interrupts
ensure
unlock_locks
@joins.each { |join| join.send self }
end
end
rescue Exception => e
Expand All @@ -390,6 +385,8 @@ def __run__
end
end

Rubinius::Mirror.reflect(@group).remove self

if Rubinius.thread_state[0] == :thread_kill
@killed = true
end
Expand Down
9 changes: 9 additions & 0 deletions kernel/bootstrap/thread_mirror.rb
@@ -0,0 +1,9 @@
module Rubinius
class Mirror
class Thread < Mirror
def group=(group)
Rubinius.invoke_primitive :object_set_ivar, @object, :@group, group
end
end
end
end
1 change: 1 addition & 0 deletions kernel/common/load_order.txt
Expand Up @@ -80,6 +80,7 @@ symbol.rbc
mutex.rbc
thread.rbc
thread_group.rbc
thread_group_mirror.rbc
throw_catch.rbc
time.rbc
true.rbc
Expand Down
31 changes: 10 additions & 21 deletions kernel/common/thread_group.rb
Expand Up @@ -7,34 +7,23 @@ def initialize
Default = ThreadGroup.new

def add(thread)
if thread.group
thread.group.remove(thread)
end
thread.add_to_group self
if g = thread.group
raise ThreadError, "can't move from the enclosed thread group" if g.enclosed?

@threads.delete_if do |w|
obj = w.__object__
!(obj and obj.alive?)
gm = Rubinius::Mirror.reflect g
gm.remove thread
end

@threads << WeakRef.new(thread)
self
end
tm = Rubinius::Mirror.reflect thread
tm.group = self

def remove(thread)
if enclosed?
raise ThreadError, "can't move from the enclosed thread group"
end
@threads.delete_if { |w| w.__object__ == thread }
@threads << thread

self
end

def list
list = []
@threads.each do |w|
obj = w.__object__
list << obj if obj and obj.alive?
end
list
@threads
end

def enclose
Expand Down
13 changes: 13 additions & 0 deletions kernel/common/thread_group_mirror.rb
@@ -0,0 +1,13 @@
module Rubinius
class Mirror
class ThreadGroup < Mirror
self.subject = ::ThreadGroup

def remove(thread)
ary = Rubinius.invoke_primitive :object_get_ivar, @object, :@threads
ary.delete thread
end

end
end
end
9 changes: 0 additions & 9 deletions vm/builtin/thread.cpp
Expand Up @@ -72,7 +72,6 @@ namespace rubinius {
thr->result(state, cFalse);
thr->exception(state, nil<Exception>());
thr->critical(state, cFalse);
thr->joins(state, Array::create(state, 1));
thr->killed(state, cFalse);
thr->priority(state, Fixnum::from(0));
thr->pid(state, Fixnum::from(0));
Expand Down Expand Up @@ -474,14 +473,6 @@ namespace rubinius {
return Location::mri_backtrace(state, cf);
}

void Thread::release_joins(STATE, GCToken gct, CallFrame* calling_environment) {
for(native_int i = 0; i < joins_->size(); ++i) {
if(Channel* chn = try_as<Channel>(joins_->get(state, i))) {
chn->send(state, gct, this, calling_environment);
}
}
}

void Thread::stopped() {
alive_ = cFalse;
}
Expand Down
3 changes: 0 additions & 3 deletions vm/builtin/thread.hpp
Expand Up @@ -44,7 +44,6 @@ namespace rubinius {
Object* result_; // slot
Exception* exception_; // slot
Object* critical_; // slot
Array* joins_; // slot
Object* killed_; // slot
Fixnum* priority_; // slot
Fixnum* pid_; // slot
Expand Down Expand Up @@ -86,7 +85,6 @@ namespace rubinius {
attr_accessor(result, Object);
attr_accessor(exception, Exception);
attr_accessor(critical, Object);
attr_accessor(joins, Array);
attr_accessor(killed, Object);
attr_accessor(priority, Fixnum);
attr_accessor(pid, Fixnum);
Expand Down Expand Up @@ -271,7 +269,6 @@ namespace rubinius {

void init_lock();
void stopped();
void release_joins(STATE, GCToken gct, CallFrame* calling_environment);

/**
* Create a Thread object.
Expand Down
2 changes: 0 additions & 2 deletions vm/capi/thread.cpp
Expand Up @@ -294,7 +294,6 @@ extern "C" {
self->hard_lock(state, gct, call_frame, false);
Exception* exc = capi::c_as<Exception>(self->current_exception(state));
self->exception(state, exc);
self->release_joins(state, gct, call_frame);
self->alive(state, cFalse);
self->hard_unlock(state, gct, call_frame);
}
Expand All @@ -314,7 +313,6 @@ extern "C" {
OnStack<1> os(state, self);

self->hard_lock(state, gct, &cf, false);
self->release_joins(state, gct, &cf);
self->alive(state, cFalse);
self->hard_unlock(state, gct, &cf);

Expand Down

3 comments on commit 00c04b9

@godfat
Copy link
Contributor

@godfat godfat commented on 00c04b9 Jul 17, 2015

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess ThreadGroup#add is not thread safe itself? Should I use a mutex to protect it while adding another thread to the group manually?

Thanks!

Reference: https://travis-ci.org/godfat/pork/jobs/71427711

@brixen
Copy link
Member Author

@brixen brixen commented on 00c04b9 Jul 17, 2015

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@godfat hm, this is an interesting case. Could you please open an issue so we can discuss it. While ThreadGroup is simply using an Array underneath (hence the error you saw) so by itself, it's not thread-safe. However, given the facility of ThreadGroup, I'm inclined to make the operations synchronized on Rubinius.

@godfat
Copy link
Contributor

@godfat godfat commented on 00c04b9 Jul 17, 2015

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed. Thanks for looking into it! #3467

Please sign in to comment.