Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

M:N Threads, now w/ macOS support (kqueue) #9178

Merged
merged 2 commits into from Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions configure.ac
Expand Up @@ -1352,6 +1352,7 @@ AC_CHECK_HEADERS(time.h)
AC_CHECK_HEADERS(ucontext.h)
AC_CHECK_HEADERS(utime.h)
AC_CHECK_HEADERS(sys/epoll.h)
AC_CHECK_HEADERS(sys/event.h)

AS_CASE("$target_cpu", [x64|x86_64|i[3-6]86*], [
AC_CHECK_HEADERS(x86intrin.h)
Expand Down
7 changes: 7 additions & 0 deletions process.c
Expand Up @@ -3354,6 +3354,13 @@ run_exec_dup2(VALUE ary, VALUE tmpbuf, struct rb_execarg *sargp, char *errmsg, s
ERRMSG("dup");
goto fail;
}
// without this, kqueue timer_th.event_fd fails with a reserved FD did not have close-on-exec
// in #assert_close_on_exec because the FD_CLOEXEC is not dup'd by default
if (fd_get_cloexec(pairs[i].oldfd, errmsg, errmsg_buflen)) {
if (fd_set_cloexec(extra_fd, errmsg, errmsg_buflen)) {
goto fail;
}
}
rb_update_max_fd(extra_fd);
}
else {
Expand Down
46 changes: 29 additions & 17 deletions thread.c
Expand Up @@ -4254,6 +4254,27 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t *
return (int)rb_ensure(do_select, (VALUE)&set, select_set_free, (VALUE)&set);
}

static bool
thread_sched_wait_events_timeval(rb_thread_t *th, int fd, int events, struct timeval *timeout)
{
#ifdef RUBY_THREAD_PTHREAD_H
if (!th->nt->dedicated) {
rb_hrtime_t rel, *prel;

if (timeout) {
rel = rb_timeval2hrtime(timeout);
prel = &rel;
}
else {
prel = NULL;
}

return thread_sched_wait_events(TH_SCHED(th), th, fd, waitfd_to_waiting_flag(events), prel);
}
#endif // RUBY_THREAD_PTHREAD_H
return 0;
}

#ifdef USE_POLL

/* The same with linux kernel. TODO: make platform independent definition. */
Expand Down Expand Up @@ -4282,23 +4303,9 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
wfd.fd = fd;
wfd.busy = NULL;

#ifdef RUBY_THREAD_PTHREAD_H
if (!th->nt->dedicated) {
rb_hrtime_t rel, *prel;

if (timeout) {
rel = rb_timeval2hrtime(timeout);
prel = &rel;
}
else {
prel = NULL;
}

if (thread_sched_wait_events(TH_SCHED(th), th, fd, waitfd_to_waiting_flag(events), prel)) {
return 0; // timeout
}
if (thread_sched_wait_events_timeval(th, fd, events, timeout)) {
return 0; // timeout
}
#endif

RB_VM_LOCK_ENTER();
{
Expand Down Expand Up @@ -4433,14 +4440,19 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
struct select_args args;
int r;
VALUE ptr = (VALUE)&args;
rb_thread_t *th = GET_THREAD();

if (thread_sched_wait_events_timeval(th, fd, events, timeout)) {
return 0; // timeout
}

args.as.fd = fd;
args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL;
args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL;
args.except = (events & RB_WAITFD_PRI) ? init_set_fd(fd, &efds) : NULL;
args.tv = timeout;
args.wfd.fd = fd;
args.wfd.th = GET_THREAD();
args.wfd.th = th;
args.wfd.busy = NULL;

RB_VM_LOCK_ENTER();
Expand Down
24 changes: 18 additions & 6 deletions thread_pthread.c
Expand Up @@ -62,6 +62,10 @@ static const void *const condattr_monotonic = NULL;

#include COROUTINE_H

#ifndef HAVE_SYS_EVENT_H
#define HAVE_SYS_EVENT_H 0
#endif

#ifndef HAVE_SYS_EPOLL_H
#define HAVE_SYS_EPOLL_H 0
#else
Expand All @@ -78,6 +82,9 @@ static const void *const condattr_monotonic = NULL;
#elif HAVE_SYS_EPOLL_H
#include <sys/epoll.h>
#define USE_MN_THREADS 1
#elif HAVE_SYS_EVENT_H
#include <sys/event.h>
#define USE_MN_THREADS 1
#else
#define USE_MN_THREADS 0
#endif
Expand Down Expand Up @@ -2794,10 +2801,15 @@ static struct {

int comm_fds[2]; // r, w

#if (HAVE_SYS_EPOLL_H || HAVE_SYS_EVENT_H) && USE_MN_THREADS
int event_fd; // kernel event queue fd (epoll/kqueue)
#endif
#if HAVE_SYS_EPOLL_H && USE_MN_THREADS
#define EPOLL_EVENTS_MAX 0x10
int epoll_fd;
struct epoll_event finished_events[EPOLL_EVENTS_MAX];
#elif HAVE_SYS_EVENT_H && USE_MN_THREADS
#define KQUEUE_EVENTS_MAX 0x10
struct kevent finished_events[KQUEUE_EVENTS_MAX];
#endif

// waiting threads list
Expand Down Expand Up @@ -3083,7 +3095,7 @@ rb_thread_create_timer_thread(void)

CLOSE_INVALIDATE_PAIR(timer_th.comm_fds);
#if HAVE_SYS_EPOLL_H && USE_MN_THREADS
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally made this #if (HAVE_SYS_EPOLL_H || HAVE_SYS_EVENT_H) && USE_MN_THREADS, but several specs failed with EBADF errors. In the docs (https://man.openbsd.org/kqueue.2) it says The queue is not inherited by a child created with [fork(2)](https://man.openbsd.org/fork.2). Similarly, kqueues cannot be passed across UNIX-domain sockets.. Possibly that is the reason it's invalid at this point?

close_invalidate(&timer_th.epoll_fd, "close epoll_fd");
close_invalidate(&timer_th.event_fd, "close event_fd");
#endif
rb_native_mutex_destroy(&timer_th.waiting_lock);
}
Expand All @@ -3094,8 +3106,8 @@ rb_thread_create_timer_thread(void)
// open communication channel
setup_communication_pipe_internal(timer_th.comm_fds);

// open epoll fd
timer_thread_setup_nm();
// open event fd
timer_thread_setup_mn();
}

pthread_create(&timer_th.pthread_id, NULL, timer_thread_func, GET_VM());
Expand Down Expand Up @@ -3176,8 +3188,8 @@ rb_reserved_fd_p(int fd)

if (fd == timer_th.comm_fds[0] ||
fd == timer_th.comm_fds[1]
#if HAVE_SYS_EPOLL_H && USE_MN_THREADS
|| fd == timer_th.epoll_fd
#if (HAVE_SYS_EPOLL_H || HAVE_SYS_EVENT_H) && USE_MN_THREADS
|| fd == timer_th.event_fd
#endif
) {
goto check_fork_gen;
Expand Down