diff --git a/src/main/java/reactor/ipc/netty/http/HttpOperations.java b/src/main/java/reactor/ipc/netty/http/HttpOperations.java index 7e595b8abe..e95097d136 100644 --- a/src/main/java/reactor/ipc/netty/http/HttpOperations.java +++ b/src/main/java/reactor/ipc/netty/http/HttpOperations.java @@ -103,34 +103,7 @@ public NettyOutbound sendHeaders() { return this; } - return then(FutureMono.deferFuture(() -> { - if (markSentHeaders()) { - if (HttpUtil.isContentLengthSet(outboundHttpMessage())) { - outboundHttpMessage().headers() - .remove(HttpHeaderNames.TRANSFER_ENCODING); - } - - HttpMessage message; - if (!HttpUtil.isTransferEncodingChunked(outboundHttpMessage()) - && (!HttpUtil.isContentLengthSet(outboundHttpMessage()) || - HttpUtil.getContentLength(outboundHttpMessage(), 0) == 0)) { - if(isKeepAlive() && markSentBody()){ - message = newFullEmptyBodyMessage(); - } - else { - markPersistent(false); - message = outboundHttpMessage(); - } - } - else { - message = outboundHttpMessage(); - } - return channel().writeAndFlush(message); - } - else { - return channel().newSucceededFuture(); - } - })); + return then(Mono.empty()); } @Override @@ -141,14 +114,26 @@ public Mono then() { return FutureMono.deferFuture(() -> { if (markSentHeaders()) { + HttpMessage msg; + if (HttpUtil.isContentLengthSet(outboundHttpMessage())) { outboundHttpMessage().headers() .remove(HttpHeaderNames.TRANSFER_ENCODING); + if (HttpUtil.getContentLength(outboundHttpMessage(), 0) == 0) { + msg = newFullEmptyBodyMessage(); + } + else { + msg = outboundHttpMessage(); + } } + else { + msg = outboundHttpMessage(); + } + checkIfNotPersistent(); - return channel().writeAndFlush(outboundHttpMessage()); + return channel().writeAndFlush(msg); } else { return channel().newSucceededFuture(); diff --git a/src/main/java/reactor/ipc/netty/http/client/HttpClientOperations.java b/src/main/java/reactor/ipc/netty/http/client/HttpClientOperations.java index 356dc45419..358e48f0ab 100644 --- a/src/main/java/reactor/ipc/netty/http/client/HttpClientOperations.java +++ b/src/main/java/reactor/ipc/netty/http/client/HttpClientOperations.java @@ -271,6 +271,17 @@ protected void onInboundClose() { super.onInboundError(new IOException("Connection closed prematurely")); } + @Override + public NettyOutbound sendObject(Publisher dataStream) { + if (!HttpUtil.isTransferEncodingChunked(nettyRequest) && + !HttpUtil.isContentLengthSet(nettyRequest) && + !method().equals(HttpMethod.HEAD) && + !hasSentHeaders()) { + HttpUtil.setTransferEncodingChunked(nettyRequest, true); + } + return super.sendObject(dataStream); + } + @Override public HttpClientRequest header(CharSequence name, CharSequence value) { if (!hasSentHeaders()) { diff --git a/src/main/java/reactor/ipc/netty/http/server/HttpServerHandler.java b/src/main/java/reactor/ipc/netty/http/server/HttpServerHandler.java index c0e9b3db39..acc20a2dd0 100644 --- a/src/main/java/reactor/ipc/netty/http/server/HttpServerHandler.java +++ b/src/main/java/reactor/ipc/netty/http/server/HttpServerHandler.java @@ -18,7 +18,9 @@ import java.util.Queue; +import io.netty.channel.Channel; import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; @@ -184,8 +186,8 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) return; } - ctx.write(msg, promise); - HttpServerOperations.cleanHandlerTerminate(ctx.channel()); + ctx.write(msg, promise) + .addListener(new TerminateHttpHandler(ctx.channel())); if (!persistentConnection) { return; @@ -293,4 +295,18 @@ static boolean isMultipart(HttpResponse response) { 0, MULTIPART_PREFIX.length()); } + + static final class TerminateHttpHandler implements ChannelFutureListener { + + final Channel channel; + + TerminateHttpHandler(Channel channel) { + this.channel = channel; + } + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + HttpServerOperations.cleanHandlerTerminate(channel); + } + } } diff --git a/src/main/java/reactor/ipc/netty/http/server/HttpServerOperations.java b/src/main/java/reactor/ipc/netty/http/server/HttpServerOperations.java index 2cb0929ef0..8cf9c7e83d 100644 --- a/src/main/java/reactor/ipc/netty/http/server/HttpServerOperations.java +++ b/src/main/java/reactor/ipc/netty/http/server/HttpServerOperations.java @@ -131,6 +131,7 @@ protected HttpMessage newFullEmptyBodyMessage() { else { res.headers().set(responseHeaders); } + markPersistent(true); return res; } @@ -162,8 +163,6 @@ public HttpServerResponse chunkedTransfer(boolean chunked) { responseHeaders.remove(HttpHeaderNames.TRANSFER_ENCODING); HttpUtil.setTransferEncodingChunked(nettyResponse, chunked); } - - markPersistent(chunked); return this; } @@ -281,10 +280,7 @@ public HttpHeaders responseHeaders() { public Mono send() { if (markSentHeaderAndBody()) { HttpMessage response = newFullEmptyBodyMessage(); - return FutureMono.deferFuture(() -> { - markPersistent(true); - return channel().writeAndFlush(response); - }); + return FutureMono.deferFuture(() -> channel().writeAndFlush(response)); } else { return Mono.empty(); @@ -413,7 +409,6 @@ protected void onOutboundComplete() { log.debug("No sendHeaders() called before complete, sending " + "zero-length header"); } - markPersistent(true); f = channel().writeAndFlush(newFullEmptyBodyMessage()); } else if (markSentBody()) { diff --git a/src/test/java/reactor/ipc/netty/http/client/HttpClientTest.java b/src/test/java/reactor/ipc/netty/http/client/HttpClientTest.java index 8acba6f394..24984346b7 100644 --- a/src/test/java/reactor/ipc/netty/http/client/HttpClientTest.java +++ b/src/test/java/reactor/ipc/netty/http/client/HttpClientTest.java @@ -52,6 +52,7 @@ import reactor.ipc.netty.FutureMono; import reactor.ipc.netty.NettyContext; import reactor.ipc.netty.channel.AbortedException; +import reactor.ipc.netty.http.HttpResources; import reactor.ipc.netty.http.server.HttpServer; import reactor.ipc.netty.options.ClientProxyOptions.Proxy; import reactor.ipc.netty.resources.PoolResources; @@ -111,6 +112,8 @@ public void abort() throws Exception { resp.dispose(); x.dispose(); + + pool.dispose(); } DefaultFullHttpResponse response() { @@ -196,6 +199,7 @@ public void pipelined() throws Exception { } x.dispose(); + pool.dispose(); Assert.fail("Not aborted"); } @@ -401,7 +405,7 @@ public void disableChunkForced() throws Exception { c -> c.chunkedTransfer(false) .failOnClientError(false) .sendString(Flux.just("hello"))) - .block(Duration.ofSeconds(30)); + .block(); FutureMono.from(r.context() .channel() @@ -452,23 +456,33 @@ public void disableChunkImplicit() throws Exception { Assert.assertTrue(r.status() == HttpResponseStatus.NOT_FOUND); r.dispose(); r2.dispose(); + p.dispose(); } @Test public void disableChunkImplicitDefault() throws Exception { - HttpClientResponse r = HttpClient.create("google.com") - .get("/unsupportedURI", + PoolResources p = PoolResources.fixed("test", 1); + + HttpClientResponse r = HttpClient.create(opts -> opts.poolResources(p)) + .get("http://google.com/unsupportedURI", c -> c.chunkedTransfer(false) .failOnClientError(false)) .block(Duration.ofSeconds(30)); - FutureMono.from(r.context() - .channel() - .closeFuture()) - .block(Duration.ofSeconds(5)); + HttpClientResponse r2 = HttpClient.create(opts -> opts.poolResources(p)) + .get("http://google.com/unsupportedURI", + c -> c.chunkedTransfer(false) + .failOnClientError(false)) + .block(Duration.ofSeconds(30)); + + Assert.assertTrue(r.context() + .channel() == r2.context() + .channel()); Assert.assertTrue(r.status() == HttpResponseStatus.NOT_FOUND); r.dispose(); + r2.dispose(); + p.dispose(); } @Test @@ -491,6 +505,7 @@ public void contentHeader() throws Exception { Assert.assertTrue(r.status() == HttpResponseStatus.BAD_REQUEST); r.dispose(); r1.dispose(); + fixed.dispose(); } @Test