diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpServerTests.java b/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpServerTests.java index 801cb511a7..8c8ec9e671 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpServerTests.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpServerTests.java @@ -28,6 +28,7 @@ import java.time.Duration; import java.time.ZonedDateTime; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Objects; @@ -157,6 +158,7 @@ import static reactor.netty.http.server.HttpServerFormDecoderProvider.DEFAULT_FORM_DECODER_SPEC; import static reactor.netty.http.server.ConnectionInfo.DEFAULT_HOST_NAME; import static reactor.netty.http.server.ConnectionInfo.DEFAULT_HTTP_PORT; +import static reactor.netty.resources.LoopResources.DEFAULT_SHUTDOWN_TIMEOUT; /** * @author Stephane Maldini @@ -166,7 +168,8 @@ class HttpServerTests extends BaseHttpTest { static SelfSignedCertificate ssc; static final EventExecutor executor = new DefaultEventExecutor(); static final Logger log = Loggers.getLogger(HttpServerTests.class); - + static final String DATA_STRING = String.join("", Collections.nCopies(128, "X")); + static final byte[] DATA = DATA_STRING.getBytes(Charset.defaultCharset()); ChannelGroup group; /** @@ -220,6 +223,35 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { } } + /** + * Handler used to delay a bit outgoing HTTP/2 server responses. This handler will be placed + * at the head of the server channel pipeline. + */ + final static class DelayH2FlushHandler extends ChannelOutboundHandlerAdapter { + final static String NAME = "handler.h2flush_delay"; + final static DelayH2FlushHandler INSTANCE = new DelayH2FlushHandler(); + + static void register(Connection cnx) { + Channel channel = cnx.channel(); + assertThat(channel).isInstanceOf(Http2StreamChannel.class); + Channel parent = cnx.channel().parent(); + if (parent.pipeline().get(NAME) == null) { + parent.pipeline().addFirst(NAME, INSTANCE); + } + } + + @Override + public boolean isSharable() { + return true; + } + + @Override + @SuppressWarnings("FutureReturnValueIgnored") + public void flush(ChannelHandlerContext ctx) { + ctx.executor().schedule(ctx::flush, 1, TimeUnit.MILLISECONDS); + } + } + @BeforeAll static void createSelfSignedCertificate() throws CertificateException { ssc = new SelfSignedCertificate(); @@ -3366,4 +3398,76 @@ void testHttpServerCancelledOnClientClose() throws InterruptedException { assertThat(clientClosed.await(30, TimeUnit.SECONDS)).as("clientClosed await").isTrue(); } } + + @ParameterizedTest + @MethodSource("h2CompatibleCombinations") + void testIssue2760_H2(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols) { + Http2SslContextSpec serverCtx = Http2SslContextSpec.forServer(ssc.certificate(), ssc.privateKey()); + Http2SslContextSpec clientCtx = Http2SslContextSpec.forClient() + .configure(builder -> builder.trustManager(InsecureTrustManagerFactory.INSTANCE)); + + testIssue2760( + server -> server.protocol(serverProtocols).secure(spec -> spec.sslContext(serverCtx)), + client -> client.protocol(clientProtocols).secure(spec -> spec.sslContext(clientCtx))); + } + + @ParameterizedTest + @MethodSource("h2cCompatibleCombinations") + void testIssue2760_H2C(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols) { + testIssue2760(server -> server.protocol(serverProtocols), client -> client.protocol(clientProtocols)); + } + + private void testIssue2760(Function serverCustomizer, + Function clientCustomizer) { + ConnectionProvider provider = ConnectionProvider.create("testIssue2760", 1); + LoopResources loopServer = null; + LoopResources loopClient = null; + + try { + loopClient = LoopResources.create("client", 1, false); + loopServer = LoopResources.create("server", 1, false); + doTestIssue2760( + serverCustomizer + .apply(createServer().runOn(loopServer)), + clientCustomizer + .apply(createClient(provider, () -> disposableServer.address()).runOn(loopClient))); + } + finally { + provider.disposeLater().block(Duration.ofSeconds(5)); + if (loopServer != null) { + loopServer.disposeLater(Duration.ofSeconds(0), Duration.ofSeconds(DEFAULT_SHUTDOWN_TIMEOUT)) + .block(Duration.ofSeconds(5)); + } + if (loopClient != null) { + loopClient.disposeLater(Duration.ofSeconds(0), Duration.ofSeconds(DEFAULT_SHUTDOWN_TIMEOUT)) + .block(Duration.ofSeconds(5)); + } + } + } + + private void doTestIssue2760(HttpServer server, HttpClient client) { + disposableServer = server + .doOnConnection(DelayH2FlushHandler::register) + .route(r -> r.get("/issue-2760", (req, res) -> res + .header("Content-Type", "text/plain") + .sendObject(ByteBufAllocator.DEFAULT.buffer().writeBytes(DATA)) + )) + .bindNow(); + + int messages = 100; + Flux.range(0, messages) + .flatMap(i -> client + .wiretap(false) + .get() + .uri("/issue-2760") + .responseSingle((res, bytes) -> bytes.asString() + .zipWith(Mono.just(res.status())))) + .collectList() + .as(StepVerifier::create) + .expectNextMatches(l -> l.size() == messages && + l.stream().allMatch(tpl -> tpl.getT2().code() == 200 && tpl.getT1().equals(DATA_STRING)) + ) + .expectComplete() + .verify(Duration.ofSeconds(60)); + } }