Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Http2ConnectionHandler to allow decoupling close(..) from GOAWAY graceful close #9094

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.netty.handler.codec.http2;

import io.netty.channel.Channel;
import io.netty.handler.codec.http2.Http2HeadersEncoder.SensitivityDetector;
import io.netty.util.internal.UnstableApi;

Expand Down Expand Up @@ -83,6 +84,7 @@ public abstract class AbstractHttp2ConnectionHandlerBuilder<T extends Http2Conne
private Http2Settings initialSettings = Http2Settings.defaultSettings();
private Http2FrameListener frameListener;
private long gracefulShutdownTimeoutMillis = Http2CodecUtil.DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_MILLIS;
private boolean decoupleCloseAndGoAway;

// The property that will prohibit connection() and codec() if set by server(),
// because this property is used only when this builder creates a Http2Connection.
Expand Down Expand Up @@ -401,6 +403,24 @@ protected boolean isAutoAckSettingsFrame() {
return autoAckSettingsFrame;
}

/**
* Determine if the {@link Channel#close()} should be coupled with goaway and graceful close.
* @param decoupleCloseAndGoAway {@code true} to make {@link Channel#close()} directly close the underlying
* transport, and not attempt graceful closure via GOAWAY.
* @return {@code this}.
*/
protected B decoupleCloseAndGoAway(boolean decoupleCloseAndGoAway) {
this.decoupleCloseAndGoAway = decoupleCloseAndGoAway;
return self();
}

/**
* Determine if the {@link Channel#close()} should be coupled with goaway and graceful close.
*/
protected boolean decoupleCloseAndGoAway() {
return decoupleCloseAndGoAway;
}

