diff --git a/src/main/java/com/rabbitmq/client/MetricsCollector.java b/src/main/java/com/rabbitmq/client/MetricsCollector.java index 34bb61f577..101948e6b9 100644 --- a/src/main/java/com/rabbitmq/client/MetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/MetricsCollector.java @@ -36,6 +36,14 @@ public interface MetricsCollector { void basicPublish(Channel channel); + void basicPublishFailure(Channel channel, Throwable cause); + + void basicPublishAck(Channel channel, long deliveryTag, boolean multiple); + + void basicPublishNack(Channel channel, long deliveryTag, boolean multiple); + + void basicPublishUnrouted(Channel channel); + void consumedMessage(Channel channel, long deliveryTag, boolean autoAck); void consumedMessage(Channel channel, long deliveryTag, String consumerTag); @@ -49,4 +57,5 @@ public interface MetricsCollector { void basicConsume(Channel channel, String consumerTag, boolean autoAck); void basicCancel(Channel channel, String consumerTag); + } diff --git a/src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java b/src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java index 3895f34013..30e0d83402 100644 --- a/src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java @@ -70,6 +70,26 @@ public void basicPublish(Channel channel) { } + @Override + public void basicPublishFailure(Channel channel, Throwable cause) { + + } + + @Override + public void basicPublishAck(Channel channel, long deliveryTag, boolean multiple) { + + } + + @Override + public void basicPublishNack(Channel channel, long deliveryTag, boolean multiple) { + + } + + @Override + public void basicPublishUnrouted(Channel channel) { + + } + @Override public void consumedMessage(Channel channel, long deliveryTag, boolean autoAck) { diff --git a/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java b/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java index 71983fb457..99a5b7a138 100644 --- a/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java @@ -102,6 +102,50 @@ public void basicPublish(Channel channel) { } } + @Override + public void basicPublishFailure(Channel channel, Throwable cause) { + try { + markMessagePublishFailed(); + } catch(Exception e) { + LOGGER.info("Error while computing metrics in basicPublishFailure: " + e.getMessage()); + } + } + + @Override + public void basicPublishAck(Channel channel, long deliveryTag, boolean multiple) { + if (multiple) { + // this is a naive approach, as it does not handle multiple nacks + return; + } + try { + markMessagePublishAcknowledged(); + } catch(Exception e) { + LOGGER.info("Error while computing metrics in basicPublishAck: " + e.getMessage()); + } + } + + @Override + public void basicPublishNack(Channel channel, long deliveryTag, boolean multiple) { + if (multiple) { + // this is a naive approach, as it does not handle multiple nacks + return; + } + try { + markMessagePublishNotAcknowledged(); + } catch(Exception e) { + LOGGER.info("Error while computing metrics in basicPublishNack: " + e.getMessage()); + } + } + + @Override + public void basicPublishUnrouted(Channel channel) { + try { + markPublishedMessageNotRouted(); + } catch(Exception e) { + LOGGER.info("Error while computing metrics in markPublishedMessageNotRouted: " + e.getMessage()); + } + } + @Override public void basicConsume(Channel channel, String consumerTag, boolean autoAck) { try { @@ -331,6 +375,11 @@ private ChannelState(Channel channel) { */ protected abstract void markPublishedMessage(); + /** + * Marks the event of a message publishing failure. + */ + protected abstract void markMessagePublishFailed(); + /** * Marks the event of a consumed message. */ @@ -346,6 +395,17 @@ private ChannelState(Channel channel) { */ protected abstract void markRejectedMessage(); + /** + * Marks the event of a message publishing acknowledgement. + */ + protected abstract void markMessagePublishAcknowledged(); - + /** + * Marks the event of a message publishing not being acknowledged. + */ + protected abstract void markMessagePublishNotAcknowledged(); + /** + * Marks the event of a published message not being routed. + */ + protected abstract void markPublishedMessageNotRouted(); } diff --git a/src/main/java/com/rabbitmq/client/impl/ChannelN.java b/src/main/java/com/rabbitmq/client/impl/ChannelN.java index 11d812c407..17bee9ffe7 100644 --- a/src/main/java/com/rabbitmq/client/impl/ChannelN.java +++ b/src/main/java/com/rabbitmq/client/impl/ChannelN.java @@ -481,6 +481,8 @@ private void callReturnListeners(Command command, Basic.Return basicReturn) { } } catch (Throwable ex) { getConnection().getExceptionHandler().handleReturnListenerException(this, ex); + } finally { + metricsCollector.basicPublishUnrouted(this); } } @@ -491,6 +493,8 @@ private void callConfirmListeners(@SuppressWarnings("unused") Command command, B } } catch (Throwable ex) { getConnection().getExceptionHandler().handleConfirmListenerException(this, ex); + } finally { + metricsCollector.basicPublishAck(this, ack.getDeliveryTag(), ack.getMultiple()); } } @@ -501,6 +505,8 @@ private void callConfirmListeners(@SuppressWarnings("unused") Command command, B } } catch (Throwable ex) { getConnection().getExceptionHandler().handleConfirmListenerException(this, ex); + } finally { + metricsCollector.basicPublishNack(this, nack.getDeliveryTag(), nack.getMultiple()); } } @@ -684,22 +690,25 @@ public void basicPublish(String exchange, String routingKey, unconfirmedSet.add(getNextPublishSeqNo()); nextPublishSeqNo++; } - BasicProperties useProps = props; if (props == null) { - useProps = MessageProperties.MINIMAL_BASIC; + props = MessageProperties.MINIMAL_BASIC; + } + AMQCommand command = new AMQCommand( + new Basic.Publish.Builder() + .exchange(exchange) + .routingKey(routingKey) + .mandatory(mandatory) + .immediate(immediate) + .build(), props, body); + try { + transmit(command); + } catch (IOException e) { + metricsCollector.basicPublishFailure(this, e); + throw e; } - transmit(new AMQCommand(new Basic.Publish.Builder() - .exchange(exchange) - .routingKey(routingKey) - .mandatory(mandatory) - .immediate(immediate) - .build(), - useProps, body)); metricsCollector.basicPublish(this); } - - /** Public API - {@inheritDoc} */ @Override public Exchange.DeclareOk exchangeDeclare(String exchange, String type, diff --git a/src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java b/src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java index 1e2b8303f2..ee17194538 100644 --- a/src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java @@ -27,12 +27,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; -import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.ACKNOWLEDGED_MESSAGES; -import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.CHANNELS; -import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.CONNECTIONS; -import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.CONSUMED_MESSAGES; -import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.PUBLISHED_MESSAGES; -import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.REJECTED_MESSAGES; +import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.*; /** * Micrometer implementation of {@link MetricsCollector}. @@ -54,6 +49,14 @@ public class MicrometerMetricsCollector extends AbstractMetricsCollector { private final Counter publishedMessages; + private final Counter failedToPublishMessages; + + private final Counter ackedPublishedMessages; + + private final Counter nackedPublishedMessages; + + private final Counter unroutedPublishedMessages; + private final Counter consumedMessages; private final Counter acknowledgedMessages; @@ -83,6 +86,10 @@ public MicrometerMetricsCollector(Function metricsCreator) { this.consumedMessages = (Counter) metricsCreator.apply(CONSUMED_MESSAGES); this.acknowledgedMessages = (Counter) metricsCreator.apply(ACKNOWLEDGED_MESSAGES); this.rejectedMessages = (Counter) metricsCreator.apply(REJECTED_MESSAGES); + this.failedToPublishMessages = (Counter) metricsCreator.apply(FAILED_TO_PUBLISH_MESSAGES); + this.ackedPublishedMessages = (Counter) metricsCreator.apply(ACKED_PUBLISHED_MESSAGES); + this.nackedPublishedMessages = (Counter) metricsCreator.apply(NACKED_PUBLISHED_MESSAGES); + this.unroutedPublishedMessages = (Counter) metricsCreator.apply(UNROUTED_PUBLISHED_MESSAGES); } @Override @@ -110,6 +117,11 @@ protected void markPublishedMessage() { publishedMessages.increment(); } + @Override + protected void markMessagePublishFailed() { + failedToPublishMessages.increment(); + } + @Override protected void markConsumedMessage() { consumedMessages.increment(); @@ -125,6 +137,21 @@ protected void markRejectedMessage() { rejectedMessages.increment(); } + @Override + protected void markMessagePublishAcknowledged() { + ackedPublishedMessages.increment(); + } + + @Override + protected void markMessagePublishNotAcknowledged() { + nackedPublishedMessages.increment(); + } + + @Override + protected void markPublishedMessageNotRouted() { + unroutedPublishedMessages.increment(); + } + public AtomicLong getConnections() { return connections; } @@ -137,6 +164,22 @@ public Counter getPublishedMessages() { return publishedMessages; } + public Counter getFailedToPublishMessages() { + return failedToPublishMessages; + } + + public Counter getAckedPublishedMessages() { + return ackedPublishedMessages; + } + + public Counter getNackedPublishedMessages() { + return nackedPublishedMessages; + } + + public Counter getUnroutedPublishedMessages() { + return unroutedPublishedMessages; + } + public Counter getConsumedMessages() { return consumedMessages; } @@ -185,6 +228,30 @@ Object create(MeterRegistry registry, String prefix, Iterable tags) { Object create(MeterRegistry registry, String prefix, Iterable tags) { return registry.counter(prefix + ".rejected", tags); } + }, + FAILED_TO_PUBLISH_MESSAGES { + @Override + Object create(MeterRegistry registry, String prefix, Iterable tags) { + return registry.counter(prefix + ".failed_to_publish", tags); + } + }, + ACKED_PUBLISHED_MESSAGES { + @Override + Object create(MeterRegistry registry, String prefix, Iterable tags) { + return registry.counter(prefix + ".acknowledged_published", tags); + } + }, + NACKED_PUBLISHED_MESSAGES { + @Override + Object create(MeterRegistry registry, String prefix, Iterable tags) { + return registry.counter(prefix + ".not_acknowledged_published", tags); + } + }, + UNROUTED_PUBLISHED_MESSAGES { + @Override + Object create(MeterRegistry registry, String prefix, Iterable tags) { + return registry.counter(prefix + ".unrouted_published", tags); + } }; /** @@ -192,7 +259,6 @@ Object create(MeterRegistry registry, String prefix, Iterable tags) { * @param registry * @param prefix * @deprecated will be removed in 6.0.0 - * @return */ @Deprecated Object create(MeterRegistry registry, String prefix) { diff --git a/src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java b/src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java index 08a3939f24..29240bcc48 100644 --- a/src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java @@ -41,6 +41,10 @@ public class StandardMetricsCollector extends AbstractMetricsCollector { private final Meter consumedMessages; private final Meter acknowledgedMessages; private final Meter rejectedMessages; + private final Meter failedToPublishMessages; + private final Meter publishAcknowledgedMessages; + private final Meter publishNacknowledgedMessages; + private final Meter publishUnroutedMessages; public StandardMetricsCollector(MetricRegistry registry, String metricsPrefix) { @@ -48,6 +52,10 @@ public StandardMetricsCollector(MetricRegistry registry, String metricsPrefix) { this.connections = registry.counter(metricsPrefix+".connections"); this.channels = registry.counter(metricsPrefix+".channels"); this.publishedMessages = registry.meter(metricsPrefix+".published"); + this.failedToPublishMessages = registry.meter(metricsPrefix+".failed_to_publish"); + this.publishAcknowledgedMessages = registry.meter(metricsPrefix+".publish_ack"); + this.publishNacknowledgedMessages = registry.meter(metricsPrefix+".publish_nack"); + this.publishUnroutedMessages = registry.meter(metricsPrefix+".publish_unrouted"); this.consumedMessages = registry.meter(metricsPrefix+".consumed"); this.acknowledgedMessages = registry.meter(metricsPrefix+".acknowledged"); this.rejectedMessages = registry.meter(metricsPrefix+".rejected"); @@ -86,6 +94,11 @@ protected void markPublishedMessage() { publishedMessages.mark(); } + @Override + protected void markMessagePublishFailed() { + failedToPublishMessages.mark(); + } + @Override protected void markConsumedMessage() { consumedMessages.mark(); @@ -101,8 +114,21 @@ protected void markRejectedMessage() { rejectedMessages.mark(); } + @Override + protected void markMessagePublishAcknowledged() { + publishAcknowledgedMessages.mark(); + } + + @Override + protected void markMessagePublishNotAcknowledged() { + publishNacknowledgedMessages.mark(); + } + + @Override + protected void markPublishedMessageNotRouted() { + publishUnroutedMessages.mark(); + } - public MetricRegistry getMetricRegistry() { return registry; } @@ -130,4 +156,21 @@ public Meter getAcknowledgedMessages() { public Meter getRejectedMessages() { return rejectedMessages; } + + public Meter getFailedToPublishMessages() { + return failedToPublishMessages; + } + + public Meter getPublishAcknowledgedMessages() { + return publishAcknowledgedMessages; + } + + public Meter getPublishNotAcknowledgedMessages() { + return publishNacknowledgedMessages; + } + + public Meter getPublishUnroutedMessages() { + return publishUnroutedMessages; + } + } diff --git a/src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java b/src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java index 5aa6a24cdc..74f85ac668 100644 --- a/src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java +++ b/src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java @@ -26,9 +26,12 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import static org.hamcrest.Matchers.*; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; +import java.io.IOException; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * @@ -116,7 +119,90 @@ public void basicGetAndAck() { metrics.basicAck(channel, 10, true); assertThat(acknowledgedMessages(metrics), is(1L+2L+1L)); + } + + @Test public void publishingAndPublishingFailures() { + AbstractMetricsCollector metrics = factory.create(); + Channel channel = mock(Channel.class); + + assertThat(failedToPublishMessages(metrics), is(0L)); + assertThat(publishedMessages(metrics), is(0L)); + + metrics.basicPublishFailure(channel, new IOException()); + assertThat(failedToPublishMessages(metrics), is(1L)); + assertThat(publishedMessages(metrics), is(0L)); + + metrics.basicPublish(channel); + assertThat(failedToPublishMessages(metrics), is(1L)); + assertThat(publishedMessages(metrics), is(1L)); + + metrics.basicPublishFailure(channel, new IOException()); + assertThat(failedToPublishMessages(metrics), is(2L)); + assertThat(publishedMessages(metrics), is(1L)); + + metrics.basicPublish(channel); + assertThat(failedToPublishMessages(metrics), is(2L)); + assertThat(publishedMessages(metrics), is(2L)); + + metrics.cleanStaleState(); + assertThat(failedToPublishMessages(metrics), is(2L)); + assertThat(publishedMessages(metrics), is(2L)); + } + @Test public void publishingAcknowledgements() { + long anyDeliveryTag = 123L; + AbstractMetricsCollector metrics = factory.create(); + Channel channel = mock(Channel.class); + // begins with no messages acknowledged + assertThat(publishAck(metrics), is(0L)); + // first acknowledgement gets tracked + metrics.basicPublishAck(channel, anyDeliveryTag, false); + assertThat(publishAck(metrics), is(1L)); + // second acknowledgement gets tracked + metrics.basicPublishAck(channel, anyDeliveryTag, false); + assertThat(publishAck(metrics), is(2L)); + // multiple deliveries aren't tracked + metrics.basicPublishAck(channel, anyDeliveryTag, true); + assertThat(publishAck(metrics), is(2L)); + // cleaning stale state doesn't affect the metric + metrics.cleanStaleState(); + assertThat(publishAck(metrics), is(2L)); + } + + @Test public void publishingNotAcknowledgements() { + long anyDeliveryTag = 123L; + AbstractMetricsCollector metrics = factory.create(); + Channel channel = mock(Channel.class); + // begins with no messages not-acknowledged + assertThat(publishNack(metrics), is(0L)); + // first not-acknowledgement gets tracked + metrics.basicPublishNack(channel, anyDeliveryTag, false); + assertThat(publishNack(metrics), is(1L)); + // second not-acknowledgement gets tracked + metrics.basicPublishNack(channel, anyDeliveryTag, false); + assertThat(publishNack(metrics), is(2L)); + // multiple deliveries aren't tracked + metrics.basicPublishNack(channel, anyDeliveryTag, true); + assertThat(publishNack(metrics), is(2L)); + // cleaning stale state doesn't affect the metric + metrics.cleanStaleState(); + assertThat(publishNack(metrics), is(2L)); + } + + @Test public void publishingUnrouted() { + AbstractMetricsCollector metrics = factory.create(); + Channel channel = mock(Channel.class); + // begins with no messages not-acknowledged + assertThat(publishUnrouted(metrics), is(0L)); + // first unrouted gets tracked + metrics.basicPublishUnrouted(channel); + assertThat(publishUnrouted(metrics), is(1L)); + // second unrouted gets tracked + metrics.basicPublishUnrouted(channel); + assertThat(publishUnrouted(metrics), is(2L)); + // cleaning stale state doesn't affect the metric + metrics.cleanStaleState(); + assertThat(publishUnrouted(metrics), is(2L)); } @Test public void cleanStaleState() { @@ -159,6 +245,47 @@ public void basicGetAndAck() { assertThat(channels(metrics), is(1L)); } + + long publishAck(MetricsCollector metrics) { + if (metrics instanceof StandardMetricsCollector) { + return ((StandardMetricsCollector) metrics).getPublishAcknowledgedMessages().getCount(); + } else { + return (long) ((MicrometerMetricsCollector) metrics).getAckedPublishedMessages().count(); + } + } + + long publishNack(MetricsCollector metrics) { + if (metrics instanceof StandardMetricsCollector) { + return ((StandardMetricsCollector) metrics).getPublishNotAcknowledgedMessages().getCount(); + } else { + return (long) ((MicrometerMetricsCollector) metrics).getNackedPublishedMessages().count(); + } + } + + long publishUnrouted(MetricsCollector metrics) { + if (metrics instanceof StandardMetricsCollector) { + return ((StandardMetricsCollector) metrics).getPublishUnroutedMessages().getCount(); + } else { + return (long) ((MicrometerMetricsCollector) metrics).getUnroutedPublishedMessages().count(); + } + } + + long publishedMessages(MetricsCollector metrics) { + if (metrics instanceof StandardMetricsCollector) { + return ((StandardMetricsCollector) metrics).getPublishedMessages().getCount(); + } else { + return (long) ((MicrometerMetricsCollector) metrics).getPublishedMessages().count(); + } + } + + long failedToPublishMessages(MetricsCollector metrics) { + if (metrics instanceof StandardMetricsCollector) { + return ((StandardMetricsCollector) metrics).getFailedToPublishMessages().getCount(); + } else { + return (long) ((MicrometerMetricsCollector) metrics).getFailedToPublishMessages().count(); + } + } + long consumedMessages(MetricsCollector metrics) { if (metrics instanceof StandardMetricsCollector) { return ((StandardMetricsCollector) metrics).getConsumedMessages().getCount(); diff --git a/src/test/java/com/rabbitmq/client/test/functional/Metrics.java b/src/test/java/com/rabbitmq/client/test/functional/Metrics.java index 70400627f8..783d9e197f 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/Metrics.java +++ b/src/test/java/com/rabbitmq/client/test/functional/Metrics.java @@ -22,6 +22,7 @@ import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.GetResponse; +import com.rabbitmq.client.MessageProperties; import com.rabbitmq.client.Recoverable; import com.rabbitmq.client.RecoveryListener; import com.rabbitmq.client.impl.StandardMetricsCollector; @@ -139,6 +140,52 @@ protected void releaseResources() throws IOException { } } + @Test public void metricsPublisherUnrouted() throws IOException, TimeoutException { + StandardMetricsCollector metrics = new StandardMetricsCollector(); + connectionFactory.setMetricsCollector(metrics); + Connection connection = null; + try { + connection = connectionFactory.newConnection(); + Channel channel = connection.createChannel(); + channel.confirmSelect(); + assertThat(metrics.getPublishUnroutedMessages().getCount(), is(0L)); + // when + channel.basicPublish( + "amq.direct", + "any-unroutable-routing-key", + /* basic.return will be sent back only if the message is mandatory */ true, + MessageProperties.MINIMAL_BASIC, + "any-message".getBytes() + ); + // then + waitAtMost(timeout()).until( + () -> metrics.getPublishUnroutedMessages().getCount(), + equalTo(1L) + ); + } finally { + safeClose(connection); + } + } + + @Test public void metricsPublisherAck() throws IOException, TimeoutException, InterruptedException { + StandardMetricsCollector metrics = new StandardMetricsCollector(); + connectionFactory.setMetricsCollector(metrics); + Connection connection = null; + try { + connection = connectionFactory.newConnection(); + Channel channel = connection.createChannel(); + channel.confirmSelect(); + assertThat(metrics.getPublishAcknowledgedMessages().getCount(), is(0L)); + channel.basicConsume(QUEUE, false, new MultipleAckConsumer(channel, false)); + // when + sendMessage(channel); + channel.waitForConfirms(30 * 60 * 1000); + // then + assertThat(metrics.getPublishAcknowledgedMessages().getCount(), is(1L)); + } finally { + safeClose(connection); + } + } @Test public void metricsAck() throws IOException, TimeoutException { StandardMetricsCollector metrics = new StandardMetricsCollector();