Skip to content

Commit

Permalink
Revert changes in EpollEventLoop that were done recently and did caus…
Browse files Browse the repository at this point in the history
…e various problems in different testsuites.

Motivation:

Changes that were done to the EpollEventLoop to optimize some things did break some testsuite and caused timeouts. We need to investigate to see why this is the case but for
now we should just revert so we can do a release.

Modifivations:

- Partly revert 1fa7a5e and a22d4ba

Result:

Testsuites pass again.
  • Loading branch information
normanmaurer committed Sep 12, 2019
1 parent b409f8e commit 7f39142
Showing 1 changed file with 71 additions and 132 deletions.
Expand Up @@ -35,9 +35,7 @@
import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import static java.lang.Math.min;

Expand All @@ -46,23 +44,17 @@
*/
class EpollEventLoop extends SingleThreadEventLoop {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollEventLoop.class);
private static final AtomicIntegerFieldUpdater<EpollEventLoop> WAKEN_UP_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(EpollEventLoop.class, "wakenUp");

static {
// Ensure JNI is initialized by the time this class is loaded by this time!
// We use unix-common methods in this class which are backed by JNI methods.
Epoll.ensureAvailability();
}

/**
* When in epollWait(), this mirrors the currently-set deadline of the timerFd. A negative value
* means that the event loop is awake, which blocks rescheduling activity by other threads.
* It is restored to the real timerFd expiry time again prior to entering epollWait().
*
* Note that we use deadline instead of delay because deadline is just a fixed number but delay requires interacting
* with the time source (e.g. calling System.nanoTime()) which can be expensive.
*/
private final AtomicLong nextDeadlineNanos = new AtomicLong(-1L);
private final AtomicInteger wakenUp = new AtomicInteger();
// Pick a number that no task could have previously used.
private long prevDeadlineNanos = nanoTime() - 1;
private final FileDescriptor epollFd;
private final FileDescriptor eventFd;
private final FileDescriptor timerFd;
Expand All @@ -81,6 +73,12 @@ public int get() throws Exception {
return epollWaitNow();
}
};
@SuppressWarnings("unused") // AtomicIntegerFieldUpdater
private volatile int wakenUp;
private volatile int ioRatio = 50;

// See http://man7.org/linux/man-pages/man2/timerfd_create.2.html.
private static final long MAX_SCHEDULED_TIMERFD_NS = 999999999;

EpollEventLoop(EventLoopGroup parent, Executor executor, int maxEvents,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
Expand Down Expand Up @@ -177,88 +175,9 @@ NativeDatagramPacketArray cleanDatagramPacketArray() {
return datagramPacketArray;
}

@Override
protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
return false; // don't wake event loop
}

@Override
protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
try {
trySetTimerFd(deadlineNanos);
} catch (IOException e) {
throw new RejectedExecutionException(e);
}
return false; // don't wake event loop
}

@Override
protected boolean runAllTasks() {
// This method is overridden to ensure that all the expired scheduled tasks are executed during shutdown, and
// any other execute all scenarios in the base class.
return runScheduledAndExecutorTasks(4);
}

private void trySetTimerFd(long candidateNextDeadline) throws IOException {
for (;;) {
long nextDeadline = nextDeadlineNanos.get();
if (nextDeadline <= candidateNextDeadline) {
// This includes case where nextDeadline is negative (event loop is awake)
return;
}
if (nextDeadlineNanos.compareAndSet(nextDeadline, candidateNextDeadline)) {
// We must serialize calls to setTimerFd to avoid the set of a later deadline
// racing with a sooner one and overwriting it. A second check of nextDeadlineNanos
// is made within the sync block to avoid having the CAS within the sync
synchronized (nextDeadlineNanos) {
nextDeadline = nextDeadlineNanos.get();
if (nextDeadline == candidateNextDeadline ||
(nextDeadline + Long.MAX_VALUE + 1) == candidateNextDeadline) {
setTimerFd(deadlineToDelayNanos(candidateNextDeadline));
}
}
return;
}
}
}

private void setTimerFd(long candidateNextDelayNanos) throws IOException {
if (candidateNextDelayNanos > 0) {
final int delaySeconds = (int) min(candidateNextDelayNanos / 1000000000L, Integer.MAX_VALUE);
final int delayNanos = (int) min(candidateNextDelayNanos - delaySeconds * 1000000000L, Integer.MAX_VALUE);
Native.timerFdSetTime(timerFd.intValue(), delaySeconds, delayNanos);
} else {
// Setting the timer to 0, 0 will disarm it, so we have a few options:
// 1. Set the timer wakeup to 1ns (1 system call).
// 2. Use the eventFd to force a wakeup and disarm the timer (2 system calls).
// For now we are using option (1) because there are less system calls, and we will correctly reset the
// nextDeadlineNanos state when the EventLoop processes the timer wakeup.
Native.timerFdSetTime(timerFd.intValue(), 0, 1);
}
}

