Skip to content
Permalink
Browse files

Unify KQueue and Epoll wait timeout approach

Motivation:
KQueueEventLoop and EpollEventLoop implement different approaches to applying a timeout of their respective poll calls. Epoll attempts to ensure the desired timeout is satisfied at the java layer and at the JNI layer, but it should be sufficient to account for spurious wakups at the JNI layer. Epoll timeout granularity is also limited to milliseconds which may be too large for some latency sensitive applications.

Modifications:
- Make EpollEventLoop wait method look like KQueueEventLoop
- Epoll should support a finer timeout granularity via timerfd_create. We can hide most of these details behind the epollWait0 JNI call to avoid crossing additional JNI boundaries.

Result:
More consistent timeout approach between KQueue and Epoll.
  • Loading branch information...
Scottmitch committed Aug 2, 2017
1 parent 1d7c3fb commit fe2dd973e9cc537027f75cefa447a6d8ebe5695d
@@ -27,6 +27,7 @@
#include <netinet/tcp.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/timerfd.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <fcntl.h>
@@ -50,16 +51,6 @@
#define TCP_FASTOPEN 23
#endif

/**
* On older Linux kernels, epoll can't handle timeout
* values bigger than (LONG_MAX - 999ULL)/HZ.
*
* See:
* - https://github.com/libevent/libevent/blob/master/epoll.c#L138
* - http://cvs.schmorp.de/libev/ev_epoll.c?revision=1.68&view=markup
*/
#define MAX_EPOLL_TIMEOUT_MSEC (35*60*1000)

// optional
extern int epoll_create1(int flags) __attribute__((weak));

@@ -86,8 +77,6 @@ jfieldID packetPortFieldId = NULL;
jfieldID packetMemoryAddressFieldId = NULL;
jfieldID packetCountFieldId = NULL;

clockid_t waitClockId = 0; // initialized by netty_unix_util_initialize_wait_clock

// util methods
static int getSysctlValue(const char * property, int* returnValue) {
int rc = -1;
@@ -117,18 +106,25 @@ static jint netty_epoll_native_eventFd(JNIEnv* env, jclass clazz) {
jint eventFD = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);

if (eventFD < 0) {
int err = errno;
netty_unix_errors_throwChannelExceptionErrorNo(env, "eventfd() failed: ", err);
netty_unix_errors_throwChannelExceptionErrorNo(env, "eventfd() failed: ", errno);
}
return eventFD;
}

static jint netty_epoll_native_timerFd(JNIEnv* env, jclass clazz) {
jint timerFD = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC | TFD_NONBLOCK);

if (timerFD < 0) {
netty_unix_errors_throwChannelExceptionErrorNo(env, "timerfd_create() failed: ", errno);
}
return timerFD;
}

static void netty_epoll_native_eventFdWrite(JNIEnv* env, jclass clazz, jint fd, jlong value) {
jint eventFD = eventfd_write(fd, (eventfd_t) value);

if (eventFD < 0) {
int err = errno;
netty_unix_errors_throwChannelExceptionErrorNo(env, "eventfd_write() failed: ", err);
netty_unix_errors_throwChannelExceptionErrorNo(env, "eventfd_write() failed: ", errno);
}
}

@@ -141,6 +137,15 @@ static void netty_epoll_native_eventFdRead(JNIEnv* env, jclass clazz, jint fd) {
}
}

static void netty_epoll_native_timerFdRead(JNIEnv* env, jclass clazz, jint fd) {
uint64_t timerFireCount;

if (read(fd, &timerFireCount, sizeof(uint64_t)) < 0) {
// it is expected that this is only called where there is known to be activity, so this is an error.
netty_unix_errors_throwChannelExceptionErrorNo(env, "read() failed: ", errno);
}
}

