Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

M:N thread scheduler for Ractors #8629

Merged
merged 1 commit into from Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 14 additions & 0 deletions bootstraptest/test_thread.rb
Expand Up @@ -242,6 +242,20 @@
end
}

assert_equal 'true', %{
Thread.new{}.join
begin
Process.waitpid2 fork{
Thread.new{
sleep 0.1
}.join
}
true
rescue NotImplementedError
true
end
}

assert_equal 'ok', %{
open("zzz_t1.rb", "w") do |f|
f.puts <<-END
Expand Down
3 changes: 3 additions & 0 deletions common.mk
Expand Up @@ -15323,6 +15323,7 @@ ruby.$(OBJEXT): $(top_srcdir)/internal/ruby_parser.h
ruby.$(OBJEXT): $(top_srcdir)/internal/serial.h
ruby.$(OBJEXT): $(top_srcdir)/internal/static_assert.h
ruby.$(OBJEXT): $(top_srcdir)/internal/string.h
ruby.$(OBJEXT): $(top_srcdir)/internal/thread.h
ruby.$(OBJEXT): $(top_srcdir)/internal/variable.h
ruby.$(OBJEXT): $(top_srcdir)/internal/vm.h
ruby.$(OBJEXT): $(top_srcdir)/internal/warnings.h
Expand Down Expand Up @@ -17536,6 +17537,7 @@ thread.$(OBJEXT): $(top_srcdir)/internal/time.h
thread.$(OBJEXT): $(top_srcdir)/internal/variable.h
thread.$(OBJEXT): $(top_srcdir)/internal/vm.h
thread.$(OBJEXT): $(top_srcdir)/internal/warnings.h
thread.$(OBJEXT): {$(VPATH)}$(COROUTINE_H)
thread.$(OBJEXT): {$(VPATH)}assert.h
thread.$(OBJEXT): {$(VPATH)}atomic.h
thread.$(OBJEXT): {$(VPATH)}backward/2/assume.h
Expand Down Expand Up @@ -17730,6 +17732,7 @@ thread.$(OBJEXT): {$(VPATH)}thread.h
thread.$(OBJEXT): {$(VPATH)}thread_$(THREAD_MODEL).c
thread.$(OBJEXT): {$(VPATH)}thread_$(THREAD_MODEL).h
thread.$(OBJEXT): {$(VPATH)}thread_native.h
thread.$(OBJEXT): {$(VPATH)}thread_pthread_mn.c
thread.$(OBJEXT): {$(VPATH)}thread_sync.c
thread.$(OBJEXT): {$(VPATH)}thread_sync.rbinc
thread.$(OBJEXT): {$(VPATH)}timev.h
Expand Down
2 changes: 2 additions & 0 deletions configure.ac
Expand Up @@ -1342,6 +1342,8 @@ AC_CHECK_HEADERS(syscall.h)
AC_CHECK_HEADERS(time.h)
AC_CHECK_HEADERS(ucontext.h)
AC_CHECK_HEADERS(utime.h)
AC_CHECK_HEADERS(sys/epoll.h)

AS_CASE("$target_cpu", [x64|x86_64|i[3-6]86*], [
AC_CHECK_HEADERS(x86intrin.h)
])
Expand Down
9 changes: 7 additions & 2 deletions debug.c
Expand Up @@ -443,6 +443,10 @@ setup_debug_log(void)
(ruby_debug_log_mode & ruby_debug_log_memory) ? "[mem]" : "",
(ruby_debug_log_mode & ruby_debug_log_stderr) ? "[stderr]" : "",
(ruby_debug_log_mode & ruby_debug_log_file) ? "[file]" : "");
if (debug_log.output_file[0]) {
fprintf(stderr, "RUBY_DEBUG_LOG filename=%s\n", debug_log.output_file);
}

rb_nativethread_lock_initialize(&debug_log.lock);

