Skip to content

Commit

Permalink
Remove ChannelHandlerInvoker
Browse files Browse the repository at this point in the history
Motivation:

We tried to provide the ability for the user to change the semantics of the threading-model by delegate the invoking of the ChannelHandler to the ChannelHandlerInvoker. Unfortunually this not really worked out quite well and resulted in just more complexity and splitting of code that belongs together. We should remove the ChannelHandlerInvoker again and just do the same as in 4.0

Modifications:

Remove ChannelHandlerInvoker again and replace its usage in Http2MultiplexCodec

Result:

Easier code and less bad abstractions.
  • Loading branch information
normanmaurer committed May 17, 2016
1 parent a56ef03 commit 68cd670
Show file tree
Hide file tree
Showing 20 changed files with 900 additions and 1,569 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandlerInvoker;
import io.netty.channel.ChannelHandlerInvokerUtil;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
Expand Down Expand Up @@ -83,6 +81,7 @@ public final class Http2MultiplexCodec extends ChannelDuplexHandler {
private final List<StreamInfo> streamsToFireChildReadComplete = new ArrayList<StreamInfo>();
private ChannelHandlerContext ctx;
private ChannelHandlerContext http2HandlerCtx;
private volatile Runnable flushTask;

/**
* Construct a new handler whose child channels run in the same event loop as this handler.
Expand Down Expand Up @@ -176,15 +175,74 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
}
}

// Override this to signal it will never throw an exception.
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.fireExceptionCaught(cause);
}

// Override this to signal it will never throw an exception.
@Override
public void flush(ChannelHandlerContext ctx) {
ctx.flush();
}

void flushFromStreamChannel() {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
flush(ctx);
} else {
Runnable task = flushTask;
if (task == null) {
task = flushTask = new Runnable() {
@Override
public void run() {
flush(ctx);
}
};
}
executor.execute(task);
}
}

void writeFromStreamChannel(final Object msg, final boolean flush) {
final ChannelPromise promise = ctx.newPromise();
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
writeFromStreamChannel0(msg, flush, promise);
} else {
try {
executor.execute(new OneTimeTask() {
@Override
public void run() {
writeFromStreamChannel0(msg, flush, promise);
}
});
} catch (Throwable cause) {
promise.setFailure(cause);
}
}
}

private void writeFromStreamChannel0(Object msg, boolean flush, ChannelPromise promise) {
try {
write(ctx, msg, promise);
} catch (Throwable cause) {
promise.tryFailure(cause);
}
if (flush) {
flush(ctx);
}
}

/**
* Processes all {@link Http2Frame}s. {@link Http2StreamFrame}s may only originate in child
* streams.
*/
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (!(msg instanceof Http2Frame)) {
super.write(ctx, msg, promise);
ctx.write(msg, promise);
return;
}
try {
Expand Down Expand Up @@ -280,7 +338,7 @@ public void onStreamActive(Http2Stream stream) {

@Override
public void onStreamClosed(Http2Stream stream) {
final StreamInfo streamInfo = (StreamInfo) stream.getProperty(streamInfoKey);
final StreamInfo streamInfo = stream.getProperty(streamInfoKey);
if (streamInfo != null) {
final EventLoop eventLoop = streamInfo.childChannel.eventLoop();
if (eventLoop.inEventLoop()) {
Expand Down Expand Up @@ -318,8 +376,18 @@ public boolean visit(Http2Stream stream) {
return true;
}
});
} catch (Throwable t) {
ctx.invoker().invokeExceptionCaught(ctx, t);
} catch (final Throwable t) {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
exceptionCaught(ctx, t);
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
exceptionCaught(ctx, t);
}
});
}
}
ctx.fireUserEventTriggered(goAway.duplicate().retain());
}
Expand All @@ -339,7 +407,7 @@ protected void onStreamError(ChannelHandlerContext ctx, Throwable cause,
if (stream == null) {
return;
}
StreamInfo streamInfo = (StreamInfo) stream.getProperty(streamInfoKey);
StreamInfo streamInfo = stream.getProperty(streamInfoKey);
if (streamInfo == null) {
return;
}
Expand All @@ -355,7 +423,7 @@ class FrameListener extends Http2FrameAdapter {
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode)
throws Http2Exception {
Http2Stream stream = http2Handler.connection().stream(streamId);
StreamInfo streamInfo = (StreamInfo) stream.getProperty(streamInfoKey);
StreamInfo streamInfo = stream.getProperty(streamInfoKey);
// Use a user event in order to circumvent read queue.
streamInfo.childChannel.pipeline().fireUserEventTriggered(new DefaultHttp2ResetFrame(errorCode));
}
Expand All @@ -371,15 +439,15 @@ public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
int padding, boolean endOfStream) throws Http2Exception {
Http2Stream stream = http2Handler.connection().stream(streamId);
StreamInfo streamInfo = (StreamInfo) stream.getProperty(streamInfoKey);
StreamInfo streamInfo = stream.getProperty(streamInfoKey);
fireChildReadAndRegister(streamInfo, new DefaultHttp2HeadersFrame(headers, endOfStream, padding));
}

