Skip to content

Commit

Permalink
Merge pull request #38161 from cescoffier/metrics-connection-rejection
Browse files Browse the repository at this point in the history
  • Loading branch information
cescoffier committed Jan 15, 2024
2 parents 4b1295f + 31474fd commit e642b6a
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package io.quarkus.micrometer.deployment.binder;

import static org.awaitility.Awaitility.await;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Metrics;
import io.quarkus.runtime.StartupEvent;
import io.quarkus.test.QuarkusUnitTest;
import io.restassured.RestAssured;
import io.vertx.mutiny.ext.web.Router;

/**
* Verify that we report connection metrics when the connection limit it set.
*/
public class VertxConnectionMetricsTest {

@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.withConfigurationResource("test-logging.properties")
.overrideConfigKey("quarkus.redis.devservices.enabled", "false")
// Only allows 2 concurrent connections
.overrideConfigKey("quarkus.http.limits.max-connections", "2")
// Close the connection after 1s of inactivity, otherwise, the connection are kept open for 30min
.overrideConfigKey("quarkus.http.idle-timeout", "1s")
.withApplicationRoot(jar -> jar.addClasses(App.class));

@Inject
App app;

private ExecutorService executor;
private int concurrency;

@BeforeEach
public void init() {
concurrency = 10;
executor = Executors.newFixedThreadPool(10); // More than the connection limit
}

@AfterEach
public void cleanup() {
executor.shutdown();
}

@Test
void testConnectionMetrics() throws InterruptedException {
AtomicInteger rejected = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(concurrency);
for (int i = 0; i < concurrency; i++) {
executor.submit(() -> {
try {
RestAssured.get("/ok").statusCode();
} catch (Exception e) {
// RestAssured considers the rejection as an error.
rejected.incrementAndGet();
} finally {
latch.countDown();
}
});
}

Assertions.assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
Gauge max = Metrics.globalRegistry.find("vertx.http.connections.max").gauge();
Gauge current = Metrics.globalRegistry.find("vertx.http.connections.current").gauge();
Assertions.assertThat(max).isNotNull();
Assertions.assertThat(current).isNotNull();

Assertions.assertThat(max.value()).isEqualTo(2);

// All requests are done, and connection closed (idle timeout)
await().untilAsserted(() -> Assertions.assertThat(current.value()).isEqualTo(0));

if (rejected.get() > 0) {
Counter counter = Metrics.globalRegistry.find("vertx.http.connections.rejected").counter();
Assertions.assertThat(counter).isNotNull();
Assertions.assertThat(counter.count()).isGreaterThan(0);
}
}

@ApplicationScoped
public static class App {

public void start(@Observes StartupEvent ev, Router router, io.vertx.core.Vertx vertx) {
router.get("/ok").handler(rc -> {
// Keep the connection open for 100ms.
vertx.setTimer(250, l -> rc.endAndForget("ok"));
});
}

}

}
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
package io.quarkus.micrometer.runtime.binder.vertx;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

import org.jboss.logging.Logger;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.quarkus.micrometer.runtime.binder.HttpBinderConfiguration;
import io.quarkus.vertx.http.runtime.ExtendedQuarkusVertxHttpMetrics;
import io.vertx.core.VertxOptions;
import io.vertx.core.datagram.DatagramSocketOptions;
import io.vertx.core.http.HttpClientOptions;
Expand All @@ -25,7 +31,8 @@
import io.vertx.core.spi.metrics.TCPMetrics;
import io.vertx.core.spi.metrics.VertxMetrics;

public class VertxMeterBinderAdapter extends MetricsOptions implements VertxMetricsFactory, VertxMetrics {
public class VertxMeterBinderAdapter extends MetricsOptions
implements VertxMetricsFactory, VertxMetrics, ExtendedQuarkusVertxHttpMetrics {
private static final Logger log = Logger.getLogger(VertxMeterBinderAdapter.class);
public static final String METRIC_NAME_SEPARATOR = "|";

Expand Down Expand Up @@ -184,4 +191,39 @@ private String extractClientName(String mn) {
return null;
}

@Override
public ConnectionTracker getHttpConnectionTracker() {
return new ConnectionTracker() {

private final Counter counter = Counter.builder("vertx.http.connections.rejected")
.description("Number of rejected HTTP connections")
.register(Metrics.globalRegistry);

@Override
public void onConnectionRejected() {
counter.increment();
}

@Override
public void initialize(int maxConnections, AtomicInteger current) {
Gauge.builder("vertx.http.connections.current", new Supplier<Number>() {
@Override
public Number get() {
return current.get();
}
})
.description("Current number of active HTTP connections")
.register(Metrics.globalRegistry);

Gauge.builder("vertx.http.connections.max", new Supplier<Number>() {
@Override
public Number get() {
return maxConnections;
}
})
.description("Max number of HTTP connections")
.register(Metrics.globalRegistry);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.quarkus.vertx.http.runtime;

import java.util.concurrent.atomic.AtomicInteger;

/**
* An interface extending the metrics exposed for the Vert.x HTTP server.
* <p>
* The Vert.x HTTP metrics are managed by Vert.x, and are exposed by the Vert.x metrics SPI.
* However, some of the metrics are not exposed by the SPI, and are only available through the Vert.x HTTP SPI.
* <p>
* Thus, we need to extend the Vert.x HTTP metrics SPI to expose these metrics.
*/
public interface ExtendedQuarkusVertxHttpMetrics {

/**
* A no-op connection tracker.
*/
ConnectionTracker NOOP_CONNECTION_TRACKER = new ConnectionTracker() {
@Override
public void onConnectionRejected() {
}

@Override
public void initialize(int maxConnections, AtomicInteger current) {
}
};

/**
* Gets a tracker to report the number of active HTTP connection, the number of rejected connections, etc.
*
* @return the tracker.
*/
ConnectionTracker getHttpConnectionTracker();

interface ConnectionTracker {

void onConnectionRejected();

void initialize(int maxConnections, AtomicInteger current);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1123,8 +1123,14 @@ private void setupUnixDomainSocketHttpServer(HttpServer httpServer, HttpServerOp

private void setupTcpHttpServer(HttpServer httpServer, HttpServerOptions options, boolean https,
Promise<Void> startFuture, AtomicInteger remainingCount, AtomicInteger currentConnectionCount) {

if (quarkusConfig.limits.maxConnections.isPresent() && quarkusConfig.limits.maxConnections.getAsInt() > 0) {
var tracker = vertx.isMetricsEnabled()
? ((ExtendedQuarkusVertxHttpMetrics) ((VertxInternal) vertx).metricsSPI()).getHttpConnectionTracker()
: ExtendedQuarkusVertxHttpMetrics.NOOP_CONNECTION_TRACKER;

final int maxConnections = quarkusConfig.limits.maxConnections.getAsInt();
tracker.initialize(maxConnections, currentConnectionCount);
httpServer.connectionHandler(new Handler<HttpConnection>() {

@Override
Expand All @@ -1135,6 +1141,7 @@ public void handle(HttpConnection event) {
if (current == maxConnections) {
//just close the connection
LOGGER.debug("Rejecting connection as there are too many active connections");
tracker.onConnectionRejected();
event.close();
return;
}
Expand All @@ -1143,7 +1150,7 @@ public void handle(HttpConnection event) {
@Override
public void handle(Void event) {
LOGGER.debug("Connection closed");
connectionCount.decrementAndGet();
currentConnectionCount.decrementAndGet();
}
});
}
Expand Down

0 comments on commit e642b6a

Please sign in to comment.