Skip to content

Commit

Permalink
HTTP/2: Prevent memory leak when trying to create new streams on a co…
Browse files Browse the repository at this point in the history
…nnection that received a GOAWAY.

Motivation:

In netty#8692, `Http2FrameCodec` was
updated to keep track of all "being initialized" streams, allocating
memory before initialization begins, and releasing memory after
initialization completes successfully.

In some instances where stream initialization fails (e.g. because this
connection has received a GOAWAY frame), this memory is never released.

Modifications:

This change updates the `Http2FrameCodec` to use a separate promise
for monitoring the success of sending HTTP2 headers. When sending of
headers fails, we now make sure to release memory allocated for stream
initialization.

Result:

After this change, failures in writing HTTP2 Headers (e.g. because this
connection has received a GOAWAY frame) will no longer leak memory.
  • Loading branch information
millems committed Oct 15, 2019
1 parent 2e5dd28 commit 74f1219
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import io.netty.util.ReferenceCounted;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
Expand Down Expand Up @@ -207,6 +209,13 @@ public boolean visit(Http2Stream stream) {
}
}

/**
* Retrieve the number of streams currently in the process of being initialized.
*/
int numInitializingStreams() {
return frameStreamToInitializeMap.size();
}

@Override
public final void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
Expand Down Expand Up @@ -411,8 +420,24 @@ private void writeHeadersFrame(
// We should not re-use ids.
assert old == null;

ChannelPromise writeHeadersPromise = ctx.newPromise();

// Clean up the stream being initialized if writing the headers fails.
writeHeadersPromise.addListener(new GenericFutureListener<Future<Void>>() {
@Override
public void operationComplete(Future<Void> operationFuture) {
if (!operationFuture.isSuccess()) {
frameStreamToInitializeMap.remove(streamId);
promise.setFailure(operationFuture.cause());
} else {
promise.setSuccess(operationFuture.getNow());
}
}
});

encoder().writeHeaders(ctx, streamId, headersFrame.headers(), headersFrame.padding(),
headersFrame.isEndStream(), promise);
headersFrame.isEndStream(), writeHeadersPromise);

if (!promise.isDone()) {
numBufferedStreams++;
promise.addListener(bufferedStreamsListener);
Expand All @@ -431,15 +456,14 @@ private void onStreamActive0(Http2Stream stream) {
}

private final class ConnectionListener extends Http2ConnectionAdapter {

@Override
public void onStreamAdded(Http2Stream stream) {
DefaultHttp2FrameStream frameStream = frameStreamToInitializeMap.remove(stream.id());

if (frameStream != null) {
frameStream.setStreamAndProperty(streamKey, stream);
}
}
if (frameStream != null) {
frameStream.setStreamAndProperty(streamKey, stream);
}
}

@Override
public void onStreamActive(Http2Stream stream) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,33 @@ public void multipleNewOutboundStreamsShouldBeBuffered() throws Exception {
assertFalse(channel.finishAndReleaseAll());
}

@Test
public void doNotLeakOnFailedInitializationForChannels() throws Exception {
setUp(Http2FrameCodecBuilder.forServer(), new Http2Settings().maxConcurrentStreams(2));

Http2FrameStream stream1 = frameCodec.newStream();
Http2FrameStream stream2 = frameCodec.newStream();

ChannelPromise stream1HeaderPromise = channel.newPromise();
ChannelPromise stream2HeaderPromise = channel.newPromise();

channel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()).stream(stream1),
stream1HeaderPromise);
channel.runPendingTasks();

frameInboundWriter.writeInboundGoAway(stream1.id(), 0L, Unpooled.EMPTY_BUFFER);

channel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()).stream(stream2),
stream2HeaderPromise);
channel.runPendingTasks();

assertTrue(stream1HeaderPromise.syncUninterruptibly().isSuccess());
assertTrue(stream2HeaderPromise.isDone());

assertEquals(0, frameCodec.numInitializingStreams());
channel.finishAndReleaseAll();
}

@Test
public void streamIdentifiersExhausted() throws Http2Exception {
int maxServerStreamId = Integer.MAX_VALUE - 1;
Expand Down

0 comments on commit 74f1219

Please sign in to comment.