Permalink
Browse files

Detecting actual Channel write idleness vs. slowness

Motivation

The IdleStateHandler tracks write() idleness on message granularity but does not take into consideration that the client may be just slow and has managed to consume a subset of the message's bytes in the configured period of time.

Modifications

Adding an optional configuration parameter to IdleStateHandler which tells it to observe ChannelOutboundBuffer's state.

Result

Fixes #6150
  • Loading branch information...
1 parent 56ddc47 commit 68a941c091f5a5b5d69715327db9ca77b53e1864 Roger Kapsi committed with Scottmitch Dec 28, 2016
@@ -17,11 +17,13 @@
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
+import io.netty.channel.Channel.Unsafe;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.EventExecutor;
@@ -101,11 +103,12 @@
private final ChannelFutureListener writeListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
- lastWriteTime = System.nanoTime();
+ lastWriteTime = ticksInNanos();
firstWriterIdleEvent = firstAllIdleEvent = true;
}
};
+ private final boolean observeOutput;
private final long readerIdleTimeNanos;
private final long writerIdleTimeNanos;
private final long allIdleTimeNanos;
@@ -124,6 +127,10 @@ public void operationComplete(ChannelFuture future) throws Exception {
private byte state; // 0 - none, 1 - initialized, 2 - destroyed
private boolean reading;
+ private long lastChangeCheckTimeStamp;
+ private int lastMessageHashCode;
+ private long lastPendingWriteBytes;
+
/**
* Creates a new instance firing {@link IdleStateEvent}s.
*
@@ -150,8 +157,20 @@ public IdleStateHandler(
}
/**
+ * @see #IdleStateHandler(boolean, long, long, long, TimeUnit)
+ */
+ public IdleStateHandler(
+ long readerIdleTime, long writerIdleTime, long allIdleTime,
+ TimeUnit unit) {
+ this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
+ }
+
+ /**
* Creates a new instance firing {@link IdleStateEvent}s.
*
+ * @param observeOutput
+ * whether or not the consumption of {@code bytes} should be taken into
+ * consideration when assessing write idleness. The default is {@code false}.
* @param readerIdleTime
* an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
* will be triggered when no read was performed for the specified
@@ -168,13 +187,15 @@ public IdleStateHandler(
* the {@link TimeUnit} of {@code readerIdleTime},
* {@code writeIdleTime}, and {@code allIdleTime}
*/
- public IdleStateHandler(
+ public IdleStateHandler(boolean observeOutput,
long readerIdleTime, long writerIdleTime, long allIdleTime,
TimeUnit unit) {
if (unit == null) {
throw new NullPointerException("unit");
}
+ this.observeOutput = observeOutput;
+
if (readerIdleTime <= 0) {
readerIdleTimeNanos = 0;
} else {
@@ -269,7 +290,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
- lastReadTime = System.nanoTime();
+ lastReadTime = ticksInNanos();
reading = false;
}
ctx.fireChannelReadComplete();
@@ -297,27 +318,37 @@ private void initialize(ChannelHandlerContext ctx) {
}
state = 1;
+ initOutputChanged(ctx);
- EventExecutor loop = ctx.executor();
-
- lastReadTime = lastWriteTime = System.nanoTime();
+ lastReadTime = lastWriteTime = ticksInNanos();
if (readerIdleTimeNanos > 0) {
- readerIdleTimeout = loop.schedule(
- new ReaderIdleTimeoutTask(ctx),
+ readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (writerIdleTimeNanos > 0) {
- writerIdleTimeout = loop.schedule(
- new WriterIdleTimeoutTask(ctx),
+ writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (allIdleTimeNanos > 0) {
- allIdleTimeout = loop.schedule(
- new AllIdleTimeoutTask(ctx),
+ allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
allIdleTimeNanos, TimeUnit.NANOSECONDS);
}
}
+ /**
+ * This method is visible for testing!
+ */
+ long ticksInNanos() {
+ return System.nanoTime();
+ }
+
+ /**
+ * This method is visible for testing!
+ */
+ ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
+ return ctx.executor().schedule(task, delay, unit);
+ }
+
private void destroy() {
state = 2;
@@ -355,15 +386,77 @@ protected IdleStateEvent newIdleStateEvent(IdleState state, boolean first) {
case WRITER_IDLE:
return first ? IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT : IdleStateEvent.WRITER_IDLE_STATE_EVENT;
default:
- throw new Error();
+ throw new IllegalArgumentException("Unhandled: state=" + state + ", first=" + first);
}
}
- private final class ReaderIdleTimeoutTask implements Runnable {
+ /**
+ * @see #hasOutputChanged(ChannelHandlerContext, boolean)
+ */
+ private void initOutputChanged(ChannelHandlerContext ctx) {
+ if (observeOutput) {
+ Channel channel = ctx.channel();
+ Unsafe unsafe = channel.unsafe();
+ ChannelOutboundBuffer buf = unsafe.outboundBuffer();
+
+ if (buf != null) {
+ lastMessageHashCode = System.identityHashCode(buf.current());
+ lastPendingWriteBytes = buf.totalPendingWriteBytes();
+ }
+ }
+ }
+
+ /**
+ * Returns {@code true} if and only if the {@link IdleStateHandler} was constructed
+ * with {@link #observeOutput} enabled and there has been an observed change in the
+ * {@link ChannelOutboundBuffer} between two consecutive calls of this method.
+ *
+ * https://github.com/netty/netty/issues/6150
+ */
+ private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {
+ if (observeOutput) {
+
+ // We can take this shortcut if the ChannelPromises that got passed into write()
+ // appear to complete. It indicates "change" on message level and we simply assume
+ // that there's change happening on byte level. If the user doesn't observe channel
+ // writability events then they'll eventually OOME and there's clearly a different
+ // problem and idleness is least of their concerns.
+ if (lastChangeCheckTimeStamp != lastWriteTime) {
+ lastChangeCheckTimeStamp = lastWriteTime;
+
+ // But this applies only if it's the non-first call.
+ if (!first) {
+ return true;
+ }
+ }
+
+ Channel channel = ctx.channel();
+ Unsafe unsafe = channel.unsafe();
+ ChannelOutboundBuffer buf = unsafe.outboundBuffer();
+
+ if (buf != null) {
+ int messageHashCode = System.identityHashCode(buf.current());
+ long pendingWriteBytes = buf.totalPendingWriteBytes();
+
+ if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {
+ lastMessageHashCode = messageHashCode;
+ lastPendingWriteBytes = pendingWriteBytes;
+
+ if (!first) {
+ return true;
+ }
+ }
+ }
+ }
+
+ return false;
+ }
+
+ private abstract static class AbstractIdleTask implements Runnable {
private final ChannelHandlerContext ctx;
- ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
+ AbstractIdleTask(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@@ -373,106 +466,115 @@ public void run() {
return;
}
+ run(ctx);
+ }
+
+ protected abstract void run(ChannelHandlerContext ctx);
+ }
+
+ private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
+
+ ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
+ super(ctx);
+ }
+
+ @Override
+ protected void run(ChannelHandlerContext ctx) {
long nextDelay = readerIdleTimeNanos;
if (!reading) {
- nextDelay -= System.nanoTime() - lastReadTime;
+ nextDelay -= ticksInNanos() - lastReadTime;
}
if (nextDelay <= 0) {
// Reader is idle - set a new timeout and notify the callback.
- readerIdleTimeout =
- ctx.executor().schedule(this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
- try {
- IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, firstReaderIdleEvent);
- if (firstReaderIdleEvent) {
- firstReaderIdleEvent = false;
- }
+ readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
+ boolean first = firstReaderIdleEvent;
+ firstReaderIdleEvent = false;
+
+ try {
+ IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
- readerIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.NANOSECONDS);
+ readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
- private final class WriterIdleTimeoutTask implements Runnable {
-
- private final ChannelHandlerContext ctx;
+ private final class WriterIdleTimeoutTask extends AbstractIdleTask {
WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
- this.ctx = ctx;
+ super(ctx);
}
@Override
- public void run() {
- if (!ctx.channel().isOpen()) {
- return;
- }
+ protected void run(ChannelHandlerContext ctx) {
long lastWriteTime = IdleStateHandler.this.lastWriteTime;
- long nextDelay = writerIdleTimeNanos - (System.nanoTime() - lastWriteTime);
+ long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
if (nextDelay <= 0) {
// Writer is idle - set a new timeout and notify the callback.
- writerIdleTimeout = ctx.executor().schedule(
- this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
+ writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
+
+ boolean first = firstWriterIdleEvent;
+ firstWriterIdleEvent = false;
+
try {
- IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, firstWriterIdleEvent);
- if (firstWriterIdleEvent) {
- firstWriterIdleEvent = false;
+ if (hasOutputChanged(ctx, first)) {
+ return;
}
+ IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Write occurred before the timeout - set a new timeout with shorter delay.
- writerIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.NANOSECONDS);
+ writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
- private final class AllIdleTimeoutTask implements Runnable {
-
- private final ChannelHandlerContext ctx;
+ private final class AllIdleTimeoutTask extends AbstractIdleTask {
AllIdleTimeoutTask(ChannelHandlerContext ctx) {
- this.ctx = ctx;
+ super(ctx);
}
@Override
- public void run() {
- if (!ctx.channel().isOpen()) {
- return;
- }
+ protected void run(ChannelHandlerContext ctx) {
long nextDelay = allIdleTimeNanos;
if (!reading) {
- nextDelay -= System.nanoTime() - Math.max(lastReadTime, lastWriteTime);
+ nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
}
if (nextDelay <= 0) {
// Both reader and writer are idle - set a new timeout and
// notify the callback.
- allIdleTimeout = ctx.executor().schedule(
- this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
+ allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
+
+ boolean first = firstAllIdleEvent;
+ firstAllIdleEvent = false;
+
try {
- IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, firstAllIdleEvent);
- if (firstAllIdleEvent) {
- firstAllIdleEvent = false;
+ if (hasOutputChanged(ctx, first)) {
+ return;
}
+ IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Either read or write occurred before the timeout - set a new
// timeout with shorter delay.
- allIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.NANOSECONDS);
+ allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
Oops, something went wrong.

0 comments on commit 68a941c

Please sign in to comment.