static jint netty_epoll_native_epollCreate(JNIEnv* env, jclass clazz) {
jint efd;
if (epoll_create1) {
@@ -169,42 +174,44 @@ static jint netty_epoll_native_epollCreate(JNIEnv* env, jclass clazz) {
return efd;
}

static jint netty_epoll_native_epollWait0(JNIEnv* env, jclass clazz, jint efd, jlong address, jint len, jint timeout) {
static jint netty_epoll_native_epollWait0(JNIEnv* env, jclass clazz, jint efd, jlong address, jint len, jint timerFd, jint tvSec, jint tvNsec) {
struct epoll_event *ev = (struct epoll_event*) (intptr_t) address;
struct timespec ts;
jlong timeBeforeWait, timeNow;
int timeDiff, result, err;

if (timeout > MAX_EPOLL_TIMEOUT_MSEC) {
// Workaround for bug in older linux kernels that can not handle bigger timeout then MAX_EPOLL_TIMEOUT_MSEC.
timeout = MAX_EPOLL_TIMEOUT_MSEC;
}

clock_gettime(waitClockId, &ts);
timeBeforeWait = ts.tv_sec * 1000 + ts.tv_nsec / 1000000;

for (;;) {
result = epoll_wait(efd, ev, len, timeout);
if (result >= 0) {
return result;
}
if ((err = errno) == EINTR) {
if (timeout > 0) {
clock_gettime(waitClockId, &ts);
timeNow = ts.tv_sec * 1000 + ts.tv_nsec / 1000000;
timeDiff = timeNow - timeBeforeWait;
timeout -= timeDiff;
if (timeDiff < 0 || timeout <= 0) {
return 0;
}
timeBeforeWait = timeNow;
} else if (timeout == 0) {
return 0;
int result, err;

if (tvSec == 0 && tvNsec == 0) {
// Zeros = poll (aka return immediately).
do {
result = epoll_wait(efd, ev, len, 0);
if (result >= 0) {
return result;
}
} while((err = errno) == EINTR);
} else {
struct itimerspec ts;
memset(&ts.it_interval, 0, sizeof(struct timespec));
ts.it_value.tv_sec = tvSec;
ts.it_value.tv_nsec = tvNsec;
if (timerfd_settime(timerFd, 0, &ts, NULL) < 0) {
netty_unix_errors_throwChannelExceptionErrorNo(env, "timerfd_settime() failed: ", errno);
return -1;
}
} else {
return -err;
}
}
do {
result = epoll_wait(efd, ev, len, -1);
if (result > 0) {
// Detect timeout, and preserve the epoll_wait API.
if (result == 1 && ev[0].data.fd == timerFd) {
// We assume that timerFD is in ET mode. So we must consume this event to ensure we are notified
// of future timer events because ET mode only notifies a single time until the event is consumed.
uint64_t timerFireCount;
// We don't care what the result is. We just want to consume the wakeup event and reset ET.
result = read(timerFd, &timerFireCount, sizeof(uint64_t));
return 0;
}
return result;
}
} while((err = errno) == EINTR);
}
return -err;
}

static jint netty_epoll_native_epollCtlAdd0(JNIEnv* env, jclass clazz, jint efd, jint fd, jint flags) {
@@ -311,8 +318,7 @@ static jstring netty_epoll_native_kernelVersion(JNIEnv* env, jclass clazz) {
if (res == 0) {
return (*env)->NewStringUTF(env, name.release);
}
int err = errno;
netty_unix_errors_throwRuntimeExceptionErrorNo(env, "uname() failed: ", err);
netty_unix_errors_throwRuntimeExceptionErrorNo(env, "uname() failed: ", errno);
return NULL;
}

@@ -409,10 +415,12 @@ static const JNINativeMethod statically_referenced_fixed_method_table[] = {
static const jint statically_referenced_fixed_method_table_size = sizeof(statically_referenced_fixed_method_table) / sizeof(statically_referenced_fixed_method_table[0]);
static const JNINativeMethod fixed_method_table[] = {
{ "eventFd", "()I", (void *) netty_epoll_native_eventFd },
{ "timerFd", "()I", (void *) netty_epoll_native_timerFd },
{ "eventFdWrite", "(IJ)V", (void *) netty_epoll_native_eventFdWrite },
{ "eventFdRead", "(I)V", (void *) netty_epoll_native_eventFdRead },
{ "timerFdRead", "(I)V", (void *) netty_epoll_native_timerFdRead },
{ "epollCreate", "()I", (void *) netty_epoll_native_epollCreate },
{ "epollWait0", "(IJII)I", (void *) netty_epoll_native_epollWait0 },
{ "epollWait0", "(IJIIII)I", (void *) netty_epoll_native_epollWait0 },
{ "epollCtlAdd0", "(III)I", (void *) netty_epoll_native_epollCtlAdd0 },
{ "epollCtlMod0", "(III)I", (void *) netty_epoll_native_epollCtlMod0 },
{ "epollCtlDel0", "(II)I", (void *) netty_epoll_native_epollCtlDel0 },
@@ -572,11 +580,6 @@ static jint netty_epoll_native_JNI_OnLoad(JNIEnv* env, const char* packagePrefix
return JNI_ERR;
}

if (!netty_unix_util_initialize_wait_clock(&waitClockId)) {
fprintf(stderr, "FATAL: could not find a clock for clock_gettime!\n");
return JNI_ERR;
}

return NETTY_JNI_VERSION;
}

@@ -39,6 +39,8 @@
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import static java.lang.Math.min;

/**
* {@link EventLoop} which uses epoll under the covers. Only works on Linux!
*/
@@ -55,6 +57,7 @@

private final FileDescriptor epollFd;
private final FileDescriptor eventFd;
private final FileDescriptor timerFd;
private final IntObjectMap<AbstractEpollChannel> channels = new IntObjectHashMap<AbstractEpollChannel>(4096);
private final boolean allowGrowing;
private final EpollEventArray events;
@@ -63,7 +66,7 @@
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return Native.epollWait(epollFd.intValue(), events, 0);
return epollWaitNow();
}
};
private final Callable<Integer> pendingTasksCallable = new Callable<Integer>() {
@@ -89,6 +92,7 @@ public Integer call() throws Exception {
boolean success = false;
FileDescriptor epollFd = null;
FileDescriptor eventFd = null;
FileDescriptor timerFd = null;
try {
this.epollFd = epollFd = Native.newEpollCreate();
this.eventFd = eventFd = Native.newEventFd();
@@ -97,6 +101,12 @@ public Integer call() throws Exception {
} catch (IOException e) {
throw new IllegalStateException("Unable to add eventFd filedescriptor to epoll", e);
}
this.timerFd = timerFd = Native.newTimerFd();
try {
Native.epollCtlAdd(epollFd.intValue(), timerFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
} catch (IOException e) {
throw new IllegalStateException("Unable to add timerFd filedescriptor to epoll", e);
}
success = true;
} finally {
if (!success) {
@@ -114,6 +124,13 @@ public Integer call() throws Exception {
// ignore
}
}
if (timerFd != null) {
try {
timerFd.close();
} catch (Exception e) {
// ignore
}
}
}
}
}
@@ -204,43 +221,23 @@ public void setIoRatio(int ioRatio) {
this.ioRatio = ioRatio;
}

private int epollWait(boolean oldWakenUp) throws IOException {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
int ready = Native.epollWait(epollFd.intValue(), events, 0);
if (ready > 0) {
return ready;
}
}
break;
}

// If a task was submitted when wakenUp value was 1, the task didn't get a chance to produce wakeup event.
// So we need to check task queue again before calling epoll_wait. If we don't, the task might be pended
// until epoll_wait was timed out. It might be pended until idle timeout if IdleStateHandler existed
// in pipeline.
if (hasTasks() && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) {
return Native.epollWait(epollFd.intValue(), events, 0);
}
private int epollWait(boolean oldWakeup) throws IOException {
// If a task was submitted when wakenUp value was 1, the task didn't get a chance to produce wakeup event.
// So we need to check task queue again before calling epoll_wait. If we don't, the task might be pended
// until epoll_wait was timed out. It might be pended until idle timeout if IdleStateHandler existed
// in pipeline.
if (oldWakeup && hasTasks()) {
return epollWaitNow();
}

int selectedKeys = Native.epollWait(epollFd.intValue(), events, (int) timeoutMillis);
selectCnt ++;
long totalDelay = delayNanos(System.nanoTime());
int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
return Native.epollWait(epollFd, events, timerFd, delaySeconds,
(int) min(totalDelay - delaySeconds * 1000000000L, Integer.MAX_VALUE));
}

if (selectedKeys != 0 || oldWakenUp || wakenUp == 1 || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
return selectedKeys;
}
currentTimeNanos = System.nanoTime();
}
return 0;
private int epollWaitNow() throws IOException {
return Native.epollWait(epollFd, events, timerFd, 0, 0);
}

@Override
@@ -347,7 +344,7 @@ private static void handleLoopException(Throwable t) {

private void closeAll() {
try {
Native.epollWait(epollFd.intValue(), events, 0);
epollWaitNow();
} catch (IOException ignore) {
// ignore on close
}
@@ -368,8 +365,11 @@ private void processReady(EpollEventArray events, int ready) {
for (int i = 0; i < ready; i ++) {
final int fd = events.fd(i);
if (fd == eventFd.intValue()) {
// consume wakeup event
Native.eventFdRead(eventFd.intValue());
// consume wakeup event.
Native.eventFdRead(fd);
} else if (fd == timerFd.intValue()) {
// consume wakeup event, necessary because the timer is added with ET mode.
Native.timerFdRead(fd);
} else {
final long ev = events.events(i);

@@ -438,6 +438,11 @@ protected void cleanup() {
} catch (IOException e) {
logger.warn("Failed to close the event fd.", e);
}
try {
timerFd.close();
} catch (IOException e) {
logger.warn("Failed to close the timer fd.", e);
}
} finally {
// release native memory
iovArray.release();
@@ -97,24 +97,32 @@ public static FileDescriptor newEventFd() {
return new FileDescriptor(eventFd());
}

public static FileDescriptor newTimerFd() {
return new FileDescriptor(timerFd());
}

private static native int eventFd();
private static native int timerFd();
public static native void eventFdWrite(int fd, long value);
public static native void eventFdRead(int fd);
static native void timerFdRead(int fd);

public static FileDescriptor newEpollCreate() {
return new FileDescriptor(epollCreate());
}

private static native int epollCreate();

public static int epollWait(int efd, EpollEventArray events, int timeout) throws IOException {
int ready = epollWait0(efd, events.memoryAddress(), events.length(), timeout);
public static int epollWait(FileDescriptor epollFd, EpollEventArray events, FileDescriptor timerFd,
int timeoutSec, int timeoutNs) throws IOException {
int ready = epollWait0(epollFd.intValue(), events.memoryAddress(), events.length(), timerFd.intValue(),
timeoutSec, timeoutNs);
if (ready < 0) {
throw newIOException("epoll_wait", ready);
}
return ready;
}
private static native int epollWait0(int efd, long address, int len, int timeout);
private static native int epollWait0(int efd, long address, int len, int timerFd, int timeoutSec, int timeoutNs);

public static void epollCtlAdd(int efd, final int fd, final int flags) throws IOException {
int res = epollCtlAdd0(efd, fd, flags);

0 comments on commit fe2dd97

Please sign in to comment.
You can’t perform that action at this time.