diff --git a/src/main/java/com/rabbitmq/client/MetricsCollector.java b/src/main/java/com/rabbitmq/client/MetricsCollector.java index 34bb61f577..40e5af3b6a 100644 --- a/src/main/java/com/rabbitmq/client/MetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/MetricsCollector.java @@ -36,6 +36,8 @@ public interface MetricsCollector { void basicPublish(Channel channel); + void basicPublishFailure(Channel channel, Throwable cause); + void consumedMessage(Channel channel, long deliveryTag, boolean autoAck); void consumedMessage(Channel channel, long deliveryTag, String consumerTag); @@ -49,4 +51,5 @@ public interface MetricsCollector { void basicConsume(Channel channel, String consumerTag, boolean autoAck); void basicCancel(Channel channel, String consumerTag); + } diff --git a/src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java b/src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java index 3895f34013..6563ab14ce 100644 --- a/src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java @@ -70,6 +70,11 @@ public void basicPublish(Channel channel) { } + @Override + public void basicPublishFailure(Channel channel, Throwable cause) { + + } + @Override public void consumedMessage(Channel channel, long deliveryTag, boolean autoAck) { diff --git a/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java b/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java index 71983fb457..03eb64f468 100644 --- a/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java @@ -102,6 +102,15 @@ public void basicPublish(Channel channel) { } } + @Override + public void basicPublishFailure(Channel channel, Throwable cause) { + try { + markMessagePublishFailed(); + } catch(Exception e) { + LOGGER.info("Error while computing metrics in basicPublishFailure: " + e.getMessage()); + } + } + @Override public void basicConsume(Channel channel, String consumerTag, boolean autoAck) { try { @@ -331,6 +340,11 @@ private ChannelState(Channel channel) { */ protected abstract void markPublishedMessage(); + /** + * Marks the event of a message publishing failure. + */ + protected abstract void markMessagePublishFailed(); + /** * Marks the event of a consumed message. */ diff --git a/src/main/java/com/rabbitmq/client/impl/ChannelN.java b/src/main/java/com/rabbitmq/client/impl/ChannelN.java index 11d812c407..1685c249a0 100644 --- a/src/main/java/com/rabbitmq/client/impl/ChannelN.java +++ b/src/main/java/com/rabbitmq/client/impl/ChannelN.java @@ -684,22 +684,25 @@ public void basicPublish(String exchange, String routingKey, unconfirmedSet.add(getNextPublishSeqNo()); nextPublishSeqNo++; } - BasicProperties useProps = props; if (props == null) { - useProps = MessageProperties.MINIMAL_BASIC; + props = MessageProperties.MINIMAL_BASIC; + } + AMQCommand command = new AMQCommand( + new Basic.Publish.Builder() + .exchange(exchange) + .routingKey(routingKey) + .mandatory(mandatory) + .immediate(immediate) + .build(), props, body); + try { + transmit(command); + } catch (IOException e) { + metricsCollector.basicPublishFailure(this, e); + throw e; } - transmit(new AMQCommand(new Basic.Publish.Builder() - .exchange(exchange) - .routingKey(routingKey) - .mandatory(mandatory) - .immediate(immediate) - .build(), - useProps, body)); metricsCollector.basicPublish(this); } - - /** Public API - {@inheritDoc} */ @Override public Exchange.DeclareOk exchangeDeclare(String exchange, String type, diff --git a/src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java b/src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java index 1e2b8303f2..a7f65e9018 100644 --- a/src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java @@ -27,12 +27,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; -import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.ACKNOWLEDGED_MESSAGES; -import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.CHANNELS; -import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.CONNECTIONS; -import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.CONSUMED_MESSAGES; -import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.PUBLISHED_MESSAGES; -import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.REJECTED_MESSAGES; +import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.*; /** * Micrometer implementation of {@link MetricsCollector}. @@ -54,6 +49,8 @@ public class MicrometerMetricsCollector extends AbstractMetricsCollector { private final Counter publishedMessages; + private final Counter failedToPublishMessages; + private final Counter consumedMessages; private final Counter acknowledgedMessages; @@ -83,6 +80,7 @@ public MicrometerMetricsCollector(Function metricsCreator) { this.consumedMessages = (Counter) metricsCreator.apply(CONSUMED_MESSAGES); this.acknowledgedMessages = (Counter) metricsCreator.apply(ACKNOWLEDGED_MESSAGES); this.rejectedMessages = (Counter) metricsCreator.apply(REJECTED_MESSAGES); + this.failedToPublishMessages = (Counter) metricsCreator.apply(FAILED_TO_PUBLISH_MESSAGES); } @Override @@ -110,6 +108,11 @@ protected void markPublishedMessage() { publishedMessages.increment(); } + @Override + protected void markMessagePublishFailed() { + failedToPublishMessages.increment(); + } + @Override protected void markConsumedMessage() { consumedMessages.increment(); @@ -137,6 +140,10 @@ public Counter getPublishedMessages() { return publishedMessages; } + public Counter getFailedToPublishMessages() { + return failedToPublishMessages; + } + public Counter getConsumedMessages() { return consumedMessages; } @@ -185,6 +192,12 @@ Object create(MeterRegistry registry, String prefix, Iterable tags) { Object create(MeterRegistry registry, String prefix, Iterable tags) { return registry.counter(prefix + ".rejected", tags); } + }, + FAILED_TO_PUBLISH_MESSAGES { + @Override + Object create(MeterRegistry registry, String prefix, Iterable tags) { + return registry.counter(prefix + ".failed_to_publish", tags); + } }; /** @@ -192,7 +205,6 @@ Object create(MeterRegistry registry, String prefix, Iterable tags) { * @param registry * @param prefix * @deprecated will be removed in 6.0.0 - * @return */ @Deprecated Object create(MeterRegistry registry, String prefix) { diff --git a/src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java b/src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java index 08a3939f24..91f9342b6d 100644 --- a/src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java @@ -41,6 +41,7 @@ public class StandardMetricsCollector extends AbstractMetricsCollector { private final Meter consumedMessages; private final Meter acknowledgedMessages; private final Meter rejectedMessages; + private final Meter failedToPublishMessages; public StandardMetricsCollector(MetricRegistry registry, String metricsPrefix) { @@ -48,6 +49,7 @@ public StandardMetricsCollector(MetricRegistry registry, String metricsPrefix) { this.connections = registry.counter(metricsPrefix+".connections"); this.channels = registry.counter(metricsPrefix+".channels"); this.publishedMessages = registry.meter(metricsPrefix+".published"); + this.failedToPublishMessages = registry.meter(metricsPrefix+".failed_to_publish"); this.consumedMessages = registry.meter(metricsPrefix+".consumed"); this.acknowledgedMessages = registry.meter(metricsPrefix+".acknowledged"); this.rejectedMessages = registry.meter(metricsPrefix+".rejected"); @@ -86,6 +88,11 @@ protected void markPublishedMessage() { publishedMessages.mark(); } + @Override + protected void markMessagePublishFailed() { + failedToPublishMessages.mark(); + } + @Override protected void markConsumedMessage() { consumedMessages.mark(); @@ -130,4 +137,8 @@ public Meter getAcknowledgedMessages() { public Meter getRejectedMessages() { return rejectedMessages; } + + public Meter getFailedToPublishMessages() { + return failedToPublishMessages; + } } diff --git a/src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java b/src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java index 5aa6a24cdc..d6b5095ed0 100644 --- a/src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java +++ b/src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java @@ -26,9 +26,12 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import static org.hamcrest.Matchers.*; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; +import java.io.IOException; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * @@ -116,7 +119,34 @@ public void basicGetAndAck() { metrics.basicAck(channel, 10, true); assertThat(acknowledgedMessages(metrics), is(1L+2L+1L)); + } + + @Test public void publishingAndPublishingFailures() { + AbstractMetricsCollector metrics = factory.create(); + Channel channel = mock(Channel.class); + + assertThat(failedToPublishMessages(metrics), is(0L)); + assertThat(publishedMessages(metrics), is(0L)); + metrics.basicPublishFailure(channel, new IOException()); + assertThat(failedToPublishMessages(metrics), is(1L)); + assertThat(publishedMessages(metrics), is(0L)); + + metrics.basicPublish(channel); + assertThat(failedToPublishMessages(metrics), is(1L)); + assertThat(publishedMessages(metrics), is(1L)); + + metrics.basicPublishFailure(channel, new IOException()); + assertThat(failedToPublishMessages(metrics), is(2L)); + assertThat(publishedMessages(metrics), is(1L)); + + metrics.basicPublish(channel); + assertThat(failedToPublishMessages(metrics), is(2L)); + assertThat(publishedMessages(metrics), is(2L)); + + metrics.cleanStaleState(); + assertThat(failedToPublishMessages(metrics), is(2L)); + assertThat(publishedMessages(metrics), is(2L)); } @Test public void cleanStaleState() { @@ -159,6 +189,22 @@ public void basicGetAndAck() { assertThat(channels(metrics), is(1L)); } + long publishedMessages(MetricsCollector metrics) { + if (metrics instanceof StandardMetricsCollector) { + return ((StandardMetricsCollector) metrics).getPublishedMessages().getCount(); + } else { + return (long) ((MicrometerMetricsCollector) metrics).getPublishedMessages().count(); + } + } + + long failedToPublishMessages(MetricsCollector metrics) { + if (metrics instanceof StandardMetricsCollector) { + return ((StandardMetricsCollector) metrics).getFailedToPublishMessages().getCount(); + } else { + return (long) ((MicrometerMetricsCollector) metrics).getFailedToPublishMessages().count(); + } + } + long consumedMessages(MetricsCollector metrics) { if (metrics instanceof StandardMetricsCollector) { return ((StandardMetricsCollector) metrics).getConsumedMessages().getCount();