diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpClientCodec.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpClientCodec.java index c19da6fec48..bc0680bea76 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpClientCodec.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpClientCodec.java @@ -17,11 +17,10 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerAppender; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; +import io.netty.channel.CombinedChannelDuplexHandler; import io.netty.handler.codec.PrematureChannelClosureException; -import io.netty.util.internal.OneTimeTask; import java.util.ArrayDeque; import java.util.List; @@ -42,7 +41,8 @@ * * @see HttpServerCodec */ -public final class HttpClientCodec extends ChannelHandlerAppender implements HttpClientUpgradeHandler.SourceCodec { +public final class HttpClientCodec extends CombinedChannelDuplexHandler + implements HttpClientUpgradeHandler.SourceCodec { /** A queue that is used for correlating a request and a response. */ private final Queue queue = new ArrayDeque(); @@ -83,8 +83,7 @@ public HttpClientCodec( public HttpClientCodec( int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean failOnMissingResponse, boolean validateHeaders) { - add(new Decoder(maxInitialLineLength, maxHeaderSize, maxChunkSize, validateHeaders)); - add(new Encoder()); + init(new Decoder(maxInitialLineLength, maxHeaderSize, maxChunkSize, validateHeaders), new Encoder()); this.failOnMissingResponse = failOnMissingResponse; } @@ -95,36 +94,15 @@ public HttpClientCodec( @Override public void upgradeFrom(ChannelHandlerContext ctx) { final ChannelPipeline p = ctx.pipeline(); - // Remove the decoder later so that the decoder can enter the 'UPGRADED' state and forward the remaining data. - ctx.executor().execute(new OneTimeTask() { - @Override - public void run() { - p.remove(decoder()); - } - }); - p.remove(encoder()); - } - - /** - * Returns the encoder of this codec. - */ - public HttpRequestEncoder encoder() { - return handlerAt(1); - } - - /** - * Returns the decoder of this codec. - */ - public HttpResponseDecoder decoder() { - return handlerAt(0); + p.remove(this); } public void setSingleDecode(boolean singleDecode) { - decoder().setSingleDecode(singleDecode); + inboundHandler().setSingleDecode(singleDecode); } public boolean isSingleDecode() { - return decoder().isSingleDecode(); + return inboundHandler().isSingleDecode(); } private final class Encoder extends HttpRequestEncoder { diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpServerCodec.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpServerCodec.java index ac9086f502d..0a33d3c897f 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpServerCodec.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpServerCodec.java @@ -15,9 +15,8 @@ */ package io.netty.handler.codec.http; -import io.netty.channel.ChannelHandlerAppender; import io.netty.channel.ChannelHandlerContext; - +import io.netty.channel.CombinedChannelDuplexHandler; /** * A combination of {@link HttpRequestDecoder} and {@link HttpResponseEncoder} @@ -25,8 +24,8 @@ * * @see HttpClientCodec */ -public final class HttpServerCodec extends ChannelHandlerAppender implements - HttpServerUpgradeHandler.SourceCodec { +public final class HttpServerCodec extends CombinedChannelDuplexHandler + implements HttpServerUpgradeHandler.SourceCodec { /** * Creates a new instance with the default decoder options @@ -58,21 +57,6 @@ public HttpServerCodec(int maxInitialLineLength, int maxHeaderSize, int maxChunk */ @Override public void upgradeFrom(ChannelHandlerContext ctx) { - ctx.pipeline().remove(HttpRequestDecoder.class); - ctx.pipeline().remove(HttpResponseEncoder.class); - } - - /** - * Returns the encoder of this codec. - */ - public HttpResponseEncoder encoder() { - return handlerAt(1); - } - - /** - * Returns the decoder of this codec. - */ - public HttpRequestDecoder decoder() { - return handlerAt(0); + ctx.pipeline().remove(this); } } diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpCodec.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpCodec.java index 56c0a3066c6..1575b0ddded 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpCodec.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyHttpCodec.java @@ -15,12 +15,12 @@ */ package io.netty.handler.codec.spdy; -import io.netty.channel.ChannelHandlerAppender; +import io.netty.channel.CombinedChannelDuplexHandler; /** * A combination of {@link SpdyHttpDecoder} and {@link SpdyHttpEncoder} */ -public final class SpdyHttpCodec extends ChannelHandlerAppender { +public final class SpdyHttpCodec extends CombinedChannelDuplexHandler { /** * Creates a new instance with the specified decoder options. */ diff --git a/codec-memcache/src/main/java/io/netty/handler/codec/memcache/binary/BinaryMemcacheClientCodec.java b/codec-memcache/src/main/java/io/netty/handler/codec/memcache/binary/BinaryMemcacheClientCodec.java index b7b67eb9fe9..920cd20d2d5 100644 --- a/codec-memcache/src/main/java/io/netty/handler/codec/memcache/binary/BinaryMemcacheClientCodec.java +++ b/codec-memcache/src/main/java/io/netty/handler/codec/memcache/binary/BinaryMemcacheClientCodec.java @@ -16,8 +16,8 @@ package io.netty.handler.codec.memcache.binary; import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerAppender; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.CombinedChannelDuplexHandler; import io.netty.handler.codec.PrematureChannelClosureException; import io.netty.handler.codec.memcache.LastMemcacheContent; @@ -35,7 +35,8 @@ * content, which defaults to 8192. This chunk size is the maximum, so if smaller chunks arrive they * will be passed up the pipeline and not queued up to the chunk size. */ -public final class BinaryMemcacheClientCodec extends ChannelHandlerAppender { +public final class BinaryMemcacheClientCodec extends + CombinedChannelDuplexHandler { private final boolean failOnMissingResponse; private final AtomicLong requestResponseCounter = new AtomicLong(); @@ -64,8 +65,7 @@ public BinaryMemcacheClientCodec(int decodeChunkSize) { */ public BinaryMemcacheClientCodec(int decodeChunkSize, boolean failOnMissingResponse) { this.failOnMissingResponse = failOnMissingResponse; - add(new Decoder(decodeChunkSize)); - add(new Encoder()); + init(new Decoder(decodeChunkSize), new Encoder()); } private final class Encoder extends BinaryMemcacheRequestEncoder { diff --git a/codec-memcache/src/main/java/io/netty/handler/codec/memcache/binary/BinaryMemcacheServerCodec.java b/codec-memcache/src/main/java/io/netty/handler/codec/memcache/binary/BinaryMemcacheServerCodec.java index 1d74710de3a..ffe29284d8f 100644 --- a/codec-memcache/src/main/java/io/netty/handler/codec/memcache/binary/BinaryMemcacheServerCodec.java +++ b/codec-memcache/src/main/java/io/netty/handler/codec/memcache/binary/BinaryMemcacheServerCodec.java @@ -15,7 +15,7 @@ */ package io.netty.handler.codec.memcache.binary; -import io.netty.channel.ChannelHandlerAppender; +import io.netty.channel.CombinedChannelDuplexHandler; /** * The full server codec that combines the correct encoder and decoder. @@ -24,14 +24,14 @@ * Internally, it combines the {@link BinaryMemcacheRequestDecoder} and the * {@link BinaryMemcacheResponseEncoder} to request decoding and response encoding. */ -public class BinaryMemcacheServerCodec extends ChannelHandlerAppender { +public class BinaryMemcacheServerCodec extends + CombinedChannelDuplexHandler { public BinaryMemcacheServerCodec() { this(AbstractBinaryMemcacheDecoder.DEFAULT_MAX_CHUNK_SIZE); } public BinaryMemcacheServerCodec(int decodeChunkSize) { - add(new BinaryMemcacheRequestDecoder(decodeChunkSize)); - add(new BinaryMemcacheResponseEncoder()); + super(new BinaryMemcacheRequestDecoder(decodeChunkSize), new BinaryMemcacheResponseEncoder()); } } diff --git a/handler-proxy/src/main/java/io/netty/handler/proxy/HttpProxyHandler.java b/handler-proxy/src/main/java/io/netty/handler/proxy/HttpProxyHandler.java index 2008f3b1e01..376b319c02a 100644 --- a/handler-proxy/src/main/java/io/netty/handler/proxy/HttpProxyHandler.java +++ b/handler-proxy/src/main/java/io/netty/handler/proxy/HttpProxyHandler.java @@ -101,12 +101,12 @@ protected void addCodec(ChannelHandlerContext ctx) throws Exception { @Override protected void removeEncoder(ChannelHandlerContext ctx) throws Exception { - ctx.pipeline().remove(codec.encoder()); + codec.removeOutboundHandler(); } @Override protected void removeDecoder(ChannelHandlerContext ctx) throws Exception { - ctx.pipeline().remove(codec.decoder()); + codec.removeInboundHandler(); } @Override diff --git a/handler-proxy/src/test/java/io/netty/handler/proxy/HttpProxyServer.java b/handler-proxy/src/test/java/io/netty/handler/proxy/HttpProxyServer.java index 69071ffdf14..1b1d55c32e5 100644 --- a/handler-proxy/src/test/java/io/netty/handler/proxy/HttpProxyServer.java +++ b/handler-proxy/src/test/java/io/netty/handler/proxy/HttpProxyServer.java @@ -83,7 +83,7 @@ private boolean authenticate(ChannelHandlerContext ctx, FullHttpRequest req) { } ctx.pipeline().remove(HttpObjectAggregator.class); - ctx.pipeline().remove(HttpRequestDecoder.class); + ctx.pipeline().get(HttpServerCodec.class).removeInboundHandler(); boolean authzSuccess = false; if (username != null) { @@ -128,7 +128,7 @@ protected boolean handleProxyProtocol(ChannelHandlerContext ctx, Object msg) thr } ctx.write(res); - ctx.pipeline().remove(HttpResponseEncoder.class); + ctx.pipeline().get(HttpServerCodec.class).removeOutboundHandler(); return true; } @@ -158,7 +158,7 @@ protected boolean handleProxyProtocol(ChannelHandlerContext ctx, Object msg) thr } ctx.write(res); - ctx.pipeline().remove(HttpResponseEncoder.class); + ctx.pipeline().get(HttpServerCodec.class).removeOutboundHandler(); if (sendGreeting) { ctx.write(Unpooled.copiedBuffer("0\n", CharsetUtil.US_ASCII)); diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerAppender.java b/transport/src/main/java/io/netty/channel/ChannelHandlerAppender.java deleted file mode 100644 index 798a8c29f82..00000000000 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerAppender.java +++ /dev/null @@ -1,205 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel; - -import java.util.ArrayList; -import java.util.List; - -/** - * A {@link ChannelHandler} that appends the specified {@link ChannelHandler}s right next to itself. - * By default, it removes itself from the {@link ChannelPipeline} once the specified {@link ChannelHandler}s - * are added. Optionally, you can keep it in the {@link ChannelPipeline} by specifying a {@code boolean} - * parameter at construction time. - */ -public class ChannelHandlerAppender extends ChannelInboundHandlerAdapter { - - private static final class Entry { - final String name; - final ChannelHandler handler; - - Entry(String name, ChannelHandler handler) { - this.name = name; - this.handler = handler; - } - } - - private final boolean selfRemoval; - private final List handlers = new ArrayList(); - private boolean added; - - /** - * Creates a new uninitialized instance. A class that extends this handler must invoke - * {@link #add(ChannelHandler...)} before adding this handler into a {@link ChannelPipeline}. - */ - protected ChannelHandlerAppender() { - this(true); - } - - /** - * Creates a new uninitialized instance. A class that extends this handler must invoke - * {@link #add(ChannelHandler...)} before adding this handler into a {@link ChannelPipeline}. - * - * @param selfRemoval {@code true} to remove itself from the {@link ChannelPipeline} after appending - * the {@link ChannelHandler}s specified via {@link #add(ChannelHandler...)}. - */ - protected ChannelHandlerAppender(boolean selfRemoval) { - this.selfRemoval = selfRemoval; - } - - /** - * Creates a new instance that appends the specified {@link ChannelHandler}s right next to itself. - */ - public ChannelHandlerAppender(Iterable handlers) { - this(true, handlers); - } - - /** - * Creates a new instance that appends the specified {@link ChannelHandler}s right next to itself. - */ - public ChannelHandlerAppender(ChannelHandler... handlers) { - this(true, handlers); - } - - /** - * Creates a new instance that appends the specified {@link ChannelHandler}s right next to itself. - * - * @param selfRemoval {@code true} to remove itself from the {@link ChannelPipeline} after appending - * the specified {@link ChannelHandler}s - */ - public ChannelHandlerAppender(boolean selfRemoval, Iterable handlers) { - this.selfRemoval = selfRemoval; - add(handlers); - } - - /** - * Creates a new instance that appends the specified {@link ChannelHandler}s right next to itself. - * - * @param selfRemoval {@code true} to remove itself from the {@link ChannelPipeline} after appending - * the specified {@link ChannelHandler}s - */ - public ChannelHandlerAppender(boolean selfRemoval, ChannelHandler... handlers) { - this.selfRemoval = selfRemoval; - add(handlers); - } - - /** - * Adds the specified handler to the list of the appended handlers. - * - * @param name the name of the appended handler. {@code null} to auto-generate - * @param handler the handler to append - * - * @throws IllegalStateException if {@link ChannelHandlerAppender} has been added to the pipeline already - */ - protected final ChannelHandlerAppender add(String name, ChannelHandler handler) { - if (handler == null) { - throw new NullPointerException("handler"); - } - - if (added) { - throw new IllegalStateException("added to the pipeline already"); - } - - handlers.add(new Entry(name, handler)); - return this; - } - - /** - * Adds the specified handler to the list of the appended handlers with the auto-generated handler name. - * - * @param handler the handler to append - * - * @throws IllegalStateException if {@link ChannelHandlerAppender} has been added to the pipeline already - */ - protected final ChannelHandlerAppender add(ChannelHandler handler) { - return add(null, handler); - } - - /** - * Adds the specified handlers to the list of the appended handlers. The handlers' names are auto-generated. - * - * @throws IllegalStateException if {@link ChannelHandlerAppender} has been added to the pipeline already - */ - protected final ChannelHandlerAppender add(Iterable handlers) { - if (handlers == null) { - throw new NullPointerException("handlers"); - } - - for (ChannelHandler h: handlers) { - if (h == null) { - break; - } - add(h); - } - - return this; - } - - /** - * Adds the specified handlers to the list of the appended handlers. The handlers' names are auto-generated. - * - * @throws IllegalStateException if {@link ChannelHandlerAppender} has been added to the pipeline already - */ - protected final ChannelHandlerAppender add(ChannelHandler... handlers) { - if (handlers == null) { - throw new NullPointerException("handlers"); - } - - for (ChannelHandler h: handlers) { - if (h == null) { - break; - } - - add(h); - } - - return this; - } - - /** - * Returns the {@code index}-th appended handler. - */ - @SuppressWarnings("unchecked") - protected final T handlerAt(int index) { - return (T) handlers.get(index).handler; - } - - @Override - public void handlerAdded(ChannelHandlerContext ctx) throws Exception { - added = true; - - AbstractChannelHandlerContext dctx = (AbstractChannelHandlerContext) ctx; - DefaultChannelPipeline pipeline = (DefaultChannelPipeline) dctx.pipeline(); - String name = dctx.name(); - try { - for (Entry e: handlers) { - String oldName = name; - if (e.name == null) { - name = pipeline.generateName(e.handler); - } else { - name = e.name; - } - - // Note that we do not use dctx.invoker() because it raises an IllegalStateExxception - // if the Channel is not registered yet. - pipeline.addAfter(dctx.invoker, oldName, name, e.handler); - } - } finally { - if (selfRemoval) { - pipeline.remove(this); - } - } - } -} diff --git a/transport/src/main/java/io/netty/channel/CombinedChannelDuplexHandler.java b/transport/src/main/java/io/netty/channel/CombinedChannelDuplexHandler.java index 95ebac6dfcb..4abc4884687 100644 --- a/transport/src/main/java/io/netty/channel/CombinedChannelDuplexHandler.java +++ b/transport/src/main/java/io/netty/channel/CombinedChannelDuplexHandler.java @@ -15,15 +15,28 @@ */ package io.netty.channel; +import io.netty.buffer.ByteBufAllocator; +import io.netty.util.Attribute; +import io.netty.util.AttributeKey; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.internal.OneTimeTask; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + import java.net.SocketAddress; /** - * @deprecated Use {@link ChannelHandlerAppender} instead. + * Combines a {@link ChannelInboundHandler} and a {@link ChannelOutboundHandler} into one {@link ChannelHandler}. */ -@Deprecated public class CombinedChannelDuplexHandler extends ChannelDuplexHandler { + private static final InternalLogger logger = InternalLoggerFactory.getInstance(CombinedChannelDuplexHandler.class); + + private DelegatingChannelHandlerContext inboundCtx; + private DelegatingChannelHandlerContext outboundCtx; + private volatile boolean handlerAdded; + private I inboundHandler; private O outboundHandler; @@ -88,6 +101,28 @@ protected final O outboundHandler() { return outboundHandler; } + private void checkAdded() { + if (!handlerAdded) { + throw new IllegalStateException("handler not added to pipeline yet"); + } + } + + /** + * Removes the {@link ChannelInboundHandler} that was combined in this {@link CombinedChannelDuplexHandler}. + */ + public final void removeInboundHandler() { + checkAdded(); + inboundCtx.remove(); + } + + /** + * Removes the {@link ChannelOutboundHandler} that was combined in this {@link CombinedChannelDuplexHandler}. + */ + public final void removeOutboundHandler() { + checkAdded(); + outboundCtx.remove(); + } + @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { if (inboundHandler == null) { @@ -96,67 +131,151 @@ public void handlerAdded(ChannelHandlerContext ctx) throws Exception { " if " + CombinedChannelDuplexHandler.class.getSimpleName() + " was constructed with the default constructor."); } + + outboundCtx = new DelegatingChannelHandlerContext(ctx, outboundHandler); + inboundCtx = new DelegatingChannelHandlerContext(ctx, inboundHandler) { + @SuppressWarnings("deprecation") + @Override + public ChannelHandlerContext fireExceptionCaught(Throwable cause) { + if (!outboundCtx.removed) { + try { + // We directly delegate to the ChannelOutboundHandler as this may override exceptionCaught(...) + // as well + outboundHandler.exceptionCaught(outboundCtx, cause); + } catch (Throwable error) { + if (logger.isWarnEnabled()) { + logger.warn( + "An exception was thrown by a user handler's " + + "exceptionCaught() method while handling the following exception:", error); + } + } + } else { + super.fireExceptionCaught(cause); + } + return this; + } + }; + + // The inboundCtx and outboundCtx were created and set now it's safe to call removeInboundHandler() and + // removeOutboundHandler(). + handlerAdded = true; + try { - inboundHandler.handlerAdded(ctx); + inboundHandler.handlerAdded(inboundCtx); } finally { - outboundHandler.handlerAdded(ctx); + outboundHandler.handlerAdded(outboundCtx); } } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { try { - inboundHandler.handlerRemoved(ctx); + inboundCtx.remove(); } finally { - outboundHandler.handlerRemoved(ctx); + outboundCtx.remove(); } } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { - inboundHandler.channelRegistered(ctx); + assert ctx == inboundCtx.ctx; + if (!inboundCtx.removed) { + inboundHandler.channelRegistered(inboundCtx); + } else { + inboundCtx.fireChannelRegistered(); + } } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { - inboundHandler.channelUnregistered(ctx); + assert ctx == inboundCtx.ctx; + if (!inboundCtx.removed) { + inboundHandler.channelUnregistered(inboundCtx); + } else { + inboundCtx.fireChannelUnregistered(); + } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { - inboundHandler.channelActive(ctx); + assert ctx == inboundCtx.ctx; + if (!inboundCtx.removed) { + inboundHandler.channelActive(inboundCtx); + } else { + inboundCtx.fireChannelActive(); + } } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { - inboundHandler.channelInactive(ctx); + assert ctx == inboundCtx.ctx; + if (!inboundCtx.removed) { + inboundHandler.channelInactive(inboundCtx); + } else { + inboundCtx.fireChannelInactive(); + } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - inboundHandler.exceptionCaught(ctx, cause); + assert ctx == inboundCtx.ctx; + if (!inboundCtx.removed) { + inboundHandler.exceptionCaught(inboundCtx, cause); + } else { + inboundCtx.fireExceptionCaught(cause); + } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - inboundHandler.userEventTriggered(ctx, evt); + assert ctx == inboundCtx.ctx; + if (!inboundCtx.removed) { + inboundHandler.userEventTriggered(inboundCtx, evt); + } else { + inboundCtx.fireUserEventTriggered(evt); + } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - inboundHandler.channelRead(ctx, msg); + assert ctx == inboundCtx.ctx; + if (!inboundCtx.removed) { + inboundHandler.channelRead(inboundCtx, msg); + } else { + inboundCtx.fireChannelRead(msg); + } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { - inboundHandler.channelReadComplete(ctx); + assert ctx == inboundCtx.ctx; + if (!inboundCtx.removed) { + inboundHandler.channelReadComplete(inboundCtx); + } else { + inboundCtx.fireChannelReadComplete(); + } + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + assert ctx == inboundCtx.ctx; + if (!inboundCtx.removed) { + inboundHandler.channelWritabilityChanged(inboundCtx); + } else { + inboundCtx.fireChannelWritabilityChanged(); + } } @Override public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { - outboundHandler.bind(ctx, localAddress, promise); + assert ctx == outboundCtx.ctx; + if (!outboundCtx.removed) { + outboundHandler.bind(outboundCtx, localAddress, promise); + } else { + outboundCtx.bind(localAddress, promise); + } } @Override @@ -164,41 +283,326 @@ public void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { - outboundHandler.connect(ctx, remoteAddress, localAddress, promise); + assert ctx == outboundCtx.ctx; + if (!outboundCtx.removed) { + outboundHandler.connect(outboundCtx, remoteAddress, localAddress, promise); + } else { + outboundCtx.connect(localAddress, promise); + } } @Override public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { - outboundHandler.disconnect(ctx, promise); + assert ctx == outboundCtx.ctx; + if (!outboundCtx.removed) { + outboundHandler.disconnect(outboundCtx, promise); + } else { + outboundCtx.disconnect(promise); + } } @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { - outboundHandler.close(ctx, promise); + assert ctx == outboundCtx.ctx; + if (!outboundCtx.removed) { + outboundHandler.close(outboundCtx, promise); + } else { + outboundCtx.close(promise); + } } @Override public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { - outboundHandler.deregister(ctx, promise); + assert ctx == outboundCtx.ctx; + if (!outboundCtx.removed) { + outboundHandler.deregister(outboundCtx, promise); + } else { + outboundCtx.deregister(promise); + } } @Override public void read(ChannelHandlerContext ctx) throws Exception { - outboundHandler.read(ctx); + assert ctx == outboundCtx.ctx; + if (!outboundCtx.removed) { + outboundHandler.read(outboundCtx); + } else { + outboundCtx.read(); + } } @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - outboundHandler.write(ctx, msg, promise); + assert ctx == outboundCtx.ctx; + if (!outboundCtx.removed) { + outboundHandler.write(outboundCtx, msg, promise); + } else { + outboundCtx.write(msg, promise); + } } @Override public void flush(ChannelHandlerContext ctx) throws Exception { - outboundHandler.flush(ctx); + assert ctx == outboundCtx.ctx; + if (!outboundCtx.removed) { + outboundHandler.flush(outboundCtx); + } else { + outboundCtx.flush(); + } } - @Override - public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { - inboundHandler.channelWritabilityChanged(ctx); + private static class DelegatingChannelHandlerContext implements ChannelHandlerContext { + + private final ChannelHandlerContext ctx; + private final ChannelHandler handler; + boolean removed; + + DelegatingChannelHandlerContext(ChannelHandlerContext ctx, ChannelHandler handler) { + this.ctx = ctx; + this.handler = handler; + } + + @Override + public Channel channel() { + return ctx.channel(); + } + + @Override + public EventExecutor executor() { + return ctx.executor(); + } + + @Override + public String name() { + return ctx.name(); + } + + @Override + public ChannelHandler handler() { + return ctx.handler(); + } + + @Override + public boolean isRemoved() { + return removed || ctx.isRemoved(); + } + + @Override + public ChannelHandlerContext fireChannelRegistered() { + ctx.fireChannelRegistered(); + return this; + } + + @Override + public ChannelHandlerContext fireChannelUnregistered() { + ctx.fireChannelUnregistered(); + return this; + } + + @Override + public ChannelHandlerContext fireChannelActive() { + ctx.fireChannelActive(); + return this; + } + + @Override + public ChannelHandlerContext fireChannelInactive() { + ctx.fireChannelInactive(); + return this; + } + + @Override + public ChannelHandlerContext fireExceptionCaught(Throwable cause) { + ctx.fireExceptionCaught(cause); + return this; + } + + @Override + public ChannelHandlerContext fireUserEventTriggered(Object event) { + ctx.fireUserEventTriggered(event); + return this; + } + + @Override + public ChannelHandlerContext fireChannelRead(Object msg) { + ctx.fireChannelRead(msg); + return this; + } + + @Override + public ChannelHandlerContext fireChannelReadComplete() { + ctx.fireChannelReadComplete(); + return this; + } + + @Override + public ChannelHandlerContext fireChannelWritabilityChanged() { + ctx.fireChannelWritabilityChanged(); + return this; + } + + @Override + public ChannelFuture bind(SocketAddress localAddress) { + return ctx.bind(localAddress); + } + + @Override + public ChannelFuture connect(SocketAddress remoteAddress) { + return ctx.connect(remoteAddress); + } + + @Override + public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { + return ctx.connect(remoteAddress, localAddress); + } + + @Override + public ChannelFuture disconnect() { + return ctx.disconnect(); + } + + @Override + public ChannelFuture close() { + return ctx.close(); + } + + @Override + public ChannelFuture deregister() { + return ctx.deregister(); + } + + @Override + public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { + return ctx.bind(localAddress, promise); + } + + @Override + public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { + return ctx.connect(remoteAddress, promise); + } + + @Override + public ChannelFuture connect( + SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { + return ctx.connect(remoteAddress, localAddress, promise); + } + + @Override + public ChannelFuture disconnect(ChannelPromise promise) { + return ctx.disconnect(promise); + } + + @Override + public ChannelFuture close(ChannelPromise promise) { + return ctx.close(promise); + } + + @Override + public ChannelFuture deregister(ChannelPromise promise) { + return ctx.deregister(); + } + + @Override + public ChannelHandlerContext read() { + ctx.read(); + return this; + } + + @Override + public ChannelFuture write(Object msg) { + return ctx.write(msg); + } + + @Override + public ChannelFuture write(Object msg, ChannelPromise promise) { + return ctx.write(msg, promise); + } + + @Override + public ChannelHandlerContext flush() { + ctx.flush(); + return this; + } + + @Override + public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { + return ctx.writeAndFlush(msg, promise); + } + + @Override + public ChannelFuture writeAndFlush(Object msg) { + return ctx.writeAndFlush(msg); + } + + @Override + public ChannelPipeline pipeline() { + return ctx.pipeline(); + } + + @Override + public ByteBufAllocator alloc() { + return ctx.alloc(); + } + + @Override + public ChannelPromise newPromise() { + return ctx.newPromise(); + } + + @Override + public ChannelProgressivePromise newProgressivePromise() { + return ctx.newProgressivePromise(); + } + + @Override + public ChannelFuture newSucceededFuture() { + return ctx.newSucceededFuture(); + } + + @Override + public ChannelFuture newFailedFuture(Throwable cause) { + return ctx.newFailedFuture(cause); + } + + @Override + public ChannelPromise voidPromise() { + return ctx.voidPromise(); + } + + @Override + public Attribute attr(AttributeKey key) { + return ctx.attr(key); + } + + @Override + public boolean hasAttr(AttributeKey key) { + return ctx.hasAttr(key); + } + + final void remove() { + EventExecutor executor = executor(); + if (executor.inEventLoop()) { + remove0(); + } else { + executor.execute(new OneTimeTask() { + @Override + public void run() { + remove0(); + } + }); + } + } + + private void remove0() { + if (!removed) { + removed = true; + try { + handler.handlerRemoved(this); + } catch (Throwable cause) { + fireExceptionCaught(new ChannelPipelineException( + handler.getClass().getName() + ".handlerRemoved() has thrown an exception.", cause)); + } + } + } } } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index 6a45822b6e4..921a7e33dab 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -337,7 +337,7 @@ private ChannelHandlerInvoker findInvoker(EventExecutorGroup group) { return invoker; } - String generateName(ChannelHandler handler) { + private String generateName(ChannelHandler handler) { Map, String> cache = nameCaches.get(); Class handlerType = handler.getClass(); String name = cache.get(handlerType); @@ -416,7 +416,7 @@ public void run() { return context; } - void remove0(AbstractChannelHandlerContext ctx) { + private void remove0(AbstractChannelHandlerContext ctx) { AbstractChannelHandlerContext prev = ctx.prev; AbstractChannelHandlerContext next = ctx.next; prev.next = next; diff --git a/transport/src/test/java/io/netty/channel/CombinedChannelDuplexHandlerTest.java b/transport/src/test/java/io/netty/channel/CombinedChannelDuplexHandlerTest.java new file mode 100644 index 00000000000..0bcf0a7a156 --- /dev/null +++ b/transport/src/test/java/io/netty/channel/CombinedChannelDuplexHandlerTest.java @@ -0,0 +1,324 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel; + +import io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Test; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.ArrayDeque; +import java.util.Queue; + +import static org.junit.Assert.*; + +public class CombinedChannelDuplexHandlerTest { + + private static final Object MSG = new Object(); + private static final SocketAddress ADDRESS = new InetSocketAddress(0); + + private enum Event { + REGISTERED, + UNREGISTERED, + ACTIVE, + INACTIVE, + CHANNEL_READ, + CHANNEL_READ_COMPLETE, + EXCEPTION_CAUGHT, + USER_EVENT_TRIGGERED, + CHANNEL_WRITABILITY_CHANGED, + HANDLER_ADDED, + HANDLER_REMOVED, + BIND, + CONNECT, + WRITE, + FLUSH, + READ, + REGISTER, + DEREGISTER, + CLOSE, + DISCONNECT + } + + @Test(expected = IllegalStateException.class) + public void testInboundRemoveBeforeAdded() { + CombinedChannelDuplexHandler handler = + new CombinedChannelDuplexHandler( + new ChannelInboundHandlerAdapter(), new ChannelOutboundHandlerAdapter()); + handler.removeInboundHandler(); + } + + @Test(expected = IllegalStateException.class) + public void testOutboundRemoveBeforeAdded() { + CombinedChannelDuplexHandler handler = + new CombinedChannelDuplexHandler( + new ChannelInboundHandlerAdapter(), new ChannelOutboundHandlerAdapter()); + handler.removeOutboundHandler(); + } + + @Test(expected = IllegalArgumentException.class) + public void testInboundHandlerImplementsOutboundHandler() { + new CombinedChannelDuplexHandler( + new ChannelDuplexHandler(), new ChannelOutboundHandlerAdapter()); + } + + @Test(expected = IllegalArgumentException.class) + public void testOutboundHandlerImplementsInbboundHandler() { + new CombinedChannelDuplexHandler( + new ChannelInboundHandlerAdapter(), new ChannelDuplexHandler()); + } + + @Test(expected = IllegalStateException.class) + public void testInitNotCalledBeforeAdded() throws Exception { + CombinedChannelDuplexHandler handler = + new CombinedChannelDuplexHandler() { }; + handler.handlerAdded(null); + } + + @Test + public void testExceptionCaughtBothCombinedHandlers() { + final Exception exception = new Exception(); + final Queue queue = new ArrayDeque(); + + ChannelInboundHandler inboundHandler = new ChannelInboundHandlerAdapter() { + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + assertSame(exception, cause); + queue.add(this); + ctx.fireExceptionCaught(cause); + } + }; + ChannelOutboundHandler outboundHandler = new ChannelOutboundHandlerAdapter() { + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + assertSame(exception, cause); + queue.add(this); + ctx.fireExceptionCaught(cause); + } + }; + ChannelInboundHandler lastHandler = new ChannelInboundHandlerAdapter() { + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + assertSame(exception, cause); + queue.add(this); + } + }; + EmbeddedChannel channel = new EmbeddedChannel( + new CombinedChannelDuplexHandler( + inboundHandler, outboundHandler), lastHandler); + channel.pipeline().fireExceptionCaught(exception); + assertFalse(channel.finish()); + assertSame(inboundHandler, queue.poll()); + assertSame(outboundHandler, queue.poll()); + assertSame(lastHandler, queue.poll()); + assertTrue(queue.isEmpty()); + } + + @Test + public void testInboundEvents() { + final Queue queue = new ArrayDeque(); + + ChannelInboundHandler inboundHandler = new ChannelInboundHandlerAdapter() { + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + queue.add(Event.HANDLER_ADDED); + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + queue.add(Event.HANDLER_REMOVED); + } + + @Override + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + queue.add(Event.REGISTERED); + } + + @Override + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + queue.add(Event.UNREGISTERED); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + queue.add(Event.ACTIVE); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + queue.add(Event.INACTIVE); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + queue.add(Event.CHANNEL_READ); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + queue.add(Event.CHANNEL_READ_COMPLETE); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + queue.add(Event.USER_EVENT_TRIGGERED); + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + queue.add(Event.CHANNEL_WRITABILITY_CHANGED); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + queue.add(Event.EXCEPTION_CAUGHT); + } + }; + + CombinedChannelDuplexHandler handler = + new CombinedChannelDuplexHandler( + inboundHandler, new ChannelOutboundHandlerAdapter()); + + EmbeddedChannel channel = new EmbeddedChannel(handler); + channel.pipeline().fireChannelWritabilityChanged(); + channel.pipeline().fireUserEventTriggered(MSG); + channel.pipeline().fireChannelRead(MSG); + channel.pipeline().fireChannelReadComplete(); + + assertEquals(Event.HANDLER_ADDED, queue.poll()); + assertEquals(Event.REGISTERED, queue.poll()); + assertEquals(Event.ACTIVE, queue.poll()); + assertEquals(Event.CHANNEL_WRITABILITY_CHANGED, queue.poll()); + assertEquals(Event.USER_EVENT_TRIGGERED, queue.poll()); + assertEquals(Event.CHANNEL_READ, queue.poll()); + assertEquals(Event.CHANNEL_READ_COMPLETE, queue.poll()); + + handler.removeInboundHandler(); + assertEquals(Event.HANDLER_REMOVED, queue.poll()); + + // These should not be handled by the inboundHandler anymore as it was removed before + channel.pipeline().fireChannelWritabilityChanged(); + channel.pipeline().fireUserEventTriggered(MSG); + channel.pipeline().fireChannelRead(MSG); + channel.pipeline().fireChannelReadComplete(); + + // Should have not received any more events as it was removed before via removeInboundHandler() + assertTrue(queue.isEmpty()); + assertTrue(channel.finish()); + assertTrue(queue.isEmpty()); + } + + @Test + public void testOutboundEvents() { + final Queue queue = new ArrayDeque(); + + ChannelInboundHandler inboundHandler = new ChannelInboundHandlerAdapter(); + ChannelOutboundHandler outboundHandler = new ChannelOutboundHandlerAdapter() { + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + queue.add(Event.HANDLER_ADDED); + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + queue.add(Event.HANDLER_REMOVED); + } + + @Override + public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) + throws Exception { + queue.add(Event.BIND); + } + + @Override + public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, + SocketAddress localAddress, ChannelPromise promise) throws Exception { + queue.add(Event.CONNECT); + } + + @Override + public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + queue.add(Event.DISCONNECT); + } + + @Override + public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + queue.add(Event.CLOSE); + } + + @Override + public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + queue.add(Event.DEREGISTER); + } + + @Override + public void read(ChannelHandlerContext ctx) throws Exception { + queue.add(Event.READ); + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + queue.add(Event.WRITE); + } + + @Override + public void flush(ChannelHandlerContext ctx) throws Exception { + queue.add(Event.FLUSH); + } + }; + + CombinedChannelDuplexHandler handler = + new CombinedChannelDuplexHandler( + inboundHandler, outboundHandler); + + EmbeddedChannel channel = new EmbeddedChannel(); + channel.pipeline().addFirst(handler); + + doOutboundOperations(channel); + + assertEquals(Event.HANDLER_ADDED, queue.poll()); + assertEquals(Event.BIND, queue.poll()); + assertEquals(Event.CONNECT, queue.poll()); + assertEquals(Event.WRITE, queue.poll()); + assertEquals(Event.FLUSH, queue.poll()); + assertEquals(Event.READ, queue.poll()); + assertEquals(Event.CLOSE, queue.poll()); + assertEquals(Event.CLOSE, queue.poll()); + assertEquals(Event.DEREGISTER, queue.poll()); + + handler.removeOutboundHandler(); + assertEquals(Event.HANDLER_REMOVED, queue.poll()); + + // These should not be handled by the inboundHandler anymore as it was removed before + doOutboundOperations(channel); + + // Should have not received any more events as it was removed before via removeInboundHandler() + assertTrue(queue.isEmpty()); + assertTrue(channel.finish()); + assertTrue(queue.isEmpty()); + } + + private static void doOutboundOperations(Channel channel) { + channel.pipeline().bind(ADDRESS); + channel.pipeline().connect(ADDRESS); + channel.pipeline().write(MSG); + channel.pipeline().flush(); + channel.pipeline().read(); + channel.pipeline().disconnect(); + channel.pipeline().close(); + channel.pipeline().deregister(); + } +}