Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure FlowControlled data frames will be correctly removed from the … #8726

Merged
merged 3 commits into from
Jan 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,9 @@ public void error(ChannelHandlerContext ctx, Throwable cause) {
queue.releaseAndFailAll(cause);
// Don't update dataSize because we need to ensure the size() method returns a consistent size even after
// error so we don't invalidate flow control when returning bytes to flow control.
//
// That said we will set dataSize and padding to 0 in the write(...) method if we cleared the queue
// because of an error.
lifecycleManager.onError(ctx, true, cause);
}

Expand All @@ -405,11 +408,21 @@ public void write(ChannelHandlerContext ctx, int allowedBytes) {
int queuedData = queue.readableBytes();
if (!endOfStream) {
if (queuedData == 0) {
// There's no need to write any data frames because there are only empty data frames in the queue
// and it is not end of stream yet. Just complete their promises by getting the buffer corresponding
// to 0 bytes and writing it to the channel (to preserve notification order).
ChannelPromise writePromise = ctx.newPromise().addListener(this);
ctx.write(queue.remove(0, writePromise), writePromise);
if (queue.isEmpty()) {
// When the queue is empty it means we did clear it because of an error(...) call
// (as otherwise we will have at least 1 entry in there), which will happen either when called
// explicit or when the write itself fails. In this case just set dataSize and padding to 0
// which will signal back that the whole frame was consumed.
//
// See https://github.com/netty/netty/issues/8707.
padding = dataSize = 0;
Copy link
Member

@Scottmitch Scottmitch Jan 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this has potentially interesting implications on returning bytes to flow control. consider adding a unit test that involves the real flow control and that the flow control window is properly tracked.

Also consider writing a test that would demonstrate the original issue (infinite loop) to verify the scenario doesn't re-occur. The interactions between the encoder, flow controller, and pipeline events may lead to interesting combinations which is not obvious if this will resolve the original issue. Http2ConnectionRounttripTest#writeOfEmptyReleasedBufferQueuedInFlowControllerShouldFail hits the error condition, I wonder if we can enhance this test or do something similar to exercise the desired condition.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Scottmitch will check and see if I can add more tests. That said I think it should work as expected as we only reset after write is called and we failed it before.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rkapsi @Scottmitch I was able to add a unit test that would result in the endless loop before the fix:

3d2cff4

} else {
// There's no need to write any data frames because there are only empty data frames in the
// queue and it is not end of stream yet. Just complete their promises by getting the buffer
// corresponding to 0 bytes and writing it to the channel (to preserve notification order).
ChannelPromise writePromise = ctx.newPromise().addListener(this);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wonder if we want to move the addListener() after the write(...) as well. Right now it's skewing the stack should the remove(...) complete the promise.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand... can you show me some code ?

Copy link
Member

@rkapsi rkapsi Jan 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like that...

ChannelPromise writePromise = ctx.newPromise();
ctx.write(queue.remove(0, writePromise), writePromise)
    .addListener(this);

Otherwise if queue.remove(0, writePromise) was to complete the promise it'd potentially notify the listener first, followed by calling ctx.write(...). Stack (traces) will look strange. Not sure there would be other side effects due to the order in which things execute but I think you want the listener to get notified after the write call returns.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it... I would like to investigate as a followup here. Trying to keep changes as minimal as possible atm

ctx.write(queue.remove(0, writePromise), writePromise);
}
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -126,6 +127,13 @@ public void setup() throws Exception {
when(channel.unsafe()).thenReturn(unsafe);
ChannelConfig config = new DefaultChannelConfig(channel);
when(channel.config()).thenReturn(config);
doAnswer(new Answer<ChannelFuture>() {
@Override
public ChannelFuture answer(InvocationOnMock in) {
return newPromise().setFailure((Throwable) in.getArgument(0));
}
}).when(channel).newFailedFuture(any(Throwable.class));

when(writer.configuration()).thenReturn(writerConfig);
when(writerConfig.frameSizePolicy()).thenReturn(frameSizePolicy);
when(frameSizePolicy.maxFrameSize()).thenReturn(64);
Expand Down Expand Up @@ -206,6 +214,36 @@ public ChannelFuture answer(InvocationOnMock in) throws Throwable {
encoder.lifecycleManager(lifecycleManager);
}

@Test
public void dataWithEndOfStreamWriteShouldSignalThatFrameWasConsumedOnError() throws Exception {
dataWriteShouldSignalThatFrameWasConsumedOnError0(true);
}

@Test
public void dataWriteShouldSignalThatFrameWasConsumedOnError() throws Exception {
dataWriteShouldSignalThatFrameWasConsumedOnError0(false);
}

private void dataWriteShouldSignalThatFrameWasConsumedOnError0(boolean endOfStream) throws Exception {
createStream(STREAM_ID, false);
final ByteBuf data = dummyData();
ChannelPromise p = newPromise();
encoder.writeData(ctx, STREAM_ID, data, 0, endOfStream, p);

FlowControlled controlled = payloadCaptor.getValue();
assertEquals(8, controlled.size());
payloadCaptor.getValue().write(ctx, 4);
assertEquals(4, controlled.size());

Throwable error = new IllegalStateException();
payloadCaptor.getValue().error(ctx, error);
payloadCaptor.getValue().write(ctx, 8);
assertEquals(0, controlled.size());
assertEquals("abcd", writtenData.get(0));
assertEquals(0, data.refCnt());
assertSame(error, p.cause());
}

@Test
public void dataWriteShouldSucceed() throws Exception {
createStream(STREAM_ID, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultEventLoopGroup;
Expand All @@ -37,6 +38,7 @@
import io.netty.handler.codec.http2.Http2TestUtil.Http2Runnable;
import io.netty.util.AsciiString;
import io.netty.util.IllegalReferenceCountException;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Future;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -682,7 +684,7 @@ public void writeOfEmptyReleasedBufferMultipleBuffersTrailersQueuedInFlowControl
writeOfEmptyReleasedBufferQueuedInFlowControllerShouldFail(WriteEmptyBufferMode.SECOND_WITH_TRAILERS);
}

public void writeOfEmptyReleasedBufferQueuedInFlowControllerShouldFail(final WriteEmptyBufferMode mode)
private void writeOfEmptyReleasedBufferQueuedInFlowControllerShouldFail(final WriteEmptyBufferMode mode)
throws Exception {
bootstrapEnv(1, 1, 2, 1);

Expand Down Expand Up @@ -728,6 +730,59 @@ public void run() throws Http2Exception {
}
}

@Test
public void writeFailureFlowControllerRemoveFrame()
throws Exception {
bootstrapEnv(1, 1, 2, 1);

final ChannelPromise dataPromise = newPromise();
final ChannelPromise assertPromise = newPromise();

runInChannel(clientChannel, new Http2Runnable() {
@Override
public void run() throws Http2Exception {
http2Client.encoder().writeHeaders(ctx(), 3, EmptyHttp2Headers.INSTANCE, 0, (short) 16, false, 0, false,
newPromise());
clientChannel.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ReferenceCountUtil.release(msg);

// Ensure we update the window size so we will try to write the rest of the frame while
// processing the flush.
http2Client.encoder().flowController().initialWindowSize(8);
promise.setFailure(new IllegalStateException());
}
});

http2Client.encoder().flowController().initialWindowSize(4);
http2Client.encoder().writeData(ctx(), 3, randomBytes(8), 0, false, dataPromise);
assertTrue(http2Client.encoder().flowController()
.hasFlowControlled(http2Client.connection().stream(3)));

http2Client.flush(ctx());

try {
// The Frame should have been removed after the write failed.
assertFalse(http2Client.encoder().flowController()
.hasFlowControlled(http2Client.connection().stream(3)));
assertPromise.setSuccess();
} catch (Throwable error) {
assertPromise.setFailure(error);
}
}
});

try {
dataPromise.get();
fail();
} catch (ExecutionException e) {
assertThat(e.getCause(), is(instanceOf(IllegalStateException.class)));
}

assertPromise.sync();
}

@Test
public void nonHttp2ExceptionInPipelineShouldNotCloseConnection() throws Exception {
bootstrapEnv(1, 1, 2, 1);
Expand Down