Skip to content

Commit

Permalink
Multiple tweaks related to http request/response lifecycle:
Browse files Browse the repository at this point in the history
- Correctly defer cleanHandlerTerminate in HttpServerHandler
- Manage markPersistent(true) in a single HttpServerOp method
- Override sendObject for client to force chunked transfer on send if:
- Not head verb, not header sent, no length defined
- Delegate sendHeader() implementation to then()
- Try to create FullHttpMessage if contentLength defined and 0
  • Loading branch information
Stephane Maldini authored and smaldini committed Feb 27, 2018
1 parent cba5f9c commit 95b22e0
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 45 deletions.
43 changes: 14 additions & 29 deletions src/main/java/reactor/ipc/netty/http/HttpOperations.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,34 +103,7 @@ public NettyOutbound sendHeaders() {
return this;
}

return then(FutureMono.deferFuture(() -> {
if (markSentHeaders()) {
if (HttpUtil.isContentLengthSet(outboundHttpMessage())) {
outboundHttpMessage().headers()
.remove(HttpHeaderNames.TRANSFER_ENCODING);
}

HttpMessage message;
if (!HttpUtil.isTransferEncodingChunked(outboundHttpMessage())
&& (!HttpUtil.isContentLengthSet(outboundHttpMessage()) ||
HttpUtil.getContentLength(outboundHttpMessage(), 0) == 0)) {
if(isKeepAlive() && markSentBody()){
message = newFullEmptyBodyMessage();
}
else {
markPersistent(false);
message = outboundHttpMessage();
}
}
else {
message = outboundHttpMessage();
}
return channel().writeAndFlush(message);
}
else {
return channel().newSucceededFuture();
}
}));
return then(Mono.empty());
}