@Override
public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding,
boolean endOfStream) throws Http2Exception {
Http2Stream stream = http2Handler.connection().stream(streamId);
StreamInfo streamInfo = (StreamInfo) stream.getProperty(streamInfoKey);
StreamInfo streamInfo = stream.getProperty(streamInfoKey);
fireChildReadAndRegister(streamInfo, new DefaultHttp2DataFrame(data.retain(), endOfStream, padding));
// We return the bytes in bytesConsumed() once the stream channel consumed the bytes.
return 0;
Expand All @@ -389,7 +457,7 @@ public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int
static final class StreamInfo {
final Http2StreamChannel childChannel;
/**
* {@code true} if stream is in {@link Http2MultiplexCodec#steamsToFireChildReadComplete}.
* {@code true} if stream is in {@link Http2MultiplexCodec#streamsToFireChildReadComplete}.
*/
boolean inStreamsToFireChildReadComplete;

Expand All @@ -413,9 +481,7 @@ final class Http2StreamChannel extends AbstractHttp2StreamChannel {
protected void doClose() throws Exception {
if (!onStreamClosedFired) {
Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(Http2Error.CANCEL).setStream(this);
ChannelHandlerInvoker invoker = ctx.invoker();
invoker.invokeWrite(ctx, resetFrame, ctx.newPromise());
invoker.invokeFlush(ctx);
writeFromStreamChannel(resetFrame, true);
}
super.doClose();
}
Expand All @@ -432,12 +498,13 @@ protected void doWrite(Object msg) {
throw new IllegalArgumentException("Stream must be null on the frame");
}
frame.setStream(this);
ctx.invoker().invokeWrite(ctx, frame, ctx.newPromise());

writeFromStreamChannel(msg, false);
}

@Override
protected void doWriteComplete() {
ctx.invoker().invokeFlush(ctx);
flushFromStreamChannel();
}

@Override
Expand All @@ -464,7 +531,7 @@ private void bytesConsumed0(int bytes) {
try {
http2Handler.connection().local().flowController().consumeBytes(stream, bytes);
} catch (Throwable t) {
ChannelHandlerInvokerUtil.invokeExceptionCaughtNow(ctx, t);
exceptionCaught(ctx, t);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.PlatformDependent;

import java.net.InetSocketAddress;
import java.util.ArrayDeque;
import java.util.Queue;

Expand Down Expand Up @@ -68,7 +69,7 @@ public class Http2MultiplexCodecTest {

@Before
public void setUp() throws Exception {
channel.connect(null);
channel.connect(new InetSocketAddress(0));
channel.pipeline().addLast(serverCodec);
http2HandlerCtx = channel.pipeline().context(serverCodec.connectionHandler());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandlerInvoker;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelProgressivePromise;
import io.netty.channel.ChannelPromise;
Expand Down Expand Up @@ -50,7 +49,7 @@ public EmbeddedChannelWriteReleaseHandlerContext(ByteBufAllocator alloc, Channel
this.alloc = checkNotNull(alloc, "alloc");
this.channel = checkNotNull(channel, "channel");
this.handler = checkNotNull(handler, "handler");
this.eventLoop = checkNotNull(channel.eventLoop(), "eventLoop");
eventLoop = checkNotNull(channel.eventLoop(), "eventLoop");
}

protected abstract void handleException(Throwable t);
Expand All @@ -75,11 +74,6 @@ public EventExecutor executor() {
return eventLoop;
}

@Override
public ChannelHandlerInvoker invoker() {
return eventLoop.asInvoker();
}

@Override
public String name() {
return HANDLER_NAME;
Expand Down
5 changes: 0 additions & 5 deletions transport/src/main/java/io/netty/channel/AbstractChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -431,11 +431,6 @@ public RecvByteBufAllocator.Handle recvBufAllocHandle() {
return recvHandle;
}

@Override
public final ChannelHandlerInvoker invoker() {
return eventLoop().asInvoker();
}

@Override
public final ChannelOutboundBuffer outboundBuffer() {
return outboundBuffer;
Expand Down

0 comments on commit 68cd670

Please sign in to comment.