Skip to content

Commit

Permalink
Epoll: Avoid redundant EPOLL_CTL_MOD calls (#9397) (#9583)
Browse files Browse the repository at this point in the history
Motivation

Currently an epoll_ctl syscall is made every time there is a change to
the event interest flags (EPOLLIN, EPOLLOUT, etc) of a channel. These
are only done in the event loop so can be aggregated into 0 or 1 such
calls per channel prior to the next call to epoll_wait.

Modifications

I think further streamlining/simplification is possible but for now I've
tried to minimize structural changes and added the aggregation beneath
the existing flag manipulation logic.

A new AbstractChannel#activeFlags field records the flags last set on
the epoll fd for that channel. Calls to setFlag/clearFlag update the
flags field as before but instead of calling epoll_ctl immediately, just
set or clear a bit for the channel in a new bitset in the associated
EpollEventLoop to reflect whether there's any change to the last set
value.

Prior to calling epoll_wait the event loop makes the appropriate
epoll_ctl(EPOLL_CTL_MOD) call once for each channel who's bit is set.

Result

Fewer syscalls, particularly in some auto-read=false cases. Simplified
error handling from centralization of these calls.
  • Loading branch information
normanmaurer committed Sep 20, 2019
1 parent 338e1a9 commit 2b9f69a
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
private volatile SocketAddress local;
private volatile SocketAddress remote;

protected int flags = Native.EPOLLET;
protected int flags = Native.EPOLLET | Native.EPOLLIN;
protected int activeFlags;
boolean inputClosedSeenErrorOnRead;
boolean epollInReadyRunnablePending;

Expand Down Expand Up @@ -109,17 +110,23 @@ static boolean isSoErrorZero(Socket fd) {
}
}

