Skip to content

Commit

Permalink
Merge #3072 into 1.1.17
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed Feb 26, 2024
2 parents e58544f + 9e8700c commit 2e55760
Show file tree
Hide file tree
Showing 15 changed files with 529 additions and 333 deletions.
5 changes: 4 additions & 1 deletion reactor-netty-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,17 @@ dependencies {
testImplementation "ch.qos.logback:logback-classic:$logbackVersion"
testImplementation "io.projectreactor.tools:blockhound-junit-platform:$blockHoundVersion"
testImplementation "io.micrometer:micrometer-core:$micrometerVersion"
testImplementation "io.micrometer:micrometer-test:$micrometerVersion"
testImplementation "org.reflections:reflections:$reflectionsVersion"

testRuntimeOnly "org.junit.platform:junit-platform-launcher:$junitPlatformLauncherVersion"
testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:$junitVersion"
testRuntimeOnly "org.slf4j:jcl-over-slf4j:$slf4jVersion"

// JSR-305 annotations
testFixturesCompileOnly "com.google.code.findbugs:jsr305:$jsr305Version"
testFixturesApi("ch.qos.logback:logback-classic:$logbackVersion")
testFixturesApi("io.micrometer:micrometer-core:$micrometerVersion")
testFixturesApi("org.assertj:assertj-core:$assertJVersion")

for (dependency in project.configurations.shaded.dependencies) {
compileOnly(dependency)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2019-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,16 +30,14 @@
import static reactor.netty.Metrics.TCP_SERVER_PREFIX;
import static reactor.netty.Metrics.TLS_HANDSHAKE_TIME;
import static reactor.netty.Metrics.URI;
import static reactor.netty.micrometer.CounterAssert.assertCounter;
import static reactor.netty.micrometer.DistributionSummaryAssert.assertDistributionSummary;
import static reactor.netty.micrometer.GaugeAssert.assertGauge;
import static reactor.netty.micrometer.TimerAssert.assertTimer;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import io.micrometer.core.tck.MeterRegistryAssert;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -72,7 +70,7 @@ class TcpMetricsTests {
TcpClient tcpClient;
Connection connection;
private ConnectionProvider provider;
private MeterRegistry registry;
MeterRegistry registry;

@BeforeEach
void setUp() {
Expand Down Expand Up @@ -241,18 +239,28 @@ private void checkExpectationsPositive() {
String[] totalConnectionsTags = new String[] {URI, "tcp", LOCAL_ADDRESS, serverAddress};

checkTlsTimer(SERVER_TLS_HANDSHAKE_TIME, timerTags, true);
checkDistributionSummary(SERVER_DATA_SENT, summaryTags, 1, 5, true);
checkDistributionSummary(SERVER_DATA_RECEIVED, summaryTags, 1, 5, true);
checkCounter(SERVER_ERRORS, summaryTags, 0, false);
assertDistributionSummary(registry, SERVER_DATA_SENT, summaryTags)
.hasCountEqualTo(1)
.hasTotalAmountGreaterThanOrEqualTo(5);
assertDistributionSummary(registry, SERVER_DATA_RECEIVED, summaryTags)
.hasCountEqualTo(1)
.hasTotalAmountGreaterThanOrEqualTo(5);
assertCounter(registry, SERVER_ERRORS, summaryTags).isNull();

timerTags = new String[] {REMOTE_ADDRESS, serverAddress, STATUS, "SUCCESS"};
summaryTags = new String[] {REMOTE_ADDRESS, serverAddress, URI, "tcp"};
checkTimer(CLIENT_CONNECT_TIME, timerTags, true);
assertTimer(registry, CLIENT_CONNECT_TIME, timerTags)
.hasCountEqualTo(1)
.hasTotalTimeGreaterThanOrEqualTo(0);
checkTlsTimer(CLIENT_TLS_HANDSHAKE_TIME, timerTags, true);
checkDistributionSummary(CLIENT_DATA_SENT, summaryTags, 1, 5, true);
checkDistributionSummary(CLIENT_DATA_RECEIVED, summaryTags, 1, 5, true);
checkCounter(CLIENT_ERRORS, summaryTags, 0, false);
checkGauge(SERVER_CONNECTIONS_TOTAL, totalConnectionsTags, 1, true);
assertDistributionSummary(registry, CLIENT_DATA_SENT, summaryTags)
.hasCountEqualTo(1)
.hasTotalAmountGreaterThanOrEqualTo(5);
assertDistributionSummary(registry, CLIENT_DATA_RECEIVED, summaryTags)
.hasCountEqualTo(1)
.hasTotalAmountGreaterThanOrEqualTo(5);
assertCounter(registry, CLIENT_ERRORS, summaryTags).isNull();
assertGauge(registry, SERVER_CONNECTIONS_TOTAL, totalConnectionsTags).hasValueEqualTo(1);
}

private void checkExpectationsNegative(int port) {
Expand All @@ -261,11 +269,13 @@ private void checkExpectationsNegative(int port) {
String[] timerTags2 = new String[] {REMOTE_ADDRESS, address, STATUS, "SUCCESS"};
String[] summaryTags = new String[] {REMOTE_ADDRESS, address, URI, "tcp"};

checkTimer(CLIENT_CONNECT_TIME, timerTags1, true);
assertTimer(registry, CLIENT_CONNECT_TIME, timerTags1)
.hasCountEqualTo(1)
.hasTotalTimeGreaterThanOrEqualTo(0);
checkTlsTimer(CLIENT_TLS_HANDSHAKE_TIME, timerTags2, false);
checkDistributionSummary(CLIENT_DATA_SENT, summaryTags, 0, 0, false);
checkDistributionSummary(CLIENT_DATA_RECEIVED, summaryTags, 0, 0, false);
checkCounter(CLIENT_ERRORS, summaryTags, 0, false);
assertDistributionSummary(registry, CLIENT_DATA_SENT, summaryTags).isNull();
assertDistributionSummary(registry, CLIENT_DATA_RECEIVED, summaryTags).isNull();
assertCounter(registry, CLIENT_ERRORS, summaryTags).isNull();
}


Expand All @@ -281,55 +291,6 @@ protected void checkTlsTimer(String name, String[] tags, boolean exists) {
//no-op
}


void checkTimer(String name, String[] tags, boolean exists) {
Timer timer = registry.find(name).tags(tags).timer();
if (exists) {
MeterRegistryAssert.assertThat(registry).hasTimerWithNameAndTags(name, Tags.of(tags));

assertThat(timer).isNotNull();
assertThat(timer.count()).isEqualTo(1);
assertThat(timer.totalTime(TimeUnit.NANOSECONDS) >= 0).isTrue();
}
else {
assertThat(timer).isNull();
}
}

void checkDistributionSummary(String name, String[] tags, long expectedCount, int expectedAmount, boolean exists) {
DistributionSummary summary = registry.find(name).tags(tags).summary();
if (exists) {
assertThat(summary).isNotNull();
assertThat(summary.count()).isEqualTo(expectedCount);
assertThat(summary.totalAmount() >= expectedAmount).isTrue();
}
else {
assertThat(summary).isNull();
}
}

void checkCounter(String name, String[] tags, double expectedCount, boolean exists) {
Counter counter = registry.find(name).tags(tags).counter();
if (exists) {
assertThat(counter).isNotNull();
assertThat(counter.count() >= expectedCount).isTrue();
}
else {
assertThat(counter).isNull();
}
}

void checkGauge(String name, String[] tags, double expectedCount, boolean exists) {
Gauge counter = registry.find(name).tags(tags).gauge();
if (exists) {
assertThat(counter).isNotNull();
assertThat(counter.value() == expectedCount).isTrue();
}
else {
assertThat(counter).isNull();
}
}

static final String SERVER_CONNECTIONS_TOTAL = TCP_SERVER_PREFIX + CONNECTIONS_TOTAL;
static final String SERVER_DATA_SENT = TCP_SERVER_PREFIX + DATA_SENT;
static final String SERVER_DATA_RECEIVED = TCP_SERVER_PREFIX + DATA_RECEIVED;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2019-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -34,6 +34,9 @@
import static reactor.netty.Metrics.REMOTE_ADDRESS;
import static reactor.netty.Metrics.STATUS;
import static reactor.netty.Metrics.URI;
import static reactor.netty.micrometer.CounterAssert.assertCounter;
import static reactor.netty.micrometer.DistributionSummaryAssert.assertDistributionSummary;
import static reactor.netty.micrometer.TimerAssert.assertTimer;

/**
* This test class verifies TCP metrics functionality.
Expand Down Expand Up @@ -78,7 +81,14 @@ protected TcpClient customizeClientOptions(TcpClient tcpClient) {

@Override
protected void checkTlsTimer(String name, String[] tags, boolean exists) {
checkTimer(name, tags, exists);
if (exists) {
assertTimer(registry, name, tags)
.hasCountEqualTo(1)
.hasTotalTimeGreaterThanOrEqualTo(0);
}
else {
assertTimer(registry, name, tags).isNull();
}
}

@Test
Expand Down Expand Up @@ -111,18 +121,22 @@ private void checkExpectationsNegative() {
String[] summaryTags = new String[] {REMOTE_ADDRESS, clientAddress, URI, "tcp"};

checkTlsTimer(SERVER_TLS_HANDSHAKE_TIME, timerTags, true);
checkDistributionSummary(SERVER_DATA_SENT, summaryTags, 0, 0, false);
checkDistributionSummary(SERVER_DATA_RECEIVED, summaryTags, 0, 0, false);
checkCounter(SERVER_ERRORS, summaryTags, 1, true);
assertDistributionSummary(registry, SERVER_DATA_SENT, summaryTags).isNull();
assertDistributionSummary(registry, SERVER_DATA_RECEIVED, summaryTags).isNull();
assertCounter(registry, SERVER_ERRORS, summaryTags).hasCountGreaterThanOrEqualTo(1);

InetSocketAddress sa = (InetSocketAddress) disposableServer.channel().localAddress();
String serverAddress = sa.getHostString() + ":" + sa.getPort();
timerTags = new String[] {REMOTE_ADDRESS, serverAddress, STATUS, "SUCCESS"};
summaryTags = new String[] {REMOTE_ADDRESS, serverAddress, URI, "tcp"};

checkTimer(CLIENT_CONNECT_TIME, timerTags, true);
checkDistributionSummary(CLIENT_DATA_SENT, summaryTags, 1, 5, true);
checkDistributionSummary(CLIENT_DATA_RECEIVED, summaryTags, 0, 0, false);
checkCounter(CLIENT_ERRORS, summaryTags, 0, false);
assertTimer(registry, CLIENT_CONNECT_TIME, timerTags)
.hasCountEqualTo(1)
.hasTotalTimeGreaterThanOrEqualTo(0);
assertDistributionSummary(registry, CLIENT_DATA_SENT, summaryTags)
.hasCountEqualTo(1)
.hasTotalAmountGreaterThanOrEqualTo(5);
assertDistributionSummary(registry, CLIENT_DATA_RECEIVED, summaryTags).isNull();
assertCounter(registry, CLIENT_ERRORS, summaryTags).isNull();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2021-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,7 +15,6 @@
*/
package reactor.netty.transport;

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
Expand All @@ -41,6 +40,7 @@
import static reactor.netty.Metrics.EVENT_LOOP_PREFIX;
import static reactor.netty.Metrics.NAME;
import static reactor.netty.Metrics.PENDING_TASKS;
import static reactor.netty.micrometer.GaugeAssert.assertGauge;

/**
* Tests for event loop metrics.
Expand Down Expand Up @@ -86,7 +86,7 @@ void testEventLoopMetrics() throws InterruptedException {
String[] tags = new String[]{
NAME, singleThreadEventExecutor.threadProperties().name(),
};
assertThat(getGaugeValue(EVENT_LOOP_PREFIX + PENDING_TASKS, tags)).isEqualTo(10);
assertGauge(registry, EVENT_LOOP_PREFIX + PENDING_TASKS, tags).hasValueEqualTo(10);
latch.countDown();
}
})
Expand Down Expand Up @@ -162,14 +162,4 @@ public Meter.Id map(Meter.Id id) {
}
}
}

private double getGaugeValue(String name, String... tags) {
Gauge gauge = registry.find(name).tags(tags).gauge();
double result = -1;
if (gauge != null) {
result = gauge.value();
}
return result;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2019-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,14 +15,9 @@
*/
package reactor.netty.udp;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import io.micrometer.core.tck.MeterRegistryAssert;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.socket.DatagramPacket;
Expand Down Expand Up @@ -53,6 +48,9 @@
import static reactor.netty.Metrics.UDP_CLIENT_PREFIX;
import static reactor.netty.Metrics.UDP_SERVER_PREFIX;
import static reactor.netty.Metrics.URI;
import static reactor.netty.micrometer.CounterAssert.assertCounter;
import static reactor.netty.micrometer.DistributionSummaryAssert.assertDistributionSummary;
import static reactor.netty.micrometer.TimerAssert.assertTimer;

/**
* This test class verifies UDP metrics functionality.
Expand Down Expand Up @@ -197,45 +195,26 @@ private void checkExpectationsPositive() {
String[] summaryTags1 = new String[] {REMOTE_ADDRESS, clientAddress, URI, "udp"};
String[] summaryTags2 = new String[] {REMOTE_ADDRESS, serverAddress, URI, "udp"};

checkDistributionSummary(SERVER_DATA_SENT, summaryTags1);
checkDistributionSummary(SERVER_DATA_RECEIVED, summaryTags1);
checkCounter(SERVER_ERRORS, summaryTags1, false);

checkClientConnectTime(timerTags);
checkDistributionSummary(CLIENT_DATA_SENT, summaryTags2);
checkDistributionSummary(CLIENT_DATA_RECEIVED, summaryTags2);
checkCounter(CLIENT_ERRORS, summaryTags2, false);
}


private void checkClientConnectTime(String[] tags) {
MeterRegistryAssert.assertThat(registry).hasTimerWithNameAndTags(CLIENT_CONNECT_TIME, Tags.of(tags));

Timer timer = registry.find(CLIENT_CONNECT_TIME).tags(tags).timer();
assertThat(timer).isNotNull();
assertThat(timer.count()).isEqualTo(1);
assertThat(timer.totalTime(TimeUnit.NANOSECONDS) > 0).isTrue();
}

private void checkDistributionSummary(String name, String[] tags) {
DistributionSummary summary = registry.find(name).tags(tags).summary();
assertThat(summary).isNotNull();
assertThat(summary.count()).isEqualTo(1);
assertThat(summary.totalAmount() >= 5).isTrue();
}

private void checkCounter(String name, String[] tags, boolean exists) {
Counter counter = registry.find(name).tags(tags).counter();
if (exists) {
assertThat(counter).isNotNull();
assertThat(counter.count()).isEqualTo(0);
}
else {
assertThat(counter).isNull();
}
assertDistributionSummary(registry, SERVER_DATA_SENT, summaryTags1)
.hasCountEqualTo(1)
.hasTotalAmountGreaterThanOrEqualTo(5);
assertDistributionSummary(registry, SERVER_DATA_RECEIVED, summaryTags1)
.hasCountEqualTo(1)
.hasTotalAmountGreaterThanOrEqualTo(5);
assertCounter(registry, SERVER_ERRORS, summaryTags1).isNull();

assertTimer(registry, CLIENT_CONNECT_TIME, timerTags)
.hasCountEqualTo(1)
.hasTotalTimeGreaterThan(0);
assertDistributionSummary(registry, CLIENT_DATA_SENT, summaryTags2)
.hasCountEqualTo(1)
.hasTotalAmountGreaterThanOrEqualTo(5);
assertDistributionSummary(registry, CLIENT_DATA_RECEIVED, summaryTags2)
.hasCountEqualTo(1)
.hasTotalAmountGreaterThanOrEqualTo(5);
assertCounter(registry, CLIENT_ERRORS, summaryTags2).isNull();
}


private static final String SERVER_DATA_SENT = UDP_SERVER_PREFIX + DATA_SENT;
private static final String SERVER_DATA_RECEIVED = UDP_SERVER_PREFIX + DATA_RECEIVED;
private static final String SERVER_ERRORS = UDP_SERVER_PREFIX + ERRORS;
Expand Down
Loading

0 comments on commit 2e55760

Please sign in to comment.