Skip to content

Commit

Permalink
Add metrics when the connections limit is set
Browse files Browse the repository at this point in the history
When the HTTP connection limit is set:

- expose a new gauge with the max number of connections
- expose a new gauge with the current number of connections
- expose a new counter with the number of rejected connections
  • Loading branch information
cescoffier committed Jan 15, 2024
1 parent e3dd81b commit 31474fd
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package io.quarkus.micrometer.deployment.binder;

import static org.awaitility.Awaitility.await;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Metrics;
import io.quarkus.runtime.StartupEvent;
import io.quarkus.test.QuarkusUnitTest;
import io.restassured.RestAssured;
import io.vertx.mutiny.ext.web.Router;

/**
* Verify that we report connection metrics when the connection limit it set.
*/
public class VertxConnectionMetricsTest {

@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.withConfigurationResource("test-logging.properties")
.overrideConfigKey("quarkus.redis.devservices.enabled", "false")
// Only allows 2 concurrent connections
.overrideConfigKey("quarkus.http.limits.max-connections", "2")
// Close the connection after 1s of inactivity, otherwise, the connection are kept open for 30min
.overrideConfigKey("quarkus.http.idle-timeout", "1s")
.withApplicationRoot(jar -> jar.addClasses(App.class));

@Inject
App app;

private ExecutorService executor;
private int concurrency;

@BeforeEach
public void init() {
concurrency = 10;
executor = Executors.newFixedThreadPool(10); // More than the connection limit
}

@AfterEach
public void cleanup() {
executor.shutdown();
}

@Test
void testConnectionMetrics() throws InterruptedException {
AtomicInteger rejected = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(concurrency);
for (int i = 0; i < concurrency; i++) {
executor.submit(() -> {
try {
RestAssured.get("/ok").statusCode();
} catch (Exception e) {
// RestAssured considers the rejection as an error.
rejected.incrementAndGet();
} finally {
latch.countDown();
}
});
}

Assertions.assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
Gauge max = Metrics.globalRegistry.find("vertx.http.connections.max").gauge();
Gauge current = Metrics.globalRegistry.find("vertx.http.connections.current").gauge();
Assertions.assertThat(max).isNotNull();
Assertions.assertThat(current).isNotNull();

Assertions.assertThat(max.value()).isEqualTo(2);

// All requests are done, and connection closed (idle timeout)
await().untilAsserted(() -> Assertions.assertThat(current.value()).isEqualTo(0));

if (rejected.get() > 0) {
Counter counter = Metrics.globalRegistry.find("vertx.http.connections.rejected").counter();
Assertions.assertThat(counter).isNotNull();
Assertions.assertThat(counter.count()).isGreaterThan(0);
}
}

@ApplicationScoped
public static class App {

public void start(@Observes StartupEvent ev, Router router, io.vertx.core.Vertx vertx) {
router.get("/ok").handler(rc -> {
// Keep the connection open for 100ms.
vertx.setTimer(250, l -> rc.endAndForget("ok"));
});
}

}

}
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
package io.quarkus.micrometer.runtime.binder.vertx;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

import org.jboss.logging.Logger;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.quarkus.micrometer.runtime.binder.HttpBinderConfiguration;
import io.quarkus.vertx.http.runtime.ExtendedQuarkusVertxHttpMetrics;
import io.vertx.core.VertxOptions;
import io.vertx.core.datagram.DatagramSocketOptions;
import io.vertx.core.http.HttpClientOptions;
Expand All @@ -25,7 +31,8 @@
import io.vertx.core.spi.metrics.TCPMetrics;
import io.vertx.core.spi.metrics.VertxMetrics;

public class VertxMeterBinderAdapter extends MetricsOptions implements VertxMetricsFactory, VertxMetrics {
public class VertxMeterBinderAdapter extends MetricsOptions
implements VertxMetricsFactory, VertxMetrics, ExtendedQuarkusVertxHttpMetrics {
private static final Logger log = Logger.getLogger(VertxMeterBinderAdapter.class);
public static final String METRIC_NAME_SEPARATOR = "|";

Expand Down Expand Up @@ -184,4 +191,39 @@ private String extractClientName(String mn) {
return null;
}

@Override
public ConnectionTracker getHttpConnectionTracker() {
return new ConnectionTracker() {

private final Counter counter = Counter.builder("vertx.http.connections.rejected")
.description("Number of rejected HTTP connections")
.register(Metrics.globalRegistry);

@Override
public void onConnectionRejected() {
counter.increment();
}

@Override
public void initialize(int maxConnections, AtomicInteger current) {
Gauge.builder("vertx.http.connections.current", new Supplier<Number>() {
@Override
public Number get() {
return current.get();
}
})
.description("Current number of active HTTP connections")
.register(Metrics.globalRegistry);

Gauge.builder("vertx.http.connections.max", new Supplier<Number>() {
@Override
public Number get() {
return maxConnections;
}
})
.description("Max number of HTTP connections")
.register(Metrics.globalRegistry);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.quarkus.vertx.http.runtime;

import java.util.concurrent.atomic.AtomicInteger;

/**
* An interface extending the metrics exposed for the Vert.x HTTP server.
* <p>
* The Vert.x HTTP metrics are managed by Vert.x, and are exposed by the Vert.x metrics SPI.
* However, some of the metrics are not exposed by the SPI, and are only available through the Vert.x HTTP SPI.
* <p>
* Thus, we need to extend the Vert.x HTTP metrics SPI to expose these metrics.
*/
public interface ExtendedQuarkusVertxHttpMetrics {

/**
* A no-op connection tracker.
*/
ConnectionTracker NOOP_CONNECTION_TRACKER = new ConnectionTracker() {
@Override
public void onConnectionRejected() {
}

@Override
public void initialize(int maxConnections, AtomicInteger current) {
}
};

/**
* Gets a tracker to report the number of active HTTP connection, the number of rejected connections, etc.
*
* @return the tracker.
*/
ConnectionTracker getHttpConnectionTracker();

interface ConnectionTracker {

void onConnectionRejected();

void initialize(int maxConnections, AtomicInteger current);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1123,8 +1123,14 @@ private void setupUnixDomainSocketHttpServer(HttpServer httpServer, HttpServerOp

private void setupTcpHttpServer(HttpServer httpServer, HttpServerOptions options, boolean https,
Promise<Void> startFuture, AtomicInteger remainingCount, AtomicInteger currentConnectionCount) {

if (quarkusConfig.limits.maxConnections.isPresent() && quarkusConfig.limits.maxConnections.getAsInt() > 0) {
var tracker = vertx.isMetricsEnabled()
? ((ExtendedQuarkusVertxHttpMetrics) ((VertxInternal) vertx).metricsSPI()).getHttpConnectionTracker()
: ExtendedQuarkusVertxHttpMetrics.NOOP_CONNECTION_TRACKER;

final int maxConnections = quarkusConfig.limits.maxConnections.getAsInt();
tracker.initialize(maxConnections, currentConnectionCount);
httpServer.connectionHandler(new Handler<HttpConnection>() {

@Override
Expand All @@ -1135,6 +1141,7 @@ public void handle(HttpConnection event) {
if (current == maxConnections) {
//just close the connection
LOGGER.debug("Rejecting connection as there are too many active connections");
tracker.onConnectionRejected();
event.close();
return;
}
Expand All @@ -1143,7 +1150,7 @@ public void handle(HttpConnection event) {
@Override
public void handle(Void event) {
LOGGER.debug("Connection closed");
connectionCount.decrementAndGet();
currentConnectionCount.decrementAndGet();
}
});
}
Expand Down

0 comments on commit 31474fd

Please sign in to comment.