Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/main/java/com/rabbitmq/client/MetricsCollector.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public interface MetricsCollector {

void basicPublish(Channel channel);

void basicPublishFailure(Channel channel, Throwable cause);

void consumedMessage(Channel channel, long deliveryTag, boolean autoAck);

void consumedMessage(Channel channel, long deliveryTag, String consumerTag);
Expand All @@ -49,4 +51,5 @@ public interface MetricsCollector {
void basicConsume(Channel channel, String consumerTag, boolean autoAck);

void basicCancel(Channel channel, String consumerTag);

}
5 changes: 5 additions & 0 deletions src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ public void basicPublish(Channel channel) {

}

@Override
public void basicPublishFailure(Channel channel, Throwable cause) {

}

@Override
public void consumedMessage(Channel channel, long deliveryTag, boolean autoAck) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,15 @@ public void basicPublish(Channel channel) {
}
}

@Override
public void basicPublishFailure(Channel channel, Throwable cause) {
try {
markMessagePublishFailed();
} catch(Exception e) {
LOGGER.info("Error while computing metrics in basicPublishFailure: " + e.getMessage());
}
}

@Override
public void basicConsume(Channel channel, String consumerTag, boolean autoAck) {
try {
Expand Down Expand Up @@ -331,6 +340,11 @@ private ChannelState(Channel channel) {
*/
protected abstract void markPublishedMessage();

/**
* Marks the event of a message publishing failure.
*/
protected abstract void markMessagePublishFailed();

/**
* Marks the event of a consumed message.
*/
Expand Down
25 changes: 14 additions & 11 deletions src/main/java/com/rabbitmq/client/impl/ChannelN.java
Original file line number Diff line number Diff line change
Expand Up @@ -684,22 +684,25 @@ public void basicPublish(String exchange, String routingKey,
unconfirmedSet.add(getNextPublishSeqNo());
nextPublishSeqNo++;
}
BasicProperties useProps = props;
if (props == null) {
useProps = MessageProperties.MINIMAL_BASIC;
props = MessageProperties.MINIMAL_BASIC;
}
AMQCommand command = new AMQCommand(
new Basic.Publish.Builder()
.exchange(exchange)
.routingKey(routingKey)
.mandatory(mandatory)
.immediate(immediate)
.build(), props, body);
try {
transmit(command);
} catch (IOException e) {
metricsCollector.basicPublishFailure(this, e);
throw e;
}
transmit(new AMQCommand(new Basic.Publish.Builder()
.exchange(exchange)
.routingKey(routingKey)
.mandatory(mandatory)
.immediate(immediate)
.build(),
useProps, body));
metricsCollector.basicPublish(this);
}



/** Public API - {@inheritDoc} */
@Override
public Exchange.DeclareOk exchangeDeclare(String exchange, String type,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.ACKNOWLEDGED_MESSAGES;
import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.CHANNELS;
import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.CONNECTIONS;
import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.CONSUMED_MESSAGES;
import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.PUBLISHED_MESSAGES;
import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.REJECTED_MESSAGES;
import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.*;

/**
* Micrometer implementation of {@link MetricsCollector}.
Expand All @@ -54,6 +49,8 @@ public class MicrometerMetricsCollector extends AbstractMetricsCollector {

private final Counter publishedMessages;

private final Counter failedToPublishMessages;

private final Counter consumedMessages;

private final Counter acknowledgedMessages;
Expand Down Expand Up @@ -83,6 +80,7 @@ public MicrometerMetricsCollector(Function<Metrics, Object> metricsCreator) {
this.consumedMessages = (Counter) metricsCreator.apply(CONSUMED_MESSAGES);
this.acknowledgedMessages = (Counter) metricsCreator.apply(ACKNOWLEDGED_MESSAGES);
this.rejectedMessages = (Counter) metricsCreator.apply(REJECTED_MESSAGES);
this.failedToPublishMessages = (Counter) metricsCreator.apply(FAILED_TO_PUBLISH_MESSAGES);
}

@Override
Expand Down Expand Up @@ -110,6 +108,11 @@ protected void markPublishedMessage() {
publishedMessages.increment();
}

@Override
protected void markMessagePublishFailed() {
failedToPublishMessages.increment();
}

@Override
protected void markConsumedMessage() {
consumedMessages.increment();
Expand Down Expand Up @@ -137,6 +140,10 @@ public Counter getPublishedMessages() {
return publishedMessages;
}

public Counter getFailedToPublishMessages() {
return failedToPublishMessages;
}

public Counter getConsumedMessages() {
return consumedMessages;
}
Expand Down Expand Up @@ -185,14 +192,19 @@ Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
return registry.counter(prefix + ".rejected", tags);
}
},
FAILED_TO_PUBLISH_MESSAGES {
@Override
Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
return registry.counter(prefix + ".failed_to_publish", tags);
}
};

/**
*
* @param registry
* @param prefix
* @deprecated will be removed in 6.0.0
* @return
*/
@Deprecated
Object create(MeterRegistry registry, String prefix) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ public class StandardMetricsCollector extends AbstractMetricsCollector {
private final Meter consumedMessages;
private final Meter acknowledgedMessages;
private final Meter rejectedMessages;
private final Meter failedToPublishMessages;


public StandardMetricsCollector(MetricRegistry registry, String metricsPrefix) {
this.registry = registry;
this.connections = registry.counter(metricsPrefix+".connections");
this.channels = registry.counter(metricsPrefix+".channels");
this.publishedMessages = registry.meter(metricsPrefix+".published");
this.failedToPublishMessages = registry.meter(metricsPrefix+".failed_to_publish");
this.consumedMessages = registry.meter(metricsPrefix+".consumed");
this.acknowledgedMessages = registry.meter(metricsPrefix+".acknowledged");
this.rejectedMessages = registry.meter(metricsPrefix+".rejected");
Expand Down Expand Up @@ -86,6 +88,11 @@ protected void markPublishedMessage() {
publishedMessages.mark();
}

@Override
protected void markMessagePublishFailed() {
failedToPublishMessages.mark();
}

@Override
protected void markConsumedMessage() {
consumedMessages.mark();
Expand Down Expand Up @@ -130,4 +137,8 @@ public Meter getAcknowledgedMessages() {
public Meter getRejectedMessages() {
return rejectedMessages;
}

public Meter getFailedToPublishMessages() {
return failedToPublishMessages;
}
}
52 changes: 49 additions & 3 deletions src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import java.io.IOException;

import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
*
Expand Down Expand Up @@ -116,7 +119,34 @@ public void basicGetAndAck() {

metrics.basicAck(channel, 10, true);
assertThat(acknowledgedMessages(metrics), is(1L+2L+1L));
}

@Test public void publishingAndPublishingFailures() {
AbstractMetricsCollector metrics = factory.create();
Channel channel = mock(Channel.class);

assertThat(failedToPublishMessages(metrics), is(0L));
assertThat(publishedMessages(metrics), is(0L));

metrics.basicPublishFailure(channel, new IOException());
assertThat(failedToPublishMessages(metrics), is(1L));
assertThat(publishedMessages(metrics), is(0L));

metrics.basicPublish(channel);
assertThat(failedToPublishMessages(metrics), is(1L));
assertThat(publishedMessages(metrics), is(1L));

metrics.basicPublishFailure(channel, new IOException());
assertThat(failedToPublishMessages(metrics), is(2L));
assertThat(publishedMessages(metrics), is(1L));

metrics.basicPublish(channel);
assertThat(failedToPublishMessages(metrics), is(2L));
assertThat(publishedMessages(metrics), is(2L));

metrics.cleanStaleState();
assertThat(failedToPublishMessages(metrics), is(2L));
assertThat(publishedMessages(metrics), is(2L));
}

@Test public void cleanStaleState() {
Expand Down Expand Up @@ -159,6 +189,22 @@ public void basicGetAndAck() {
assertThat(channels(metrics), is(1L));
}

long publishedMessages(MetricsCollector metrics) {
if (metrics instanceof StandardMetricsCollector) {
return ((StandardMetricsCollector) metrics).getPublishedMessages().getCount();
} else {
return (long) ((MicrometerMetricsCollector) metrics).getPublishedMessages().count();
}
}

long failedToPublishMessages(MetricsCollector metrics) {
if (metrics instanceof StandardMetricsCollector) {
return ((StandardMetricsCollector) metrics).getFailedToPublishMessages().getCount();
} else {
return (long) ((MicrometerMetricsCollector) metrics).getFailedToPublishMessages().count();
}
}

long consumedMessages(MetricsCollector metrics) {
if (metrics instanceof StandardMetricsCollector) {
return ((StandardMetricsCollector) metrics).getConsumedMessages().getCount();
Expand Down