Skip to content

Commit

Permalink
Merge #2821 into 1.1.8
Browse files Browse the repository at this point in the history
  • Loading branch information
pderop committed Jun 7, 2023
2 parents 7fef5e7 + c71390e commit 253e767
Showing 1 changed file with 105 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -157,6 +158,7 @@
import static reactor.netty.http.server.HttpServerFormDecoderProvider.DEFAULT_FORM_DECODER_SPEC;
import static reactor.netty.http.server.ConnectionInfo.DEFAULT_HOST_NAME;
import static reactor.netty.http.server.ConnectionInfo.DEFAULT_HTTP_PORT;
import static reactor.netty.resources.LoopResources.DEFAULT_SHUTDOWN_TIMEOUT;

/**
* @author Stephane Maldini
Expand All @@ -166,7 +168,8 @@ class HttpServerTests extends BaseHttpTest {
static SelfSignedCertificate ssc;
static final EventExecutor executor = new DefaultEventExecutor();
static final Logger log = Loggers.getLogger(HttpServerTests.class);

static final String DATA_STRING = String.join("", Collections.nCopies(128, "X"));
static final byte[] DATA = DATA_STRING.getBytes(Charset.defaultCharset());
ChannelGroup group;

/**
Expand Down Expand Up @@ -220,6 +223,35 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
}
}

/**
* Handler used to delay a bit outgoing HTTP/2 server responses. This handler will be placed
* at the head of the server channel pipeline.
*/
final static class DelayH2FlushHandler extends ChannelOutboundHandlerAdapter {
final static String NAME = "handler.h2flush_delay";
final static DelayH2FlushHandler INSTANCE = new DelayH2FlushHandler();

static void register(Connection cnx) {
Channel channel = cnx.channel();
assertThat(channel).isInstanceOf(Http2StreamChannel.class);
Channel parent = cnx.channel().parent();
if (parent.pipeline().get(NAME) == null) {
parent.pipeline().addFirst(NAME, INSTANCE);
}
}

@Override
public boolean isSharable() {
return true;
}

@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void flush(ChannelHandlerContext ctx) {
ctx.executor().schedule(ctx::flush, 1, TimeUnit.MILLISECONDS);
}
}

@BeforeAll
static void createSelfSignedCertificate() throws CertificateException {
ssc = new SelfSignedCertificate();
Expand Down Expand Up @@ -3366,4 +3398,76 @@ void testHttpServerCancelledOnClientClose() throws InterruptedException {
assertThat(clientClosed.await(30, TimeUnit.SECONDS)).as("clientClosed await").isTrue();
}
}

@ParameterizedTest
@MethodSource("h2CompatibleCombinations")
void testIssue2760_H2(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols) {
Http2SslContextSpec serverCtx = Http2SslContextSpec.forServer(ssc.certificate(), ssc.privateKey());
Http2SslContextSpec clientCtx = Http2SslContextSpec.forClient()
.configure(builder -> builder.trustManager(InsecureTrustManagerFactory.INSTANCE));

testIssue2760(
server -> server.protocol(serverProtocols).secure(spec -> spec.sslContext(serverCtx)),
client -> client.protocol(clientProtocols).secure(spec -> spec.sslContext(clientCtx)));
}

@ParameterizedTest
@MethodSource("h2cCompatibleCombinations")
void testIssue2760_H2C(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols) {
testIssue2760(server -> server.protocol(serverProtocols), client -> client.protocol(clientProtocols));
}

private void testIssue2760(Function<HttpServer, HttpServer> serverCustomizer,
Function<HttpClient, HttpClient> clientCustomizer) {
ConnectionProvider provider = ConnectionProvider.create("testIssue2760", 1);
LoopResources loopServer = null;
LoopResources loopClient = null;

try {
loopClient = LoopResources.create("client", 1, false);
loopServer = LoopResources.create("server", 1, false);
doTestIssue2760(
serverCustomizer
.apply(createServer().runOn(loopServer)),
clientCustomizer
.apply(createClient(provider, () -> disposableServer.address()).runOn(loopClient)));
}
finally {
provider.disposeLater().block(Duration.ofSeconds(5));
if (loopServer != null) {
loopServer.disposeLater(Duration.ofSeconds(0), Duration.ofSeconds(DEFAULT_SHUTDOWN_TIMEOUT))
.block(Duration.ofSeconds(5));
}
if (loopClient != null) {
loopClient.disposeLater(Duration.ofSeconds(0), Duration.ofSeconds(DEFAULT_SHUTDOWN_TIMEOUT))
.block(Duration.ofSeconds(5));
}
}
}

private void doTestIssue2760(HttpServer server, HttpClient client) {
disposableServer = server
.doOnConnection(DelayH2FlushHandler::register)
.route(r -> r.get("/issue-2760", (req, res) -> res
.header("Content-Type", "text/plain")
.sendObject(ByteBufAllocator.DEFAULT.buffer().writeBytes(DATA))
))
.bindNow();

int messages = 100;
Flux.range(0, messages)
.flatMap(i -> client
.wiretap(false)
.get()
.uri("/issue-2760")
.responseSingle((res, bytes) -> bytes.asString()
.zipWith(Mono.just(res.status()))))
.collectList()
.as(StepVerifier::create)
.expectNextMatches(l -> l.size() == messages &&
l.stream().allMatch(tpl -> tpl.getT2().code() == 200 && tpl.getT1().equals(DATA_STRING))
)
.expectComplete()
.verify(Duration.ofSeconds(60));
}
}

0 comments on commit 253e767

Please sign in to comment.