Skip to content

Commit d65d2fb

Browse files
committed
Do not poll first
Before this patch, the MN scheduler waits for the IO with the following steps: 1. `poll(fd, timeout=0)` to check fd is ready or not. 2. if fd is not ready, waits with MN thread scheduler 3. call `func` to issue the blocking I/O call The advantage of advanced `poll()` is we can wait for the IO ready for any fds. However `poll()` becomes overhead for already ready fds. This patch changes the steps like: 1. call `func` to issue the blocking I/O call 2. if the `func` returns `EWOULDBLOCK` the fd is `O_NONBLOCK` and we need to wait for fd is ready so that waits with MN thread scheduler. In this case, we can wait only for `O_NONBLOCK` fds. Otherwise it waits with blocking operations such as `read()` system call. However we don't need to call `poll()` to check fd is ready in advance. With this patch we can observe performance improvement on microbenchmark which repeats blocking I/O (not `O_NONBLOCK` fd) with and without MN thread scheduler. ```ruby require 'benchmark' f = open('/dev/null', 'w') f.sync = true TN = 1 N = 1_000_000 / TN Benchmark.bm{|x| x.report{ TN.times.map{ Thread.new{ N.times{f.print '.'} } }.each(&:join) } } __END__ TN = 1 user system total real ruby32 0.393966 0.101122 0.495088 ( 0.495235) ruby33 0.493963 0.089521 0.583484 ( 0.584091) ruby33+MN 0.639333 0.200843 0.840176 ( 0.840291) <- Slow this+MN 0.512231 0.099091 0.611322 ( 0.611074) <- Good ```
1 parent 6c25291 commit d65d2fb

File tree

5 files changed

+79
-45
lines changed

5 files changed

+79
-45
lines changed

internal/thread.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,5 +74,6 @@ int ruby_thread_has_gvl_p(void); /* for ext/fiddle/closure.c */
7474
RUBY_SYMBOL_EXPORT_END
7575

7676
int rb_threadptr_execute_interrupts(struct rb_thread_struct *th, int blocking_timing);
77+
bool rb_thread_mn_schedulable(VALUE thread);
7778

7879
#endif /* INTERNAL_THREAD_H */