/**
* Create a new {@link Http2ConnectionHandler}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,22 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
private final Http2ConnectionDecoder decoder;
private final Http2ConnectionEncoder encoder;
private final Http2Settings initialSettings;
private final boolean decoupleCloseAndGoAway;
private ChannelFutureListener closeListener;
private BaseDecoder byteDecoder;
private long gracefulShutdownTimeoutMillis;

protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
Http2Settings initialSettings) {
this(decoder, encoder, initialSettings, false);
}

protected Http2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
Http2Settings initialSettings, boolean decoupleCloseAndGoAway) {
this.initialSettings = checkNotNull(initialSettings, "initialSettings");
this.decoder = checkNotNull(decoder, "decoder");
this.encoder = checkNotNull(encoder, "encoder");
this.decoupleCloseAndGoAway = decoupleCloseAndGoAway;
if (encoder.connection() != decoder.connection()) {
throw new IllegalArgumentException("Encoder and Decoder do not share the same connection object");
}
Expand Down Expand Up @@ -449,6 +456,10 @@ public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws

@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
if (decoupleCloseAndGoAway) {
ctx.close(promise);
return;
}
promise = promise.unvoid();
// Avoid NotYetConnectedException
if (!ctx.channel().isActive()) {
Expand All @@ -461,22 +472,36 @@ public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exce
// a GO_AWAY has been sent we send a empty buffer just so we can wait to close until all other data has been
// flushed to the OS.
// https://github.com/netty/netty/issues/5307
final ChannelFuture future = connection().goAwaySent() ? ctx.write(EMPTY_BUFFER) : goAway(ctx, null);
ChannelFuture f = connection().goAwaySent() ? ctx.write(EMPTY_BUFFER) : goAway(ctx, null, ctx.newPromise());
ctx.flush();
doGracefulShutdown(ctx, future, promise);
doGracefulShutdown(ctx, f, promise);
}

private void doGracefulShutdown(ChannelHandlerContext ctx, ChannelFuture future, ChannelPromise promise) {
private void doGracefulShutdown(ChannelHandlerContext ctx, ChannelFuture future, final ChannelPromise promise) {
if (isGracefulShutdownComplete()) {
// If there are no active streams, close immediately after the GO_AWAY write completes.
future.addListener(new ClosingChannelFutureListener(ctx, promise));
} else {
// If there are active streams we should wait until they are all closed before closing the connection.
if (gracefulShutdownTimeoutMillis < 0) {
closeListener = new ClosingChannelFutureListener(ctx, promise);
} else {
closeListener = new ClosingChannelFutureListener(ctx, promise,
gracefulShutdownTimeoutMillis, MILLISECONDS);
final ClosingChannelFutureListener tmp = gracefulShutdownTimeoutMillis < 0 ?
new ClosingChannelFutureListener(ctx, promise) :
new ClosingChannelFutureListener(ctx, promise, gracefulShutdownTimeoutMillis, MILLISECONDS);
// The ClosingChannelFutureListener will cascade promise completion. We need to always notify the
// new ClosingChannelFutureListener when the graceful close completes if the promise is not null.
if (closeListener == null) {
closeListener = tmp;
} else if (promise != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To confirm I understand: this is a bug fix that was just noticed when messing with this PR. It isn't necessary for the new feature (any more than previously).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A null promise was possible but wasn't likely (e.g. would have to come from the pipeline, or from the ChanntHandlerContext promise factory). However now we are explicitly passing null when we don't care about being notified when the operation completes (e.g. after we write a GOAWAY we don't really care about knowing when graceful close completes).

final ChannelFutureListener oldCloseListener = closeListener;
closeListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
try {
oldCloseListener.operationComplete(future);
} finally {
tmp.operationComplete(future);
}
}
};
}
}
}
Expand Down Expand Up @@ -636,14 +661,11 @@ protected void onConnectionError(ChannelHandlerContext ctx, boolean outbound,
}

ChannelPromise promise = ctx.newPromise();
ChannelFuture future = goAway(ctx, http2Ex);
switch (http2Ex.shutdownHint()) {
case GRACEFUL_SHUTDOWN:
ChannelFuture future = goAway(ctx, http2Ex, ctx.newPromise());
if (http2Ex.shutdownHint() == Http2Exception.ShutdownHint.GRACEFUL_SHUTDOWN) {
doGracefulShutdown(ctx, future, promise);
break;
default:
} else {
future.addListener(new ClosingChannelFutureListener(ctx, promise));
break;
}
}

Expand Down Expand Up @@ -814,6 +836,12 @@ public void operationComplete(ChannelFuture future) throws Exception {
}
});
}
// if closeListener != null this means we have already initiated graceful closure. doGracefulShutdown will apply
// the gracefulShutdownTimeoutMillis on each invocation, however we only care to apply the timeout on the
// start of graceful shutdown.
if (errorCode == NO_ERROR.code() && closeListener == null) {
doGracefulShutdown(ctx, future, null);
}

return future;
}
Expand Down Expand Up @@ -842,10 +870,10 @@ private void checkCloseConnection(ChannelFuture future) {
* Close the remote endpoint with with a {@code GO_AWAY} frame. Does <strong>not</strong> flush
* immediately, this is the responsibility of the caller.
*/
private ChannelFuture goAway(ChannelHandlerContext ctx, Http2Exception cause) {
private ChannelFuture goAway(ChannelHandlerContext ctx, Http2Exception cause, ChannelPromise promise) {
long errorCode = cause != null ? cause.error().code() : NO_ERROR.code();
int lastKnownStream = connection().remote().lastStreamCreated();
return goAway(ctx, lastKnownStream, errorCode, Http2CodecUtil.toByteBuf(ctx, cause), ctx.newPromise());
return goAway(ctx, lastKnownStream, errorCode, Http2CodecUtil.toByteBuf(ctx, cause), promise);
}

private void processRstStreamWriteResult(ChannelHandlerContext ctx, Http2Stream stream, ChannelFuture future) {
Expand Down Expand Up @@ -917,17 +945,25 @@ private static final class ClosingChannelFutureListener implements ChannelFuture
timeoutTask = ctx.executor().schedule(new Runnable() {
@Override
public void run() {
ctx.close(promise);
doClose();
}
}, timeout, unit);
}

@Override
public void operationComplete(ChannelFuture sentGoAwayFuture) throws Exception {
public void operationComplete(ChannelFuture sentGoAwayFuture) {
if (timeoutTask != null) {
timeoutTask.cancel(false);
}
ctx.close(promise);
doClose();
}

private void doClose() {
if (promise == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can promise now be null? I don't see why this is done.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see it now, on line 844.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yip this class (e.g. ClosingChannelFutureListener) is a bit overloaded as it manages 3 things: promise propagation (optional), timer management (optional), and connection closure. I debated breaking it up ... but decided to just leave it as is for now due to its internal nature and easy to change if necessary.

ctx.close();
} else {
ctx.close(promise);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ public Http2ConnectionHandlerBuilder initialHuffmanDecodeCapacity(int initialHuf
return super.initialHuffmanDecodeCapacity(initialHuffmanDecodeCapacity);
}

@Override
public Http2ConnectionHandlerBuilder decoupleCloseAndGoAway(boolean decoupleCloseAndGoAway) {
return super.decoupleCloseAndGoAway(decoupleCloseAndGoAway);
}

@Override
public Http2ConnectionHandler build() {
return super.build();
Expand All @@ -100,6 +105,6 @@ public Http2ConnectionHandler build() {
@Override
protected Http2ConnectionHandler build(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
Http2Settings initialSettings) {
return new Http2ConnectionHandler(decoder, encoder, initialSettings);
return new Http2ConnectionHandler(decoder, encoder, initialSettings, decoupleCloseAndGoAway());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,9 @@ public class Http2FrameCodec extends Http2ConnectionHandler {
private final IntObjectMap<DefaultHttp2FrameStream> frameStreamToInitializeMap =
new IntObjectHashMap<DefaultHttp2FrameStream>(8);

Http2FrameCodec(Http2ConnectionEncoder encoder, Http2ConnectionDecoder decoder, Http2Settings initialSettings) {
super(decoder, encoder, initialSettings);
Http2FrameCodec(Http2ConnectionEncoder encoder, Http2ConnectionDecoder decoder, Http2Settings initialSettings,
boolean decoupleCloseAndGoAway) {
super(decoder, encoder, initialSettings, decoupleCloseAndGoAway);

decoder.frameListener(new FrameListener());
connection().addListener(new ConnectionListener());
Expand Down Expand Up @@ -502,7 +503,7 @@ protected final void onStreamError(ChannelHandlerContext ctx, boolean outbound,
void onHttp2UnknownStreamError(@SuppressWarnings("unused") ChannelHandlerContext ctx, Throwable cause,
Http2Exception.StreamException streamException) {
// Just log....
LOG.warn("Stream exception thrown for unkown stream {}.", streamException.streamId(), cause);
LOG.warn("Stream exception thrown for unknown stream {}.", streamException.streamId(), cause);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public class Http2FrameCodecBuilder extends

Http2FrameCodecBuilder(boolean server) {
server(server);
// For backwards compatibility we should disable to timeout by default at this layer.
gracefulShutdownTimeoutMillis(0);
}

/**
Expand Down Expand Up @@ -139,6 +141,11 @@ public Http2FrameCodecBuilder initialHuffmanDecodeCapacity(int initialHuffmanDec
return super.initialHuffmanDecodeCapacity(initialHuffmanDecodeCapacity);
}

@Override
public Http2FrameCodecBuilder decoupleCloseAndGoAway(boolean decoupleCloseAndGoAway) {
return super.decoupleCloseAndGoAway(decoupleCloseAndGoAway);
}

/**
* Build a {@link Http2FrameCodec} object.
*/
Expand Down Expand Up @@ -173,6 +180,8 @@ public Http2FrameCodec build() {
@Override
protected Http2FrameCodec build(
Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder, Http2Settings initialSettings) {
return new Http2FrameCodec(encoder, decoder, initialSettings);
Http2FrameCodec codec = new Http2FrameCodec(encoder, decoder, initialSettings, decoupleCloseAndGoAway());
codec.gracefulShutdownTimeoutMillis(gracefulShutdownTimeoutMillis());
return codec;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ public Handle newHandle() {
Http2ConnectionDecoder decoder,
Http2Settings initialSettings,
ChannelHandler inboundStreamHandler,
ChannelHandler upgradeStreamHandler) {
super(encoder, decoder, initialSettings);
ChannelHandler upgradeStreamHandler, boolean decoupleCloseAndGoAway) {
super(encoder, decoder, initialSettings, decoupleCloseAndGoAway);
this.inboundStreamHandler = inboundStreamHandler;
this.upgradeStreamHandler = upgradeStreamHandler;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class Http2MultiplexCodecBuilder
Http2MultiplexCodecBuilder(boolean server, ChannelHandler childHandler) {
server(server);
this.childHandler = checkSharable(checkNotNull(childHandler, "childHandler"));
// For backwards compatibility we should disable to timeout by default at this layer.
gracefulShutdownTimeoutMillis(0);
}

private static ChannelHandler checkSharable(ChannelHandler handler) {
Expand Down Expand Up @@ -71,6 +73,14 @@ public static Http2MultiplexCodecBuilder forServer(ChannelHandler childHandler)
return new Http2MultiplexCodecBuilder(true, childHandler);
}

public Http2MultiplexCodecBuilder withUpgradeStreamHandler(ChannelHandler upgradeStreamHandler) {
if (this.isServer()) {
throw new IllegalArgumentException("Server codecs don't use an extra handler for the upgrade stream");
}
this.upgradeStreamHandler = upgradeStreamHandler;
return this;
}

@Override
public Http2Settings initialSettings() {
return super.initialSettings();
Expand All @@ -91,14 +101,6 @@ public Http2MultiplexCodecBuilder gracefulShutdownTimeoutMillis(long gracefulShu
return super.gracefulShutdownTimeoutMillis(gracefulShutdownTimeoutMillis);
}

public Http2MultiplexCodecBuilder withUpgradeStreamHandler(ChannelHandler upgradeStreamHandler) {
if (this.isServer()) {
throw new IllegalArgumentException("Server codecs don't use an extra handler for the upgrade stream");
}
this.upgradeStreamHandler = upgradeStreamHandler;
return this;
}

@Override
public boolean isServer() {
return super.isServer();
Expand Down Expand Up @@ -170,6 +172,11 @@ public Http2MultiplexCodecBuilder autoAckSettingsFrame(boolean autoAckSettings)
return super.autoAckSettingsFrame(autoAckSettings);
}

@Override
public Http2MultiplexCodecBuilder decoupleCloseAndGoAway(boolean decoupleCloseAndGoAway) {
return super.decoupleCloseAndGoAway(decoupleCloseAndGoAway);
}

@Override
public Http2MultiplexCodec build() {
Http2FrameWriter frameWriter = this.frameWriter;
Expand Down Expand Up @@ -201,6 +208,9 @@ public Http2MultiplexCodec build() {
@Override
protected Http2MultiplexCodec build(
Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder, Http2Settings initialSettings) {
return new Http2MultiplexCodec(encoder, decoder, initialSettings, childHandler, upgradeStreamHandler);
Http2MultiplexCodec codec = new Http2MultiplexCodec(encoder, decoder, initialSettings, childHandler,
upgradeStreamHandler, decoupleCloseAndGoAway());
codec.gracefulShutdownTimeoutMillis(gracefulShutdownTimeoutMillis());
return codec;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ protected HttpToHttp2ConnectionHandler(Http2ConnectionDecoder decoder, Http2Conn
this.validateHeaders = validateHeaders;
}

protected HttpToHttp2ConnectionHandler(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
Http2Settings initialSettings, boolean validateHeaders,
boolean decoupleCloseAndGoAway) {
super(decoder, encoder, initialSettings, decoupleCloseAndGoAway);
this.validateHeaders = validateHeaders;
}

/**
* Get the next stream id either from the {@link HttpHeaders} object or HTTP/2 codec
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ public HttpToHttp2ConnectionHandlerBuilder initialHuffmanDecodeCapacity(int init
return super.initialHuffmanDecodeCapacity(initialHuffmanDecodeCapacity);
}

@Override
public HttpToHttp2ConnectionHandlerBuilder decoupleCloseAndGoAway(boolean decoupleCloseAndGoAway) {
return super.decoupleCloseAndGoAway(decoupleCloseAndGoAway);
}

@Override
public HttpToHttp2ConnectionHandler build() {
return super.build();
Expand All @@ -92,6 +97,7 @@ public HttpToHttp2ConnectionHandler build() {
@Override
protected HttpToHttp2ConnectionHandler build(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder,
Http2Settings initialSettings) {
return new HttpToHttp2ConnectionHandler(decoder, encoder, initialSettings, isValidateHeaders());
return new HttpToHttp2ConnectionHandler(decoder, encoder, initialSettings, isValidateHeaders(),
decoupleCloseAndGoAway());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,7 @@ public void gracefulShutdownTimeoutTest() throws Exception {
final long expectedMillis = 1234;
handler.gracefulShutdownTimeoutMillis(expectedMillis);
handler.close(ctx, promise);
verify(executor).schedule(any(Runnable.class), eq(expectedMillis), eq(TimeUnit.MILLISECONDS));
verify(executor, atLeastOnce()).schedule(any(Runnable.class), eq(expectedMillis), eq(TimeUnit.MILLISECONDS));
}

@Test
Expand Down
Loading