setup_debug_log_filter();
Expand Down Expand Up @@ -609,10 +613,11 @@ ruby_debug_log(const char *file, int line, const char *func_name, const char *fm
// ractor information
if (ruby_single_main_ractor == NULL) {
rb_ractor_t *cr = th ? th->ractor : NULL;
rb_vm_t *vm = GET_VM();

if (r && len < MAX_DEBUG_LOG_MESSAGE_LEN) {
r = snprintf(buff + len, MAX_DEBUG_LOG_MESSAGE_LEN - len, "\tr:#%d/%u",
cr ? (int)rb_ractor_id(cr) : -1, GET_VM()->ractor.cnt);
r = snprintf(buff + len, MAX_DEBUG_LOG_MESSAGE_LEN - len, "\tr:#%d/%u (%u)",
cr ? (int)rb_ractor_id(cr) : -1, vm->ractor.cnt, vm->ractor.sched.running_cnt);

if (r < 0) rb_bug("ruby_debug_log returns %d", r);
len += r;
Expand Down
4 changes: 2 additions & 2 deletions dir.c
Expand Up @@ -805,7 +805,7 @@ dir_read(VALUE dir)
struct dirent *dp;

GetDIR(dir, dirp);
errno = 0;
rb_errno_set(0);
if ((dp = READDIR(dirp->dir, dirp->enc)) != NULL) {
return rb_external_str_new_with_enc(dp->d_name, NAMLEN(dp), dirp->enc);
}
Expand Down Expand Up @@ -1723,7 +1723,7 @@ nogvl_opendir_at(void *ptr)
/* fallthrough*/
case 0:
if (fd >= 0) close(fd);
errno = e;
rb_errno_set(e);
}
}
#else /* !USE_OPENDIR_AT */
Expand Down
18 changes: 18 additions & 0 deletions eval.c
Expand Up @@ -2110,3 +2110,21 @@ Init_eval(void)
id_signo = rb_intern_const("signo");
id_status = rb_intern_const("status");
}

int
rb_errno(void)
{
return *rb_orig_errno_ptr();
}

void
rb_errno_set(int e)
{
*rb_orig_errno_ptr() = e;
}

int *
rb_errno_ptr(void)
{
return rb_orig_errno_ptr();
}
18 changes: 18 additions & 0 deletions include/ruby/ruby.h
Expand Up @@ -270,6 +270,24 @@ RBIMPL_ATTR_FORMAT(RBIMPL_PRINTF_FORMAT, 3, 0)
*/
int ruby_vsnprintf(char *str, size_t n, char const *fmt, va_list ap);

// TODO: doc

#include <errno.h>

int rb_errno(void);
void rb_errno_set(int);
int *rb_errno_ptr(void);

static inline int *
rb_orig_errno_ptr(void)
{
return &errno;
}

#define rb_orig_errno errno
#undef errno
#define errno (*rb_errno_ptr())

/** @cond INTERNAL_MACRO */
#if RBIMPL_HAS_WARNING("-Wgnu-zero-variadic-macro-arguments")
# /* Skip it; clang -pedantic doesn't like the following */
Expand Down
1 change: 1 addition & 0 deletions internal/thread.h
Expand Up @@ -50,6 +50,7 @@ void rb_mutex_allow_trap(VALUE self, int val);
VALUE rb_uninterruptible(VALUE (*b_proc)(VALUE), VALUE data);
VALUE rb_mutex_owned_p(VALUE self);
VALUE rb_exec_recursive_outer_mid(VALUE (*f)(VALUE g, VALUE h, int r), VALUE g, VALUE h, ID mid);
void ruby_mn_threads_params(void);

int rb_thread_wait_for_single_fd(int fd, int events, struct timeval * timeout);

Expand Down
64 changes: 35 additions & 29 deletions process.c
Expand Up @@ -685,10 +685,16 @@ rb_last_status_set(int status, rb_pid_t pid)
GET_THREAD()->last_status = rb_process_status_new(pid, status, 0);
}

static void
last_status_clear(rb_thread_t *th)
{
th->last_status = Qnil;
}

void
rb_last_status_clear(void)
{
GET_THREAD()->last_status = Qnil;
last_status_clear(GET_THREAD());
}