io.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1146,6 +1146,11 @@ static int nogvl_wait_for(VALUE th, rb_io_t *fptr, short events, struct timeval
11461146
static inline int
11471147
io_internal_wait(VALUE thread, rb_io_t *fptr, int error, int events, struct timeval *timeout)
11481148
{
1149+
if (!timeout && rb_thread_mn_schedulable(thread)) {
1150+
RUBY_ASSERT(errno == EWOULDBLOCK || errno == EAGAIN);
1151+
return -1;
1152+
}
1153+
11491154
int ready = nogvl_wait_for(thread, fptr, events, timeout);
11501155

11511156
if (ready > 0) {

rjit_c.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1497,6 +1497,7 @@ def C.rb_thread_struct
14971497
nt: [CType::Pointer.new { self.rb_native_thread }, Primitive.cexpr!("OFFSETOF((*((struct rb_thread_struct *)NULL)), nt)")],
14981498
ec: [CType::Pointer.new { self.rb_execution_context_t }, Primitive.cexpr!("OFFSETOF((*((struct rb_thread_struct *)NULL)), ec)")],
14991499
sched: [self.rb_thread_sched_item, Primitive.cexpr!("OFFSETOF((*((struct rb_thread_struct *)NULL)), sched)")],
1500+
mn_schedulable: [self._Bool, Primitive.cexpr!("OFFSETOF((*((struct rb_thread_struct *)NULL)), mn_schedulable)")],
15001501
serial: [self.rb_atomic_t, Primitive.cexpr!("OFFSETOF((*((struct rb_thread_struct *)NULL)), serial)")],
15011502
last_status: [self.VALUE, Primitive.cexpr!("OFFSETOF((*((struct rb_thread_struct *)NULL)), last_status)")],
15021503
calling: [CType::Pointer.new { self.rb_calling_info }, Primitive.cexpr!("OFFSETOF((*((struct rb_thread_struct *)NULL)), calling)")],

thread.c

Lines changed: 71 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1687,15 +1687,22 @@ thread_io_wake_pending_closer(struct waiting_fd *wfd)
16871687
}
16881688
}
16891689

1690-
static int
1691-
thread_io_wait_events(rb_thread_t *th, rb_execution_context_t *ec, int fd, int events, struct timeval *timeout, struct waiting_fd *wfd)
1690+
static bool
1691+
thread_io_mn_schedulable(rb_thread_t *th, int events, const struct timeval *timeout)
16921692
{
16931693
#if defined(USE_MN_THREADS) && USE_MN_THREADS
1694-
if (!th_has_dedicated_nt(th) &&
1695-
(events || timeout) &&
1696-
th->blocking // no fiber scheduler
1697-
) {
1698-
int r;
1694+
return !th_has_dedicated_nt(th) && (events || timeout) && th->blocking;
1695+
#else
1696+
return false;
1697+
#endif
1698+
}
1699+
1700+
// true if need retry
1701+
static bool
1702+
thread_io_wait_events(rb_thread_t *th, int fd, int events, const struct timeval *timeout)
1703+
{
1704+
#if defined(USE_MN_THREADS) && USE_MN_THREADS
1705+
if (thread_io_mn_schedulable(th, events, timeout)) {
16991706
rb_hrtime_t rel, *prel;
17001707

17011708
if (timeout) {
@@ -1708,20 +1715,40 @@ thread_io_wait_events(rb_thread_t *th, rb_execution_context_t *ec, int fd, int e
17081715

17091716
VM_ASSERT(prel || (events & (RB_WAITFD_IN | RB_WAITFD_OUT)));
17101717

1711-
thread_io_setup_wfd(th, fd, wfd);
1712-
{
1713-
// wait readable/writable
1714-
r = thread_sched_wait_events(TH_SCHED(th), th, fd, waitfd_to_waiting_flag(events), prel);
1718+
if (thread_sched_wait_events(TH_SCHED(th), th, fd, waitfd_to_waiting_flag(events), prel)) {
1719+
// timeout
1720+
return false;
17151721
}
1716-
thread_io_wake_pending_closer(wfd);
1722+
else {
1723+
return true;
1724+
}
1725+
}
1726+
#endif // defined(USE_MN_THREADS) && USE_MN_THREADS
1727+
return false;
1728+
}
17171729

1718-
RUBY_VM_CHECK_INTS_BLOCKING(ec);
1730+
// assume read/write
1731+
static bool
1732+
blocking_call_retryable_p(int r, int eno)
1733+
{
1734+
if (r != -1) return false;
17191735

1720-
return r;
1736+
switch (eno) {
1737+
case EAGAIN:
1738+
#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
1739+
case EWOULDBLOCK:
1740+
#endif
1741+
return true;
1742+
default:
1743+
return false;
17211744
}
1722-
#endif // defined(USE_MN_THREADS) && USE_MN_THREADS
1745+
}
17231746

1724-
return 0;
1747+
bool
1748+
rb_thread_mn_schedulable(VALUE thval)
1749+
{
1750+
rb_thread_t *th = rb_thread_ptr(thval);
1751+
return th->mn_schedulable;
17251752
}
17261753

17271754
VALUE
@@ -1733,12 +1760,11 @@ rb_thread_io_blocking_call(rb_blocking_function_t *func, void *data1, int fd, in
17331760
RUBY_DEBUG_LOG("th:%u fd:%d ev:%d", rb_th_serial(th), fd, events);
17341761

17351762
struct waiting_fd waiting_fd;
1736-
1737-
thread_io_wait_events(th, ec, fd, events, NULL, &waiting_fd);
1738-
17391763
volatile VALUE val = Qundef; /* shouldn't be used */
17401764
volatile int saved_errno = 0;
17411765
enum ruby_tag_type state;
1766+
bool prev_mn_schedulable = th->mn_schedulable;
1767+
th->mn_schedulable = thread_io_mn_schedulable(th, events, NULL);
17421768

17431769
// `errno` is only valid when there is an actual error - but we can't
17441770
// extract that from the return value of `func` alone, so we clear any
@@ -1747,16 +1773,26 @@ rb_thread_io_blocking_call(rb_blocking_function_t *func, void *data1, int fd, in
17471773
errno = 0;
17481774

17491775
thread_io_setup_wfd(th, fd, &waiting_fd);
1776+
{
1777+
EC_PUSH_TAG(ec);
1778+
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
1779+
retry:
1780+
BLOCKING_REGION(waiting_fd.th, {
1781+
val = func(data1);
1782+
saved_errno = errno;
1783+
}, ubf_select, waiting_fd.th, FALSE);
1784+
1785+
if (events &&
1786+
blocking_call_retryable_p((int)val, saved_errno) &&
1787+
thread_io_wait_events(th, fd, events, NULL)) {
1788+
RUBY_VM_CHECK_INTS_BLOCKING(ec);
1789+
goto retry;
1790+
}
1791+
}
1792+
EC_POP_TAG();
17501793

1751-
EC_PUSH_TAG(ec);
1752-
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
1753-
BLOCKING_REGION(waiting_fd.th, {
1754-
val = func(data1);
1755-
saved_errno = errno;
1756-
}, ubf_select, waiting_fd.th, FALSE);
1794+
th->mn_schedulable = prev_mn_schedulable;
17571795
}
1758-
EC_POP_TAG();
1759-
17601796
/*
17611797
* must be deleted before jump
17621798
* this will delete either from waiting_fds or on-stack struct rb_io_close_wait_list
@@ -4316,20 +4352,20 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
43164352
rb_execution_context_t *ec = GET_EC();
43174353
rb_thread_t *th = rb_ec_thread_ptr(ec);
43184354

4319-
if (thread_io_wait_events(th, ec, fd, events, timeout, &wfd)) {
4320-
return 0; // timeout
4321-
}
4322-
43234355
thread_io_setup_wfd(th, fd, &wfd);
43244356

43254357
EC_PUSH_TAG(wfd.th->ec);
43264358
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
43274359
rb_hrtime_t *to, rel, end = 0;
4360+
struct timeval tv;
4361+
43284362
RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec);
4363+
43294364
timeout_prepare(&to, &rel, &end, timeout);
43304365
fds[0].fd = fd;
43314366
fds[0].events = (short)events;
43324367
fds[0].revents = 0;
4368+
43334369
do {
43344370
nfds = 1;
43354371

@@ -4344,7 +4380,9 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
43444380
}, ubf_select, wfd.th, TRUE);
43454381

43464382
RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec);
4347-
} while (wait_retryable(&result, lerrno, to, end));
4383+
} while (wait_retryable(&result, lerrno, to, end) &&
4384+
thread_io_wait_events(th, fd, events, rb_hrtime2timeval(&tv, to)) &&
4385+
wait_retryable(&result, lerrno, to, end));
43484386
}
43494387
EC_POP_TAG();
43504388

@@ -4452,24 +4490,12 @@ rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
44524490
rb_execution_context_t *ec = GET_EC();
44534491
rb_thread_t *th = rb_ec_thread_ptr(ec);
44544492

4455-
if (thread_io_wait_events(th, ec, fd, events, timeout, &args.wfd)) {
4456-
return 0; // timeout
4457-
}
4458-
44594493
args.as.fd = fd;
44604494
args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL;
44614495
args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL;
44624496
args.except = (events & RB_WAITFD_PRI) ? init_set_fd(fd, &efds) : NULL;
44634497
args.tv = timeout;
4464-
args.wfd.fd = fd;
4465-
args.wfd.th = th;
4466-
args.wfd.busy = NULL;
4467-
4468-
RB_VM_LOCK_ENTER();
4469-
{
4470-
ccan_list_add(&args.wfd.th->vm->waiting_fds, &args.wfd.wfd_node);
4471-
}
4472-
RB_VM_LOCK_LEAVE();
4498+
thread_io_setup_wfd(th, fd, &args.wfd);
44734499

44744500
r = (int)rb_ensure(select_single, ptr, select_single_cleanup, ptr);
44754501
if (r == -1)

vm_core.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1076,6 +1076,7 @@ typedef struct rb_thread_struct {
10761076
rb_execution_context_t *ec;
10771077

10781078
struct rb_thread_sched_item sched;
1079+
bool mn_schedulable;
10791080
rb_atomic_t serial; // only for RUBY_DEBUG_LOG()
10801081

10811082
VALUE last_status; /* $? */

0 commit comments

Comments
 (0)