Skip to content

Commit

Permalink
Fix timeout calculation
Browse files Browse the repository at this point in the history
* The struct was getting constructed properly, but it was being wiped out when returned because it local to the function. Now it gets calculated inside a dedicated function `kqueue_wait` that is colocated with the `kevent` call. This fixes an issue where the timeout is always 0 and constantly returns triggering tons of thread activity

* `kqueue` specific code for creation and unregistering have also been moved to their own functions, `kqueue_create` and `kqueue_unregister_waiting`, for code clarity

* Like the change from `kq_fd` to `event_fd`, `kq_wait` has been changed to `event_wait` to distance the naming from the `kqueue` function
  • Loading branch information
jpcamara committed Dec 14, 2023
1 parent 87ad31f commit 15db3e3
Showing 1 changed file with 79 additions and 68 deletions.
147 changes: 79 additions & 68 deletions thread_pthread_mn.c
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,78 @@ verify_waiting_list(void)
#endif
}

#if HAVE_SYS_EVENT_H // kqueue helpers

static enum thread_sched_waiting_flag
kqueue_translate_filter_to_flags(int16_t filter)
{
switch (filter) {
case EVFILT_READ:
return thread_sched_waiting_io_read;
case EVFILT_WRITE:
return thread_sched_waiting_io_write;
case EVFILT_TIMER:
return thread_sched_waiting_timeout;
default:
rb_bug("kevent filter:%d not supported", filter);
}
}

static int
kqueue_wait(rb_vm_t *vm)
{
struct timespec calculated_timeout;
struct timespec *timeout = NULL;
int timeout_ms = timer_thread_set_timeout(vm);

if (timeout_ms >= 0) {
calculated_timeout.tv_sec = timeout_ms / 1000;
calculated_timeout.tv_nsec = (timeout_ms % 1000) * 1000000;
timeout = &calculated_timeout;
}

return kevent(timer_th.event_fd, NULL, 0, timer_th.finished_events, KQUEUE_EVENTS_MAX, timeout);
}

static void
kqueue_create(void)
{
if ((timer_th.event_fd = kqueue()) == -1) rb_bug("kqueue creation failed (errno:%d)", errno);
int flags = fcntl(timer_th.event_fd, F_GETFD);
if (flags == -1) {
rb_bug("kqueue GETFD failed (errno:%d)", errno);
}

flags |= FD_CLOEXEC;
if (fcntl(timer_th.event_fd, F_SETFD, flags) == -1) {
rb_bug("kqueue SETFD failed (errno:%d)", errno);
}
}

