Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correctly buffer multiple outbound streams if needed. #8694

Merged
merged 1 commit into from
Jan 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2GoAwayException;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
Expand Down Expand Up @@ -152,7 +154,8 @@ public class Http2FrameCodec extends Http2ConnectionHandler {

/** Number of buffered streams if the {@link StreamBufferingEncoder} is used. **/
private int numBufferedStreams;
private DefaultHttp2FrameStream frameStreamToInitialize;
private final IntObjectMap<DefaultHttp2FrameStream> frameStreamToInitializeMap =
new IntObjectHashMap<DefaultHttp2FrameStream>(8);

Http2FrameCodec(Http2ConnectionEncoder encoder, Http2ConnectionDecoder decoder, Http2Settings initialSettings) {
super(decoder, encoder, initialSettings);
Expand Down Expand Up @@ -358,12 +361,17 @@ private void writeHeadersFrame(
}
stream.id = streamId;

// TODO: This depends on the fact that the connection based API will create Http2Stream objects
// synchronously. We should investigate how to refactor this later on when we consolidate some layers.
assert frameStreamToInitialize == null;
frameStreamToInitialize = stream;
// Use a Map to store all pending streams as we may have multiple. This is needed as if we would store the
// stream in a field directly we may override the stored field before onStreamAdded(...) was called
// and so not correctly set the property for the buffered stream.
//
// See https://github.com/netty/netty/issues/8692
Object old = frameStreamToInitializeMap.put(streamId, stream);

// We should not re-use ids.
assert old == null;

// TODO(buchgr): Once Http2Stream2 and Http2Stream are merged this is no longer necessary.
// TODO(buchgr): Once Http2FrameStream and Http2Stream are merged this is no longer necessary.
final ChannelPromise writePromise = ctx.newPromise();

encoder().writeHeaders(ctx, streamId, headersFrame.headers(), headersFrame.padding(),
Expand Down Expand Up @@ -399,17 +407,18 @@ private void onStreamActive0(Http2Stream stream) {
return;
}

DefaultHttp2FrameStream stream2 = newStream().setStreamAndProperty(streamKey, stream);
Http2FrameStream stream2 = newStream().setStreamAndProperty(streamKey, stream);
onHttp2StreamStateChanged(ctx, stream2);
}

private final class ConnectionListener extends Http2ConnectionAdapter {

@Override
public void onStreamAdded(Http2Stream stream) {
if (frameStreamToInitialize != null && stream.id() == frameStreamToInitialize.id()) {
frameStreamToInitialize.setStreamAndProperty(streamKey, stream);
frameStreamToInitialize = null;
DefaultHttp2FrameStream frameStream = frameStreamToInitializeMap.remove(stream.id());

if (frameStream != null) {
frameStream.setStreamAndProperty(streamKey, stream);
}
}

Expand All @@ -420,15 +429,15 @@ public void onStreamActive(Http2Stream stream) {

@Override
public void onStreamClosed(Http2Stream stream) {
DefaultHttp2FrameStream stream2 = stream.getProperty(streamKey);
Http2FrameStream stream2 = stream.getProperty(streamKey);
if (stream2 != null) {
onHttp2StreamStateChanged(ctx, stream2);
}
}

@Override
public void onStreamHalfClosed(Http2Stream stream) {
DefaultHttp2FrameStream stream2 = stream.getProperty(streamKey);
Http2FrameStream stream2 = stream.getProperty(streamKey);
if (stream2 != null) {
onHttp2StreamStateChanged(ctx, stream2);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,49 @@ public void newOutboundStreamsShouldBeBuffered() throws Exception {
assertTrue(promise2.syncUninterruptibly().isSuccess());
}

@Test
public void multipleNewOutboundStreamsShouldBeBuffered() throws Exception {
// We use a limit of 1 and then increase it step by step.
setUp(Http2FrameCodecBuilder.forServer().encoderEnforceMaxConcurrentStreams(true),
new Http2Settings().maxConcurrentStreams(1));

Http2FrameStream stream1 = frameCodec.newStream();
Http2FrameStream stream2 = frameCodec.newStream();
Http2FrameStream stream3 = frameCodec.newStream();

ChannelPromise promise1 = channel.newPromise();
ChannelPromise promise2 = channel.newPromise();
ChannelPromise promise3 = channel.newPromise();

channel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()).stream(stream1), promise1);
channel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()).stream(stream2), promise2);
channel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()).stream(stream3), promise3);

assertTrue(isStreamIdValid(stream1.id()));
channel.runPendingTasks();
assertTrue(isStreamIdValid(stream2.id()));

assertTrue(promise1.syncUninterruptibly().isSuccess());
assertFalse(promise2.isDone());
assertFalse(promise3.isDone());

// Increase concurrent streams limit to 2
frameInboundWriter.writeInboundSettings(new Http2Settings().maxConcurrentStreams(2));
channel.flush();

// As we increased the limit to 2 we should have also succeed the second frame.
assertTrue(promise2.syncUninterruptibly().isSuccess());
assertFalse(promise3.isDone());

frameInboundWriter.writeInboundSettings(new Http2Settings().maxConcurrentStreams(3));
channel.flush();

// With the max streams of 3 all streams should be succeed now.
assertTrue(promise3.syncUninterruptibly().isSuccess());

assertFalse(channel.finishAndReleaseAll());
}

@Test
public void streamIdentifiersExhausted() throws Http2Exception {
int maxServerStreamId = Integer.MAX_VALUE - 1;
Expand Down