Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid unnecessary event loop wakeups #9605

Merged
merged 1 commit into from
Oct 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 0 additions & 4 deletions transport-native-epoll/src/main/c/netty_epoll_native.c
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,6 @@ static jint netty_epoll_native_epollWait(JNIEnv* env, jclass clazz, jint efd, jl

// This method is deprecated!
static jint netty_epoll_native_epollWait0(JNIEnv* env, jclass clazz, jint efd, jlong address, jint len, jint timerFd, jint tvSec, jint tvNsec) {
if (tvSec == 0 && tvNsec == 0) {
// Zeros = poll (aka return immediately).
return netty_epoll_native_epollWait(env, clazz, efd, address, len, 0);
}
// only reschedule the timer if there is a newer event.
// -1 is a special value used by EpollEventLoop.
if (tvSec != ((jint) -1) && tvNsec != ((jint) -1)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import java.util.BitSet;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import static java.lang.Math.min;

Expand All @@ -52,8 +52,6 @@ class EpollEventLoop extends SingleThreadEventLoop {
Epoll.ensureAvailability();
}

// Pick a number that no task could have previously used.
private long prevDeadlineNanos = nanoTime() - 1;
private final FileDescriptor epollFd;
private final FileDescriptor eventFd;
private final FileDescriptor timerFd;
Expand All @@ -74,7 +72,12 @@ public int get() throws Exception {
return epollWaitNow();
}
};
private final AtomicInteger wakenUp = new AtomicInteger(1);

// nextWakeupNanos is:
// -1 when EL is awake
// Long.MAX_VALUE when EL is waiting with no wakeup scheduled
// other value T when EL is waiting with wakeup scheduled at time T
private final AtomicLong nextWakeupNanos = new AtomicLong(-1L);
private boolean pendingWakeup;
private volatile int ioRatio = 50;

Expand Down Expand Up @@ -178,12 +181,24 @@ NativeDatagramPacketArray cleanDatagramPacketArray() {

@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && wakenUp.getAndSet(1) == 0) {
if (!inEventLoop && nextWakeupNanos.getAndSet(-1L) != -1L) {
// write to the evfd which will then wake-up epoll_wait(...)
Native.eventFdWrite(eventFd.intValue(), 1L);
}
}

@Override
protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
// Note this is also correct for the nextWakeupNanos == -1 case
return deadlineNanos < nextWakeupNanos.get();
}

@Override
protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
// Note this is also correct for the nextWakeupNanos == -1 case
return deadlineNanos < nextWakeupNanos.get();
}

/**
* Register the given epoll with this {@link EventLoop}.
*/
Expand Down Expand Up @@ -288,22 +303,20 @@ public int registeredChannels() {
return channels.size();
}

private int epollWait() throws IOException {
int delaySeconds;
int delayNanos;
long curDeadlineNanos = deadlineNanos();
if (curDeadlineNanos == prevDeadlineNanos) {
delaySeconds = -1;
delayNanos = -1;
} else {
long totalDelay = delayNanos(System.nanoTime());
prevDeadlineNanos = curDeadlineNanos;
delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
delayNanos = (int) min(totalDelay - delaySeconds * 1000000000L, MAX_SCHEDULED_TIMERFD_NS);
private int epollWait(long deadlineNanos) throws IOException {
if (deadlineNanos == Long.MAX_VALUE) {
return Native.epollWait(epollFd, events, timerFd, Integer.MAX_VALUE, 0); // disarm timer
}
long totalDelay = deadlineToDelayNanos(deadlineNanos);
int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
int delayNanos = (int) min(totalDelay - delaySeconds * 1000000000L, MAX_SCHEDULED_TIMERFD_NS);
return Native.epollWait(epollFd, events, timerFd, delaySeconds, delayNanos);
}

private int epollWaitNoTimerChange() throws IOException {
return Native.epollWait(epollFd, events, false);
}

private int epollWaitNow() throws IOException {
return Native.epollWait(epollFd, events, true);
}
Expand All @@ -319,6 +332,7 @@ private int epollWaitTimeboxed() throws IOException {

@Override
protected void run() {
long prevDeadlineNanos = Long.MAX_VALUE;
for (;;) {
try {
processPendingChannelFlags();
Expand Down Expand Up @@ -349,15 +363,26 @@ protected void run() {
// fall-through
}

wakenUp.set(0);
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = Long.MAX_VALUE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
strategy = epollWait();
if (curDeadlineNanos == prevDeadlineNanos) {
// No timer activity needed
strategy = epollWaitNoTimerChange();
} else {
// Timerfd needs to be re-armed or disarmed
prevDeadlineNanos = curDeadlineNanos;
strategy = epollWait(curDeadlineNanos);
}
}
} finally {
// Try get() first to avoid much more expensive CAS in the case we
// were woken via the wakeup() method (submitted task)
if (wakenUp.get() == 1 || wakenUp.getAndSet(1) == 1) {
if (nextWakeupNanos.get() == -1L || nextWakeupNanos.getAndSet(-1L) == -1L) {
pendingWakeup = true;
}
}
Expand All @@ -368,8 +393,8 @@ protected void run() {
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
if (strategy > 0) {
processReady(events, strategy);
if (strategy > 0 && processReady(events, strategy)) {
prevDeadlineNanos = Long.MAX_VALUE;
}
} finally {
// Ensure we always run tasks.
Expand All @@ -379,8 +404,8 @@ protected void run() {
final long ioStartTime = System.nanoTime();

try {
if (strategy > 0) {
processReady(events, strategy);
if (strategy > 0 && processReady(events, strategy)) {
prevDeadlineNanos = Long.MAX_VALUE;
}
} finally {
// Ensure we always run tasks.
Expand Down Expand Up @@ -434,15 +459,15 @@ private void closeAll() {
}
}

private void processReady(EpollEventArray events, int ready) {
// Returns true if a timerFd event was encountered
private boolean processReady(EpollEventArray events, int ready) {
boolean timerFired = false;
for (int i = 0; i < ready; i ++) {
final int fd = events.fd(i);
if (fd == eventFd.intValue()) {
pendingWakeup = false;
} else if (fd == timerFd.intValue()) {
// Just ignore as we use ET mode for the eventfd and timerfd.
//
// See also https://stackoverflow.com/a/12492308/1074097
timerFired = true;
} else {
final long ev = events.events(i);

Expand Down Expand Up @@ -496,6 +521,7 @@ private void processReady(EpollEventArray events, int ready) {
}
}
}
return timerFired;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,15 @@ public static FileDescriptor newEpollCreate() {
@Deprecated
public static int epollWait(FileDescriptor epollFd, EpollEventArray events, FileDescriptor timerFd,
int timeoutSec, int timeoutNs) throws IOException {
if (timeoutSec == 0 && timeoutNs == 0) {
// Zero timeout => poll (aka return immediately)
return epollWait(epollFd, events, 0);
}
if (timeoutSec == Integer.MAX_VALUE) {
// Max timeout => wait indefinitely: disarm timerfd first
timeoutSec = 0;
timeoutNs = 0;
}
int ready = epollWait0(epollFd.intValue(), events.memoryAddress(), events.length(), timerFd.intValue(),
timeoutSec, timeoutNs);
if (ready < 0) {
Expand Down