Skip to content

Commit

Permalink
Let CombinedChannelDuplexHandler correctly handle exceptionCaught. Re…
Browse files Browse the repository at this point in the history
…lated to [#4528]

Motivation:

ChannelInboundHandler and ChannelOutboundHandler both can implement exceptionCaught(...) method and so we need to dispatch to both of them.

Modifications:

- Correctly first dispatch exceptionCaught to the ChannelInboundHandler but also make sure the next handler it will be dispatched to will be the ChannelOutboundHandler
- Add removeInboundHandler() and removeOutboundHandler() which allows to remove one of the combined handlers
- Let *Codec extends it and not ChannelHandlerAppender
- Remove ChannelHandlerAppender

Result:

Correctly handle events and also have same behavior as in 4.0
  • Loading branch information
normanmaurer committed Jan 18, 2016
1 parent 9e76b53 commit e969b69
Show file tree
Hide file tree
Showing 11 changed files with 781 additions and 296 deletions.
Expand Up @@ -17,11 +17,10 @@

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerAppender;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.CombinedChannelDuplexHandler;
import io.netty.handler.codec.PrematureChannelClosureException;
import io.netty.util.internal.OneTimeTask;

import java.util.ArrayDeque;
import java.util.List;
Expand All @@ -42,7 +41,8 @@
*
* @see HttpServerCodec
*/
public final class HttpClientCodec extends ChannelHandlerAppender implements HttpClientUpgradeHandler.SourceCodec {
public final class HttpClientCodec extends CombinedChannelDuplexHandler<HttpResponseDecoder, HttpRequestEncoder>
implements HttpClientUpgradeHandler.SourceCodec {

/** A queue that is used for correlating a request and a response. */
private final Queue<HttpMethod> queue = new ArrayDeque<HttpMethod>();
Expand Down Expand Up @@ -83,8 +83,7 @@ public HttpClientCodec(
public HttpClientCodec(
int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean failOnMissingResponse,
boolean validateHeaders) {
add(new Decoder(maxInitialLineLength, maxHeaderSize, maxChunkSize, validateHeaders));
add(new Encoder());
init(new Decoder(maxInitialLineLength, maxHeaderSize, maxChunkSize, validateHeaders), new Encoder());
this.failOnMissingResponse = failOnMissingResponse;
}

Expand All @@ -95,36 +94,15 @@ public HttpClientCodec(
@Override
public void upgradeFrom(ChannelHandlerContext ctx) {
final ChannelPipeline p = ctx.pipeline();
// Remove the decoder later so that the decoder can enter the 'UPGRADED' state and forward the remaining data.
ctx.executor().execute(new OneTimeTask() {
@Override
public void run() {
p.remove(decoder());
}
});
p.remove(encoder());
}

/**
* Returns the encoder of this codec.
*/
public HttpRequestEncoder encoder() {
return handlerAt(1);
}

/**
* Returns the decoder of this codec.
*/
public HttpResponseDecoder decoder() {
return handlerAt(0);
p.remove(this);
}

public void setSingleDecode(boolean singleDecode) {
decoder().setSingleDecode(singleDecode);
inboundHandler().setSingleDecode(singleDecode);
}

public boolean isSingleDecode() {
return decoder().isSingleDecode();
return inboundHandler().isSingleDecode();
}

private final class Encoder extends HttpRequestEncoder {
Expand Down
Expand Up @@ -15,18 +15,17 @@
*/
package io.netty.handler.codec.http;

import io.netty.channel.ChannelHandlerAppender;
import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.CombinedChannelDuplexHandler;

/**
* A combination of {@link HttpRequestDecoder} and {@link HttpResponseEncoder}
* which enables easier server side HTTP implementation.
*
* @see HttpClientCodec
*/
public final class HttpServerCodec extends ChannelHandlerAppender implements
HttpServerUpgradeHandler.SourceCodec {
public final class HttpServerCodec extends CombinedChannelDuplexHandler<HttpRequestDecoder, HttpResponseEncoder>
implements HttpServerUpgradeHandler.SourceCodec {

/**
* Creates a new instance with the default decoder options
Expand Down Expand Up @@ -58,21 +57,6 @@ public HttpServerCodec(int maxInitialLineLength, int maxHeaderSize, int maxChunk
*/
@Override
public void upgradeFrom(ChannelHandlerContext ctx) {
ctx.pipeline().remove(HttpRequestDecoder.class);
ctx.pipeline().remove(HttpResponseEncoder.class);
}

/**
* Returns the encoder of this codec.
*/
public HttpResponseEncoder encoder() {
return handlerAt(1);
}

/**
* Returns the decoder of this codec.
*/
public HttpRequestDecoder decoder() {
return handlerAt(0);
ctx.pipeline().remove(this);
}
}
Expand Up @@ -15,12 +15,12 @@
*/
package io.netty.handler.codec.spdy;

import io.netty.channel.ChannelHandlerAppender;
import io.netty.channel.CombinedChannelDuplexHandler;

/**
* A combination of {@link SpdyHttpDecoder} and {@link SpdyHttpEncoder}
*/
public final class SpdyHttpCodec extends ChannelHandlerAppender {
public final class SpdyHttpCodec extends CombinedChannelDuplexHandler<SpdyHttpDecoder, SpdyHttpEncoder> {
/**
* Creates a new instance with the specified decoder options.
*/
Expand Down
Expand Up @@ -16,8 +16,8 @@
package io.netty.handler.codec.memcache.binary;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerAppender;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.CombinedChannelDuplexHandler;
import io.netty.handler.codec.PrematureChannelClosureException;
import io.netty.handler.codec.memcache.LastMemcacheContent;

Expand All @@ -35,7 +35,8 @@
* content, which defaults to 8192. This chunk size is the maximum, so if smaller chunks arrive they
* will be passed up the pipeline and not queued up to the chunk size.
*/
public final class BinaryMemcacheClientCodec extends ChannelHandlerAppender {
public final class BinaryMemcacheClientCodec extends
CombinedChannelDuplexHandler<BinaryMemcacheResponseDecoder, BinaryMemcacheRequestEncoder> {

private final boolean failOnMissingResponse;
private final AtomicLong requestResponseCounter = new AtomicLong();
Expand Down Expand Up @@ -64,8 +65,7 @@ public BinaryMemcacheClientCodec(int decodeChunkSize) {
*/
public BinaryMemcacheClientCodec(int decodeChunkSize, boolean failOnMissingResponse) {
this.failOnMissingResponse = failOnMissingResponse;
add(new Decoder(decodeChunkSize));
add(new Encoder());
init(new Decoder(decodeChunkSize), new Encoder());
}

private final class Encoder extends BinaryMemcacheRequestEncoder {
Expand Down
Expand Up @@ -15,7 +15,7 @@
*/
package io.netty.handler.codec.memcache.binary;

import io.netty.channel.ChannelHandlerAppender;
import io.netty.channel.CombinedChannelDuplexHandler;

/**
* The full server codec that combines the correct encoder and decoder.
Expand All @@ -24,14 +24,14 @@
* Internally, it combines the {@link BinaryMemcacheRequestDecoder} and the
* {@link BinaryMemcacheResponseEncoder} to request decoding and response encoding.
*/
public class BinaryMemcacheServerCodec extends ChannelHandlerAppender {
public class BinaryMemcacheServerCodec extends
CombinedChannelDuplexHandler<BinaryMemcacheRequestDecoder, BinaryMemcacheResponseEncoder> {

public BinaryMemcacheServerCodec() {
this(AbstractBinaryMemcacheDecoder.DEFAULT_MAX_CHUNK_SIZE);
}

public BinaryMemcacheServerCodec(int decodeChunkSize) {
add(new BinaryMemcacheRequestDecoder(decodeChunkSize));
add(new BinaryMemcacheResponseEncoder());
super(new BinaryMemcacheRequestDecoder(decodeChunkSize), new BinaryMemcacheResponseEncoder());
}
}
Expand Up @@ -101,12 +101,12 @@ protected void addCodec(ChannelHandlerContext ctx) throws Exception {

@Override
protected void removeEncoder(ChannelHandlerContext ctx) throws Exception {
ctx.pipeline().remove(codec.encoder());
codec.removeOutboundHandler();
}

@Override
protected void removeDecoder(ChannelHandlerContext ctx) throws Exception {
ctx.pipeline().remove(codec.decoder());
codec.removeInboundHandler();
}

@Override
Expand Down
Expand Up @@ -83,7 +83,7 @@ private boolean authenticate(ChannelHandlerContext ctx, FullHttpRequest req) {
}

ctx.pipeline().remove(HttpObjectAggregator.class);
ctx.pipeline().remove(HttpRequestDecoder.class);
ctx.pipeline().get(HttpServerCodec.class).removeInboundHandler();

boolean authzSuccess = false;
if (username != null) {
Expand Down Expand Up @@ -128,7 +128,7 @@ protected boolean handleProxyProtocol(ChannelHandlerContext ctx, Object msg) thr
}

ctx.write(res);
ctx.pipeline().remove(HttpResponseEncoder.class);
ctx.pipeline().get(HttpServerCodec.class).removeOutboundHandler();
return true;
}

Expand Down Expand Up @@ -158,7 +158,7 @@ protected boolean handleProxyProtocol(ChannelHandlerContext ctx, Object msg) thr
}

ctx.write(res);
ctx.pipeline().remove(HttpResponseEncoder.class);
ctx.pipeline().get(HttpServerCodec.class).removeOutboundHandler();

if (sendGreeting) {
ctx.write(Unpooled.copiedBuffer("0\n", CharsetUtil.US_ASCII));
Expand Down

0 comments on commit e969b69

Please sign in to comment.