void setFlag(int flag) throws IOException {
void setFlag(int flag) {
if (!isFlagSet(flag)) {
flags |= flag;
modifyEvents();
updatePendingFlagsSet();
}
}

void clearFlag(int flag) throws IOException {
void clearFlag(int flag) {
if (isFlagSet(flag)) {
flags &= ~flag;
modifyEvents();
updatePendingFlagsSet();
}
}

private void updatePendingFlagsSet() {
if (isRegistered()) {
((EpollEventLoop) eventLoop()).updatePendingFlagsSet(this);
}
}

Expand Down Expand Up @@ -246,33 +253,33 @@ private static boolean isAllowHalfClosure(ChannelConfig config) {
((SocketChannelConfig) config).isAllowHalfClosure();
}

private Runnable clearEpollInTask;

final void clearEpollIn() {
// Only clear if registered with an EventLoop as otherwise
if (isRegistered()) {
final EventLoop loop = eventLoop();
final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
if (loop.inEventLoop()) {
unsafe.clearEpollIn0();
} else {
// schedule a task to clear the EPOLLIN as it is not safe to modify it directly
loop.execute(new Runnable() {
@Override
public void run() {
if (!unsafe.readPending && !config().isAutoRead()) {
// Still no read triggered so clear it now
unsafe.clearEpollIn0();
}
final EventLoop loop = isRegistered() ? eventLoop() : null;
final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
if (loop == null || loop.inEventLoop()) {
unsafe.clearEpollIn0();
return;
}
// schedule a task to clear the EPOLLIN as it is not safe to modify it directly
Runnable clearFlagTask = clearEpollInTask;
if (clearFlagTask == null) {
clearEpollInTask = clearFlagTask = new Runnable() {
@Override
public void run() {
if (!unsafe.readPending && !config().isAutoRead()) {
// Still no read triggered so clear it now
unsafe.clearEpollIn0();
}
});
}
} else {
// The EventLoop is not registered atm so just update the flags so the correct value
// will be used once the channel is registered
flags &= ~Native.EPOLLIN;
}
};
}
loop.execute(clearFlagTask);
}

private void modifyEvents() throws IOException {
void modifyEvents() throws IOException {
if (isOpen() && isRegistered()) {
((EpollEventLoop) eventLoop()).modify(this);
}
Expand Down Expand Up @@ -416,7 +423,7 @@ final void epollInFinally(ChannelConfig config) {
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
clearEpollIn();
clearEpollIn0();
}
}

Expand Down Expand Up @@ -446,19 +453,7 @@ final void epollRdHupReady() {
}

// Clear the EPOLLRDHUP flag to prevent continuously getting woken up on this event.
clearEpollRdHup();
}

/**
* Clear the {@link Native#EPOLLRDHUP} flag from EPOLL, and close on failure.
*/
private void clearEpollRdHup() {
try {
clearFlag(Native.EPOLLRDHUP);
} catch (IOException e) {
pipeline().fireExceptionCaught(e);
close(voidPromise());
}
clearFlag(Native.EPOLLRDHUP);
}

/**
Expand All @@ -478,7 +473,7 @@ void shutdownInput(boolean rdHup) {
// We attempted to shutdown and failed, which means the input has already effectively been
// shutdown.
}
clearEpollIn();
clearEpollIn0();
pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
close(voidPromise());
Expand Down Expand Up @@ -534,16 +529,9 @@ final void epollOutReady() {
}

protected final void clearEpollIn0() {
assert eventLoop().inEventLoop();
try {
readPending = false;
clearFlag(Native.EPOLLIN);
} catch (IOException e) {
// When this happens there is something completely wrong with either the filedescriptor or epoll,
// so fire the exception through the pipeline and close the Channel.
pipeline().fireExceptionCaught(e);
unsafe().close(unsafe().voidPromise());
}
assert !isRegistered() || eventLoop().inEventLoop();
readPending = false;
clearFlag(Native.EPOLLIN);
}

@Override
Expand Down Expand Up @@ -668,7 +656,7 @@ private void finishConnect() {
/**
* Finish the connect
*/
private boolean doFinishConnect() throws Exception {
private boolean doFinishConnect() throws IOException {
if (socket.finishConnect()) {
clearFlag(Native.EPOLLOUT);
if (requestedRemoteAddress instanceof InetSocketAddress) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,7 @@ public EpollChannelConfig setEpollMode(EpollMode mode) {
if (mode == null) {
throw new NullPointerException("mode");
}
try {
switch (mode) {
switch (mode) {
case EDGE_TRIGGERED:
checkChannelNotRegistered();
((AbstractEpollChannel) channel).setFlag(Native.EPOLLET);
Expand All @@ -162,9 +161,6 @@ public EpollChannelConfig setEpollMode(EpollMode mode) {
break;
default:
throw new Error();
}
} catch (IOException e) {
throw new ChannelException(e);
}
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.io.IOException;
import java.util.BitSet;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
Expand All @@ -59,6 +60,8 @@ class EpollEventLoop extends SingleThreadEventLoop {
private final FileDescriptor eventFd;
private final FileDescriptor timerFd;
private final IntObjectMap<AbstractEpollChannel> channels = new IntObjectHashMap<AbstractEpollChannel>(4096);
private final BitSet pendingFlagChannels = new BitSet();

private final boolean allowGrowing;
private final EpollEventArray events;

Expand Down Expand Up @@ -190,6 +193,7 @@ void add(AbstractEpollChannel ch) throws IOException {
assert inEventLoop();
int fd = ch.socket.intValue();
Native.epollCtlAdd(epollFd.intValue(), fd, ch.flags);
ch.activeFlags = ch.flags;
AbstractEpollChannel old = channels.put(fd, ch);

// We either expect to have no Channel in the map with the same FD or that the FD of the old Channel is already
Expand All @@ -203,6 +207,28 @@ void add(AbstractEpollChannel ch) throws IOException {
void modify(AbstractEpollChannel ch) throws IOException {
assert inEventLoop();
Native.epollCtlMod(epollFd.intValue(), ch.socket.intValue(), ch.flags);
ch.activeFlags = ch.flags;
}

void updatePendingFlagsSet(AbstractEpollChannel ch) {
pendingFlagChannels.set(ch.socket.intValue(), ch.flags != ch.activeFlags);
}

private void processPendingChannelFlags() {
// Call epollCtlMod for any channels that require event interest changes before epollWaiting
if (!pendingFlagChannels.isEmpty()) {
for (int fd = 0; (fd = pendingFlagChannels.nextSetBit(fd)) >= 0; pendingFlagChannels.clear(fd)) {
AbstractEpollChannel ch = channels.get(fd);
if (ch != null) {
try {
ch.modifyEvents();
} catch (IOException e) {
ch.pipeline().fireExceptionCaught(e);
ch.close();
}
}
}
}
}

/**
Expand All @@ -219,10 +245,14 @@ void remove(AbstractEpollChannel ch) throws IOException {

// If we found another Channel in the map that is mapped to the same FD the given Channel MUST be closed.
assert !ch.isOpen();
} else if (ch.isOpen()) {
// Remove the epoll. This is only needed if it's still open as otherwise it will be automatically
// removed once the file-descriptor is closed.
Native.epollCtlDel(epollFd.intValue(), fd);
} else {
ch.activeFlags = 0;
pendingFlagChannels.clear(fd);
if (ch.isOpen()) {
// Remove the epoll. This is only needed if it's still open as otherwise it will be automatically
// removed once the file-descriptor is closed.
Native.epollCtlDel(epollFd.intValue(), fd);
}
}
}

Expand Down Expand Up @@ -288,6 +318,7 @@ private int epollBusyWait() throws IOException {
protected void run() {
for (;;) {
try {
processPendingChannelFlags();
int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
Expand Down

0 comments on commit 2b9f69a

Please sign in to comment.