From d06babf02a716ef01f6c41afc55f743e4c26422a Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Mon, 14 Jan 2019 08:25:45 +0100 Subject: [PATCH] Correctly buffer multiple outbound streams if needed. (#8694) Motivation: In Http2FrameCodec we made the incorrect assumption that we can only have 1 buffered outboundstream as maximum. This is not correct and we need to account for multiple buffered streams. Modifications: - Use a map to allow buffer multiple streams - Add unit test. Result: Fixes https://github.com/netty/netty/issues/8692. --- .../handler/codec/http2/Http2FrameCodec.java | 33 ++++++++------ .../codec/http2/Http2FrameCodecTest.java | 43 +++++++++++++++++++ 2 files changed, 64 insertions(+), 12 deletions(-) diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameCodec.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameCodec.java index b9075144f3e..cf756cab25c 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameCodec.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2FrameCodec.java @@ -29,6 +29,8 @@ import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2GoAwayException; import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCounted; +import io.netty.util.collection.IntObjectHashMap; +import io.netty.util.collection.IntObjectMap; import io.netty.util.internal.UnstableApi; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -152,7 +154,8 @@ public class Http2FrameCodec extends Http2ConnectionHandler { /** Number of buffered streams if the {@link StreamBufferingEncoder} is used. **/ private int numBufferedStreams; - private DefaultHttp2FrameStream frameStreamToInitialize; + private final IntObjectMap frameStreamToInitializeMap = + new IntObjectHashMap(8); Http2FrameCodec(Http2ConnectionEncoder encoder, Http2ConnectionDecoder decoder, Http2Settings initialSettings) { super(decoder, encoder, initialSettings); @@ -358,12 +361,17 @@ private void writeHeadersFrame( } stream.id = streamId; - // TODO: This depends on the fact that the connection based API will create Http2Stream objects - // synchronously. We should investigate how to refactor this later on when we consolidate some layers. - assert frameStreamToInitialize == null; - frameStreamToInitialize = stream; + // Use a Map to store all pending streams as we may have multiple. This is needed as if we would store the + // stream in a field directly we may override the stored field before onStreamAdded(...) was called + // and so not correctly set the property for the buffered stream. + // + // See https://github.com/netty/netty/issues/8692 + Object old = frameStreamToInitializeMap.put(streamId, stream); + + // We should not re-use ids. + assert old == null; - // TODO(buchgr): Once Http2Stream2 and Http2Stream are merged this is no longer necessary. + // TODO(buchgr): Once Http2FrameStream and Http2Stream are merged this is no longer necessary. final ChannelPromise writePromise = ctx.newPromise(); encoder().writeHeaders(ctx, streamId, headersFrame.headers(), headersFrame.padding(), @@ -399,7 +407,7 @@ private void onStreamActive0(Http2Stream stream) { return; } - DefaultHttp2FrameStream stream2 = newStream().setStreamAndProperty(streamKey, stream); + Http2FrameStream stream2 = newStream().setStreamAndProperty(streamKey, stream); onHttp2StreamStateChanged(ctx, stream2); } @@ -407,9 +415,10 @@ private final class ConnectionListener extends Http2ConnectionAdapter { @Override public void onStreamAdded(Http2Stream stream) { - if (frameStreamToInitialize != null && stream.id() == frameStreamToInitialize.id()) { - frameStreamToInitialize.setStreamAndProperty(streamKey, stream); - frameStreamToInitialize = null; + DefaultHttp2FrameStream frameStream = frameStreamToInitializeMap.remove(stream.id()); + + if (frameStream != null) { + frameStream.setStreamAndProperty(streamKey, stream); } } @@ -420,7 +429,7 @@ public void onStreamActive(Http2Stream stream) { @Override public void onStreamClosed(Http2Stream stream) { - DefaultHttp2FrameStream stream2 = stream.getProperty(streamKey); + Http2FrameStream stream2 = stream.getProperty(streamKey); if (stream2 != null) { onHttp2StreamStateChanged(ctx, stream2); } @@ -428,7 +437,7 @@ public void onStreamClosed(Http2Stream stream) { @Override public void onStreamHalfClosed(Http2Stream stream) { - DefaultHttp2FrameStream stream2 = stream.getProperty(streamKey); + Http2FrameStream stream2 = stream.getProperty(streamKey); if (stream2 != null) { onHttp2StreamStateChanged(ctx, stream2); } diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameCodecTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameCodecTest.java index a3ccf038d01..27d13cf50b3 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameCodecTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2FrameCodecTest.java @@ -619,6 +619,49 @@ public void newOutboundStreamsShouldBeBuffered() throws Exception { assertTrue(promise2.syncUninterruptibly().isSuccess()); } + @Test + public void multipleNewOutboundStreamsShouldBeBuffered() throws Exception { + // We use a limit of 1 and then increase it step by step. + setUp(Http2FrameCodecBuilder.forServer().encoderEnforceMaxConcurrentStreams(true), + new Http2Settings().maxConcurrentStreams(1)); + + Http2FrameStream stream1 = frameCodec.newStream(); + Http2FrameStream stream2 = frameCodec.newStream(); + Http2FrameStream stream3 = frameCodec.newStream(); + + ChannelPromise promise1 = channel.newPromise(); + ChannelPromise promise2 = channel.newPromise(); + ChannelPromise promise3 = channel.newPromise(); + + channel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()).stream(stream1), promise1); + channel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()).stream(stream2), promise2); + channel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()).stream(stream3), promise3); + + assertTrue(isStreamIdValid(stream1.id())); + channel.runPendingTasks(); + assertTrue(isStreamIdValid(stream2.id())); + + assertTrue(promise1.syncUninterruptibly().isSuccess()); + assertFalse(promise2.isDone()); + assertFalse(promise3.isDone()); + + // Increase concurrent streams limit to 2 + frameInboundWriter.writeInboundSettings(new Http2Settings().maxConcurrentStreams(2)); + channel.flush(); + + // As we increased the limit to 2 we should have also succeed the second frame. + assertTrue(promise2.syncUninterruptibly().isSuccess()); + assertFalse(promise3.isDone()); + + frameInboundWriter.writeInboundSettings(new Http2Settings().maxConcurrentStreams(3)); + channel.flush(); + + // With the max streams of 3 all streams should be succeed now. + assertTrue(promise3.syncUninterruptibly().isSuccess()); + + assertFalse(channel.finishAndReleaseAll()); + } + @Test public void streamIdentifiersExhausted() throws Http2Exception { int maxServerStreamId = Integer.MAX_VALUE - 1;