Skip to content

Commit

Permalink
Fix Thread and Channel to interact properly
Browse files Browse the repository at this point in the history
  • Loading branch information
Evan Phoenix committed Oct 3, 2008
1 parent 88fc943 commit 719c98f
Show file tree
Hide file tree
Showing 17 changed files with 144 additions and 24 deletions.
8 changes: 6 additions & 2 deletions kernel/bootstrap/thread.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@ def self.pass
Kernel.raise PrimitiveFailure, "primitive failed"
end

def self.dequeue
def dequeue
Ruby.primitive :thread_dequeue
Kernel.raise PrimitiveFailure, "primitive failed"
end


def self.dequeue
Thread.current.dequeue
end

def run
Ruby.primitive :thread_run
Kernel.raise ThreadError, "killed thread"
Expand Down
4 changes: 4 additions & 0 deletions kernel/common/compile.rb
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,10 @@ def self.single_load(dir, rb, rbc, ext, requiring, options)
# Store it for the future
Rubinius::CompiledFile.dump cm, rbc_path
else
if $DEBUG_LOADING
STDERR.puts "[Loading #{rbc_path}]"
end

compile_feature(rb, requiring) do
cm = load_from_rbc(rbc_path, version_number)
# cm is nil if the file is out of date, version wise.
Expand Down
2 changes: 1 addition & 1 deletion kernel/common/thread.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ def inspect

def setup(prime_lock)
@group = nil
@sleep = true
@alive = true
@result = nil
@exception = nil
@critical = false
@locals = LookupTable.new
@lock = Channel.new
@lock.send nil if prime_lock
@joins = []
end

def initialize(*args)
Expand Down
9 changes: 8 additions & 1 deletion rakelib/vm.rake
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,14 @@ def compile_c(obj, src)
$llvm_c.delete_if { |e| e.index("-O") == 0 }
end

flags = (INCLUDES + FLAGS + $llvm_c).join(' ')
flags = INCLUDES + FLAGS + $llvm_c

# GROSS
if src == "vm/test/runner.cpp"
flags.delete_if { |f| /-O.*/.match(f) }
end

flags = flags.join(" ")

if $verbose
sh "#{CC} #{flags} -c -o #{obj} #{src} 2>&1"
Expand Down
5 changes: 4 additions & 1 deletion vm/builtin/block_environment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ namespace rubinius {
} else {
val = Qnil;
}
task->pop(); // Remove this from the stack.
BlockContext* ctx = create_context(state, task->active());
if(task->profiler) {
profiler::Method* prof_meth = task->profiler->enter_method(
Expand All @@ -79,6 +78,10 @@ namespace rubinius {
prof_meth->set_position(method_->file(), method_->start_line(state));
}
}

// HACK: manually clear the stack used as args.
task->active()->clear_stack(msg.stack);

task->make_active(ctx);
task->push(val);
}
Expand Down
26 changes: 26 additions & 0 deletions vm/builtin/channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "builtin/fixnum.hpp"
#include "builtin/float.hpp"
#include "builtin/io.hpp"
#include "builtin/contexts.hpp"

#include "event.hpp"

Expand Down Expand Up @@ -38,6 +39,29 @@ namespace rubinius {
return Qnil;
}

bool Channel::receive_prim(STATE, Executable* exec, Task* task, Message& msg) {
// TODO check arity
//

// HACK manually clear the stack of msg's values
task->active()->clear_stack(msg.stack);

if(!value_->nil_p()) {
OBJECT val = as<List>(value_)->shift(state);
task->push(val);
return false;
}

/* We push nil on the stack to reserve a place to put the result. */
state->return_value(Qfalse);

G(current_thread)->sleep_for(state, this);
waiting_->append(state, G(current_thread));

state->run_best_thread();
return true;
}

