Skip to content

Commit

Permalink
Ensure ByteBuf#release is invoked for already sent HTTP/2 response
Browse files Browse the repository at this point in the history
Fix the observed exception:

java.lang.IllegalStateException: Stream 3 in unexpected state HALF_CLOSED_LOCAL
	at io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder.writeData(DefaultHttp2ConnectionEncoder.java:135)
	at io.netty.handler.codec.http2.DecoratingHttp2FrameWriter.writeData(DecoratingHttp2FrameWriter.java:39)
	at io.netty.handler.codec.http2.Http2FrameCodec.write(Http2FrameCodec.java:310)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:895)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:875)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:984)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:868)
	at io.netty.handler.codec.http2.AbstractHttp2StreamChannel.write0(AbstractHttp2StreamChannel.java:1192)
	at io.netty.handler.codec.http2.AbstractHttp2StreamChannel$Http2ChannelUnsafe.writeHttp2StreamFrame(AbstractHttp2StreamChannel.java:1045)
	at io.netty.handler.codec.http2.AbstractHttp2StreamChannel$Http2ChannelUnsafe.write(AbstractHttp2StreamChannel.java:1003)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:889)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:875)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:984)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:868)
	at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:113)
	at io.netty.handler.codec.MessageToMessageCodec.write(MessageToMessageCodec.java:116)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:891)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:875)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:984)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:868)
	at reactor.netty.http.server.Http2StreamBridgeServerHandler.write(Http2StreamBridgeServerHandler.java:181)
  • Loading branch information
violetagg committed May 9, 2024
1 parent 066a645 commit 7b05fe2
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2018-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -167,6 +167,16 @@ else if (!pendingResponse) {
@SuppressWarnings("FutureReturnValueIgnored")
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (msg instanceof ByteBuf) {
if (!pendingResponse) {
if (HttpServerOperations.log.isDebugEnabled()) {
HttpServerOperations.log.debug(
format(ctx.channel(), "Dropped HTTP content, since response has been sent already: {}"), msg);
}
((ByteBuf) msg).release();
promise.setSuccess();
return;
}

//"FutureReturnValueIgnored" this is deliberate
ctx.write(new DefaultHttpContent((ByteBuf) msg), promise);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.reactivestreams.Publisher;
Expand Down Expand Up @@ -1047,7 +1048,8 @@ void testDropPublisherConnectionClose() throws Exception {
(req, out) -> {
req.addHeader("Connection", "close");
return out;
});
},
HttpProtocol.HTTP11);
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
assertThat(ReferenceCountUtil.refCnt(data)).isEqualTo(0);
}
Expand All @@ -1062,12 +1064,14 @@ void testDropMessageConnectionClose() throws Exception {
(req, out) -> {
req.addHeader("Connection", "close");
return out;
});
},
HttpProtocol.HTTP11);
assertThat(ReferenceCountUtil.refCnt(data)).isEqualTo(0);
}

@Test
void testDropPublisher_1() throws Exception {
@ParameterizedTest
@EnumSource(value = HttpProtocol.class, names = {"HTTP11", "H2C"})
void testDropPublisher_1(HttpProtocol protocol) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
ByteBuf data = ByteBufAllocator.DEFAULT.buffer();
data.writeCharSequence("test", Charset.defaultCharset());
Expand All @@ -1076,7 +1080,8 @@ void testDropPublisher_1() throws Exception {
.send(Flux.defer(() -> Flux.just(data, data.retain(), data.retain()))
.doFinally(s -> latch.countDown()))
.then(),
(req, out) -> out);
(req, out) -> out,
protocol);
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
assertThat(ReferenceCountUtil.refCnt(data)).isEqualTo(0);
}
Expand All @@ -1089,7 +1094,8 @@ void testDropPublisher_2() throws Exception {
(req, res) -> res.header("Content-Length", "0")
.send(Mono.just(data))
.then(),
(req, out) -> out);
(req, out) -> out,
HttpProtocol.HTTP11);
assertThat(ReferenceCountUtil.refCnt(data)).isEqualTo(0);
}

Expand All @@ -1100,23 +1106,27 @@ void testDropMessage() throws Exception {
doTestDropData(
(req, res) -> res.header("Content-Length", "0")
.sendObject(data),
(req, out) -> out);
(req, out) -> out,
HttpProtocol.HTTP11);
assertThat(ReferenceCountUtil.refCnt(data)).isEqualTo(0);
}

private void doTestDropData(
BiFunction<? super HttpServerRequest, ? super
HttpServerResponse, ? extends Publisher<Void>> serverFn,
BiFunction<? super HttpClientRequest, ? super NettyOutbound, ? extends Publisher<Void>> clientFn)
BiFunction<? super HttpClientRequest, ? super NettyOutbound, ? extends Publisher<Void>> clientFn,
HttpProtocol protocol)
throws Exception {
disposableServer =
createServer()
.protocol(protocol)
.handle(serverFn)
.bindNow(Duration.ofSeconds(30));

CountDownLatch latch = new CountDownLatch(1);
String response =
createClient(disposableServer.port())
.protocol(protocol)
.doOnRequest((req, conn) -> conn.onTerminate()
.subscribe(null, null, latch::countDown))
.request(HttpMethod.GET)
Expand Down

0 comments on commit 7b05fe2

Please sign in to comment.