@Override
Expand All @@ -141,14 +114,26 @@ public Mono<Void> then() {

return FutureMono.deferFuture(() -> {
if (markSentHeaders()) {
HttpMessage msg;

if (HttpUtil.isContentLengthSet(outboundHttpMessage())) {
outboundHttpMessage().headers()
.remove(HttpHeaderNames.TRANSFER_ENCODING);
if (HttpUtil.getContentLength(outboundHttpMessage(), 0) == 0) {
msg = newFullEmptyBodyMessage();
}
else {
msg = outboundHttpMessage();
}
}
else {
msg = outboundHttpMessage();
}


checkIfNotPersistent();

return channel().writeAndFlush(outboundHttpMessage());
return channel().writeAndFlush(msg);
}
else {
return channel().newSucceededFuture();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,17 @@ protected void onInboundClose() {
super.onInboundError(new IOException("Connection closed prematurely"));
}

@Override
public NettyOutbound sendObject(Publisher<?> dataStream) {
if (!HttpUtil.isTransferEncodingChunked(nettyRequest) &&
!HttpUtil.isContentLengthSet(nettyRequest) &&
!method().equals(HttpMethod.HEAD) &&
!hasSentHeaders()) {
HttpUtil.setTransferEncodingChunked(nettyRequest, true);
}
return super.sendObject(dataStream);
}

@Override
public HttpClientRequest header(CharSequence name, CharSequence value) {
if (!hasSentHeaders()) {
Expand Down
20 changes: 18 additions & 2 deletions src/main/java/reactor/ipc/netty/http/server/HttpServerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

import java.util.Queue;

import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
Expand Down Expand Up @@ -184,8 +186,8 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
return;
}

ctx.write(msg, promise);
HttpServerOperations.cleanHandlerTerminate(ctx.channel());
ctx.write(msg, promise)
.addListener(new TerminateHttpHandler(ctx.channel()));

if (!persistentConnection) {
return;
Expand Down Expand Up @@ -293,4 +295,18 @@ static boolean isMultipart(HttpResponse response) {
0,
MULTIPART_PREFIX.length());
}

static final class TerminateHttpHandler implements ChannelFutureListener {

final Channel channel;

TerminateHttpHandler(Channel channel) {
this.channel = channel;
}

@Override
public void operationComplete(ChannelFuture future) throws Exception {
HttpServerOperations.cleanHandlerTerminate(channel);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ protected HttpMessage newFullEmptyBodyMessage() {
else {
res.headers().set(responseHeaders);
}
markPersistent(true);
return res;
}

Expand Down Expand Up @@ -162,8 +163,6 @@ public HttpServerResponse chunkedTransfer(boolean chunked) {
responseHeaders.remove(HttpHeaderNames.TRANSFER_ENCODING);
HttpUtil.setTransferEncodingChunked(nettyResponse, chunked);
}

markPersistent(chunked);
return this;
}

Expand Down Expand Up @@ -281,10 +280,7 @@ public HttpHeaders responseHeaders() {
public Mono<Void> send() {
if (markSentHeaderAndBody()) {
HttpMessage response = newFullEmptyBodyMessage();
return FutureMono.deferFuture(() -> {
markPersistent(true);
return channel().writeAndFlush(response);
});
return FutureMono.deferFuture(() -> channel().writeAndFlush(response));
}
else {
return Mono.empty();
Expand Down Expand Up @@ -413,7 +409,6 @@ protected void onOutboundComplete() {
log.debug("No sendHeaders() called before complete, sending " + "zero-length header");
}

markPersistent(true);
f = channel().writeAndFlush(newFullEmptyBodyMessage());
}
else if (markSentBody()) {
Expand Down
29 changes: 22 additions & 7 deletions src/test/java/reactor/ipc/netty/http/client/HttpClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import reactor.ipc.netty.FutureMono;
import reactor.ipc.netty.NettyContext;
import reactor.ipc.netty.channel.AbortedException;
import reactor.ipc.netty.http.HttpResources;
import reactor.ipc.netty.http.server.HttpServer;
import reactor.ipc.netty.options.ClientProxyOptions.Proxy;
import reactor.ipc.netty.resources.PoolResources;
Expand Down Expand Up @@ -111,6 +112,8 @@ public void abort() throws Exception {
resp.dispose();

x.dispose();

pool.dispose();
}

DefaultFullHttpResponse response() {
Expand Down Expand Up @@ -196,6 +199,7 @@ public void pipelined() throws Exception {
}

x.dispose();
pool.dispose();
Assert.fail("Not aborted");
}

Expand Down Expand Up @@ -401,7 +405,7 @@ public void disableChunkForced() throws Exception {
c -> c.chunkedTransfer(false)
.failOnClientError(false)
.sendString(Flux.just("hello")))
.block(Duration.ofSeconds(30));
.block();

FutureMono.from(r.context()
.channel()
Expand Down Expand Up @@ -452,23 +456,33 @@ public void disableChunkImplicit() throws Exception {
Assert.assertTrue(r.status() == HttpResponseStatus.NOT_FOUND);
r.dispose();
r2.dispose();
p.dispose();
}

@Test
public void disableChunkImplicitDefault() throws Exception {
HttpClientResponse r = HttpClient.create("google.com")
.get("/unsupportedURI",
PoolResources p = PoolResources.fixed("test", 1);

HttpClientResponse r = HttpClient.create(opts -> opts.poolResources(p))
.get("http://google.com/unsupportedURI",
c -> c.chunkedTransfer(false)
.failOnClientError(false))
.block(Duration.ofSeconds(30));

FutureMono.from(r.context()
.channel()
.closeFuture())
.block(Duration.ofSeconds(5));
HttpClientResponse r2 = HttpClient.create(opts -> opts.poolResources(p))
.get("http://google.com/unsupportedURI",
c -> c.chunkedTransfer(false)
.failOnClientError(false))
.block(Duration.ofSeconds(30));

Assert.assertTrue(r.context()
.channel() == r2.context()
.channel());

Assert.assertTrue(r.status() == HttpResponseStatus.NOT_FOUND);
r.dispose();
r2.dispose();
p.dispose();
}

@Test
Expand All @@ -491,6 +505,7 @@ public void contentHeader() throws Exception {
Assert.assertTrue(r.status() == HttpResponseStatus.BAD_REQUEST);
r.dispose();
r1.dispose();
fixed.dispose();
}

@Test
Expand Down

0 comments on commit 95b22e0

Please sign in to comment.