Skip to content

Commit

Permalink
Reduce syscall overhead in native epoll transport when a lot of timeo…
Browse files Browse the repository at this point in the history
…uts are scheduled(#12145)


Motivation:

At the moment we might end up calling timerfd_settime everytime a new timer is scheduled. This can produce quite some overhead. We should try to reduce the number of syscalls when possible.

Modifications:

- If we are using Linux Kernel >= 5.11 use directly epoll_pwait2(...)
- If the scheduled timeout is big enough just use epoll_wait(...) without timerfd_settime and accept some inaccuracy.

Result:

Fixes #11695
  • Loading branch information
normanmaurer committed Mar 18, 2022
1 parent 40382fb commit 86004b7
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 36 deletions.
Expand Up @@ -29,6 +29,7 @@
import io.netty.util.concurrent.RejectedExecutionHandler;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

Expand All @@ -44,6 +45,8 @@
*/
class EpollEventLoop extends SingleThreadEventLoop {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollEventLoop.class);
private static final long EPOLL_WAIT_MILLIS_THRESHOLD =
SystemPropertyUtil.getLong("io.netty.channel.epoll.epollWaitThreshold", 10);

static {
// Ensure JNI is initialized by the time this class is loaded by this time!
Expand Down Expand Up @@ -276,14 +279,15 @@ public int registeredChannels() {
return channels.size();
}

private int epollWait(long deadlineNanos) throws IOException {
private long epollWait(long deadlineNanos) throws IOException {
if (deadlineNanos == NONE) {
return Native.epollWait(epollFd, events, timerFd, Integer.MAX_VALUE, 0); // disarm timer
return Native.epollWait(epollFd, events, timerFd,
Integer.MAX_VALUE, 0, EPOLL_WAIT_MILLIS_THRESHOLD); // disarm timer
}
long totalDelay = deadlineToDelayNanos(deadlineNanos);
int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
int delayNanos = (int) min(totalDelay - delaySeconds * 1000000000L, MAX_SCHEDULED_TIMERFD_NS);
return Native.epollWait(epollFd, events, timerFd, delaySeconds, delayNanos);
return Native.epollWait(epollFd, events, timerFd, delaySeconds, delayNanos, EPOLL_WAIT_MILLIS_THRESHOLD);
}

private int epollWaitNoTimerChange() throws IOException {
Expand Down Expand Up @@ -347,8 +351,11 @@ protected void run() {
strategy = epollWaitNoTimerChange();
} else {
// Timerfd needs to be re-armed or disarmed
prevDeadlineNanos = curDeadlineNanos;
strategy = epollWait(curDeadlineNanos);
long result = epollWait(curDeadlineNanos);
// The result contains the actual return value and if a timer was used or not.
// We need to "unpack" using the helper methods exposed in Native.
strategy = Native.epollReady(result);
prevDeadlineNanos = Native.epollTimerWasUsed(result) ? curDeadlineNanos : NONE;
}
}
} finally {
Expand Down
Expand Up @@ -150,8 +150,6 @@ public static FileDescriptor newTimerFd() {
private static native int timerFd();
public static native void eventFdWrite(int fd, long value);
public static native void eventFdRead(int fd);
static native void timerFdRead(int fd);
static native void timerFdSetTime(int fd, int sec, int nsec) throws IOException;

public static FileDescriptor newEpollCreate() {
return new FileDescriptor(epollCreate());
Expand All @@ -165,6 +163,12 @@ public static FileDescriptor newEpollCreate() {
@Deprecated
public static int epollWait(FileDescriptor epollFd, EpollEventArray events, FileDescriptor timerFd,
int timeoutSec, int timeoutNs) throws IOException {
long result = epollWait(epollFd, events, timerFd, timeoutSec, timeoutNs, -1);
return epollReady(result);
}

static long epollWait(FileDescriptor epollFd, EpollEventArray events, FileDescriptor timerFd,
int timeoutSec, int timeoutNs, long millisThreshold) throws IOException {
if (timeoutSec == 0 && timeoutNs == 0) {
// Zero timeout => poll (aka return immediately)
return epollWait(epollFd, events, 0);
Expand All @@ -174,12 +178,23 @@ public static int epollWait(FileDescriptor epollFd, EpollEventArray events, File
timeoutSec = 0;
timeoutNs = 0;
}
int ready = epollWait0(epollFd.intValue(), events.memoryAddress(), events.length(), timerFd.intValue(),
timeoutSec, timeoutNs);
long result = epollWait0(epollFd.intValue(), events.memoryAddress(), events.length(), timerFd.intValue(),
timeoutSec, timeoutNs, millisThreshold);
int ready = epollReady(result);
if (ready < 0) {
throw newIOException("epoll_wait", ready);
}
return ready;
return result;
}

// IMPORTANT: This needs to be consistent with what is used in netty_epoll_native.c
static int epollReady(long result) {
return (int) (result >> 32);
}

// IMPORTANT: This needs to be consistent with what is used in netty_epoll_native.c
static boolean epollTimerWasUsed(long result) {
return (result & 0xff) != 0;
}

static int epollWait(FileDescriptor epollFd, EpollEventArray events, boolean immediatePoll) throws IOException {
Expand Down Expand Up @@ -210,7 +225,8 @@ public static int epollBusyWait(FileDescriptor epollFd, EpollEventArray events)
return ready;
}

private static native int epollWait0(int efd, long address, int len, int timerFd, int timeoutSec, int timeoutNs);
private static native long epollWait0(
int efd, long address, int len, int timerFd, int timeoutSec, int timeoutNs, long millisThreshold);
private static native int epollWait(int efd, long address, int len, int timeout);
private static native int epollBusyWait0(int efd, long address, int len);

Expand Down
67 changes: 42 additions & 25 deletions transport-native-epoll/src/main/c/netty_epoll_native.c
Expand Up @@ -84,6 +84,7 @@

// optional
extern int epoll_create1(int flags) __attribute__((weak));
extern int epoll_pwait2(int epfd, struct epoll_event *events, int maxevents, const struct timespec *timeout, const sigset_t *sigmask) __attribute__((weak));

#ifndef __USE_GNU
struct mmsghdr {
Expand Down Expand Up @@ -213,15 +214,6 @@ static void netty_epoll_native_eventFdRead(JNIEnv* env, jclass clazz, jint fd) {
}
}

static void netty_epoll_native_timerFdRead(JNIEnv* env, jclass clazz, jint fd) {
uint64_t timerFireCount;

if (read(fd, &timerFireCount, sizeof(uint64_t)) < 0) {
// it is expected that this is only called where there is known to be activity, so this is an error.
netty_unix_errors_throwChannelExceptionErrorNo(env, "read() failed: ", errno);
}
}

static jint netty_epoll_native_epollCreate(JNIEnv* env, jclass clazz) {
jint efd;
if (epoll_create1) {
Expand Down Expand Up @@ -250,16 +242,6 @@ static jint netty_epoll_native_epollCreate(JNIEnv* env, jclass clazz) {
return efd;
}

static void netty_epoll_native_timerFdSetTime(JNIEnv* env, jclass clazz, jint timerFd, jint tvSec, jint tvNsec) {
struct itimerspec ts;
memset(&ts.it_interval, 0, sizeof(struct timespec));
ts.it_value.tv_sec = tvSec;
ts.it_value.tv_nsec = tvNsec;
if (timerfd_settime(timerFd, 0, &ts, NULL) < 0) {
netty_unix_errors_throwIOExceptionErrorNo(env, "timerfd_settime() failed: ", errno);
}
}

static jint netty_epoll_native_epollWait(JNIEnv* env, jclass clazz, jint efd, jlong address, jint len, jint timeout) {
struct epoll_event *ev = (struct epoll_event*) (intptr_t) address;
int result, err;
Expand All @@ -273,11 +255,46 @@ static jint netty_epoll_native_epollWait(JNIEnv* env, jclass clazz, jint efd, jl
return -err;
}

// This method is deprecated!
static jint netty_epoll_native_epollWait0(JNIEnv* env, jclass clazz, jint efd, jlong address, jint len, jint timerFd, jint tvSec, jint tvNsec) {
// This needs to be consistent with Native.java
#define EPOLL_WAIT_RESULT(V, ARM_TIMER) ((jlong) ((uint64_t) ((uint32_t) V) << 32 | ARM_TIMER))

static jlong netty_epoll_native_epollWait0(JNIEnv* env, jclass clazz, jint efd, jlong address, jint len, jint timerFd, jint tvSec, jint tvNsec, jlong millisThreshold) {
// only reschedule the timer if there is a newer event.
// -1 is a special value used by EpollEventLoop.
uint32_t armTimer = millisThreshold <= 0 ? 1 : 0;
if (tvSec != ((jint) -1) && tvNsec != ((jint) -1)) {
if (millisThreshold > 0 && (tvSec != 0 || tvNsec != 0)) {
// Let's try to reduce the syscalls as much as possible as timerfd_settime(...) can be expensive:
// See https://github.com/netty/netty/issues/11695

if (epoll_pwait2) {
// We have epoll_pwait2(...), this means we can just pass in the itimerspec directly and not need an
// extra syscall even for very small timeouts.
struct timespec ts = { tvSec, tvNsec };
struct epoll_event *ev = (struct epoll_event*) (intptr_t) address;
int result, err;
do {
result = epoll_pwait2(efd, ev, len, &ts, NULL);
if (result >= 0) {
return EPOLL_WAIT_RESULT(result, armTimer);
}
} while((err = errno) == EINTR);
return EPOLL_WAIT_RESULT(-err, armTimer);
}

int millis = tvNsec / 1000000;
// Check if we can reduce the syscall overhead by just use epoll_wait. This is done in cases when we can
// tolerate some "drift".
if (tvNsec == 0 ||
// Let's use the threshold to accept that we may be not 100 % accurate and ignore anything that
// is smaller then 1 ms.
millis >= millisThreshold ||
tvSec > 0) {
millis += tvSec * 1000;
int result = netty_epoll_native_epollWait(env, clazz, efd, address, len, millis);
return EPOLL_WAIT_RESULT(result, armTimer);
}
}
struct itimerspec ts;
memset(&ts.it_interval, 0, sizeof(struct timespec));
ts.it_value.tv_sec = tvSec;
Expand All @@ -286,8 +303,10 @@ static jint netty_epoll_native_epollWait0(JNIEnv* env, jclass clazz, jint efd, j
netty_unix_errors_throwChannelExceptionErrorNo(env, "timerfd_settime() failed: ", errno);
return -1;
}
armTimer = 1;
}
return netty_epoll_native_epollWait(env, clazz, efd, address, len, -1);
int result = netty_epoll_native_epollWait(env, clazz, efd, address, len, -1);
return EPOLL_WAIT_RESULT(result, armTimer);
}

static inline void cpu_relax() {
Expand Down Expand Up @@ -666,10 +685,8 @@ static const JNINativeMethod fixed_method_table[] = {
{ "timerFd", "()I", (void *) netty_epoll_native_timerFd },
{ "eventFdWrite", "(IJ)V", (void *) netty_epoll_native_eventFdWrite },
{ "eventFdRead", "(I)V", (void *) netty_epoll_native_eventFdRead },
{ "timerFdRead", "(I)V", (void *) netty_epoll_native_timerFdRead },
{ "timerFdSetTime", "(III)V", (void *) netty_epoll_native_timerFdSetTime },
{ "epollCreate", "()I", (void *) netty_epoll_native_epollCreate },
{ "epollWait0", "(IJIIII)I", (void *) netty_epoll_native_epollWait0 }, // This method is deprecated!
{ "epollWait0", "(IJIIIIJ)J", (void *) netty_epoll_native_epollWait0 },
{ "epollWait", "(IJII)I", (void *) netty_epoll_native_epollWait },
{ "epollBusyWait0", "(IJI)I", (void *) netty_epoll_native_epollBusyWait0 },
{ "epollCtlAdd0", "(III)I", (void *) netty_epoll_native_epollCtlAdd0 },
Expand Down

0 comments on commit 86004b7

Please sign in to comment.