OBJECT Channel::receive(STATE) {
if(!value_->nil_p()) {
OBJECT val = as<List>(value_)->shift(state);
Expand All @@ -50,7 +74,9 @@ namespace rubinius {

G(current_thread)->sleep_for(state, this);
waiting_->append(state, G(current_thread));

state->run_best_thread();

return Qnil;
}

Expand Down
7 changes: 6 additions & 1 deletion vm/builtin/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ namespace rubinius {
class List;
class IO;
class IOBuffer;
class Task;
class Message;
class Executable;

class Channel : public Object {
public:
Expand All @@ -36,7 +39,9 @@ namespace rubinius {
// Ruby.primitive :channel_send
OBJECT send(STATE, OBJECT);

// Ruby.primitive :channel_receive
// Ruby.primitive? :channel_receive
bool receive_prim(STATE, Executable* exec, Task* task, Message& msg);

OBJECT receive(STATE);
bool has_readers_p();

Expand Down
1 change: 1 addition & 0 deletions vm/builtin/object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ namespace rubinius {
msg.name = name;
msg.recv = this;
msg.lookup_from = this->lookup_begin(state);
msg.stack = 0;

msg.set_arguments(state, args);

Expand Down
12 changes: 11 additions & 1 deletion vm/builtin/task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ namespace rubinius {

MethodContext* Task::current_context(STATE) {
MethodContext* context = active_;
context->reference(state); // HACK not implemented yet
context->reference(state);
return context;
}

Expand Down Expand Up @@ -262,6 +262,12 @@ namespace rubinius {

OBJECT Task::call_object(STATE, OBJECT recv, SYMBOL meth, Array* args) {
recv->send_on_task(state, this, meth, args);

// HACK by calling directly into another Task, the context cache gets
// confused. So we force references to the top contexts here to keep
// the cache sane.
active_->reference(state);
home_->reference(state);
return Qtrue;
}

Expand Down Expand Up @@ -725,9 +731,13 @@ namespace rubinius {
// If we're switching tasks, return to the task monitor
if(state->interrupts.switch_task) {
state->interrupts.switch_task = false;
return;
}

}

if(state->wait_events) return;

if(state->om->collect_young_now || state->om->collect_mature_now) {
return;
}
Expand Down
3 changes: 3 additions & 0 deletions vm/builtin/task.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ namespace rubinius {
public:
Halt(const char* str) : std::runtime_error(str) { }
};

class WaitOnEvents {
};
};
}

Expand Down
30 changes: 29 additions & 1 deletion vm/builtin/thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "builtin/symbol.hpp"
#include "builtin/task.hpp"
#include "builtin/contexts.hpp"
#include "builtin/channel.hpp"

#include "objectmemory.hpp"
#include "vm.hpp"
Expand All @@ -29,6 +30,8 @@ namespace rubinius {
thr->priority(state, Fixnum::from(2));

thr->boot_task(state);
thr->sleep(state, Qtrue);
thr->alive(state, Qtrue);

return thr;
}
Expand Down Expand Up @@ -56,18 +59,43 @@ namespace rubinius {
return this;
}

// Called when VM is putting this Thread back into play. It
// doesn't mean it's about to run, just that it's scheduled
// to do so.
void Thread::woken(STATE) {
sleep(state, Qfalse);
channel(state, (Channel*)Qnil);
}

void Thread::set_top(STATE, OBJECT val) {
task_->active()->set_top(val);
}

void Thread::sleep_for(STATE, Channel* chan) {
channel(state, chan);
set_ivar(state, state->symbol("@sleep"), Qtrue);
sleep(state, Qtrue);
}

Thread* Thread::wakeup(STATE) {
state->queue_thread(this);
return this;
}

OBJECT Thread::raise(STATE, Exception* exc) {
return task_->raise(state, exc);
}

bool Thread::dequeue_prim(STATE, Executable* exec, Task* task, Message& msg) {
alive(state, Qfalse);
task_ = (Task*)Qnil;

// TODO make sure this isn't in global.scheduled_threads

// TODO clear the channel's events rather than making sure there isn't one
assert(channel()->nil_p());

state->run_best_thread();
return true;
}

}
14 changes: 14 additions & 0 deletions vm/builtin/thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
namespace rubinius {
class Channel;
class Task;
class Exception;

class Thread : public Object {
public:
Expand All @@ -19,13 +20,17 @@ namespace rubinius {
Task* task_; // slot
Channel* channel_; // slot
FIXNUM priority_; // slot
OBJECT alive_; // slot
OBJECT sleep_; // slot

public:
/* accessors */

attr_accessor(task, Task);
attr_accessor(channel, Channel);
attr_accessor(priority, Fixnum);
attr_accessor(sleep, Object);
attr_accessor(alive, Object);

/* interface */

Expand All @@ -51,6 +56,15 @@ namespace rubinius {
// Ruby.primitive :thread_schedule
Thread* wakeup(STATE);

// Ruby.primitive :thread_raise
OBJECT raise(STATE, Exception* exc);

// Ruby.primitive? :thread_dequeue
bool dequeue_prim(STATE, Executable* exec, Task* task, Message& msg);

// Called by the VM when this thread has been schedule to run.
void Thread::woken(STATE);

class Info : public TypeInfo {
public:
BASIC_TYPEINFO(TypeInfo)
Expand Down
5 changes: 4 additions & 1 deletion vm/external_libs/libev/ev.c
Original file line number Diff line number Diff line change
Expand Up @@ -968,7 +968,10 @@ ev_recommended_backends (void)
flags &= ~EVBACKEND_KQUEUE;
#endif
#ifdef __APPLE__
// flags &= ~EVBACKEND_KQUEUE; for documentation
// kqueue is broken for anything but sockets and pipes
flags &= ~EVBACKEND_KQUEUE;

// poll uses kquueue
flags &= ~EVBACKEND_POLL;
#endif

Expand Down
2 changes: 1 addition & 1 deletion vm/test/test_channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class TestChannel : public CxxTest::TestSuite {

chan->receive(state);
TS_ASSERT_EQUALS(G(current_thread), other);
TS_ASSERT_EQUALS(orig->get_ivar(state, state->symbol("@sleep")), Qtrue);
TS_ASSERT_EQUALS(orig->sleep(), Qtrue);
}

void test_receive_causes_event_block() {
Expand Down
4 changes: 2 additions & 2 deletions vm/test/test_compiled_file.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class TestCompiledFile : public CxxTest::TestSuite {
CompiledMethod* cm = as<CompiledMethod>(cf->body(state));
TS_ASSERT(cm);

Task* task = Task::create(state);
Task* task = state->new_task();

Message msg(state);
msg.set_args(0);
Expand All @@ -70,7 +70,7 @@ class TestCompiledFile : public CxxTest::TestSuite {

cm->execute(state, task, msg);

TS_ASSERT_THROWS(task->execute(), Task::Halt);
TS_ASSERT_THROWS(state->run_and_monitor(), Task::Halt);

Class* cls = try_as<Class>(G(object)->get_const(state, "Blah"));
TS_ASSERT(cls);
Expand Down
Loading

0 comments on commit 719c98f

Please sign in to comment.