Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@

<slf4j.version>1.7.21</slf4j.version>
<metrics.version>3.1.2</metrics.version>
<micrometer.version>1.0.0-rc.2</micrometer.version>
<logback.version>1.1.7</logback.version>
<commons-cli.version>1.1</commons-cli.version>
<junit.version>4.12</junit.version>
Expand Down Expand Up @@ -633,6 +634,12 @@
<version>${metrics.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
<version>${micrometer.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
Expand Down
193 changes: 193 additions & 0 deletions src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package com.rabbitmq.client.impl;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MetricsCollector;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.ACKNOWLEDGED_MESSAGES;
import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.CHANNELS;
import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.CONNECTIONS;
import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.CONSUMED_MESSAGES;
import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.PUBLISHED_MESSAGES;
import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.REJECTED_MESSAGES;

// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// info@rabbitmq.com.

/**
* Micrometer implementation of {@link MetricsCollector}.
* Note transactions are not supported (see {@link MetricsCollector}.
* Micrometer provides out-of-the-box support for report backends like JMX,
* Graphite, Ganglia, Atlas, Datadog, etc. See Micrometer documentation for
* more details.
*
* Note Micrometer requires Java 8 or more, so does this {@link MetricsCollector}.
*
* @see MetricsCollector
* @since 4.3.0
*/
public class MicrometerMetricsCollector extends AbstractMetricsCollector {

private final AtomicLong connections;

private final AtomicLong channels;

private final Counter publishedMessages;

private final Counter consumedMessages;

private final Counter acknowledgedMessages;

private final Counter rejectedMessages;

public MicrometerMetricsCollector(MeterRegistry registry) {
this(registry, "rabbitmq");
}

public MicrometerMetricsCollector(final MeterRegistry registry, final String prefix) {
this(new MetricsCreator() {
@Override
public Object create(Metrics metric) {
return metric.create(registry, prefix);
}
});
}

public MicrometerMetricsCollector(MetricsCreator creator) {
this.connections = (AtomicLong) creator.create(CONNECTIONS);
this.channels = (AtomicLong) creator.create(CHANNELS);
this.publishedMessages = (Counter) creator.create(PUBLISHED_MESSAGES);
this.consumedMessages = (Counter) creator.create(CONSUMED_MESSAGES);
this.acknowledgedMessages = (Counter) creator.create(ACKNOWLEDGED_MESSAGES);
this.rejectedMessages = (Counter) creator.create(REJECTED_MESSAGES);
}

@Override
protected void incrementConnectionCount(Connection connection) {
connections.incrementAndGet();
}

@Override
protected void decrementConnectionCount(Connection connection) {
connections.decrementAndGet();
}

@Override
protected void incrementChannelCount(Channel channel) {
channels.incrementAndGet();
}

@Override
protected void decrementChannelCount(Channel channel) {
channels.decrementAndGet();
}

@Override
protected void markPublishedMessage() {
publishedMessages.increment();
}

@Override
protected void markConsumedMessage() {
consumedMessages.increment();
}

@Override
protected void markAcknowledgedMessage() {
acknowledgedMessages.increment();
}

@Override
protected void markRejectedMessage() {
rejectedMessages.increment();
}

public AtomicLong getConnections() {
return connections;
}

public AtomicLong getChannels() {
return channels;
}

public Counter getPublishedMessages() {
return publishedMessages;
}

public Counter getConsumedMessages() {
return consumedMessages;
}

public Counter getAcknowledgedMessages() {
return acknowledgedMessages;
}

public Counter getRejectedMessages() {
return rejectedMessages;
}

public enum Metrics {
CONNECTIONS {
@Override
Object create(MeterRegistry registry, String prefix) {
return registry.gauge(prefix + ".connections", new AtomicLong(0));
}
},
CHANNELS {
@Override
Object create(MeterRegistry registry, String prefix) {
return registry.gauge(prefix + ".channels", new AtomicLong(0));
}
},
PUBLISHED_MESSAGES {
@Override
Object create(MeterRegistry registry, String prefix) {
return registry.counter(prefix + ".published");
}
},
CONSUMED_MESSAGES {
@Override
Object create(MeterRegistry registry, String prefix) {
return registry.counter(prefix + ".consumed");
}
},
ACKNOWLEDGED_MESSAGES {
@Override
Object create(MeterRegistry registry, String prefix) {
return registry.counter(prefix + ".acknowledged");
}
},
REJECTED_MESSAGES {
@Override
Object create(MeterRegistry registry, String prefix) {
return registry.counter(prefix + ".rejected");
}
};

abstract Object create(MeterRegistry registry, String prefix);
}

public interface MetricsCreator {

Object create(Metrics metric);

}

}
2 changes: 1 addition & 1 deletion src/test/java/com/rabbitmq/client/test/ClientTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
JSONReadWriteTest.class,
SharedThreadPoolTest.class,
DnsRecordIpAddressResolverTests.class,
StandardMetricsCollectorTest.class,
MetricsCollectorTest.class,
DnsSrvRecordAddressResolverTest.class,
JavaNioTest.class,
ConnectionFactoryTest.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,14 @@

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MetricsCollector;
import com.rabbitmq.client.impl.AbstractMetricsCollector;
import com.rabbitmq.client.impl.MicrometerMetricsCollector;
import com.rabbitmq.client.impl.StandardMetricsCollector;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
Expand All @@ -27,11 +33,23 @@
/**
*
*/
public class StandardMetricsCollectorTest {
@RunWith(Parameterized.class)
public class MetricsCollectorTest {

@Parameterized.Parameters
public static Object[] data() {
// need to resort to a factory, as this method is called only once
// if creating the collector instance, it's reused across the test methods
// and this doesn't work (it cannot be reset)
return new Object[] { new StandardMetricsCollectorFactory(), new MicrometerMetricsCollectorFactory() };
}

@Parameterized.Parameter
public MetricsCollectorFactory factory;

@Test
public void basicGetAndAck() {
StandardMetricsCollector metrics = new StandardMetricsCollector();
AbstractMetricsCollector metrics = factory.create();
Connection connection = mock(Connection.class);
when(connection.getId()).thenReturn("connection-1");
Channel channel = mock(Channel.class);
Expand All @@ -49,20 +67,20 @@ public void basicGetAndAck() {
metrics.consumedMessage(channel, 6, false);

metrics.basicAck(channel, 6, false);
assertThat(metrics.getAcknowledgedMessages().getCount(), is(1L));
assertThat(acknowledgedMessages(metrics), is(1L));

metrics.basicAck(channel, 3, true);
assertThat(metrics.getAcknowledgedMessages().getCount(), is(1L+2L));
assertThat(acknowledgedMessages(metrics), is(1L+2L));

metrics.basicAck(channel, 6, true);
assertThat(metrics.getAcknowledgedMessages().getCount(), is(1L+2L+1L));
assertThat(acknowledgedMessages(metrics), is(1L+2L+1L));

metrics.basicAck(channel, 10, true);
assertThat(metrics.getAcknowledgedMessages().getCount(), is(1L+2L+1L));
assertThat(acknowledgedMessages(metrics), is(1L+2L+1L));
}

@Test public void basicConsumeAndAck() {
StandardMetricsCollector metrics = new StandardMetricsCollector();
AbstractMetricsCollector metrics = factory.create();
Connection connection = mock(Connection.class);
when(connection.getId()).thenReturn("connection-1");
Channel channel = mock(Channel.class);
Expand All @@ -78,8 +96,8 @@ public void basicGetAndAck() {
metrics.basicConsume(channel, consumerTagWithManualAck, false);

metrics.consumedMessage(channel, 1, consumerTagWithAutoAck);
assertThat(metrics.getConsumedMessages().getCount(), is(1L));
assertThat(metrics.getAcknowledgedMessages().getCount(), is(0L));
assertThat(consumedMessages(metrics), is(1L));
assertThat(acknowledgedMessages(metrics), is(0L));

metrics.consumedMessage(channel, 2, consumerTagWithManualAck);
metrics.consumedMessage(channel, 3, consumerTagWithManualAck);
Expand All @@ -88,21 +106,21 @@ public void basicGetAndAck() {
metrics.consumedMessage(channel, 6, consumerTagWithManualAck);

metrics.basicAck(channel, 6, false);
assertThat(metrics.getAcknowledgedMessages().getCount(), is(1L));
assertThat(acknowledgedMessages(metrics), is(1L));

metrics.basicAck(channel, 3, true);
assertThat(metrics.getAcknowledgedMessages().getCount(), is(1L+2L));
assertThat(acknowledgedMessages(metrics), is(1L+2L));

metrics.basicAck(channel, 6, true);
assertThat(metrics.getAcknowledgedMessages().getCount(), is(1L+2L+1L));
assertThat(acknowledgedMessages(metrics), is(1L+2L+1L));

metrics.basicAck(channel, 10, true);
assertThat(metrics.getAcknowledgedMessages().getCount(), is(1L+2L+1L));
assertThat(acknowledgedMessages(metrics), is(1L+2L+1L));

}

@Test public void cleanStaleState() {
StandardMetricsCollector metrics = new StandardMetricsCollector();
AbstractMetricsCollector metrics = factory.create();
Connection openConnection = mock(Connection.class);
when(openConnection.getId()).thenReturn("connection-1");
when(openConnection.isOpen()).thenReturn(true);
Expand Down Expand Up @@ -132,13 +150,63 @@ public void basicGetAndAck() {
metrics.newChannel(closedChannel);
metrics.newChannel(openChannelInClosedConnection);

assertThat(metrics.getConnections().getCount(), is(2L));
assertThat(metrics.getChannels().getCount(), is(2L+1L));
assertThat(connections(metrics), is(2L));
assertThat(channels(metrics), is(2L+1L));

metrics.cleanStaleState();

assertThat(metrics.getConnections().getCount(), is(1L));
assertThat(metrics.getChannels().getCount(), is(1L));
assertThat(connections(metrics), is(1L));
assertThat(channels(metrics), is(1L));
}

long consumedMessages(MetricsCollector metrics) {
if (metrics instanceof StandardMetricsCollector) {
return ((StandardMetricsCollector) metrics).getConsumedMessages().getCount();
} else {
return (long) ((MicrometerMetricsCollector) metrics).getConsumedMessages().count();
}
}

long acknowledgedMessages(MetricsCollector metrics) {
if (metrics instanceof StandardMetricsCollector) {
return ((StandardMetricsCollector) metrics).getAcknowledgedMessages().getCount();
} else {
return (long) ((MicrometerMetricsCollector) metrics).getAcknowledgedMessages().count();
}
}

long connections(MetricsCollector metrics) {
if (metrics instanceof StandardMetricsCollector) {
return ((StandardMetricsCollector) metrics).getConnections().getCount();
} else {
return ((MicrometerMetricsCollector) metrics).getConnections().get();
}
}

long channels(MetricsCollector metrics) {
if (metrics instanceof StandardMetricsCollector) {
return ((StandardMetricsCollector) metrics).getChannels().getCount();
} else {
return ((MicrometerMetricsCollector) metrics).getChannels().get();
}
}

interface MetricsCollectorFactory {
AbstractMetricsCollector create();
}

static class StandardMetricsCollectorFactory implements MetricsCollectorFactory {
@Override
public AbstractMetricsCollector create() {
return new StandardMetricsCollector();
}
}

static class MicrometerMetricsCollectorFactory implements MetricsCollectorFactory {
@Override
public AbstractMetricsCollector create() {
return new MicrometerMetricsCollector(new SimpleMeterRegistry());
}
}

}