diff --git a/src/main/java/reactor/ipc/netty/http/HttpOperations.java b/src/main/java/reactor/ipc/netty/http/HttpOperations.java index 68e37a43d8..d7be7bca4a 100644 --- a/src/main/java/reactor/ipc/netty/http/HttpOperations.java +++ b/src/main/java/reactor/ipc/netty/http/HttpOperations.java @@ -100,57 +100,63 @@ public boolean isWebsocket() { //@Override //TODO document this public NettyOutbound sendHeaders() { - if (markSentHeaders()) { - if (HttpUtil.isContentLengthSet(outboundHttpMessage())) { - outboundHttpMessage().headers() - .remove(HttpHeaderNames.TRANSFER_ENCODING); - } + if (hasSentHeaders()) { + return this; + } - HttpMessage message; - if (!HttpUtil.isTransferEncodingChunked(outboundHttpMessage()) - && (!HttpUtil.isContentLengthSet(outboundHttpMessage()) || - HttpUtil.getContentLength(outboundHttpMessage(), 0) == 0)) { - if(isKeepAlive() && markSentBody()){ - message = newFullEmptyBodyMessage(); + return then(FutureMono.deferFuture(() -> { + if (markSentHeaders()) { + if (HttpUtil.isContentLengthSet(outboundHttpMessage())) { + outboundHttpMessage().headers() + .remove(HttpHeaderNames.TRANSFER_ENCODING); + } + + HttpMessage message; + if (!HttpUtil.isTransferEncodingChunked(outboundHttpMessage()) + && (!HttpUtil.isContentLengthSet(outboundHttpMessage()) || + HttpUtil.getContentLength(outboundHttpMessage(), 0) == 0)) { + if(isKeepAlive() && markSentBody()){ + message = newFullEmptyBodyMessage(); + } + else { + markPersistent(false); + message = outboundHttpMessage(); + } } else { - markPersistent(false); message = outboundHttpMessage(); } + return channel().writeAndFlush(message); } else { - message = outboundHttpMessage(); + return channel().newSucceededFuture(); } - return then(FutureMono.deferFuture(() -> { - if(!channel().isActive()){ - throw new AbortedException(); - } - return channel().writeAndFlush(message); - })); - } - else { - return this; - } + })); } @Override public Mono then() { - if (markSentHeaders()) { - if (HttpUtil.isContentLengthSet(outboundHttpMessage())) { - outboundHttpMessage().headers() - .remove(HttpHeaderNames.TRANSFER_ENCODING); - } - - if (!HttpUtil.isTransferEncodingChunked(outboundHttpMessage()) - && !HttpUtil.isContentLengthSet(outboundHttpMessage())) { - markPersistent(false); - } - - return FutureMono.deferFuture(() -> channel().writeAndFlush(outboundHttpMessage())); - } - else { + if (hasSentHeaders()) { return Mono.empty(); } + + return FutureMono.deferFuture(() -> { + if (markSentHeaders()) { + if (HttpUtil.isContentLengthSet(outboundHttpMessage())) { + outboundHttpMessage().headers() + .remove(HttpHeaderNames.TRANSFER_ENCODING); + } + + if (!HttpUtil.isTransferEncodingChunked(outboundHttpMessage()) && !HttpUtil.isContentLengthSet( + outboundHttpMessage())) { + markPersistent(false); + } + return channel().writeAndFlush(outboundHttpMessage()); + } + else { + return channel().newSucceededFuture(); + } + }); } protected abstract HttpMessage newFullEmptyBodyMessage(); diff --git a/src/main/java/reactor/ipc/netty/http/server/HttpServerHandler.java b/src/main/java/reactor/ipc/netty/http/server/HttpServerHandler.java index c641522dad..cea7644b70 100644 --- a/src/main/java/reactor/ipc/netty/http/server/HttpServerHandler.java +++ b/src/main/java/reactor/ipc/netty/http/server/HttpServerHandler.java @@ -166,6 +166,11 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) if (!shouldKeepAlive()) { setKeepAlive(response, false); } + + if (isInformational(response)) { + ctx.write(msg, promise); + return; + } } if (msg instanceof LastHttpContent) { if (!shouldKeepAlive()) { diff --git a/src/main/java/reactor/ipc/netty/http/server/HttpServerOperations.java b/src/main/java/reactor/ipc/netty/http/server/HttpServerOperations.java index ebf0a7e875..a27f84e129 100644 --- a/src/main/java/reactor/ipc/netty/http/server/HttpServerOperations.java +++ b/src/main/java/reactor/ipc/netty/http/server/HttpServerOperations.java @@ -250,7 +250,13 @@ public Flux receiveObject() { // No need to notify the upstream handlers - just log. // If decoding a response, just throw an error. if (HttpUtil.is100ContinueExpected(nettyRequest)) { - return FutureMono.deferFuture(() -> channel().writeAndFlush(CONTINUE)) + return FutureMono.deferFuture(() -> { + if(!hasSentHeaders()) { + return channel().writeAndFlush(CONTINUE); + } + return channel().newSucceededFuture(); + }) + .thenMany(super.receiveObject()); } else { diff --git a/src/test/java/reactor/ipc/netty/http/HttpTests.java b/src/test/java/reactor/ipc/netty/http/HttpTests.java index e7e3f2833d..ba487d7350 100644 --- a/src/test/java/reactor/ipc/netty/http/HttpTests.java +++ b/src/test/java/reactor/ipc/netty/http/HttpTests.java @@ -22,11 +22,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import org.assertj.core.api.Assertions; import org.junit.Test; - -import io.netty.buffer.ByteBuf; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -284,4 +283,35 @@ public void webSocketRespondsToRequestsFromClients() { server.dispose(); } -} \ No newline at end of file + + @Test + public void test100Continue() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + NettyContext server = + HttpServer.create(0) + .newHandler((req, res) -> req.receive() + .aggregate() + .asString() + .flatMap(s -> { + latch.countDown(); + return res.sendString(Mono.just(s)) + .then(); + })) + .block(Duration.ofSeconds(30)); + + String content = + HttpClient.create(server.address().getPort()) + .post("/", req -> req.header("Expect", "100-continue") + .sendString(Flux.just("1", "2", "3", "4", "5"))) + .flatMap(res -> res.receive() + .aggregate() + .asString()) + .block(); + + System.out.println(content); + + Assertions.assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue(); + + server.dispose(); + } +}