private long checkScheduleTaskQueueForNewDelay(long timerFdDeadline) throws IOException {
assert nextDeadlineNanos.get() < 0;
final long nextTaskDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (nextTaskDeadlineNanos == -1 || nextTaskDeadlineNanos >= timerFdDeadline) {
// Just restore to preexisting timerFd value, update not needed
nextDeadlineNanos.lazySet(timerFdDeadline);
} else {
synchronized (nextDeadlineNanos) {
// Shorter delay required than current timerFd setting, update it
nextDeadlineNanos.lazySet(timerFdDeadline = nextTaskDeadlineNanos);
setTimerFd(deadlineToDelayNanos(timerFdDeadline));
}
}
return timerFdDeadline;
// Don't disarm the timerFd even if there are no more queued tasks. Since we are setting timerFd from outside
// the EventLoop it is possible that another thread has set the timer and we may miss a wakeup if we disarm
// the timer here. Instead we wait for the timer wakeup on the EventLoop and clear state for the next timer.
}

@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && wakenUp.getAndSet(1) == 0) {
if (!inEventLoop && WAKEN_UP_UPDATER.getAndSet(this, 1) == 0) {
// write to the evfd which will then wake-up epoll_wait(...)
Native.eventFdWrite(eventFd.intValue(), 1L);
}
Expand Down Expand Up @@ -318,21 +237,47 @@ private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
: PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}

/**
* Returns the percentage of the desired amount of time spent for I/O in the event loop.
*/
public int getIoRatio() {
return ioRatio;
}

/**
* Sets the percentage of the desired amount of time spent for I/O in the event loop. The default value is
* {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
*/
public void setIoRatio(int ioRatio) {
if (ioRatio <= 0 || ioRatio > 100) {
throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
}
this.ioRatio = ioRatio;
}

@Override
public int registeredChannels() {
return channels.size();
}

private int epollWait() throws IOException {
// If a task was submitted when wakenUp value was 1, the task didn't get a chance to produce wakeup event.
// So we need to check task queue again before calling epoll_wait. If we don't, the task might be pended
// until epoll_wait was timed out. It might be pended until idle timeout if IdleStateHandler existed
// in pipeline.
return Native.epollWait(epollFd, events, hasTasks());
int delaySeconds;
int delayNanos;
long curDeadlineNanos = deadlineNanos();
if (curDeadlineNanos == prevDeadlineNanos) {
delaySeconds = -1;
delayNanos = -1;
} else {
long totalDelay = delayNanos(System.nanoTime());
prevDeadlineNanos = curDeadlineNanos;
delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
delayNanos = (int) min(totalDelay - delaySeconds * 1000000000L, MAX_SCHEDULED_TIMERFD_NS);
}
return Native.epollWait(epollFd, events, timerFd, delaySeconds, delayNanos);
}

private int epollWaitNow() throws IOException {
return Native.epollWait(epollFd, events, true);
return Native.epollWait(epollFd, events, timerFd, 0, 0);
}

private int epollBusyWait() throws IOException {
Expand All @@ -341,7 +286,6 @@ private int epollBusyWait() throws IOException {

@Override
protected void run() {
long timerFdDeadline = Long.MAX_VALUE;
for (;;) {
try {
int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
Expand All @@ -354,38 +298,38 @@ protected void run() {
break;

case SelectStrategy.SELECT:
if (wakenUp.get() == 1) {
wakenUp.set(0);
if (wakenUp == 1) {
wakenUp = 0;
}
if (!hasTasks()) {
// When we are in the EventLoop we don't bother setting the timerFd for each
// scheduled task, but instead defer the processing until the end of the EventLoop
// (next wait) to reduce the timerFd modifications.
timerFdDeadline = checkScheduleTaskQueueForNewDelay(timerFdDeadline);
try {
strategy = epollWait();
} finally {
// This getAndAdd will change the raw value of nextDeadlineNanos to be negative
// which will block any *new* timerFd mods by other threads while also "preserving"
// its last value to avoid disrupting a possibly-concurrent setTimerFd call
// (so that we can know the timerFd really did/will get updated to the read value).
timerFdDeadline = nextDeadlineNanos.getAndAdd(Long.MAX_VALUE + 1);
// The value of nextDeadlineNanos is now guaranteed to be negative
}
strategy = epollWait();
}
// fallthrough
default:
}

try {
if (processReady(events, strategy)) {
// Polled events include timerFd expiry; conservatively assume that no timer is set
timerFdDeadline = Long.MAX_VALUE;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
if (strategy > 0) {
processReady(events, strategy);
}
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();

try {
if (strategy > 0) {
processReady(events, strategy);
}
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} finally {
runAllTasks();
// No need to drainScheduledQueue() after the fact, because all in event loop scheduling results
// in direct addition to the scheduled priority queue.
}
if (allowGrowing && strategy == events.length()) {
//increase the size of the array as we needed the whole space for the events
Expand Down Expand Up @@ -439,17 +383,13 @@ private void closeAll() {
}
}

// Returns true if a timerFd event was encountered
private boolean processReady(EpollEventArray events, int ready) {
boolean timerFired = false;
for (int i = 0; i < ready; ++i) {
private void processReady(EpollEventArray events, int ready) {
for (int i = 0; i < ready; i ++) {
final int fd = events.fd(i);
if (fd == eventFd.intValue()) {
if (fd == eventFd.intValue() || fd == timerFd.intValue()) {
// Just ignore as we use ET mode for the eventfd and timerfd.
//
// See also https://stackoverflow.com/a/12492308/1074097
} else if (fd == timerFd.intValue()) {
timerFired = true;
} else {
final long ev = events.events(i);

Expand Down Expand Up @@ -503,7 +443,6 @@ private boolean processReady(EpollEventArray events, int ready) {
}
}
}
return timerFired;
}

@Override
Expand Down

0 comments on commit 7f39142

Please sign in to comment.