Skip to content

Commit

Permalink
Correctly propagate failures while update the flow-controller to the … (
Browse files Browse the repository at this point in the history
#9664)


Motivation:

We may fail to update the flow-controller and in this case need to notify the stream channel and close it.

Modifications:

Attach a future to the write of the update frame and in case of a failure propagate it to the channel and close it

Result:

Fixes #9663
  • Loading branch information
normanmaurer committed Oct 22, 2019
1 parent 3577a52 commit d71661f
Showing 1 changed file with 54 additions and 16 deletions.
Expand Up @@ -99,6 +99,28 @@ public Handle newHandle() {
private static final AtomicIntegerFieldUpdater<AbstractHttp2StreamChannel> UNWRITABLE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(AbstractHttp2StreamChannel.class, "unwritable");

private static void windowUpdateFrameWriteComplete(ChannelFuture future, Channel streamChannel) {
Throwable cause = future.cause();
if (cause != null) {
Throwable unwrappedCause;
// Unwrap if needed
if (cause instanceof Http2FrameStreamException && ((unwrappedCause = cause.getCause()) != null)) {
cause = unwrappedCause;
}

// Notify the child-channel and close it.
streamChannel.pipeline().fireExceptionCaught(cause);
streamChannel.unsafe().close(streamChannel.unsafe().voidPromise());
}
}

private final ChannelFutureListener windowUpdateFrameWriteListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
windowUpdateFrameWriteComplete(future, AbstractHttp2StreamChannel.this);
}
};

/**
* The current status of the read-processing for a {@link AbstractHttp2StreamChannel}.
*/
Expand Down Expand Up @@ -529,7 +551,7 @@ void fireChildRead(Http2Frame frame) {
// otherwise we would have drained it from the queue and processed it during the read cycle.
assert inboundBuffer == null || inboundBuffer.isEmpty();
final RecvByteBufAllocator.Handle allocHandle = unsafe.recvBufAllocHandle();
flowControlledBytes += unsafe.doRead0(frame, allocHandle);
unsafe.doRead0(frame, allocHandle);
// We currently don't need to check for readEOS because the parent channel and child channel are limited
// to the same EventLoop thread. There are a limited number of frame types that may come after EOS is
// read (unknown, reset) and the trade off is less conditionals for the hot path (headers/data) at the
Expand Down Expand Up @@ -650,7 +672,8 @@ public void close(final ChannelPromise promise) {

final boolean wasActive = isActive();

updateLocalWindowIfNeeded();
// There is no need to update the local window as once the stream is closed all the pending bytes will be
// given back to the connection window by the controller itself.

// Only ever send a reset frame if the connection is still alive and if the stream was created before
// as otherwise we may send a RST on a stream in an invalid state and cause a connection error.
Expand Down Expand Up @@ -784,7 +807,7 @@ void doBeginRead() {
allocHandle.reset(config());
boolean continueReading = false;
do {
flowControlledBytes += doRead0((Http2Frame) message, allocHandle);
doRead0((Http2Frame) message, allocHandle);
} while ((readEOS || (continueReading = allocHandle.continueReading()))
&& (message = pollQueuedMessage()) != null);

Expand All @@ -811,8 +834,17 @@ private void updateLocalWindowIfNeeded() {
if (flowControlledBytes != 0) {
int bytes = flowControlledBytes;
flowControlledBytes = 0;
write0(parentContext(), new DefaultHttp2WindowUpdateFrame(bytes).stream(stream));
writeDoneAndNoFlush = true;
ChannelFuture future = write0(parentContext(), new DefaultHttp2WindowUpdateFrame(bytes).stream(stream));
// Add a listener which will notify and teardown the stream
// when a window update fails if needed or check the result of the future directly if it was completed
// already.
// See https://github.com/netty/netty/issues/9663
if (future.isDone()) {
windowUpdateFrameWriteComplete(future, AbstractHttp2StreamChannel.this);
} else {
future.addListener(windowUpdateFrameWriteListener);
writeDoneAndNoFlush = true;
}
}
}

Expand Down Expand Up @@ -845,20 +877,26 @@ void notifyReadComplete(RecvByteBufAllocator.Handle allocHandle, boolean forceRe
}

@SuppressWarnings("deprecation")
int doRead0(Http2Frame frame, RecvByteBufAllocator.Handle allocHandle) {
pipeline().fireChannelRead(frame);
allocHandle.incMessagesRead(1);

void doRead0(Http2Frame frame, RecvByteBufAllocator.Handle allocHandle) {
final int bytes;
if (frame instanceof Http2DataFrame) {
final int numBytesToBeConsumed = ((Http2DataFrame) frame).initialFlowControlledBytes();
allocHandle.attemptedBytesRead(numBytesToBeConsumed);
allocHandle.lastBytesRead(numBytesToBeConsumed);
return numBytesToBeConsumed;
bytes = ((Http2DataFrame) frame).initialFlowControlledBytes();

// It is important that we increment the flowControlledBytes before we call fireChannelRead(...)
// as it may cause a read() that will call updateLocalWindowIfNeeded() and we need to ensure
// in this case that we accounted for it.
//
// See https://github.com/netty/netty/issues/9663
flowControlledBytes += bytes;
} else {
allocHandle.attemptedBytesRead(MIN_HTTP2_FRAME_SIZE);
allocHandle.lastBytesRead(MIN_HTTP2_FRAME_SIZE);
bytes = MIN_HTTP2_FRAME_SIZE;
}
return 0;
// Update before firing event through the pipeline to be consistent with other Channel implementation.
allocHandle.attemptedBytesRead(bytes);
allocHandle.lastBytesRead(bytes);
allocHandle.incMessagesRead(1);

pipeline().fireChannelRead(frame);
}

@Override
Expand Down

0 comments on commit d71661f

Please sign in to comment.