Skip to content

Commit

Permalink
Proper shutdown of HTTP2 encoder when channelInactive
Browse files Browse the repository at this point in the history
Motivation:

The problem is described in grpc/grpc-java#605. Basically, when using `StreamBufferingEncoder` there is a chance of creating zombie streams that never get closed.

Modifications:

Change `Http2ConnectionHandler`'s `channelInactive` handling logic to shutdown the encoder/decoder before shutting down the active streams.

Result:

Fixes grpc/grpc-java#605
  • Loading branch information
nmittler committed Jul 8, 2015
1 parent f3c3f3e commit 7ebc11e
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,25 +181,21 @@ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
public void channelActive(ChannelHandlerContext ctx) throws Exception { }

public void channelInactive(ChannelHandlerContext ctx) throws Exception {
try {
final Http2Connection connection = connection();
// Check if there are streams to avoid the overhead of creating the ChannelFuture.
if (connection.numActiveStreams() > 0) {
final ChannelFuture future = ctx.newSucceededFuture();
connection.forEachActiveStream(new Http2StreamVisitor() {
@Override
public boolean visit(Http2Stream stream) throws Http2Exception {
closeStream(stream, future);
return true;
}
});
}
} finally {
try {
encoder().close();
} finally {
decoder().close();
}
// Connection has terminated, close the encoder and decoder.
encoder().close();
decoder().close();

final Http2Connection connection = connection();
// Check if there are streams to avoid the overhead of creating the ChannelFuture.
if (connection.numActiveStreams() > 0) {
final ChannelFuture future = ctx.newSucceededFuture();
connection.forEachActiveStream(new Http2StreamVisitor() {
@Override
public boolean visit(Http2Stream stream) throws Http2Exception {
closeStream(stream, future);
return true;
}
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import static io.netty.handler.codec.http2.Http2Exception.connectionError;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.ByteString;
import io.netty.util.ReferenceCountUtil;

import java.util.ArrayDeque;
Expand Down Expand Up @@ -51,6 +53,15 @@
*/
public class StreamBufferingEncoder extends DecoratingHttp2ConnectionEncoder {

/**
* Thrown if buffered streams are terminated due to this encoder being closed.
*/
public static final class ChannelClosedException extends Http2Exception {
public ChannelClosedException() {
super(Http2Error.REFUSED_STREAM, "Connection closed");
}
}

/**
* Thrown by {@link StreamBufferingEncoder} if buffered streams are terminated due to
* receipt of a {@code GOAWAY}.
Expand All @@ -59,9 +70,9 @@ public static final class GoAwayException extends Http2Exception {
private static final long serialVersionUID = 1326785622777291198L;
private final int lastStreamId;
private final long errorCode;
private final ByteBuf debugData;
private final ByteString debugData;

public GoAwayException(int lastStreamId, long errorCode, ByteBuf debugData) {
public GoAwayException(int lastStreamId, long errorCode, ByteString debugData) {
super(Http2Error.STREAM_CLOSED);
this.lastStreamId = lastStreamId;
this.errorCode = errorCode;
Expand All @@ -76,7 +87,7 @@ public long errorCode() {
return errorCode;
}

public ByteBuf debugData() {
public ByteString debugData() {
return debugData;
}
}
Expand All @@ -87,6 +98,7 @@ public ByteBuf debugData() {
*/
private final TreeMap<Integer, PendingStream> pendingStreams = new TreeMap<Integer, PendingStream>();
private int maxConcurrentStreams;
private boolean closed;

public StreamBufferingEncoder(Http2ConnectionEncoder delegate) {
this(delegate, SMALLEST_MAX_CONCURRENT_STREAMS);
Expand Down Expand Up @@ -127,6 +139,10 @@ public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2
public ChannelFuture writeHeaders(ChannelHandlerContext ctx, int streamId, Http2Headers headers,
int streamDependency, short weight, boolean exclusive,
int padding, boolean endOfStream, ChannelPromise promise) {
if (closed) {
promise.setFailure(new ChannelClosedException());
return promise;
}
if (isExistingStream(streamId) || connection().goAwayReceived()) {
return super.writeHeaders(ctx, streamId, headers, streamDependency, weight,
exclusive, padding, endOfStream, promise);
Expand Down Expand Up @@ -198,8 +214,20 @@ public void remoteSettings(Http2Settings settings) throws Http2Exception {

@Override
public void close() {
super.close();
cancelPendingStreams();
try {
if (!closed) {
closed = true;

// Fail all buffered streams.
ChannelClosedException e = new ChannelClosedException();
for(PendingStream pendingStream : pendingStreams.values()) {
pendingStream.close(e);
}
}
} finally {
pendingStreams.clear();
super.close();
}
}

private void tryCreatePendingStreams() {
Expand All @@ -210,17 +238,10 @@ private void tryCreatePendingStreams() {
}
}

private void cancelPendingStreams() {
Exception e = new Exception("Connection closed.");
while (!pendingStreams.isEmpty()) {
PendingStream stream = pendingStreams.pollFirstEntry().getValue();
stream.close(e);
}
}

private void cancelGoAwayStreams(int lastStreamId, long errorCode, ByteBuf debugData) {
Iterator<PendingStream> iter = pendingStreams.values().iterator();
Exception e = new GoAwayException(lastStreamId, errorCode, debugData);
ByteString goAwayData = new ByteString(ByteBufUtil.getBytes(debugData), false);
Exception e = new GoAwayException(lastStreamId, errorCode, goAwayData);
while (iter.hasNext()) {
PendingStream stream = iter.next();
if (stream.streamId > lastStreamId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import io.netty.handler.codec.http2.StreamBufferingEncoder.ChannelClosedException;
import io.netty.handler.codec.http2.StreamBufferingEncoder.GoAwayException;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.ImmediateEventExecutor;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -208,7 +208,7 @@ public void receivingGoAwayFailsBufferedStreams() {
}
assertEquals(4, encoder.numBufferedStreams());

connection.goAwayReceived(11, 8, null);
connection.goAwayReceived(11, 8, EMPTY_BUFFER);

assertEquals(5, connection.numActiveStreams());
// The 4 buffered streams must have been failed.
Expand Down Expand Up @@ -410,6 +410,28 @@ public void closedBufferedStreamReleasesByteBuf() {
verify(data).release();
}

@Test
public void closeShouldCancelAllBufferedStreams() {
encoder.writeSettingsAck(ctx, promise);
connection.local().maxActiveStreams(0);

encoderWriteHeaders(3, promise);
encoderWriteHeaders(5, promise);
encoderWriteHeaders(7, promise);

encoder.close();
verify(promise, times(3)).setFailure(any(ChannelClosedException.class));
}

@Test
public void headersAfterCloseShouldImmediatelyFail() {
encoder.writeSettingsAck(ctx, promise);
encoder.close();

encoderWriteHeaders(3, promise);
verify(promise).setFailure(any(ChannelClosedException.class));
}

private void setMaxConcurrentStreams(int newValue) {
try {
encoder.remoteSettings(new Http2Settings().maxConcurrentStreams(newValue));
Expand Down

0 comments on commit 7ebc11e

Please sign in to comment.