Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a real Ruby mutex in rb_io_close_wait_list #7884

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 3 additions & 4 deletions internal/thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include "ruby/ruby.h" /* for VALUE */
#include "ruby/intern.h" /* for rb_blocking_function_t */
#include "ccan/list/list.h" /* for list in rb_io_close_wait_list */
#include "ruby/thread_native.h" /* for mutexes in rb_io_close_wait_list */

struct rb_thread_struct; /* in vm_core.h */

Expand Down Expand Up @@ -55,9 +54,9 @@ VALUE rb_exec_recursive_outer_mid(VALUE (*f)(VALUE g, VALUE h, int r), VALUE g,
int rb_thread_wait_for_single_fd(int fd, int events, struct timeval * timeout);

struct rb_io_close_wait_list {
struct ccan_list_head list;
rb_nativethread_lock_t mu;
rb_nativethread_cond_t cv;
struct ccan_list_head pending_fd_users;
VALUE closing_thread;
VALUE wakeup_mutex;
};
int rb_notify_fd_close(int fd, struct rb_io_close_wait_list *busy);
void rb_notify_fd_close_wait(struct rb_io_close_wait_list *busy);
Expand Down
10 changes: 1 addition & 9 deletions io.c
Original file line number Diff line number Diff line change
Expand Up @@ -5422,14 +5422,6 @@ maygvl_fclose(FILE *file, int keepgvl)
static void free_io_buffer(rb_io_buffer_t *buf);
static void clear_codeconv(rb_io_t *fptr);

static void*
call_close_wait_nogvl(void *arg)
{
struct rb_io_close_wait_list *busy = (struct rb_io_close_wait_list *)arg;
rb_notify_fd_close_wait(busy);
return NULL;
}

static void
fptr_finalize_flush(rb_io_t *fptr, int noraise, int keepgvl,
struct rb_io_close_wait_list *busy)
Expand Down Expand Up @@ -5475,7 +5467,7 @@ fptr_finalize_flush(rb_io_t *fptr, int noraise, int keepgvl,
// Ensure waiting_fd users do not hit EBADF.
if (busy) {
// Wait for them to exit before we call close().
(void)rb_thread_call_without_gvl(call_close_wait_nogvl, busy, RUBY_UBF_IO, 0);
rb_notify_fd_close_wait(busy);
}

// Disable for now.
Expand Down
96 changes: 53 additions & 43 deletions thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -1663,6 +1663,27 @@ rb_thread_call_without_gvl(void *(*func)(void *data), void *data1,
return rb_nogvl(func, data1, ubf, data2, 0);
}

static void
rb_thread_io_wake_pending_closer(struct waiting_fd *wfd)
{
bool has_waiter = wfd->busy && RB_TEST(wfd->busy->wakeup_mutex);
if (has_waiter) {
rb_mutex_lock(wfd->busy->wakeup_mutex);
}

/* Needs to be protected with RB_VM_LOCK because we don't know if
wfd is on the global list of pending FD ops or if it's on a
struct rb_io_close_wait_list close-waiter. */
RB_VM_LOCK_ENTER();
ccan_list_del(&wfd->wfd_node);
RB_VM_LOCK_LEAVE();

if (has_waiter) {
rb_thread_wakeup(wfd->busy->closing_thread);
rb_mutex_unlock(wfd->busy->wakeup_mutex);
}
}

VALUE
rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
{
Expand Down Expand Up @@ -1700,20 +1721,9 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)

/*
* must be deleted before jump
* this will delete either from waiting_fds or on-stack CCAN_LIST_HEAD(busy)
* this will delete either from waiting_fds or on-stack struct rb_io_close_wait_list
*/
RB_VM_LOCK_ENTER();
{
if (waiting_fd.busy) {
rb_native_mutex_lock(&waiting_fd.busy->mu);
}
ccan_list_del(&waiting_fd.wfd_node);
if (waiting_fd.busy) {
rb_native_cond_broadcast(&waiting_fd.busy->cv);
rb_native_mutex_unlock(&waiting_fd.busy->mu);
}
}
RB_VM_LOCK_LEAVE();
rb_thread_io_wake_pending_closer(&waiting_fd);

if (state) {
EC_JUMP_TAG(ec, state);
Expand Down Expand Up @@ -2474,8 +2484,9 @@ rb_notify_fd_close(int fd, struct rb_io_close_wait_list *busy)
{
rb_vm_t *vm = GET_THREAD()->vm;
struct waiting_fd *wfd = 0, *next;
ccan_list_head_init(&busy->list);
ccan_list_head_init(&busy->pending_fd_users);
int has_any;
VALUE wakeup_mutex;

RB_VM_LOCK_ENTER();
{
Expand All @@ -2485,7 +2496,7 @@ rb_notify_fd_close(int fd, struct rb_io_close_wait_list *busy)
VALUE err;

ccan_list_del(&wfd->wfd_node);
ccan_list_add(&busy->list, &wfd->wfd_node);
ccan_list_add(&busy->pending_fd_users, &wfd->wfd_node);

wfd->busy = busy;
err = th->vm->special_exceptions[ruby_error_stream_closed];
Expand All @@ -2494,34 +2505,39 @@ rb_notify_fd_close(int fd, struct rb_io_close_wait_list *busy)
}
}
}
has_any = !ccan_list_empty(&busy->list);

has_any = !ccan_list_empty(&busy->pending_fd_users);
busy->closing_thread = rb_thread_current();
wakeup_mutex = Qnil;
if (has_any) {
rb_native_mutex_initialize(&busy->mu);
rb_native_cond_initialize(&busy->cv);
wakeup_mutex = rb_mutex_new();
RBASIC_CLEAR_CLASS(wakeup_mutex); /* hide from ObjectSpace */
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it OK to use a Ruby mutex like this inside thread.c/io.c? It works, and does what I want, but I don't know if this kind of thing is considered wrong for some reason?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's okay, and I'd even say you don't need to worry about exposing it to object space.

}
busy->wakeup_mutex = wakeup_mutex;

RB_VM_LOCK_LEAVE();

/* If the caller didn't pass *busy as a pointer to something on the stack,
we need to guard this mutex object on _our_ C stack for the duration
of this function. */
RB_GC_GUARD(wakeup_mutex);
return has_any;
}

void
rb_notify_fd_close_wait(struct rb_io_close_wait_list *busy)
{
rb_native_mutex_lock(&busy->mu);
while (!ccan_list_empty(&busy->list)) {
rb_native_cond_wait(&busy->cv, &busy->mu);
};
rb_native_mutex_unlock(&busy->mu);
rb_native_mutex_destroy(&busy->mu);
rb_native_cond_destroy(&busy->cv);
}
if (!RB_TEST(busy->wakeup_mutex)) {
/* There was nobody else using this file when we closed it, so we
never bothered to allocate a mutex*/
return;
}

static void*
call_notify_fd_close_wait_nogvl(void *arg)
{
struct rb_io_close_wait_list *busy = (struct rb_io_close_wait_list *)arg;
rb_notify_fd_close_wait(busy);
return NULL;
rb_mutex_lock(busy->wakeup_mutex);
while (!ccan_list_empty(&busy->pending_fd_users)) {
rb_mutex_sleep(busy->wakeup_mutex, Qnil);
}
rb_mutex_unlock(busy->wakeup_mutex);
}

void
Expand All @@ -2530,7 +2546,7 @@ rb_thread_fd_close(int fd)
struct rb_io_close_wait_list busy;

if (rb_notify_fd_close(fd, &busy)) {
rb_thread_call_without_gvl(call_notify_fd_close_wait_nogvl, &busy, RUBY_UBF_IO, 0);
rb_notify_fd_close_wait(&busy);
}
}

