Skip to content

Commit

Permalink
MN: fix "raise on close"
Browse files Browse the repository at this point in the history
Introduce `thread_io_wait_events()` to make 1 function to call
`thread_sched_wait_events()`.

In ``thread_io_wait_events()`, manipulate `waiting_fd` to raise
an exception when closing the IO correctly.
  • Loading branch information
ko1 committed Dec 22, 2023
1 parent 19d082d commit bbfc262
Showing 1 changed file with 50 additions and 54 deletions.
104 changes: 50 additions & 54 deletions thread.c
Expand Up @@ -1646,8 +1646,14 @@ rb_thread_call_without_gvl(void *(*func)(void *data), void *data1,
return rb_nogvl(func, data1, ubf, data2, 0);
}

static int
waitfd_to_waiting_flag(int wfd_event)
{
return wfd_event << 1;
}

static void
rb_thread_io_setup_wfd(rb_thread_t *th, int fd, struct waiting_fd *wfd)
thread_io_setup_wfd(rb_thread_t *th, int fd, struct waiting_fd *wfd)
{
wfd->fd = fd;
wfd->th = th;
Expand All @@ -1661,7 +1667,7 @@ rb_thread_io_setup_wfd(rb_thread_t *th, int fd, struct waiting_fd *wfd)
}

static void
rb_thread_io_wake_pending_closer(struct waiting_fd *wfd)
thread_io_wake_pending_closer(struct waiting_fd *wfd)
{
bool has_waiter = wfd->busy && RB_TEST(wfd->busy->wakeup_mutex);
if (has_waiter) {
Expand All @@ -1682,9 +1688,37 @@ rb_thread_io_wake_pending_closer(struct waiting_fd *wfd)
}

static int
waitfd_to_waiting_flag(int wfd_event)
thread_io_wait_events(rb_thread_t *th, rb_execution_context_t *ec, int fd, int events, struct timeval *timeout, struct waiting_fd *wfd)
{
return wfd_event << 1;
#if defined(USE_MN_THREADS) && USE_MN_THREADS
if (!th_has_dedicated_nt(th) && (events || timeout)) {
int r;
rb_hrtime_t rel, *prel;

if (timeout) {
rel = rb_timeval2hrtime(timeout);
prel = &rel;
}
else {
prel = NULL;
}

VM_ASSERT(prel || events == RB_WAITFD_IN || events == RB_WAITFD_OUT);

thread_io_setup_wfd(th, fd, wfd);
{
// wait readable/writable
r = thread_sched_wait_events(TH_SCHED(th), th, fd, waitfd_to_waiting_flag(events), prel);
}
thread_io_wake_pending_closer(wfd);

RUBY_VM_CHECK_INTS_BLOCKING(ec);

return r;
}
#endif // defined(USE_MN_THREADS) && USE_MN_THREADS

return 0;
}

VALUE
Expand All @@ -1697,20 +1731,7 @@ rb_thread_io_blocking_call(rb_blocking_function_t *func, void *data1, int fd, in

struct waiting_fd waiting_fd;

#ifdef RUBY_THREAD_PTHREAD_H
if (events && !th_has_dedicated_nt(th)) {
VM_ASSERT(events == RB_WAITFD_IN || events == RB_WAITFD_OUT);

rb_thread_io_setup_wfd(th, fd, &waiting_fd);
{
// wait readable/writable
thread_sched_wait_events(TH_SCHED(th), th, fd, waitfd_to_waiting_flag(events), NULL);
}
rb_thread_io_wake_pending_closer(&waiting_fd);

RUBY_VM_CHECK_INTS_BLOCKING(ec);
}
#endif
thread_io_wait_events(th, ec, fd, events, NULL, &waiting_fd);

volatile VALUE val = Qundef; /* shouldn't be used */
volatile int saved_errno = 0;
Expand All @@ -1722,7 +1743,7 @@ rb_thread_io_blocking_call(rb_blocking_function_t *func, void *data1, int fd, in
// `func` or not (as opposed to some previously set value).
errno = 0;

rb_thread_io_setup_wfd(th, fd, &waiting_fd);
thread_io_setup_wfd(th, fd, &waiting_fd);

EC_PUSH_TAG(ec);
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
Expand All @@ -1737,7 +1758,7 @@ rb_thread_io_blocking_call(rb_blocking_function_t *func, void *data1, int fd, in
* must be deleted before jump
* this will delete either from waiting_fds or on-stack struct rb_io_close_wait_list
*/
rb_thread_io_wake_pending_closer(&waiting_fd);
thread_io_wake_pending_closer(&waiting_fd);

if (state) {
EC_JUMP_TAG(ec, state);
Expand Down Expand Up @@ -4265,27 +4286,6 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t *
return (int)rb_ensure(do_select, (VALUE)&set, select_set_free, (VALUE)&set);
}

static bool
thread_sched_wait_events_timeval(rb_thread_t *th, int fd, int events, struct timeval *timeout)
{
#ifdef RUBY_THREAD_PTHREAD_H
if (!th->nt->dedicated) {
rb_hrtime_t rel, *prel;

if (timeout) {
rel = rb_timeval2hrtime(timeout);
prel = &rel;
}
else {
prel = NULL;
}

return thread_sched_wait_events(TH_SCHED(th), th, fd, waitfd_to_waiting_flag(events), prel);
}
#endif // RUBY_THREAD_PTHREAD_H
return 0;
}

#ifdef USE_POLL

/* The same with linux kernel. TODO: make platform independent definition. */
Expand All @@ -4310,19 +4310,14 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
int state;
volatile int lerrno;

rb_thread_t *th = wfd.th = GET_THREAD();
wfd.fd = fd;
wfd.busy = NULL;
rb_execution_context_t *ec = GET_EC();
rb_thread_t *th = rb_ec_thread_ptr(ec);

if (thread_sched_wait_events_timeval(th, fd, events, timeout)) {
if (thread_io_wait_events(th, ec, fd, events, timeout, &wfd)) {
return 0; // timeout
}

RB_VM_LOCK_ENTER();
{
ccan_list_add(&wfd.th->vm->waiting_fds, &wfd.wfd_node);
}
RB_VM_LOCK_LEAVE();
thread_io_setup_wfd(th, fd, &wfd);

EC_PUSH_TAG(wfd.th->ec);
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
Expand Down Expand Up @@ -4350,7 +4345,7 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
}
EC_POP_TAG();

rb_thread_io_wake_pending_closer(&wfd);
thread_io_wake_pending_closer(&wfd);

if (state) {
EC_JUMP_TAG(wfd.th->ec, state);
Expand Down Expand Up @@ -4424,7 +4419,7 @@ select_single_cleanup(VALUE ptr)
{
struct select_args *args = (struct select_args *)ptr;

rb_thread_io_wake_pending_closer(&args->wfd);
thread_io_wake_pending_closer(&args->wfd);
if (args->read) rb_fd_term(args->read);
if (args->write) rb_fd_term(args->write);
if (args->except) rb_fd_term(args->except);
Expand All @@ -4451,9 +4446,10 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
struct select_args args;
int r;
VALUE ptr = (VALUE)&args;
rb_thread_t *th = GET_THREAD();
rb_execution_context_t *ec = GET_EC();
rb_thread_t *th = rb_ec_thread_ptr(ec);

if (thread_sched_wait_events_timeval(th, fd, events, timeout)) {
if (thread_io_wait_events(th, ec, fd, events, timeout, &args.wfd)) {
return 0; // timeout
}

Expand Down

0 comments on commit bbfc262

Please sign in to comment.