Skip to content

Commit

Permalink
Fix potential hang when joining threads.
Browse files Browse the repository at this point in the history
If the thread termination invokes user code after `th->status` becomes
`THREAD_KILLED`, and the user unblock function causes that `th->status` to
become something else (e.g. `THREAD_RUNNING`), threads waiting in
`thread_join_sleep` will hang forever. We move the unblock function call
to before the thread status is updated, and allow threads to join as soon
as `th->value` becomes defined.

This reverts commit 6505c77.
  • Loading branch information
ioquatix committed Aug 3, 2021
1 parent 785c70e commit 2d4f29e
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 36 deletions.
16 changes: 14 additions & 2 deletions test/fiber/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,10 @@ def close

self.run
ensure
@urgent.each(&:close)
@urgent = nil
if @urgent
@urgent.each(&:close)
@urgent = nil
end

@closed = true

Expand Down Expand Up @@ -240,3 +242,13 @@ def unblock(blocker, fiber)
raise "Broken unblock!"
end
end

class SleepingUnblockScheduler < Scheduler
# This method is invoked when the thread is exiting.
def unblock(blocker, fiber)
super

# This changes the current thread state to `THREAD_RUNNING` which causes `thread_join_sleep` to hang.
sleep(0.1)
end
end
39 changes: 39 additions & 0 deletions test/fiber/test_thread.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,31 @@ def test_thread_join
assert_equal :done, thread.value
end

def test_thread_join_implicit
sleeping = false
finished = false

thread = Thread.new do
scheduler = Scheduler.new
Fiber.set_scheduler scheduler

Fiber.schedule do
sleeping = true
sleep(0.1)
finished = true
end

:done
end

Thread.pass until sleeping

thread.join

assert_equal :done, thread.value
assert finished, "Scheduler thread's task should be finished!"
end

def test_thread_join_blocking
thread = Thread.new do
scheduler = Scheduler.new
Expand Down Expand Up @@ -66,4 +91,18 @@ def test_broken_unblock
thread.join
end
end

def test_thread_join_hang
thread = Thread.new do
scheduler = SleepingUnblockScheduler.new

Fiber.set_scheduler scheduler

Fiber.schedule do
Thread.new{sleep(0.01)}.value
end
end

thread.join
end
end
75 changes: 48 additions & 27 deletions thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,7 @@ thread_cleanup_func_before_exec(void *th_ptr)
{
rb_thread_t *th = th_ptr;
th->status = THREAD_KILLED;

// The thread stack doesn't exist in the forked process:
th->ec->machine.stack_start = th->ec->machine.stack_end = NULL;

Expand Down Expand Up @@ -688,7 +689,7 @@ rb_vm_proc_local_ep(VALUE proc)
VALUE rb_vm_invoke_proc_with_self(rb_execution_context_t *ec, rb_proc_t *proc, VALUE self,
int argc, const VALUE *argv, int kw_splat, VALUE passed_block_handler);

static void
static VALUE
thread_do_start_proc(rb_thread_t *th)
{
VALUE args = th->invoke_arg.proc.args;
Expand All @@ -702,7 +703,6 @@ thread_do_start_proc(rb_thread_t *th)
th->ec->root_lep = rb_vm_proc_local_ep(procval);
th->ec->root_svar = Qfalse;

EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_BEGIN, th->self, 0, 0, 0, Qundef);
vm_check_ints_blocking(th->ec);

if (th->invoke_type == thread_invoke_type_ractor_proc) {
Expand All @@ -713,11 +713,12 @@ thread_do_start_proc(rb_thread_t *th)
rb_ractor_receive_parameters(th->ec, th->ractor, args_len, (VALUE *)args_ptr);
vm_check_ints_blocking(th->ec);

// kick thread
th->value = rb_vm_invoke_proc_with_self(th->ec, proc, self,
args_len, args_ptr,
th->invoke_arg.proc.kw_splat,
VM_BLOCK_HANDLER_NONE);
return rb_vm_invoke_proc_with_self(
th->ec, proc, self,
args_len, args_ptr,
th->invoke_arg.proc.kw_splat,
VM_BLOCK_HANDLER_NONE
);
}
else {
args_len = RARRAY_LENINT(args);
Expand All @@ -733,38 +734,46 @@ thread_do_start_proc(rb_thread_t *th)

vm_check_ints_blocking(th->ec);

// kick thread
th->value = rb_vm_invoke_proc(th->ec, proc,
args_len, args_ptr,
th->invoke_arg.proc.kw_splat,
VM_BLOCK_HANDLER_NONE);
}

EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_END, th->self, 0, 0, 0, Qundef);

