Skip to content

Commit

Permalink
Merge #2919 into 1.1.12
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed Oct 3, 2023
2 parents 53d87b1 + c780aae commit e93489b
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -393,14 +393,27 @@ public final boolean isInboundCancelled() {
}

/**
* Return true if inbound traffic is not incoming or expected anymore
* Return true if inbound traffic is not incoming or expected anymore.
* The buffered data is consumed.
*
* @return true if inbound traffic is not incoming or expected anymore
* @return true if inbound traffic is not incoming or expected anymore.
* The buffered data is consumed
*/
public final boolean isInboundDisposed() {
return inbound.isDisposed();
}

/**
* Return true if inbound traffic is not incoming or expected anymore.
* The buffered data might still not be consumed.
*
* @return true if inbound traffic is not incoming or expected anymore.
* The buffered data might still not be consumed.
*/
protected final boolean isInboundComplete() {
return inbound.inboundDone;
}

/**
* React on inbound {@link Channel#read}
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ class HttpServerOperations extends HttpOperations<HttpServerRequest, HttpServerR
final ServerCookieEncoder cookieEncoder;
final ServerCookies cookieHolder;
final HttpServerFormDecoderProvider formDecoderProvider;
final boolean is100ContinueExpected;
final boolean isHttp2;
final BiFunction<? super Mono<Void>, ? super Connection, ? extends Mono<Void>> mapHandle;
final HttpRequest nettyRequest;
Expand Down Expand Up @@ -145,6 +146,7 @@ class HttpServerOperations extends HttpOperations<HttpServerRequest, HttpServerR
this.cookieHolder = replaced.cookieHolder;
this.currentContext = replaced.currentContext;
this.formDecoderProvider = replaced.formDecoderProvider;
this.is100ContinueExpected = replaced.is100ContinueExpected;
this.isHttp2 = replaced.isHttp2;
this.mapHandle = replaced.mapHandle;
this.nettyRequest = replaced.nettyRequest;
Expand Down Expand Up @@ -199,6 +201,7 @@ class HttpServerOperations extends HttpOperations<HttpServerRequest, HttpServerR
this.cookieHolder = ServerCookies.newServerRequestHolder(nettyRequest.headers(), decoder);
this.currentContext = Context.empty();
this.formDecoderProvider = formDecoderProvider;
this.is100ContinueExpected = HttpUtil.is100ContinueExpected(nettyRequest);
this.isHttp2 = isHttp2;
this.mapHandle = mapHandle;
this.nettyRequest = nettyRequest;
Expand Down Expand Up @@ -421,7 +424,7 @@ public Flux<?> receiveObject() {
// and discard the traffic or close the connection.
// No need to notify the upstream handlers - just log.
// If decoding a response, just throw an error.
if (HttpUtil.is100ContinueExpected(nettyRequest)) {
if (is100ContinueExpected) {
return FutureMono.deferFuture(() -> {
if (!hasSentHeaders()) {
return channel().writeAndFlush(CONTINUE);
Expand Down Expand Up @@ -705,7 +708,12 @@ protected void afterMarkSentHeaders() {

@Override
protected void beforeMarkSentHeaders() {
//noop
if (is100ContinueExpected && !isInboundComplete()) {
int code = status().code();
if (code < 200 || code > 299) {
keepAlive(false);
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mockito;
import org.reactivestreams.Publisher;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Signal;
import reactor.core.scheduler.Schedulers;
import reactor.netty.BaseHttpTest;
import reactor.netty.ByteBufFlux;
import reactor.netty.ByteBufMono;
import reactor.netty.Connection;
import reactor.netty.LogTracker;
import reactor.netty.NettyPipeline;
Expand All @@ -63,6 +65,8 @@
import reactor.netty.http.client.PrematureCloseException;
import reactor.netty.http.server.HttpServer;
import reactor.netty.http.server.HttpServerConfig;
import reactor.netty.http.server.HttpServerRequest;
import reactor.netty.http.server.HttpServerResponse;
import reactor.netty.http.server.logging.AccessLog;
import reactor.netty.resources.ConnectionProvider;
import reactor.test.StepVerifier;
Expand All @@ -83,6 +87,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;

Expand Down Expand Up @@ -888,6 +893,95 @@ void test100Continue(HttpServer server, HttpClient client) throws Exception {
assertThat(content.getT2()).isEqualTo(200);
}

@ParameterizedCompatibleCombinationsCustomPoolTest
void test100ContinueConnectionClose(HttpServer server, HttpClient client) throws Exception {
doTest100ContinueConnection(server, client,
(req, res) -> res.status(400).sendString(Mono.just("ERROR")),
ByteBufFlux.fromString(Flux.just("1", "2", "3", "4", "5").delaySubscription(Duration.ofMillis(100))),
false);
}

@ParameterizedCompatibleCombinationsCustomPoolTest
void test100ContinueConnectionKeepAlive(HttpServer server, HttpClient client) throws Exception {
doTest100ContinueConnection(server, client,
(req, res) -> res.status(400).sendString(Mono.just("ERROR").delaySubscription(Duration.ofMillis(100))),
ByteBufMono.fromString(Mono.just("12345")),
true);
}

private void doTest100ContinueConnection(
HttpServer server,
HttpClient client,
BiFunction<? super HttpServerRequest, ? super HttpServerResponse, ? extends Publisher<Void>> postHandler,
Publisher<ByteBuf> sendBody,
boolean isKeepAlive) throws Exception {
HttpProtocol[] serverProtocols = server.configuration().protocols();
HttpProtocol[] clientProtocols = client.configuration().protocols();

CountDownLatch latch = new CountDownLatch(2);
AtomicReference<List<Channel>> channels = new AtomicReference<>(new ArrayList<>(2));
disposableServer =
server.doOnConnection(conn -> {
channels.get().add(conn.channel());
conn.onTerminate().subscribe(null, t -> latch.countDown(), latch::countDown);
})
.route(r ->
r.post("/post", postHandler)
.get("/get", (req, res) -> res.sendString(Mono.just("OK"))))
.bindNow();

Mono<Tuple2<String, HttpClientResponse>> content1 =
client.port(disposableServer.port())
.headers(h -> h.add(HttpHeaderNames.EXPECT, HttpHeaderValues.CONTINUE))
.post()
.uri("/post")
.send(sendBody)
.responseSingle((res, bytes) -> bytes.asString().zipWith(Mono.just(res)));

Mono<Tuple2<String, HttpClientResponse>> content2 =
client.port(disposableServer.port())
.get()
.uri("/get")
.responseSingle((res, bytes) -> bytes.asString().zipWith(Mono.just(res)));

List<Tuple2<String, HttpClientResponse>> responses =
Flux.concat(content1, content2)
.collectList()
.block(Duration.ofSeconds(5));

assertThat(latch.await(30, TimeUnit.SECONDS)).as("latch await").isTrue();

assertThat(responses).isNotNull();
assertThat(responses.size()).isEqualTo(2);
assertThat(responses.get(0).getT1()).isEqualTo("ERROR");
assertThat(responses.get(0).getT2().status().code()).isEqualTo(400);
assertThat(responses.get(1).getT1()).isEqualTo("OK");
assertThat(responses.get(1).getT2().status().code()).isEqualTo(200);

assertThat(channels.get().size()).isEqualTo(2);
if ((serverProtocols.length == 1 && serverProtocols[0] == HttpProtocol.HTTP11) ||
(clientProtocols.length == 1 && clientProtocols[0] == HttpProtocol.HTTP11)) {
if (isKeepAlive) {
assertThat(channels.get().get(0)).isEqualTo(channels.get().get(1));

assertThat(responses.get(0).getT2().responseHeaders().get(HttpHeaderNames.CONNECTION))
.isNull();
}
else {
assertThat(channels.get()).doesNotHaveDuplicates();

assertThat(responses.get(0).getT2().responseHeaders().get(HttpHeaderNames.CONNECTION))
.isEqualTo(HttpHeaderValues.CLOSE.toString());
}
}
else {
assertThat(channels.get()).doesNotHaveDuplicates();

assertThat(responses.get(0).getT2().responseHeaders().get(HttpHeaderNames.CONNECTION))
.isNull();
}
}

static final class IdleTimeoutTestChannelInboundHandler extends ChannelInboundHandlerAdapter {

final CountDownLatch latch = new CountDownLatch(1);
Expand Down

0 comments on commit e93489b

Please sign in to comment.