diff --git a/pom.xml b/pom.xml
index 149af55524..e97b1fb15b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -56,6 +56,7 @@
1.7.21
3.1.2
+ 1.0.0-rc.2
1.1.7
1.1
4.12
@@ -633,6 +634,12 @@
${metrics.version}
true
+
+ io.micrometer
+ micrometer-core
+ ${micrometer.version}
+ true
+
commons-cli
commons-cli
diff --git a/src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java b/src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java
new file mode 100644
index 0000000000..9d837036f7
--- /dev/null
+++ b/src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java
@@ -0,0 +1,193 @@
+package com.rabbitmq.client.impl;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.MetricsCollector;
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.MeterRegistry;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.ACKNOWLEDGED_MESSAGES;
+import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.CHANNELS;
+import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.CONNECTIONS;
+import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.CONSUMED_MESSAGES;
+import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.PUBLISHED_MESSAGES;
+import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.REJECTED_MESSAGES;
+
+// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
+//
+// This software, the RabbitMQ Java client library, is triple-licensed under the
+// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
+// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
+// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
+// please see LICENSE-APACHE2.
+//
+// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
+// either express or implied. See the LICENSE file for specific language governing
+// rights and limitations of this software.
+//
+// If you have any questions regarding licensing, please contact us at
+// info@rabbitmq.com.
+
+/**
+ * Micrometer implementation of {@link MetricsCollector}.
+ * Note transactions are not supported (see {@link MetricsCollector}.
+ * Micrometer provides out-of-the-box support for report backends like JMX,
+ * Graphite, Ganglia, Atlas, Datadog, etc. See Micrometer documentation for
+ * more details.
+ *
+ * Note Micrometer requires Java 8 or more, so does this {@link MetricsCollector}.
+ *
+ * @see MetricsCollector
+ * @since 4.3.0
+ */
+public class MicrometerMetricsCollector extends AbstractMetricsCollector {
+
+ private final AtomicLong connections;
+
+ private final AtomicLong channels;
+
+ private final Counter publishedMessages;
+
+ private final Counter consumedMessages;
+
+ private final Counter acknowledgedMessages;
+
+ private final Counter rejectedMessages;
+
+ public MicrometerMetricsCollector(MeterRegistry registry) {
+ this(registry, "rabbitmq");
+ }
+
+ public MicrometerMetricsCollector(final MeterRegistry registry, final String prefix) {
+ this(new MetricsCreator() {
+ @Override
+ public Object create(Metrics metric) {
+ return metric.create(registry, prefix);
+ }
+ });
+ }
+
+ public MicrometerMetricsCollector(MetricsCreator creator) {
+ this.connections = (AtomicLong) creator.create(CONNECTIONS);
+ this.channels = (AtomicLong) creator.create(CHANNELS);
+ this.publishedMessages = (Counter) creator.create(PUBLISHED_MESSAGES);
+ this.consumedMessages = (Counter) creator.create(CONSUMED_MESSAGES);
+ this.acknowledgedMessages = (Counter) creator.create(ACKNOWLEDGED_MESSAGES);
+ this.rejectedMessages = (Counter) creator.create(REJECTED_MESSAGES);
+ }
+
+ @Override
+ protected void incrementConnectionCount(Connection connection) {
+ connections.incrementAndGet();
+ }
+
+ @Override
+ protected void decrementConnectionCount(Connection connection) {
+ connections.decrementAndGet();
+ }
+
+ @Override
+ protected void incrementChannelCount(Channel channel) {
+ channels.incrementAndGet();
+ }
+
+ @Override
+ protected void decrementChannelCount(Channel channel) {
+ channels.decrementAndGet();
+ }
+
+ @Override
+ protected void markPublishedMessage() {
+ publishedMessages.increment();
+ }
+
+ @Override
+ protected void markConsumedMessage() {
+ consumedMessages.increment();
+ }
+
+ @Override
+ protected void markAcknowledgedMessage() {
+ acknowledgedMessages.increment();
+ }
+
+ @Override
+ protected void markRejectedMessage() {
+ rejectedMessages.increment();
+ }
+
+ public AtomicLong getConnections() {
+ return connections;
+ }
+
+ public AtomicLong getChannels() {
+ return channels;
+ }
+
+ public Counter getPublishedMessages() {
+ return publishedMessages;
+ }
+
+ public Counter getConsumedMessages() {
+ return consumedMessages;
+ }
+
+ public Counter getAcknowledgedMessages() {
+ return acknowledgedMessages;
+ }
+
+ public Counter getRejectedMessages() {
+ return rejectedMessages;
+ }
+
+ public enum Metrics {
+ CONNECTIONS {
+ @Override
+ Object create(MeterRegistry registry, String prefix) {
+ return registry.gauge(prefix + ".connections", new AtomicLong(0));
+ }
+ },
+ CHANNELS {
+ @Override
+ Object create(MeterRegistry registry, String prefix) {
+ return registry.gauge(prefix + ".channels", new AtomicLong(0));
+ }
+ },
+ PUBLISHED_MESSAGES {
+ @Override
+ Object create(MeterRegistry registry, String prefix) {
+ return registry.counter(prefix + ".published");
+ }
+ },
+ CONSUMED_MESSAGES {
+ @Override
+ Object create(MeterRegistry registry, String prefix) {
+ return registry.counter(prefix + ".consumed");
+ }
+ },
+ ACKNOWLEDGED_MESSAGES {
+ @Override
+ Object create(MeterRegistry registry, String prefix) {
+ return registry.counter(prefix + ".acknowledged");
+ }
+ },
+ REJECTED_MESSAGES {
+ @Override
+ Object create(MeterRegistry registry, String prefix) {
+ return registry.counter(prefix + ".rejected");
+ }
+ };
+
+ abstract Object create(MeterRegistry registry, String prefix);
+ }
+
+ public interface MetricsCreator {
+
+ Object create(Metrics metric);
+
+ }
+
+}
diff --git a/src/test/java/com/rabbitmq/client/test/ClientTests.java b/src/test/java/com/rabbitmq/client/test/ClientTests.java
index 2507f80952..1162bfa357 100644
--- a/src/test/java/com/rabbitmq/client/test/ClientTests.java
+++ b/src/test/java/com/rabbitmq/client/test/ClientTests.java
@@ -43,7 +43,7 @@
JSONReadWriteTest.class,
SharedThreadPoolTest.class,
DnsRecordIpAddressResolverTests.class,
- StandardMetricsCollectorTest.class,
+ MetricsCollectorTest.class,
DnsSrvRecordAddressResolverTest.class,
JavaNioTest.class,
ConnectionFactoryTest.class,
diff --git a/src/test/java/com/rabbitmq/client/test/StandardMetricsCollectorTest.java b/src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java
similarity index 56%
rename from src/test/java/com/rabbitmq/client/test/StandardMetricsCollectorTest.java
rename to src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java
index 0616baddb9..5aa6a24cdc 100644
--- a/src/test/java/com/rabbitmq/client/test/StandardMetricsCollectorTest.java
+++ b/src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java
@@ -17,8 +17,14 @@
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.MetricsCollector;
+import com.rabbitmq.client.impl.AbstractMetricsCollector;
+import com.rabbitmq.client.impl.MicrometerMetricsCollector;
import com.rabbitmq.client.impl.StandardMetricsCollector;
+import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
@@ -27,11 +33,23 @@
/**
*
*/
-public class StandardMetricsCollectorTest {
+@RunWith(Parameterized.class)
+public class MetricsCollectorTest {
+
+ @Parameterized.Parameters
+ public static Object[] data() {
+ // need to resort to a factory, as this method is called only once
+ // if creating the collector instance, it's reused across the test methods
+ // and this doesn't work (it cannot be reset)
+ return new Object[] { new StandardMetricsCollectorFactory(), new MicrometerMetricsCollectorFactory() };
+ }
+
+ @Parameterized.Parameter
+ public MetricsCollectorFactory factory;
@Test
public void basicGetAndAck() {
- StandardMetricsCollector metrics = new StandardMetricsCollector();
+ AbstractMetricsCollector metrics = factory.create();
Connection connection = mock(Connection.class);
when(connection.getId()).thenReturn("connection-1");
Channel channel = mock(Channel.class);
@@ -49,20 +67,20 @@ public void basicGetAndAck() {
metrics.consumedMessage(channel, 6, false);
metrics.basicAck(channel, 6, false);
- assertThat(metrics.getAcknowledgedMessages().getCount(), is(1L));
+ assertThat(acknowledgedMessages(metrics), is(1L));
metrics.basicAck(channel, 3, true);
- assertThat(metrics.getAcknowledgedMessages().getCount(), is(1L+2L));
+ assertThat(acknowledgedMessages(metrics), is(1L+2L));
metrics.basicAck(channel, 6, true);
- assertThat(metrics.getAcknowledgedMessages().getCount(), is(1L+2L+1L));
+ assertThat(acknowledgedMessages(metrics), is(1L+2L+1L));
metrics.basicAck(channel, 10, true);
- assertThat(metrics.getAcknowledgedMessages().getCount(), is(1L+2L+1L));
+ assertThat(acknowledgedMessages(metrics), is(1L+2L+1L));
}
@Test public void basicConsumeAndAck() {
- StandardMetricsCollector metrics = new StandardMetricsCollector();
+ AbstractMetricsCollector metrics = factory.create();
Connection connection = mock(Connection.class);
when(connection.getId()).thenReturn("connection-1");
Channel channel = mock(Channel.class);
@@ -78,8 +96,8 @@ public void basicGetAndAck() {
metrics.basicConsume(channel, consumerTagWithManualAck, false);
metrics.consumedMessage(channel, 1, consumerTagWithAutoAck);
- assertThat(metrics.getConsumedMessages().getCount(), is(1L));
- assertThat(metrics.getAcknowledgedMessages().getCount(), is(0L));
+ assertThat(consumedMessages(metrics), is(1L));
+ assertThat(acknowledgedMessages(metrics), is(0L));
metrics.consumedMessage(channel, 2, consumerTagWithManualAck);
metrics.consumedMessage(channel, 3, consumerTagWithManualAck);
@@ -88,21 +106,21 @@ public void basicGetAndAck() {
metrics.consumedMessage(channel, 6, consumerTagWithManualAck);
metrics.basicAck(channel, 6, false);
- assertThat(metrics.getAcknowledgedMessages().getCount(), is(1L));
+ assertThat(acknowledgedMessages(metrics), is(1L));
metrics.basicAck(channel, 3, true);
- assertThat(metrics.getAcknowledgedMessages().getCount(), is(1L+2L));
+ assertThat(acknowledgedMessages(metrics), is(1L+2L));
metrics.basicAck(channel, 6, true);
- assertThat(metrics.getAcknowledgedMessages().getCount(), is(1L+2L+1L));
+ assertThat(acknowledgedMessages(metrics), is(1L+2L+1L));
metrics.basicAck(channel, 10, true);
- assertThat(metrics.getAcknowledgedMessages().getCount(), is(1L+2L+1L));
+ assertThat(acknowledgedMessages(metrics), is(1L+2L+1L));
}
@Test public void cleanStaleState() {
- StandardMetricsCollector metrics = new StandardMetricsCollector();
+ AbstractMetricsCollector metrics = factory.create();
Connection openConnection = mock(Connection.class);
when(openConnection.getId()).thenReturn("connection-1");
when(openConnection.isOpen()).thenReturn(true);
@@ -132,13 +150,63 @@ public void basicGetAndAck() {
metrics.newChannel(closedChannel);
metrics.newChannel(openChannelInClosedConnection);
- assertThat(metrics.getConnections().getCount(), is(2L));
- assertThat(metrics.getChannels().getCount(), is(2L+1L));
+ assertThat(connections(metrics), is(2L));
+ assertThat(channels(metrics), is(2L+1L));
metrics.cleanStaleState();
- assertThat(metrics.getConnections().getCount(), is(1L));
- assertThat(metrics.getChannels().getCount(), is(1L));
+ assertThat(connections(metrics), is(1L));
+ assertThat(channels(metrics), is(1L));
+ }
+
+ long consumedMessages(MetricsCollector metrics) {
+ if (metrics instanceof StandardMetricsCollector) {
+ return ((StandardMetricsCollector) metrics).getConsumedMessages().getCount();
+ } else {
+ return (long) ((MicrometerMetricsCollector) metrics).getConsumedMessages().count();
+ }
+ }
+
+ long acknowledgedMessages(MetricsCollector metrics) {
+ if (metrics instanceof StandardMetricsCollector) {
+ return ((StandardMetricsCollector) metrics).getAcknowledgedMessages().getCount();
+ } else {
+ return (long) ((MicrometerMetricsCollector) metrics).getAcknowledgedMessages().count();
+ }
+ }
+
+ long connections(MetricsCollector metrics) {
+ if (metrics instanceof StandardMetricsCollector) {
+ return ((StandardMetricsCollector) metrics).getConnections().getCount();
+ } else {
+ return ((MicrometerMetricsCollector) metrics).getConnections().get();
+ }
+ }
+
+ long channels(MetricsCollector metrics) {
+ if (metrics instanceof StandardMetricsCollector) {
+ return ((StandardMetricsCollector) metrics).getChannels().getCount();
+ } else {
+ return ((MicrometerMetricsCollector) metrics).getChannels().get();
+ }
+ }
+
+ interface MetricsCollectorFactory {
+ AbstractMetricsCollector create();
+ }
+
+ static class StandardMetricsCollectorFactory implements MetricsCollectorFactory {
+ @Override
+ public AbstractMetricsCollector create() {
+ return new StandardMetricsCollector();
+ }
+ }
+
+ static class MicrometerMetricsCollectorFactory implements MetricsCollectorFactory {
+ @Override
+ public AbstractMetricsCollector create() {
+ return new MicrometerMetricsCollector(new SimpleMeterRegistry());
+ }
}
}