if (th->invoke_type == thread_invoke_type_ractor_proc) {
rb_ractor_atexit(th->ec, th->value);
return rb_vm_invoke_proc(
th->ec, proc,
args_len, args_ptr,
th->invoke_arg.proc.kw_splat,
VM_BLOCK_HANDLER_NONE
);
}
}

static void
thread_do_start(rb_thread_t *th)
{
native_set_thread_name(th);
VALUE result = Qundef;

EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_BEGIN, th->self, 0, 0, 0, Qundef);

switch (th->invoke_type) {
case thread_invoke_type_proc:
result = thread_do_start_proc(th);
break;

case thread_invoke_type_ractor_proc:
thread_do_start_proc(th);
result = thread_do_start_proc(th);
rb_ractor_atexit(th->ec, result);
break;

case thread_invoke_type_func:
th->value = (*th->invoke_arg.func.func)(th->invoke_arg.func.arg);
result = (*th->invoke_arg.func.func)(th->invoke_arg.func.arg);
break;

case thread_invoke_type_none:
rb_bug("unreachable");
}

rb_fiber_scheduler_set(Qnil);

th->value = result;

EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_END, th->self, 0, 0, 0, Qundef);
}

void rb_ec_clear_current_thread_trace_func(const rb_execution_context_t *ec);
Expand Down Expand Up @@ -817,6 +826,9 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start)

thread_debug("thread start (get lock): %p\n", (void *)th);

// Ensure that we are not joinable.
VM_ASSERT(th->value == Qundef);

EC_PUSH_TAG(th->ec);

if ((state = EC_EXEC_TAG()) == TAG_NONE) {
Expand Down Expand Up @@ -857,6 +869,12 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start)
th->value = Qnil;
}

// The thread is effectively finished and can be joined.
VM_ASSERT(th->value != Qundef);

rb_threadptr_join_list_wakeup(th);
rb_threadptr_unlock_all_locking_mutexes(th);

if (th->invoke_type == thread_invoke_type_ractor_proc) {
rb_thread_terminate_all(th);
rb_ractor_teardown(th->ec);
Expand All @@ -874,9 +892,6 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start)
rb_threadptr_raise(ractor_main_th, 1, &errinfo);
}

rb_threadptr_join_list_wakeup(th);
rb_threadptr_unlock_all_locking_mutexes(th);

EC_POP_TAG();

rb_ec_clear_current_thread_trace_func(th->ec);
Expand Down Expand Up @@ -1153,6 +1168,12 @@ remove_from_join_list(VALUE arg)

static rb_hrtime_t *double2hrtime(rb_hrtime_t *, double);

static int
thread_finished(rb_thread_t *th)
{
return th->status == THREAD_KILLED || th->value != Qundef;
}

static VALUE
thread_join_sleep(VALUE arg)
{
Expand All @@ -1179,7 +1200,7 @@ thread_join_sleep(VALUE arg)
end = rb_hrtime_add(*limit, rb_hrtime_now());
}

while (target_th->status != THREAD_KILLED) {
while (!thread_finished(target_th)) {
VALUE scheduler = rb_fiber_scheduler_current();

if (scheduler != Qnil) {
Expand Down Expand Up @@ -3319,11 +3340,11 @@ rb_thread_status(VALUE thread)
static VALUE
rb_thread_alive_p(VALUE thread)
{
if (rb_threadptr_dead(rb_thread_ptr(thread))) {
return Qfalse;
if (thread_finished(rb_thread_ptr(thread))) {
return Qfalse;
}
else {
return Qtrue;
return Qtrue;
}
}

Expand Down
17 changes: 10 additions & 7 deletions vm.c
Original file line number Diff line number Diff line change
Expand Up @@ -3075,6 +3075,8 @@ th_init(rb_thread_t *th, VALUE self)
th->thread_id_string[0] = '\0';
#endif

th->value = Qundef;

#if OPT_CALL_THREADED_CODE
th->retval = Qundef;
#endif
Expand All @@ -3087,16 +3089,17 @@ static VALUE
ruby_thread_init(VALUE self)
{
rb_thread_t *th = GET_THREAD();
rb_thread_t *targe_th = rb_thread_ptr(self);
rb_thread_t *target_th = rb_thread_ptr(self);
rb_vm_t *vm = th->vm;

targe_th->vm = vm;
th_init(targe_th, self);
target_th->vm = vm;
th_init(target_th, self);

target_th->top_wrapper = 0;
target_th->top_self = rb_vm_top_self();
target_th->ec->root_svar = Qfalse;
target_th->ractor = th->ractor;

targe_th->top_wrapper = 0;
targe_th->top_self = rb_vm_top_self();
targe_th->ec->root_svar = Qfalse;
targe_th->ractor = th->ractor;
return self;
}

Expand Down

0 comments on commit 2d4f29e

Please sign in to comment.