Skip to content

Commit

Permalink
Ensure content will not be sent over the network when Content-Length:…
Browse files Browse the repository at this point in the history
…0 is specified

Release the message when the compressionPredicate throws RuntimeException
Additional fixes to #892
  • Loading branch information
violetagg committed Nov 27, 2019
1 parent 974f70f commit d308b09
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 4 deletions.
31 changes: 29 additions & 2 deletions src/main/java/reactor/netty/http/HttpOperations.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;

import static reactor.netty.ReactorNetty.format;
import static reactor.netty.ReactorNetty.toPrettyHexDump;

/**
* An HTTP ready {@link ChannelOperations} with state management for status and headers
* (first HTTP response packet).
Expand Down Expand Up @@ -104,6 +107,12 @@ public NettyOutbound send(Publisher<? extends ByteBuf> source) {
ReferenceCountUtil.release(msg);
throw e;
}
if (HttpUtil.getContentLength(outboundHttpMessage(), -1) == 0) {
log.debug(format(channel(), "Dropped HTTP content, " +
"since response has Content-Length: 0 {}"), toPrettyHexDump(msg));
msg.release();
return FutureMono.from(channel().writeAndFlush(newFullBodyMessage(Unpooled.EMPTY_BUFFER)));
}
return FutureMono.from(channel().writeAndFlush(newFullBodyMessage(msg)));
}
return FutureMono.from(channel().writeAndFlush(msg));
Expand All @@ -121,7 +130,19 @@ public NettyOutbound sendObject(Object message) {
ByteBuf b = (ByteBuf) message;
return new PostHeadersNettyOutbound(FutureMono.deferFuture(() -> {
if (markSentHeaderAndBody()) {
preSendHeadersAndStatus();
try {
preSendHeadersAndStatus();
}
catch (RuntimeException e) {
b.release();
throw e;
}
if (HttpUtil.getContentLength(outboundHttpMessage(), -1) == 0) {
log.debug(format(channel(), "Dropped HTTP content, " +
"since response has Content-Length: 0 {}"), toPrettyHexDump(b));
b.release();
return channel().writeAndFlush(newFullBodyMessage(Unpooled.EMPTY_BUFFER));
}
return channel().writeAndFlush(newFullBodyMessage(b));
}
return channel().writeAndFlush(b);
Expand Down Expand Up @@ -157,7 +178,13 @@ public Mono<Void> then() {
msg = outboundHttpMessage();
}

preSendHeadersAndStatus();
try {
preSendHeadersAndStatus();
}
catch (RuntimeException e) {
ReferenceCountUtil.release(msg);
throw e;
}

return channel().writeAndFlush(msg);
}
Expand Down
19 changes: 17 additions & 2 deletions src/test/java/reactor/netty/http/server/HttpServerTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -863,14 +863,29 @@ public void testDropMessageConnectionClose() throws Exception {
}

@Test
public void testDropPublisher() throws Exception {
public void testDropPublisher_1() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
ByteBuf data = ByteBufAllocator.DEFAULT.buffer();
data.writeCharSequence("test", Charset.defaultCharset());
doTestDropData(
(req, res) -> res.header("Content-Length", "0")
.send(Flux.defer(() -> Flux.just(data, data.retain(), data.retain())))
.send(Flux.defer(() -> Flux.just(data, data.retain(), data.retain()))
.doFinally(s -> latch.countDown()))
.then(),
(req, out) -> out);
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
assertThat(ReferenceCountUtil.refCnt(data)).isEqualTo(0);
}

@Test
public void testDropPublisher_2() throws Exception {
ByteBuf data = ByteBufAllocator.DEFAULT.buffer();
data.writeCharSequence("test", Charset.defaultCharset());
doTestDropData(
(req, res) -> res.header("Content-Length", "0")
.send(Mono.just(data))
.then(),
(req, out) -> out);
assertThat(ReferenceCountUtil.refCnt(data)).isEqualTo(0);
}

Expand Down

0 comments on commit d308b09

Please sign in to comment.