Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Thread#join when scheduler is used (2nd attempt). #4689

Merged
merged 1 commit into from
Aug 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions test/fiber/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,10 @@ def close

self.run
ensure
@urgent.each(&:close)
@urgent = nil
if @urgent
@urgent.each(&:close)
@urgent = nil
end

@closed = true

Expand Down Expand Up @@ -240,3 +242,13 @@ def unblock(blocker, fiber)
raise "Broken unblock!"
end
end

class SleepingUnblockScheduler < Scheduler
# This method is invoked when the thread is exiting.
def unblock(blocker, fiber)
super

# This changes the current thread state to `THREAD_RUNNING` which causes `thread_join_sleep` to hang.
sleep(0.1)
end
end
39 changes: 39 additions & 0 deletions test/fiber/test_thread.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,31 @@ def test_thread_join
assert_equal :done, thread.value
end

def test_thread_join_implicit
sleeping = false
finished = false

thread = Thread.new do
scheduler = Scheduler.new
Fiber.set_scheduler scheduler

Fiber.schedule do
sleeping = true
sleep(0.1)
finished = true
end

:done
end

Thread.pass until sleeping

thread.join

assert_equal :done, thread.value
assert finished, "Scheduler thread's task should be finished!"
end

def test_thread_join_blocking
thread = Thread.new do
scheduler = Scheduler.new
Expand Down Expand Up @@ -66,4 +91,18 @@ def test_broken_unblock
thread.join
end
end

def test_thread_join_hang
thread = Thread.new do
scheduler = SleepingUnblockScheduler.new

Fiber.set_scheduler scheduler

Fiber.schedule do
Thread.new{sleep(0.01)}.value
end
end

thread.join
end
end
75 changes: 48 additions & 27 deletions thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,7 @@ thread_cleanup_func_before_exec(void *th_ptr)
{
rb_thread_t *th = th_ptr;
th->status = THREAD_KILLED;

// The thread stack doesn't exist in the forked process:
th->ec->machine.stack_start = th->ec->machine.stack_end = NULL;

Expand Down Expand Up @@ -688,7 +689,7 @@ rb_vm_proc_local_ep(VALUE proc)
VALUE rb_vm_invoke_proc_with_self(rb_execution_context_t *ec, rb_proc_t *proc, VALUE self,
int argc, const VALUE *argv, int kw_splat, VALUE passed_block_handler);

static void
static VALUE
thread_do_start_proc(rb_thread_t *th)
{
VALUE args = th->invoke_arg.proc.args;
Expand All @@ -702,7 +703,6 @@ thread_do_start_proc(rb_thread_t *th)
th->ec->root_lep = rb_vm_proc_local_ep(procval);
th->ec->root_svar = Qfalse;

EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_BEGIN, th->self, 0, 0, 0, Qundef);
vm_check_ints_blocking(th->ec);

if (th->invoke_type == thread_invoke_type_ractor_proc) {
Expand All @@ -713,11 +713,12 @@ thread_do_start_proc(rb_thread_t *th)
rb_ractor_receive_parameters(th->ec, th->ractor, args_len, (VALUE *)args_ptr);
vm_check_ints_blocking(th->ec);

// kick thread
th->value = rb_vm_invoke_proc_with_self(th->ec, proc, self,
args_len, args_ptr,
th->invoke_arg.proc.kw_splat,
VM_BLOCK_HANDLER_NONE);
return rb_vm_invoke_proc_with_self(
th->ec, proc, self,
args_len, args_ptr,
th->invoke_arg.proc.kw_splat,
VM_BLOCK_HANDLER_NONE
);
}
else {
args_len = RARRAY_LENINT(args);
Expand All @@ -733,38 +734,46 @@ thread_do_start_proc(rb_thread_t *th)

vm_check_ints_blocking(th->ec);

// kick thread
th->value = rb_vm_invoke_proc(th->ec, proc,
args_len, args_ptr,
th->invoke_arg.proc.kw_splat,
VM_BLOCK_HANDLER_NONE);
}

EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_END, th->self, 0, 0, 0, Qundef);

if (th->invoke_type == thread_invoke_type_ractor_proc) {
rb_ractor_atexit(th->ec, th->value);
return rb_vm_invoke_proc(
th->ec, proc,
args_len, args_ptr,
th->invoke_arg.proc.kw_splat,
VM_BLOCK_HANDLER_NONE
);
}
}

