diff --git a/pom.xml b/pom.xml
index 40375441af..16b353d719 100644
--- a/pom.xml
+++ b/pom.xml
@@ -57,6 +57,7 @@
1.7.36
4.2.10
1.9.2
+ 1.16.0
2.13.3
1.2.11
4.13.2
@@ -704,6 +705,12 @@
${micrometer.version}
true
+
+ io.opentelemetry
+ opentelemetry-api
+ ${opentelemetry.version}
+ true
+
com.fasterxml.jackson.core
jackson-databind
@@ -752,6 +759,12 @@
${netcrusher.version}
test
+
+ io.opentelemetry
+ opentelemetry-sdk-testing
+ ${opentelemetry.version}
+ test
+
diff --git a/src/main/java/com/rabbitmq/client/impl/OpenTelemetryMetricsCollector.java b/src/main/java/com/rabbitmq/client/impl/OpenTelemetryMetricsCollector.java
new file mode 100644
index 0000000000..58ac454fb0
--- /dev/null
+++ b/src/main/java/com/rabbitmq/client/impl/OpenTelemetryMetricsCollector.java
@@ -0,0 +1,195 @@
+// Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+//
+// This software, the RabbitMQ Java client library, is triple-licensed under the
+// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
+// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
+// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
+// please see LICENSE-APACHE2.
+//
+// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
+// either express or implied. See the LICENSE file for specific language governing
+// rights and limitations of this software.
+//
+// If you have any questions regarding licensing, please contact us at
+// info@rabbitmq.com.
+
+package com.rabbitmq.client.impl;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.MetricsCollector;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.LongCounter;
+import io.opentelemetry.api.metrics.Meter;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * OpenTelemetry implementation of {@link MetricsCollector}.
+ *
+ * @see MetricsCollector
+ * @since 5.15.1
+ */
+public class OpenTelemetryMetricsCollector extends AbstractMetricsCollector {
+
+ private final Attributes attributes;
+
+ private final AtomicLong connections = new AtomicLong(0L);
+ private final AtomicLong channels = new AtomicLong(0L);
+
+ private final LongCounter publishedMessagesCounter;
+ private final LongCounter consumedMessagesCounter;
+ private final LongCounter acknowledgedMessagesCounter;
+ private final LongCounter rejectedMessagesCounter;
+ private final LongCounter failedToPublishMessagesCounter;
+ private final LongCounter ackedPublishedMessagesCounter;
+ private final LongCounter nackedPublishedMessagesCounter;
+ private final LongCounter unroutedPublishedMessagesCounter;
+
+ public OpenTelemetryMetricsCollector(OpenTelemetry openTelemetry) {
+ this(openTelemetry, "rabbitmq");
+ }
+
+ public OpenTelemetryMetricsCollector(final OpenTelemetry openTelemetry, final String prefix) {
+ this(openTelemetry, prefix, Attributes.empty());
+ }
+
+ public OpenTelemetryMetricsCollector(final OpenTelemetry openTelemetry, final String prefix, final Attributes attributes) {
+ // initialize meter
+ Meter meter = openTelemetry.getMeter("amqp-client");
+
+ // attributes
+ this.attributes = attributes;
+
+ // connections
+ meter.gaugeBuilder(prefix + ".connections")
+ .setUnit("{connections}")
+ .setDescription("The number of connections to the RabbitMQ server")
+ .ofLongs()
+ .buildWithCallback(measurement -> measurement.record(connections.get(), attributes));
+
+ // channels
+ meter.gaugeBuilder(prefix + ".channels")
+ .setUnit("{channels}")
+ .setDescription("The number of channels to the RabbitMQ server")
+ .ofLongs()
+ .buildWithCallback(measurement -> measurement.record(channels.get(), attributes));
+
+ // publishedMessages
+ this.publishedMessagesCounter = meter.counterBuilder(prefix + ".published")
+ .setUnit("{messages}")
+ .setDescription("The number of messages published to the RabbitMQ server")
+ .build();
+
+ // consumedMessages
+ this.consumedMessagesCounter = meter.counterBuilder(prefix + ".consumed")
+ .setUnit("{messages}")
+ .setDescription("The number of messages consumed from the RabbitMQ server")
+ .build();
+
+ // acknowledgedMessages
+ this.acknowledgedMessagesCounter = meter.counterBuilder(prefix + ".acknowledged")
+ .setUnit("{messages}")
+ .setDescription("The number of messages acknowledged from the RabbitMQ server")
+ .build();
+
+ // rejectedMessages
+ this.rejectedMessagesCounter = meter.counterBuilder(prefix + ".rejected")
+ .setUnit("{messages}")
+ .setDescription("The number of messages rejected from the RabbitMQ server")
+ .build();
+
+ // failedToPublishMessages
+ this.failedToPublishMessagesCounter = meter.counterBuilder(prefix + ".failed_to_publish")
+ .setUnit("{messages}")
+ .setDescription("The number of messages failed to publish to the RabbitMQ server")
+ .build();
+
+ // ackedPublishedMessages
+ this.ackedPublishedMessagesCounter = meter.counterBuilder(prefix + ".acknowledged_published")
+ .setUnit("{messages}")
+ .setDescription("The number of published messages acknowledged by the RabbitMQ server")
+ .build();
+
+ // nackedPublishedMessages
+ this.nackedPublishedMessagesCounter = meter.counterBuilder(prefix + ".not_acknowledged_published")
+ .setUnit("{messages}")
+ .setDescription("The number of published messages not acknowledged by the RabbitMQ server")
+ .build();
+
+ // unroutedPublishedMessages
+ this.unroutedPublishedMessagesCounter = meter.counterBuilder(prefix + ".unrouted_published")
+ .setUnit("{messages}")
+ .setDescription("The number of un-routed published messages to the RabbitMQ server")
+ .build();
+ }
+
+ @Override
+ protected void incrementConnectionCount(Connection connection) {
+ connections.incrementAndGet();
+ }
+
+ @Override
+ protected void decrementConnectionCount(Connection connection) {
+ connections.decrementAndGet();
+ }
+
+ @Override
+ protected void incrementChannelCount(Channel channel) {
+ channels.incrementAndGet();
+ }
+
+ @Override
+ protected void decrementChannelCount(Channel channel) {
+ channels.decrementAndGet();
+ }
+
+ @Override
+ protected void markPublishedMessage() {
+ publishedMessagesCounter.add(1L, attributes);
+ }
+
+ @Override
+ protected void markMessagePublishFailed() {
+ failedToPublishMessagesCounter.add(1L, attributes);
+ }
+
+ @Override
+ protected void markConsumedMessage() {
+ consumedMessagesCounter.add(1L, attributes);
+ }
+
+ @Override
+ protected void markAcknowledgedMessage() {
+ acknowledgedMessagesCounter.add(1L, attributes);
+ }
+
+ @Override
+ protected void markRejectedMessage() {
+ rejectedMessagesCounter.add(1L, attributes);
+ }
+
+ @Override
+ protected void markMessagePublishAcknowledged() {
+ ackedPublishedMessagesCounter.add(1L, attributes);
+ }
+
+ @Override
+ protected void markMessagePublishNotAcknowledged() {
+ nackedPublishedMessagesCounter.add(1L, attributes);
+ }
+
+ @Override
+ protected void markPublishedMessageUnrouted() {
+ unroutedPublishedMessagesCounter.add(1L, attributes);
+ }
+
+ public AtomicLong getConnections() {
+ return connections;
+ }
+
+ public AtomicLong getChannels() {
+ return channels;
+ }
+}
diff --git a/src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java b/src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java
index 9c18b2ac1f..9062fe56d1 100644
--- a/src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java
+++ b/src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java
@@ -20,13 +20,20 @@
import com.rabbitmq.client.MetricsCollector;
import com.rabbitmq.client.impl.AbstractMetricsCollector;
import com.rabbitmq.client.impl.MicrometerMetricsCollector;
+import com.rabbitmq.client.impl.OpenTelemetryMetricsCollector;
import com.rabbitmq.client.impl.StandardMetricsCollector;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
+import io.opentelemetry.sdk.metrics.data.LongPointData;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
+import org.junit.Before;
+import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
+import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
@@ -38,17 +45,26 @@
@RunWith(Parameterized.class)
public class MetricsCollectorTest {
+ @ClassRule
+ public static final OpenTelemetryRule openTelemetryRule = OpenTelemetryRule.create();
+
@Parameterized.Parameters
public static Object[] data() {
// need to resort to a factory, as this method is called only once
// if creating the collector instance, it's reused across the test methods
// and this doesn't work (it cannot be reset)
- return new Object[] { new StandardMetricsCollectorFactory(), new MicrometerMetricsCollectorFactory() };
+ return new Object[]{new StandardMetricsCollectorFactory(), new MicrometerMetricsCollectorFactory(), new OpenTelemetryMetricsCollectorFactory()};
}
@Parameterized.Parameter
public MetricsCollectorFactory factory;
+ @Before
+ public void reset() {
+ // reset metrics
+ openTelemetryRule.clearMetrics();
+ }
+
@Test
public void basicGetAndAck() {
AbstractMetricsCollector metrics = factory.create();
@@ -248,73 +264,109 @@ public void basicGetAndAck() {
long publishAck(MetricsCollector metrics) {
if (metrics instanceof StandardMetricsCollector) {
return ((StandardMetricsCollector) metrics).getPublishAcknowledgedMessages().getCount();
- } else {
- return (long) ((MicrometerMetricsCollector) metrics).getAckedPublishedMessages().count();
+ }
+ else if (metrics instanceof MicrometerMetricsCollector) {
+ return (long)((MicrometerMetricsCollector) metrics).getAckedPublishedMessages().count();
+ }
+ else {
+ return getOpenTelemetryCounterMeterValue("rabbitmq.acknowledged_published");
}
}
long publishNack(MetricsCollector metrics) {
if (metrics instanceof StandardMetricsCollector) {
return ((StandardMetricsCollector) metrics).getPublishNotAcknowledgedMessages().getCount();
- } else {
- return (long) ((MicrometerMetricsCollector) metrics).getNackedPublishedMessages().count();
+ }
+ else if (metrics instanceof MicrometerMetricsCollector) {
+ return (long)((MicrometerMetricsCollector) metrics).getNackedPublishedMessages().count();
+ }
+ else {
+ return getOpenTelemetryCounterMeterValue("rabbitmq.not_acknowledged_published");
}
}
long publishUnrouted(MetricsCollector metrics) {
if (metrics instanceof StandardMetricsCollector) {
return ((StandardMetricsCollector) metrics).getPublishUnroutedMessages().getCount();
- } else {
- return (long) ((MicrometerMetricsCollector) metrics).getUnroutedPublishedMessages().count();
+ }
+ else if (metrics instanceof MicrometerMetricsCollector) {
+ return (long)((MicrometerMetricsCollector) metrics).getUnroutedPublishedMessages().count();
+ }
+ else {
+ return getOpenTelemetryCounterMeterValue("rabbitmq.unrouted_published");
}
}
long publishedMessages(MetricsCollector metrics) {
if (metrics instanceof StandardMetricsCollector) {
return ((StandardMetricsCollector) metrics).getPublishedMessages().getCount();
- } else {
- return (long) ((MicrometerMetricsCollector) metrics).getPublishedMessages().count();
+ }
+ else if (metrics instanceof MicrometerMetricsCollector) {
+ return (long)((MicrometerMetricsCollector) metrics).getPublishedMessages().count();
+ }
+ else {
+ return getOpenTelemetryCounterMeterValue("rabbitmq.published");
}
}
long failedToPublishMessages(MetricsCollector metrics) {
if (metrics instanceof StandardMetricsCollector) {
return ((StandardMetricsCollector) metrics).getFailedToPublishMessages().getCount();
- } else {
- return (long) ((MicrometerMetricsCollector) metrics).getFailedToPublishMessages().count();
+ }
+ else if (metrics instanceof MicrometerMetricsCollector) {
+ return (long)((MicrometerMetricsCollector) metrics).getFailedToPublishMessages().count();
+ }
+ else {
+ return getOpenTelemetryCounterMeterValue("rabbitmq.failed_to_publish");
}
}
long consumedMessages(MetricsCollector metrics) {
if (metrics instanceof StandardMetricsCollector) {
return ((StandardMetricsCollector) metrics).getConsumedMessages().getCount();
- } else {
- return (long) ((MicrometerMetricsCollector) metrics).getConsumedMessages().count();
+ }
+ else if (metrics instanceof MicrometerMetricsCollector) {
+ return (long)((MicrometerMetricsCollector) metrics).getConsumedMessages().count();
+ }
+ else {
+ return getOpenTelemetryCounterMeterValue("rabbitmq.consumed");
}
}
long acknowledgedMessages(MetricsCollector metrics) {
if (metrics instanceof StandardMetricsCollector) {
return ((StandardMetricsCollector) metrics).getAcknowledgedMessages().getCount();
- } else {
- return (long) ((MicrometerMetricsCollector) metrics).getAcknowledgedMessages().count();
+ }
+ else if (metrics instanceof MicrometerMetricsCollector) {
+ return (long)((MicrometerMetricsCollector) metrics).getAcknowledgedMessages().count();
+ }
+ else {
+ return getOpenTelemetryCounterMeterValue("rabbitmq.acknowledged");
}
}
long connections(MetricsCollector metrics) {
if (metrics instanceof StandardMetricsCollector) {
return ((StandardMetricsCollector) metrics).getConnections().getCount();
- } else {
+ }
+ else if (metrics instanceof MicrometerMetricsCollector) {
return ((MicrometerMetricsCollector) metrics).getConnections().get();
}
+ else {
+ return ((OpenTelemetryMetricsCollector)metrics).getConnections().get();
+ }
}
long channels(MetricsCollector metrics) {
if (metrics instanceof StandardMetricsCollector) {
return ((StandardMetricsCollector) metrics).getChannels().getCount();
- } else {
+ }
+ else if (metrics instanceof MicrometerMetricsCollector) {
return ((MicrometerMetricsCollector) metrics).getChannels().get();
}
+ else {
+ return ((OpenTelemetryMetricsCollector)metrics).getChannels().get();
+ }
}
interface MetricsCollectorFactory {
@@ -335,4 +387,23 @@ public AbstractMetricsCollector create() {
}
}
+ static class OpenTelemetryMetricsCollectorFactory implements MetricsCollectorFactory {
+ @Override
+ public AbstractMetricsCollector create() {
+ return new OpenTelemetryMetricsCollector(openTelemetryRule.getOpenTelemetry());
+ }
+ }
+
+ static long getOpenTelemetryCounterMeterValue(String name) {
+ // open telemetry metrics
+ List metrics = openTelemetryRule.getMetrics();
+ // metric value
+ return metrics.stream()
+ .filter(metric -> metric.getName().equals(name))
+ .flatMap(metric -> metric.getData().getPoints().stream())
+ .map(point -> (LongPointData)point)
+ .map(LongPointData::getValue)
+ .mapToLong(value -> value)
+ .sum();
+ }
}