Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 48 additions & 36 deletions ext/io/event/selector/uring.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <liburing.h>
#include <poll.h>
#include <stdbool.h>
#include <stdint.h>
#include <time.h>

Expand Down Expand Up @@ -282,9 +283,23 @@ VALUE IO_Event_Selector_URing_initialize(VALUE self, VALUE loop) {
#ifdef IORING_SETUP_TASKRUN_FLAG
flags |= IORING_SETUP_TASKRUN_FLAG;
#endif
// IORING_SETUP_SUBMIT_ALL (kernel 5.18+): keep processing the rest of the SQE
// batch even when one fails, reducing the frequency of short submits.
#ifdef IORING_SETUP_SUBMIT_ALL
flags |= IORING_SETUP_SUBMIT_ALL;
#endif

int result = io_uring_queue_init(URING_ENTRIES, &selector->ring, flags);

#ifdef IORING_SETUP_SUBMIT_ALL
if (result == -EINVAL) {
// IORING_SETUP_SUBMIT_ALL was added in Linux 5.18; retry without it.
if (DEBUG) fprintf(stderr, "IO_Event_Selector_URing_initialize: no IORING_SETUP_SUBMIT_ALL\n");
flags &= ~IORING_SETUP_SUBMIT_ALL;
result = io_uring_queue_init(URING_ENTRIES, &selector->ring, flags);
}
#endif

if (result < 0) {
rb_syserr_fail(-result, "IO_Event_Selector_URing_initialize:io_uring_queue_init");
}
Expand Down Expand Up @@ -406,59 +421,46 @@ void IO_Event_Selector_URing_dump_completion_queue(struct IO_Event_Selector_URin
}
}

// Flush the submission queue if pending operations are present.
// Flush the submission queue, optionally yielding if unsuccessful.
static
int io_uring_submit_flush(struct IO_Event_Selector_URing *selector) {
if (selector->pending) {
if (DEBUG) fprintf(stderr, "io_uring_submit_flush(pending=%ld)\n", selector->pending);

// Try to submit:
int io_uring_submit_all(struct IO_Event_Selector_URing *selector, bool yield) {
while (selector->pending > 0) {
int result = io_uring_submit(&selector->ring);

if (result >= 0) {
// If it was submitted, reset pending count:
selector->pending = 0;
} else if (result != -EBUSY && result != -EAGAIN) {
rb_syserr_fail(-result, "io_uring_submit_flush:io_uring_submit");
// io_uring_submit() returns the number of submitted SQEs
selector->pending -= result;
} else if (result == -EBUSY || result == -EAGAIN) {
if (yield) IO_Event_Selector_yield(&selector->backend);
} else {
rb_syserr_fail(-result, "io_uring_submit_all:io_uring_submit");
return result;
}

return result;
}

if (DEBUG) {
IO_Event_Selector_URing_dump_completion_queue(selector);
}


if (DEBUG) IO_Event_Selector_URing_dump_completion_queue(selector);
return 0;
}

// Flush the submission queue if pending operations are present.
static
int io_uring_submit_flush(struct IO_Event_Selector_URing *selector) {
if (DEBUG) fprintf(stderr, "io_uring_submit_now(pending=%ld)\n", selector->pending);

return io_uring_submit_all(selector, false);
}

// Immediately flush the submission queue, yielding to the event loop if it was not successful.
static
int io_uring_submit_now(struct IO_Event_Selector_URing *selector) {
if (DEBUG) fprintf(stderr, "io_uring_submit_now(pending=%ld)\n", selector->pending);

while (true) {
int result = io_uring_submit(&selector->ring);

if (result >= 0) {
selector->pending = 0;
if (DEBUG) IO_Event_Selector_URing_dump_completion_queue(selector);
return result;
}

if (result == -EBUSY || result == -EAGAIN) {
IO_Event_Selector_yield(&selector->backend);
} else {
rb_syserr_fail(-result, "io_uring_submit_now:io_uring_submit");
}
}
return io_uring_submit_all(selector, true);
}

// Submit a pending operation. This does not submit the operation immediately, but instead defers it to the next call to `io_uring_submit_flush` or `io_uring_submit_now`. This is useful for operations that are not urgent, but should be used with care as it can lead to a deadlock if the submission queue is not flushed.
static
void io_uring_submit_pending(struct IO_Event_Selector_URing *selector) {
selector->pending += 1;

if (DEBUG) fprintf(stderr, "io_uring_submit_pending(ring=%p, pending=%ld)\n", &selector->ring, selector->pending);
}

Expand All @@ -471,7 +473,8 @@ struct io_uring_sqe * io_get_sqe(struct IO_Event_Selector_URing *selector) {

sqe = io_uring_get_sqe(&selector->ring);
}


selector->pending += 1;
return sqe;
}

Expand Down Expand Up @@ -1136,7 +1139,6 @@ int select_internal_without_gvl(struct select_arguments *arguments) {
io_uring_prep_read(sqe, IO_Event_Interrupt_descriptor(&selector->interrupt), &selector->wakeup_value, sizeof(selector->wakeup_value), 0);
io_uring_sqe_set_data(sqe, &selector->interrupt);
selector->wakeup_registered = 1;
selector->pending += 1;
}

io_uring_submit_flush(selector);
Expand Down Expand Up @@ -1318,9 +1320,19 @@ static int IO_Event_Selector_URing_supported_p(void) {
#endif
#ifdef IORING_SETUP_TASKRUN_FLAG
flags |= IORING_SETUP_TASKRUN_FLAG;
#endif
#ifdef IORING_SETUP_SUBMIT_ALL
flags |= IORING_SETUP_SUBMIT_ALL;
#endif
int result = io_uring_queue_init(32, &ring, flags);

#ifdef IORING_SETUP_SUBMIT_ALL
if (result == -EINVAL) {
flags &= ~IORING_SETUP_SUBMIT_ALL;
result = io_uring_queue_init(32, &ring, flags);
}
#endif

if (result < 0) {
rb_warn("io_uring_queue_init() was available at compile time but failed at run time: %s\n", strerror(-result));

Expand Down
2 changes: 2 additions & 0 deletions releases.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
- Add support for the `io_close` fiber-scheduler hook (Ruby 4.0+). The `URing` selector performs the close asynchronously via the ring; the `Debug::Selector` and `TestScheduler` wrappers forward to the underlying selector when supported.
- Improve `WorkerPool` GC compaction support and add proper write barriers, fixing potential use-after-free under compacting GC.
- Keep blocked scheduler fibers alive during GC by registering them as roots in `TestScheduler#block`, preventing premature collection and the resulting use-after-free crash on resume.
- Correctly handle short `io_uring_submit()` results in the `URing` selector. `io_uring_submit()` returns the number of SQEs actually accepted by the kernel and can be short (SQE prep errors, `ENOMEM`, transient `EAGAIN`); the old accounting reset `pending = 0` on any success and silently lost track of unsubmitted SQEs.
- Enable `IORING_SETUP_SUBMIT_ALL` (kernel 5.18+) on the `URing` selector so the kernel keeps processing the rest of an SQE batch past individual errors, reducing the frequency of short submits in practice.

## v1.15.1

Expand Down
Loading