static rb_pid_t
Expand Down Expand Up @@ -1654,26 +1660,13 @@ before_exec(void)
before_exec_async_signal_safe();
}

/* This function should be async-signal-safe. Actually it is. */
static void
after_exec_async_signal_safe(void)
{
}

static void
after_exec_non_async_signal_safe(void)
after_exec(void)
{
rb_thread_reset_timer_thread();
rb_thread_start_timer_thread();
}

static void
after_exec(void)
{
after_exec_async_signal_safe();
after_exec_non_async_signal_safe();
}

#if defined HAVE_WORKING_FORK || defined HAVE_DAEMON
static void
before_fork_ruby(void)
Expand All @@ -1686,10 +1679,14 @@ after_fork_ruby(rb_pid_t pid)
{
rb_threadptr_pending_interrupt_clear(GET_THREAD());
if (pid == 0) {
// child
clear_pid_cache();
rb_thread_atfork();
}
after_exec();
else {
// parent
after_exec();
}
}
#endif

Expand Down Expand Up @@ -4210,16 +4207,19 @@ rb_fork_ruby2(struct rb_process_status *status)

while (1) {
prefork();
disable_child_handler_before_fork(&old);

before_fork_ruby();
pid = rb_fork();
err = errno;
if (status) {
status->pid = pid;
status->error = err;
disable_child_handler_before_fork(&old);
{
pid = rb_fork();
err = errno;
if (status) {
status->pid = pid;
status->error = err;
}
}
after_fork_ruby(pid);
disable_child_handler_fork_parent(&old); /* yes, bad name */
after_fork_ruby(pid);

if (pid >= 0) { /* fork succeed */
return pid;
Expand Down Expand Up @@ -4663,11 +4663,16 @@ static VALUE
do_spawn_process(VALUE arg)
{
struct spawn_args *argp = (struct spawn_args *)arg;

rb_execarg_parent_start1(argp->execarg);

return (VALUE)rb_spawn_process(DATA_PTR(argp->execarg),
argp->errmsg.ptr, argp->errmsg.buflen);
}

NOINLINE(static rb_pid_t
rb_execarg_spawn(VALUE execarg_obj, char *errmsg, size_t errmsg_buflen));

static rb_pid_t
rb_execarg_spawn(VALUE execarg_obj, char *errmsg, size_t errmsg_buflen)
{
Expand All @@ -4676,8 +4681,10 @@ rb_execarg_spawn(VALUE execarg_obj, char *errmsg, size_t errmsg_buflen)
args.execarg = execarg_obj;
args.errmsg.ptr = errmsg;
args.errmsg.buflen = errmsg_buflen;
return (rb_pid_t)rb_ensure(do_spawn_process, (VALUE)&args,
execarg_parent_end, execarg_obj);

rb_pid_t r = (rb_pid_t)rb_ensure(do_spawn_process, (VALUE)&args,
execarg_parent_end, execarg_obj);
return r;
}

static rb_pid_t
Expand Down Expand Up @@ -4820,26 +4827,25 @@ rb_spawn(int argc, const VALUE *argv)
static VALUE
rb_f_system(int argc, VALUE *argv, VALUE _)
{
rb_thread_t *th = GET_THREAD();
VALUE execarg_obj = rb_execarg_new(argc, argv, TRUE, TRUE);
struct rb_execarg *eargp = rb_execarg_get(execarg_obj);

struct rb_process_status status = {0};
eargp->status = &status;

rb_last_status_clear();
last_status_clear(th);

// This function can set the thread's last status.
// May be different from waitpid_state.pid on exec failure.
rb_pid_t pid = rb_execarg_spawn(execarg_obj, 0, 0);

if (pid > 0) {
VALUE status = rb_process_status_wait(pid, 0);

struct rb_process_status *data = rb_check_typeddata(status, &rb_process_status_type);

// Set the last status:
rb_obj_freeze(status);
GET_THREAD()->last_status = status;
th->last_status = status;

if (data->status == EXIT_SUCCESS) {
return Qtrue;
Expand Down