static void
kqueue_unregister_waiting(int fd, enum thread_sched_waiting_flag flags)
{
if (flags) {
struct kevent ke[2];
int num_events = 0;

if (flags & thread_sched_waiting_io_read) {
EV_SET(&ke[num_events], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
num_events++;
}
if (flags & thread_sched_waiting_io_write) {
EV_SET(&ke[num_events], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
num_events++;
}
if (kevent(timer_th.event_fd, ke, num_events, NULL, 0, NULL) == -1) {
perror("kevent");
rb_bug("unregister/kevent fails. errno:%d", errno);
}
}
}

#endif // HAVE_SYS_EVENT_H

// return false if the fd is not waitable or not need to wait.
static bool
timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags, rb_hrtime_t *rel)
Expand Down Expand Up @@ -720,23 +792,7 @@ timer_thread_unregister_waiting(rb_thread_t *th, int fd, enum thread_sched_waiti
{
RUBY_DEBUG_LOG("th:%u fd:%d", rb_th_serial(th), fd);
#if HAVE_SYS_EVENT_H
if (flags) {
struct kevent ke[2];
int num_events = 0;

if (flags & thread_sched_waiting_io_read) {
EV_SET(&ke[num_events], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
num_events++;
}
if (flags & thread_sched_waiting_io_write) {
EV_SET(&ke[num_events], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
num_events++;
}
if (kevent(timer_th.event_fd, ke, num_events, NULL, 0, NULL) == -1) {
perror("kevent");
rb_bug("unregister/kevent fails. errno:%d", errno);
}
}
kqueue_unregister_waiting(fd, flags);
#else
// Linux 2.6.9 or later is needed to pass NULL as data.
if (epoll_ctl(timer_th.event_fd, EPOLL_CTL_DEL, fd, NULL) == -1) {
Expand All @@ -756,16 +812,7 @@ static void
timer_thread_setup_mn(void)
{
#if HAVE_SYS_EVENT_H
if ((timer_th.event_fd = kqueue()) == -1) rb_bug("kqueue creation failed (errno:%d)", errno);
int flags = fcntl(timer_th.event_fd, F_GETFD);
if (flags == -1) {
rb_bug("kqueue GETFD failed (errno:%d)", errno);
}

flags |= FD_CLOEXEC;
if (fcntl(timer_th.event_fd, F_SETFD, flags) == -1) {
rb_bug("kqueue SETFD failed (errno:%d)", errno);
}
kqueue_create();
RUBY_DEBUG_LOG("kqueue_fd:%d", timer_th.event_fd);
#else
if ((timer_th.event_fd = epoll_create1(EPOLL_CLOEXEC)) == -1) rb_bug("epoll_create (errno:%d)", errno);
Expand All @@ -776,47 +823,11 @@ timer_thread_setup_mn(void)
timer_thread_register_waiting(NULL, timer_th.comm_fds[0], thread_sched_waiting_io_read | thread_sched_waiting_io_force, NULL);
}

#if HAVE_SYS_EVENT_H // kqueue helpers

static enum thread_sched_waiting_flag
kqueue_translate_filter_to_flags(int16_t filter)
{
switch (filter) {
case EVFILT_READ:
return thread_sched_waiting_io_read;
case EVFILT_WRITE:
return thread_sched_waiting_io_write;
case EVFILT_TIMER:
return thread_sched_waiting_timeout;
default:
rb_bug("kevent filter:%d not supported", filter);
}
}

static struct timespec*
kqueue_timer_thread_set_timeout(rb_vm_t *vm)
{
struct timespec calculated_timeout;
struct timespec *timeout = NULL;
int timeout_ms = timer_thread_set_timeout(vm);

if (timeout_ms >= 0) {
calculated_timeout.tv_sec = timeout_ms / 1000;
calculated_timeout.tv_nsec = (timeout_ms % 1000) * 1000000;
timeout = &calculated_timeout;
}

return timeout;
}

#endif // HAVE_SYS_EVENT_H

static int
kq_wait(rb_vm_t *vm)
event_wait(rb_vm_t *vm)
{
#if HAVE_SYS_EVENT_H
struct timespec *timeout = kqueue_timer_thread_set_timeout(vm);
int r = kevent(timer_th.event_fd, NULL, 0, timer_th.finished_events, KQUEUE_EVENTS_MAX, timeout);
int r = kqueue_wait(vm);
#else
int r = epoll_wait(timer_th.event_fd, timer_th.finished_events, EPOLL_EVENTS_MAX, timer_thread_set_timeout(vm));
#endif
Expand All @@ -840,7 +851,7 @@ kq_wait(rb_vm_t *vm)
static void
timer_thread_polling(rb_vm_t *vm)
{
int r = kq_wait(vm);
int r = event_wait(vm);

RUBY_DEBUG_LOG("r:%d errno:%d", r, errno);

Expand Down Expand Up @@ -962,7 +973,7 @@ timer_thread_polling(rb_vm_t *vm)
}
}

#else // HAVE_SYS_EPOLL_H
#else // HAVE_SYS_EPOLL_H || HAVE_SYS_EVENT_H

static void
timer_thread_setup_mn(void)
Expand Down Expand Up @@ -1012,4 +1023,4 @@ timer_thread_polling(rb_vm_t *vm)
}
}

#endif // HAVE_SYS_EPOLL_H
#endif // HAVE_SYS_EPOLL_H || HAVE_SYS_EVENT_H

0 comments on commit 15db3e3

Please sign in to comment.