Skip to content

Commit

Permalink
Correctly buffer multiple outbound streams if needed.
Browse files Browse the repository at this point in the history
Motivation:

In Http2FrameCodec we made the incorrect assumption that we can only have 1 buffered outboundstream as maximum. This is not correct and we need to account for multiple buffered streams.

Modifications:

- Use a map to allow buffer multiple streams
- Add unit test.

Result:

Fixes #8692.
  • Loading branch information
normanmaurer committed Jan 14, 2019
1 parent fa84e2b commit e4a2d06
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2GoAwayException;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
Expand Down Expand Up @@ -152,7 +154,8 @@ public class Http2FrameCodec extends Http2ConnectionHandler {

/** Number of buffered streams if the {@link StreamBufferingEncoder} is used. **/
private int numBufferedStreams;
private DefaultHttp2FrameStream frameStreamToInitialize;
private final IntObjectMap<DefaultHttp2FrameStream> frameStreamToInitializeMap =
new IntObjectHashMap<DefaultHttp2FrameStream>(8);

Http2FrameCodec(Http2ConnectionEncoder encoder, Http2ConnectionDecoder decoder, Http2Settings initialSettings) {
super(decoder, encoder, initialSettings);
Expand Down Expand Up @@ -358,12 +361,17 @@ private void writeHeadersFrame(
}
stream.id = streamId;

// TODO: This depends on the fact that the connection based API will create Http2Stream objects
// synchronously. We should investigate how to refactor this later on when we consolidate some layers.
assert frameStreamToInitialize == null;
frameStreamToInitialize = stream;
// Use a Map to store all pending streams as we may have multiple. This is needed as if we would store the
// stream in a field directly we may override the stored field before onStreamAdded(...) was called
// and so not correctly set the property for the buffered stream.
//
// See https://github.com/netty/netty/issues/8692
Object old = frameStreamToInitializeMap.put(streamId, stream);

// We should not re-use ids.
assert old == null;

// TODO(buchgr): Once Http2Stream2 and Http2Stream are merged this is no longer necessary.
// TODO(buchgr): Once Http2FrameStream and Http2Stream are merged this is no longer necessary.
final ChannelPromise writePromise = ctx.newPromise();

encoder().writeHeaders(ctx, streamId, headersFrame.headers(), headersFrame.padding(),
Expand Down Expand Up @@ -399,17 +407,18 @@ private void onStreamActive0(Http2Stream stream) {
return;
}

DefaultHttp2FrameStream stream2 = newStream().setStreamAndProperty(streamKey, stream);
Http2FrameStream stream2 = newStream().setStreamAndProperty(streamKey, stream);
onHttp2StreamStateChanged(ctx, stream2);
}

private final class ConnectionListener extends Http2ConnectionAdapter {

@Override
public void onStreamAdded(Http2Stream stream) {
if (frameStreamToInitialize != null && stream.id() == frameStreamToInitialize.id()) {
frameStreamToInitialize.setStreamAndProperty(streamKey, stream);
frameStreamToInitialize = null;
DefaultHttp2FrameStream frameStream = frameStreamToInitializeMap.remove(stream.id());

if (frameStream != null) {
frameStream.setStreamAndProperty(streamKey, stream);
}
}

Expand All @@ -420,15 +429,15 @@ public void onStreamActive(Http2Stream stream) {

@Override
public void onStreamClosed(Http2Stream stream) {
DefaultHttp2FrameStream stream2 = stream.getProperty(streamKey);
Http2FrameStream stream2 = stream.getProperty(streamKey);
if (stream2 != null) {
onHttp2StreamStateChanged(ctx, stream2);
}
}

@Override
public void onStreamHalfClosed(Http2Stream stream) {
DefaultHttp2FrameStream stream2 = stream.getProperty(streamKey);
Http2FrameStream stream2 = stream.getProperty(streamKey);
if (stream2 != null) {
onHttp2StreamStateChanged(ctx, stream2);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,49 @@ public void newOutboundStreamsShouldBeBuffered() throws Exception {
assertTrue(promise2.syncUninterruptibly().isSuccess());
}

@Test
public void multipleNewOutboundStreamsShouldBeBuffered() throws Exception {
// We use a limit of 1 and then increase it step by step.
setUp(Http2FrameCodecBuilder.forServer().encoderEnforceMaxConcurrentStreams(true),
new Http2Settings().maxConcurrentStreams(1));

Http2FrameStream stream1 = frameCodec.newStream();
Http2FrameStream stream2 = frameCodec.newStream();
Http2FrameStream stream3 = frameCodec.newStream();

ChannelPromise promise1 = channel.newPromise();
ChannelPromise promise2 = channel.newPromise();
ChannelPromise promise3 = channel.newPromise();

channel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()).stream(stream1), promise1);
channel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()).stream(stream2), promise2);
channel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()).stream(stream3), promise3);

assertTrue(isStreamIdValid(stream1.id()));
channel.runPendingTasks();
assertTrue(isStreamIdValid(stream2.id()));

assertTrue(promise1.syncUninterruptibly().isSuccess());
assertFalse(promise2.isDone());
assertFalse(promise3.isDone());

// Increase concurrent streams limit to 2
frameInboundWriter.writeInboundSettings(new Http2Settings().maxConcurrentStreams(2));
channel.flush();

// As we increased the limit to 2 we should have also succeed the second frame.
assertTrue(promise2.syncUninterruptibly().isSuccess());
assertFalse(promise3.isDone());

frameInboundWriter.writeInboundSettings(new Http2Settings().maxConcurrentStreams(3));
channel.flush();

// With the max streams of 3 all streams should be succeed now.
assertTrue(promise3.syncUninterruptibly().isSuccess());

assertFalse(channel.finishAndReleaseAll());
}

@Test
public void streamIdentifiersExhausted() throws Http2Exception {
int maxServerStreamId = Integer.MAX_VALUE - 1;
Expand Down

0 comments on commit e4a2d06

Please sign in to comment.