static void
thread_do_start(rb_thread_t *th)
{
native_set_thread_name(th);
VALUE result = Qundef;

EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_BEGIN, th->self, 0, 0, 0, Qundef);

switch (th->invoke_type) {
case thread_invoke_type_proc:
result = thread_do_start_proc(th);
break;

case thread_invoke_type_ractor_proc:
thread_do_start_proc(th);
result = thread_do_start_proc(th);
rb_ractor_atexit(th->ec, result);
break;

case thread_invoke_type_func:
th->value = (*th->invoke_arg.func.func)(th->invoke_arg.func.arg);
result = (*th->invoke_arg.func.func)(th->invoke_arg.func.arg);
break;

case thread_invoke_type_none:
rb_bug("unreachable");
}

rb_fiber_scheduler_set(Qnil);

th->value = result;

EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_END, th->self, 0, 0, 0, Qundef);
}

void rb_ec_clear_current_thread_trace_func(const rb_execution_context_t *ec);
Expand Down Expand Up @@ -817,6 +826,9 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start)

thread_debug("thread start (get lock): %p\n", (void *)th);

// Ensure that we are not joinable.
VM_ASSERT(th->value == Qundef);

EC_PUSH_TAG(th->ec);

if ((state = EC_EXEC_TAG()) == TAG_NONE) {
Expand Down Expand Up @@ -857,6 +869,12 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start)
th->value = Qnil;
}

// The thread is effectively finished and can be joined.
VM_ASSERT(th->value != Qundef);

rb_threadptr_join_list_wakeup(th);
rb_threadptr_unlock_all_locking_mutexes(th);

if (th->invoke_type == thread_invoke_type_ractor_proc) {
rb_thread_terminate_all(th);
rb_ractor_teardown(th->ec);
Expand All @@ -874,9 +892,6 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start)
rb_threadptr_raise(ractor_main_th, 1, &errinfo);
}

rb_threadptr_join_list_wakeup(th);
rb_threadptr_unlock_all_locking_mutexes(th);

EC_POP_TAG();

rb_ec_clear_current_thread_trace_func(th->ec);
Expand Down Expand Up @@ -1153,6 +1168,12 @@ remove_from_join_list(VALUE arg)

static rb_hrtime_t *double2hrtime(rb_hrtime_t *, double);

static int
thread_finished(rb_thread_t *th)
{
return th->status == THREAD_KILLED || th->value != Qundef;
}

static VALUE
thread_join_sleep(VALUE arg)
{
Expand All @@ -1179,7 +1200,7 @@ thread_join_sleep(VALUE arg)
end = rb_hrtime_add(*limit, rb_hrtime_now());
}

while (target_th->status != THREAD_KILLED) {
while (!thread_finished(target_th)) {
VALUE scheduler = rb_fiber_scheduler_current();

if (scheduler != Qnil) {
Expand Down Expand Up @@ -3319,11 +3340,11 @@ rb_thread_status(VALUE thread)
static VALUE
rb_thread_alive_p(VALUE thread)
{
if (rb_threadptr_dead(rb_thread_ptr(thread))) {
return Qfalse;
if (thread_finished(rb_thread_ptr(thread))) {
return Qfalse;
}
else {
return Qtrue;
return Qtrue;
}
}

Expand Down
17 changes: 10 additions & 7 deletions vm.c
Original file line number Diff line number Diff line change
Expand Up @@ -3075,6 +3075,8 @@ th_init(rb_thread_t *th, VALUE self)
th->thread_id_string[0] = '\0';
#endif

th->value = Qundef;

#if OPT_CALL_THREADED_CODE
th->retval = Qundef;
#endif
Expand All @@ -3087,16 +3089,17 @@ static VALUE
ruby_thread_init(VALUE self)
{
rb_thread_t *th = GET_THREAD();
rb_thread_t *targe_th = rb_thread_ptr(self);
rb_thread_t *target_th = rb_thread_ptr(self);
rb_vm_t *vm = th->vm;

targe_th->vm = vm;
th_init(targe_th, self);
target_th->vm = vm;
th_init(target_th, self);

target_th->top_wrapper = 0;
target_th->top_self = rb_vm_top_self();
target_th->ec->root_svar = Qfalse;
target_th->ractor = th->ractor;

targe_th->top_wrapper = 0;
targe_th->top_self = rb_vm_top_self();
targe_th->ec->root_svar = Qfalse;
targe_th->ractor = th->ractor;
return self;
}

Expand Down