Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Threads in a ractor will be killed with the ractor #3754

Merged
merged 2 commits into from Nov 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 33 additions & 1 deletion bootstraptest/test_ractor.rb
Expand Up @@ -396,6 +396,39 @@ def test n
end
}

# threads in a ractor will killed
assert_equal '{:ok=>3}', %q{
Ractor.new Ractor.current do |main|
q = Queue.new
Thread.new do
q << true
loop{}
ensure
main << :ok
end

Thread.new do
q << true
while true
end
ensure
main << :ok
end

Thread.new do
q << true
sleep 1
ensure
main << :ok
end

# wait for the start of all threads
3.times{q.pop}
end

3.times.map{Ractor.receive}.tally
}

# unshareable object are copied
assert_equal 'false', %q{
obj = 'str'.dup
Expand Down Expand Up @@ -516,7 +549,6 @@ def check obj1
results << check(C.new(false).freeze) # false
}


# move example2: String
# touching moved object causes an error
assert_equal 'hello world', %q{
Expand Down
1 change: 0 additions & 1 deletion eval_intern.h
Expand Up @@ -293,7 +293,6 @@ rb_cref_t *rb_vm_cref(void);
rb_cref_t *rb_vm_cref_replace_with_duplicated_cref(void);
VALUE rb_vm_call_cfunc(VALUE recv, VALUE (*func)(VALUE), VALUE arg, VALUE block_handler, VALUE filename);
void rb_vm_set_progname(VALUE filename);
void rb_thread_terminate_all(void);
VALUE rb_vm_cbase(void);

/* vm_backtrace.c */
Expand Down
4 changes: 2 additions & 2 deletions ractor.c
Expand Up @@ -1591,7 +1591,7 @@ rb_ractor_terminate_interrupt_main_thread(rb_ractor_t *r)
}
}

void rb_thread_terminate_all(void); // thread.c
void rb_thread_terminate_all(rb_thread_t *th); // thread.c

static void
ractor_terminal_interrupt_all(rb_vm_t *vm)
Expand Down Expand Up @@ -1620,7 +1620,7 @@ rb_ractor_terminate_all(void)
ractor_terminal_interrupt_all(vm); // kill all ractors
RB_VM_UNLOCK();
}
rb_thread_terminate_all(); // kill other threads in main-ractor and wait
rb_thread_terminate_all(GET_THREAD()); // kill other threads in main-ractor and wait

RB_VM_LOCK();
{
Expand Down
24 changes: 7 additions & 17 deletions thread.c
Expand Up @@ -341,7 +341,6 @@ rb_thread_s_debug_set(VALUE self, VALUE val)
#endif

NOINLINE(static int thread_start_func_2(rb_thread_t *th, VALUE *stack_start));
static void timer_thread_function(rb_execution_context_t *ec);
void ruby_sigchld_handler(rb_vm_t *); /* signal.c */

static void
Expand Down Expand Up @@ -588,16 +587,15 @@ rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th)
}