Expand Down Expand Up @@ -4273,6 +4289,7 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)

wfd.th = GET_THREAD();
wfd.fd = fd;
wfd.busy = NULL;

RB_VM_LOCK_ENTER();
{
Expand Down Expand Up @@ -4324,11 +4341,7 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
}
EC_POP_TAG();

RB_VM_LOCK_ENTER();
{
ccan_list_del(&wfd.wfd_node);
}
RB_VM_LOCK_LEAVE();
rb_thread_io_wake_pending_closer(&wfd);

if (state) {
EC_JUMP_TAG(wfd.th->ec, state);
Expand Down Expand Up @@ -4402,11 +4415,7 @@ select_single_cleanup(VALUE ptr)
{
struct select_args *args = (struct select_args *)ptr;

RB_VM_LOCK_ENTER();
{
ccan_list_del(&args->wfd.wfd_node);
}
RB_VM_LOCK_LEAVE();
rb_thread_io_wake_pending_closer(&args->wfd);
if (args->read) rb_fd_term(args->read);
if (args->write) rb_fd_term(args->write);
if (args->except) rb_fd_term(args->except);
Expand All @@ -4429,6 +4438,7 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
args.tv = timeout;
args.wfd.fd = fd;
args.wfd.th = GET_THREAD();
args.wfd.busy = NULL;

RB_VM_LOCK_ENTER();
{
Expand Down