void
rb_thread_terminate_all(void)
rb_thread_terminate_all(rb_thread_t *th)
{
rb_thread_t *volatile th = GET_THREAD(); /* main thread */
rb_ractor_t *cr = th->ractor;
rb_execution_context_t * volatile ec = th->ec;
rb_ractor_t *r = th->ractor;
volatile int sleeping = 0;

if (r->threads.main != th) {
if (cr->threads.main != th) {
rb_bug("rb_thread_terminate_all: called by child thread (%p, %p)",
(void *)r->threads.main, (void *)th);
(void *)cr->threads.main, (void *)th);
}

/* unlock all locking mutexes */
Expand All @@ -607,9 +605,9 @@ rb_thread_terminate_all(void)
if (EC_EXEC_TAG() == TAG_NONE) {
retry:
thread_debug("rb_thread_terminate_all (main thread: %p)\n", (void *)th);
terminate_all(th->ractor, th);
terminate_all(cr, th);

while (rb_ractor_living_thread_num(th->ractor) > 1) {
while (rb_ractor_living_thread_num(cr) > 1) {
rb_hrtime_t rel = RB_HRTIME_PER_SEC;
/*q
* Thread exiting routine in thread_start_func_2 notify
Expand Down Expand Up @@ -855,6 +853,7 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start)
}

if (th->invoke_type == thread_invoke_type_ractor_proc) {
rb_thread_terminate_all(th);
rb_ractor_teardown(th->ec);
}

Expand Down Expand Up @@ -4574,15 +4573,6 @@ rb_threadptr_check_signal(rb_thread_t *mth)
}
}

static void
timer_thread_function(rb_execution_context_t *ec)
{
// strictly speaking, accessing gvl->owner is not thread-safe
if (ec) {
RUBY_VM_SET_TIMER_INTERRUPT(ec);
}
}

static void
async_bug_fd(const char *mesg, int errno_arg, int fd)
{
Expand Down
12 changes: 8 additions & 4 deletions thread_pthread.c
Expand Up @@ -194,6 +194,7 @@ designate_timer_thread(rb_global_vm_lock_t *gvl)
static void
do_gvl_timer(rb_global_vm_lock_t *gvl, rb_thread_t *th)
{
rb_vm_t *vm = GET_VM();
static rb_hrtime_t abs;
native_thread_data_t *nd = &th->native_thread_data;

Expand All @@ -208,22 +209,25 @@ do_gvl_timer(rb_global_vm_lock_t *gvl, rb_thread_t *th)
gvl->timer_err = native_cond_timedwait(&nd->cond.gvlq, &gvl->lock, &abs);

ubf_wakeup_all_threads();
ruby_sigchld_handler(GET_VM());
ruby_sigchld_handler(vm);

if (UNLIKELY(rb_signal_buff_size())) {
if (th == GET_VM()->ractor.main_thread) {
if (th == vm->ractor.main_thread) {
RUBY_VM_SET_TRAP_INTERRUPT(th->ec);
}
else {
threadptr_trap_interrupt(GET_VM()->ractor.main_thread);
threadptr_trap_interrupt(vm->ractor.main_thread);
}
}

/*
* Timeslice. Warning: the process may fork while this
* thread is contending for GVL:
*/
if (gvl->owner) timer_thread_function(gvl->owner->ec);
if (gvl->owner) {
// strictly speaking, accessing "gvl->owner" is not thread-safe
RUBY_VM_SET_TIMER_INTERRUPT(gvl->owner->ec);
}
gvl->timer = 0;
}

Expand Down
10 changes: 3 additions & 7 deletions thread_win32.c
Expand Up @@ -710,13 +710,9 @@ timer_thread_func(void *dummy)
rb_vm_t *vm = GET_VM();
thread_debug("timer_thread\n");
rb_w32_set_thread_description(GetCurrentThread(), L"ruby-timer-thread");
while (WaitForSingleObject(timer_thread.lock, TIME_QUANTUM_USEC/1000) ==
WAIT_TIMEOUT) {
rb_execution_context_t *running_ec = vm->ractor.main_ractor->threads.running_ec;

if (running_ec) {
timer_thread_function(running_ec);
}
while (WaitForSingleObject(timer_thread.lock,
TIME_QUANTUM_USEC/1000) == WAIT_TIMEOUT) {
vm->clock++;
ruby_sigchld_handler(vm); /* probably no-op */
rb_threadptr_check_signal(vm->ractor.main_thread);
}
Expand Down
2 changes: 2 additions & 0 deletions thread_win32.h
Expand Up @@ -16,6 +16,8 @@
# undef _WIN32
# endif

#define USE_VM_CLOCK 1

WINBASEAPI BOOL WINAPI
TryEnterCriticalSection(IN OUT LPCRITICAL_SECTION lpCriticalSection);

Expand Down
22 changes: 21 additions & 1 deletion vm_core.h
Expand Up @@ -649,6 +649,10 @@ typedef struct rb_vm_struct {
const struct rb_builtin_function *builtin_function_table;
int builtin_inline_index;

#if USE_VM_CLOCK
uint32_t clock;
#endif

/* params */
struct { /* size in byte */
size_t thread_vm_stack_size;
Expand Down Expand Up @@ -845,6 +849,9 @@ struct rb_execution_context_struct {
/* interrupt flags */
rb_atomic_t interrupt_flag;
rb_atomic_t interrupt_mask; /* size should match flag */
#if USE_VM_CLOCK
uint32_t checked_clock;
#endif

rb_fiber_t *fiber_ptr;
struct rb_thread_struct *thread_ptr;
Expand Down Expand Up @@ -1845,7 +1852,20 @@ enum {
#define RUBY_VM_SET_VM_BARRIER_INTERRUPT(ec) ATOMIC_OR((ec)->interrupt_flag, VM_BARRIER_INTERRUPT_MASK)
#define RUBY_VM_INTERRUPTED(ec) ((ec)->interrupt_flag & ~(ec)->interrupt_mask & \
(PENDING_INTERRUPT_MASK|TRAP_INTERRUPT_MASK))
#define RUBY_VM_INTERRUPTED_ANY(ec) ((ec)->interrupt_flag & ~(ec)->interrupt_mask)

static inline bool
RUBY_VM_INTERRUPTED_ANY(rb_execution_context_t *ec)
{
#if USE_VM_CLOCK
uint32_t current_clock = rb_ec_vm_ptr(ec)->clock;

if (current_clock != ec->checked_clock) {
ec->checked_clock = current_clock;
RUBY_VM_SET_TIMER_INTERRUPT(ec);
}
#endif
return ec->interrupt_flag & ~(ec)->interrupt_mask;
}

VALUE rb_exc_set_backtrace(VALUE exc, VALUE bt);
int rb_signal_buff_size(void);
Expand Down