From 0eacae101eae1fc18b60e2482aece831e3939045 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Turek?= Date: Tue, 10 Apr 2018 12:44:02 +0200 Subject: [PATCH 01/20] #354 | track publishing failures --- .../com/rabbitmq/client/MetricsCollector.java | 3 +++ .../com/rabbitmq/client/impl/ChannelN.java | 25 +++++++++++-------- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/rabbitmq/client/MetricsCollector.java b/src/main/java/com/rabbitmq/client/MetricsCollector.java index 34bb61f577..c4e8ca72c0 100644 --- a/src/main/java/com/rabbitmq/client/MetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/MetricsCollector.java @@ -36,6 +36,8 @@ public interface MetricsCollector { void basicPublish(Channel channel); + void basicPublishFailure(Channel channel); + void consumedMessage(Channel channel, long deliveryTag, boolean autoAck); void consumedMessage(Channel channel, long deliveryTag, String consumerTag); @@ -49,4 +51,5 @@ public interface MetricsCollector { void basicConsume(Channel channel, String consumerTag, boolean autoAck); void basicCancel(Channel channel, String consumerTag); + } diff --git a/src/main/java/com/rabbitmq/client/impl/ChannelN.java b/src/main/java/com/rabbitmq/client/impl/ChannelN.java index 11d812c407..da0a9c9492 100644 --- a/src/main/java/com/rabbitmq/client/impl/ChannelN.java +++ b/src/main/java/com/rabbitmq/client/impl/ChannelN.java @@ -684,22 +684,25 @@ public void basicPublish(String exchange, String routingKey, unconfirmedSet.add(getNextPublishSeqNo()); nextPublishSeqNo++; } - BasicProperties useProps = props; if (props == null) { - useProps = MessageProperties.MINIMAL_BASIC; + props = MessageProperties.MINIMAL_BASIC; + } + AMQCommand command = new AMQCommand( + new Basic.Publish.Builder() + .exchange(exchange) + .routingKey(routingKey) + .mandatory(mandatory) + .immediate(immediate) + .build(), props, body); + try { + transmit(command); + } catch (IOException e) { + metricsCollector.basicPublishFailure(this); + throw e; } - transmit(new AMQCommand(new Basic.Publish.Builder() - .exchange(exchange) - .routingKey(routingKey) - .mandatory(mandatory) - .immediate(immediate) - .build(), - useProps, body)); metricsCollector.basicPublish(this); } - - /** Public API - {@inheritDoc} */ @Override public Exchange.DeclareOk exchangeDeclare(String exchange, String type, From 6e8395c3d910b2a6fdc336087c85aa3d3f73c379 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Turek?= Date: Tue, 10 Apr 2018 12:49:41 +0200 Subject: [PATCH 02/20] #354 | no-op implementation --- src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java b/src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java index 3895f34013..3f25492ba6 100644 --- a/src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java @@ -70,6 +70,11 @@ public void basicPublish(Channel channel) { } + @Override + public void basicPublishFailure(Channel channel) { + + } + @Override public void consumedMessage(Channel channel, long deliveryTag, boolean autoAck) { From ce842ff83eb34701a183bd87a5a68e96aa932572 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Turek?= Date: Tue, 10 Apr 2018 14:59:00 +0200 Subject: [PATCH 03/20] #354 | abstract metrics implementation --- .../client/impl/AbstractMetricsCollector.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java b/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java index 71983fb457..be128aa161 100644 --- a/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java @@ -102,6 +102,15 @@ public void basicPublish(Channel channel) { } } + @Override + public void basicPublishFailure(Channel channel) { + try { + markMessagePublishFailed(); + } catch(Exception e) { + LOGGER.info("Error while computing metrics in basicPublishFailure: " + e.getMessage()); + } + } + @Override public void basicConsume(Channel channel, String consumerTag, boolean autoAck) { try { @@ -331,6 +340,11 @@ private ChannelState(Channel channel) { */ protected abstract void markPublishedMessage(); + /** + * Marks the event of a message publishing failure. + */ + protected abstract void markMessagePublishFailed(); + /** * Marks the event of a consumed message. */ From 40567a02043dd35c9c93517ae311bcf5ecb1e5f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Turek?= Date: Tue, 10 Apr 2018 14:59:11 +0200 Subject: [PATCH 04/20] #354 | micrometer metrics implementation --- .../impl/MicrometerMetricsCollector.java | 26 ++++++++++++++----- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java b/src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java index 1e2b8303f2..a7f65e9018 100644 --- a/src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java @@ -27,12 +27,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; -import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.ACKNOWLEDGED_MESSAGES; -import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.CHANNELS; -import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.CONNECTIONS; -import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.CONSUMED_MESSAGES; -import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.PUBLISHED_MESSAGES; -import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.REJECTED_MESSAGES; +import static com.rabbitmq.client.impl.MicrometerMetricsCollector.Metrics.*; /** * Micrometer implementation of {@link MetricsCollector}. @@ -54,6 +49,8 @@ public class MicrometerMetricsCollector extends AbstractMetricsCollector { private final Counter publishedMessages; + private final Counter failedToPublishMessages; + private final Counter consumedMessages; private final Counter acknowledgedMessages; @@ -83,6 +80,7 @@ public MicrometerMetricsCollector(Function metricsCreator) { this.consumedMessages = (Counter) metricsCreator.apply(CONSUMED_MESSAGES); this.acknowledgedMessages = (Counter) metricsCreator.apply(ACKNOWLEDGED_MESSAGES); this.rejectedMessages = (Counter) metricsCreator.apply(REJECTED_MESSAGES); + this.failedToPublishMessages = (Counter) metricsCreator.apply(FAILED_TO_PUBLISH_MESSAGES); } @Override @@ -110,6 +108,11 @@ protected void markPublishedMessage() { publishedMessages.increment(); } + @Override + protected void markMessagePublishFailed() { + failedToPublishMessages.increment(); + } + @Override protected void markConsumedMessage() { consumedMessages.increment(); @@ -137,6 +140,10 @@ public Counter getPublishedMessages() { return publishedMessages; } + public Counter getFailedToPublishMessages() { + return failedToPublishMessages; + } + public Counter getConsumedMessages() { return consumedMessages; } @@ -185,6 +192,12 @@ Object create(MeterRegistry registry, String prefix, Iterable tags) { Object create(MeterRegistry registry, String prefix, Iterable tags) { return registry.counter(prefix + ".rejected", tags); } + }, + FAILED_TO_PUBLISH_MESSAGES { + @Override + Object create(MeterRegistry registry, String prefix, Iterable tags) { + return registry.counter(prefix + ".failed_to_publish", tags); + } }; /** @@ -192,7 +205,6 @@ Object create(MeterRegistry registry, String prefix, Iterable tags) { * @param registry * @param prefix * @deprecated will be removed in 6.0.0 - * @return */ @Deprecated Object create(MeterRegistry registry, String prefix) { From 403d50a09d683a699da6cbc69af6b662f846f915 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Turek?= Date: Tue, 10 Apr 2018 15:00:26 +0200 Subject: [PATCH 05/20] #354 | standard metrics implementation --- .../client/impl/StandardMetricsCollector.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java b/src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java index 08a3939f24..91f9342b6d 100644 --- a/src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java @@ -41,6 +41,7 @@ public class StandardMetricsCollector extends AbstractMetricsCollector { private final Meter consumedMessages; private final Meter acknowledgedMessages; private final Meter rejectedMessages; + private final Meter failedToPublishMessages; public StandardMetricsCollector(MetricRegistry registry, String metricsPrefix) { @@ -48,6 +49,7 @@ public StandardMetricsCollector(MetricRegistry registry, String metricsPrefix) { this.connections = registry.counter(metricsPrefix+".connections"); this.channels = registry.counter(metricsPrefix+".channels"); this.publishedMessages = registry.meter(metricsPrefix+".published"); + this.failedToPublishMessages = registry.meter(metricsPrefix+".failed_to_publish"); this.consumedMessages = registry.meter(metricsPrefix+".consumed"); this.acknowledgedMessages = registry.meter(metricsPrefix+".acknowledged"); this.rejectedMessages = registry.meter(metricsPrefix+".rejected"); @@ -86,6 +88,11 @@ protected void markPublishedMessage() { publishedMessages.mark(); } + @Override + protected void markMessagePublishFailed() { + failedToPublishMessages.mark(); + } + @Override protected void markConsumedMessage() { consumedMessages.mark(); @@ -130,4 +137,8 @@ public Meter getAcknowledgedMessages() { public Meter getRejectedMessages() { return rejectedMessages; } + + public Meter getFailedToPublishMessages() { + return failedToPublishMessages; + } } From c215793f146cda779442606c82ebb8db7db6ce2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Turek?= Date: Mon, 23 Apr 2018 14:05:35 +0200 Subject: [PATCH 06/20] #354 | cause is passed to the metrics collector --- src/main/java/com/rabbitmq/client/MetricsCollector.java | 2 +- src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java | 2 +- .../java/com/rabbitmq/client/impl/AbstractMetricsCollector.java | 2 +- src/main/java/com/rabbitmq/client/impl/ChannelN.java | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/rabbitmq/client/MetricsCollector.java b/src/main/java/com/rabbitmq/client/MetricsCollector.java index c4e8ca72c0..40e5af3b6a 100644 --- a/src/main/java/com/rabbitmq/client/MetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/MetricsCollector.java @@ -36,7 +36,7 @@ public interface MetricsCollector { void basicPublish(Channel channel); - void basicPublishFailure(Channel channel); + void basicPublishFailure(Channel channel, Throwable cause); void consumedMessage(Channel channel, long deliveryTag, boolean autoAck); diff --git a/src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java b/src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java index 3f25492ba6..6563ab14ce 100644 --- a/src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java @@ -71,7 +71,7 @@ public void basicPublish(Channel channel) { } @Override - public void basicPublishFailure(Channel channel) { + public void basicPublishFailure(Channel channel, Throwable cause) { } diff --git a/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java b/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java index be128aa161..03eb64f468 100644 --- a/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java @@ -103,7 +103,7 @@ public void basicPublish(Channel channel) { } @Override - public void basicPublishFailure(Channel channel) { + public void basicPublishFailure(Channel channel, Throwable cause) { try { markMessagePublishFailed(); } catch(Exception e) { diff --git a/src/main/java/com/rabbitmq/client/impl/ChannelN.java b/src/main/java/com/rabbitmq/client/impl/ChannelN.java index da0a9c9492..1685c249a0 100644 --- a/src/main/java/com/rabbitmq/client/impl/ChannelN.java +++ b/src/main/java/com/rabbitmq/client/impl/ChannelN.java @@ -697,7 +697,7 @@ public void basicPublish(String exchange, String routingKey, try { transmit(command); } catch (IOException e) { - metricsCollector.basicPublishFailure(this); + metricsCollector.basicPublishFailure(this, e); throw e; } metricsCollector.basicPublish(this); From 843db2ba4180a3ae0c7be59505a47a6d0a8520ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Turek?= Date: Mon, 23 Apr 2018 14:39:10 +0200 Subject: [PATCH 07/20] #354 | test for metrics for publishes and failures to publish --- .../client/test/MetricsCollectorTest.java | 52 +++++++++++++++++-- 1 file changed, 49 insertions(+), 3 deletions(-) diff --git a/src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java b/src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java index 5aa6a24cdc..d6b5095ed0 100644 --- a/src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java +++ b/src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java @@ -26,9 +26,12 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import static org.hamcrest.Matchers.*; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; +import java.io.IOException; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * @@ -116,7 +119,34 @@ public void basicGetAndAck() { metrics.basicAck(channel, 10, true); assertThat(acknowledgedMessages(metrics), is(1L+2L+1L)); + } + + @Test public void publishingAndPublishingFailures() { + AbstractMetricsCollector metrics = factory.create(); + Channel channel = mock(Channel.class); + + assertThat(failedToPublishMessages(metrics), is(0L)); + assertThat(publishedMessages(metrics), is(0L)); + metrics.basicPublishFailure(channel, new IOException()); + assertThat(failedToPublishMessages(metrics), is(1L)); + assertThat(publishedMessages(metrics), is(0L)); + + metrics.basicPublish(channel); + assertThat(failedToPublishMessages(metrics), is(1L)); + assertThat(publishedMessages(metrics), is(1L)); + + metrics.basicPublishFailure(channel, new IOException()); + assertThat(failedToPublishMessages(metrics), is(2L)); + assertThat(publishedMessages(metrics), is(1L)); + + metrics.basicPublish(channel); + assertThat(failedToPublishMessages(metrics), is(2L)); + assertThat(publishedMessages(metrics), is(2L)); + + metrics.cleanStaleState(); + assertThat(failedToPublishMessages(metrics), is(2L)); + assertThat(publishedMessages(metrics), is(2L)); } @Test public void cleanStaleState() { @@ -159,6 +189,22 @@ public void basicGetAndAck() { assertThat(channels(metrics), is(1L)); } + long publishedMessages(MetricsCollector metrics) { + if (metrics instanceof StandardMetricsCollector) { + return ((StandardMetricsCollector) metrics).getPublishedMessages().getCount(); + } else { + return (long) ((MicrometerMetricsCollector) metrics).getPublishedMessages().count(); + } + } + + long failedToPublishMessages(MetricsCollector metrics) { + if (metrics instanceof StandardMetricsCollector) { + return ((StandardMetricsCollector) metrics).getFailedToPublishMessages().getCount(); + } else { + return (long) ((MicrometerMetricsCollector) metrics).getFailedToPublishMessages().count(); + } + } + long consumedMessages(MetricsCollector metrics) { if (metrics instanceof StandardMetricsCollector) { return ((StandardMetricsCollector) metrics).getConsumedMessages().getCount(); From deb987a729f8384c1a3166a8429fcf128b78d606 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Turek?= Date: Mon, 23 Apr 2018 21:00:43 +0200 Subject: [PATCH 08/20] #354 | add implementation for ack, nack and unrouted publishes --- .../com/rabbitmq/client/MetricsCollector.java | 6 +++ .../rabbitmq/client/NoOpMetricsCollector.java | 15 ++++++ .../client/impl/AbstractMetricsCollector.java | 40 +++++++++++++- .../impl/MicrometerMetricsCollector.java | 54 +++++++++++++++++++ .../client/impl/StandardMetricsCollector.java | 34 +++++++++++- 5 files changed, 147 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/rabbitmq/client/MetricsCollector.java b/src/main/java/com/rabbitmq/client/MetricsCollector.java index 40e5af3b6a..a67918a4b2 100644 --- a/src/main/java/com/rabbitmq/client/MetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/MetricsCollector.java @@ -38,6 +38,12 @@ public interface MetricsCollector { void basicPublishFailure(Channel channel, Throwable cause); + void basicPublishAck(Channel channel); + + void basicPublishNack(Channel channel); + + void basicPublishUnrouted(Channel channel); + void consumedMessage(Channel channel, long deliveryTag, boolean autoAck); void consumedMessage(Channel channel, long deliveryTag, String consumerTag); diff --git a/src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java b/src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java index 6563ab14ce..6dc4688dd2 100644 --- a/src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java @@ -75,6 +75,21 @@ public void basicPublishFailure(Channel channel, Throwable cause) { } + @Override + public void basicPublishAck(Channel channel) { + + } + + @Override + public void basicPublishNack(Channel channel) { + + } + + @Override + public void basicPublishUnrouted(Channel channel) { + + } + @Override public void consumedMessage(Channel channel, long deliveryTag, boolean autoAck) { diff --git a/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java b/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java index 03eb64f468..6894d09ca4 100644 --- a/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java @@ -111,6 +111,33 @@ public void basicPublishFailure(Channel channel, Throwable cause) { } } + @Override + public void basicPublishAck(Channel channel) { + try { + markMessagePublishAcknowledged(); + } catch(Exception e) { + LOGGER.info("Error while computing metrics in basicPublishAck: " + e.getMessage()); + } + } + + @Override + public void basicPublishNack(Channel channel) { + try { + markMessagePublishNotAcknowledged(); + } catch(Exception e) { + LOGGER.info("Error while computing metrics in basicPublishNack: " + e.getMessage()); + } + } + + @Override + public void basicPublishUnrouted(Channel channel) { + try { + markPublishedMessageNotRouted(); + } catch(Exception e) { + LOGGER.info("Error while computing metrics in markPublishedMessageNotRouted: " + e.getMessage()); + } + } + @Override public void basicConsume(Channel channel, String consumerTag, boolean autoAck) { try { @@ -360,6 +387,17 @@ private ChannelState(Channel channel) { */ protected abstract void markRejectedMessage(); + /** + * Marks the event of a message publishing acknowledgement. + */ + protected abstract void markMessagePublishAcknowledged(); - + /** + * Marks the event of a message publishing not being acknowledged. + */ + protected abstract void markMessagePublishNotAcknowledged(); + /** + * Marks the event of a published message not being routed. + */ + protected abstract void markPublishedMessageNotRouted(); } diff --git a/src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java b/src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java index a7f65e9018..ee17194538 100644 --- a/src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java @@ -51,6 +51,12 @@ public class MicrometerMetricsCollector extends AbstractMetricsCollector { private final Counter failedToPublishMessages; + private final Counter ackedPublishedMessages; + + private final Counter nackedPublishedMessages; + + private final Counter unroutedPublishedMessages; + private final Counter consumedMessages; private final Counter acknowledgedMessages; @@ -81,6 +87,9 @@ public MicrometerMetricsCollector(Function metricsCreator) { this.acknowledgedMessages = (Counter) metricsCreator.apply(ACKNOWLEDGED_MESSAGES); this.rejectedMessages = (Counter) metricsCreator.apply(REJECTED_MESSAGES); this.failedToPublishMessages = (Counter) metricsCreator.apply(FAILED_TO_PUBLISH_MESSAGES); + this.ackedPublishedMessages = (Counter) metricsCreator.apply(ACKED_PUBLISHED_MESSAGES); + this.nackedPublishedMessages = (Counter) metricsCreator.apply(NACKED_PUBLISHED_MESSAGES); + this.unroutedPublishedMessages = (Counter) metricsCreator.apply(UNROUTED_PUBLISHED_MESSAGES); } @Override @@ -128,6 +137,21 @@ protected void markRejectedMessage() { rejectedMessages.increment(); } + @Override + protected void markMessagePublishAcknowledged() { + ackedPublishedMessages.increment(); + } + + @Override + protected void markMessagePublishNotAcknowledged() { + nackedPublishedMessages.increment(); + } + + @Override + protected void markPublishedMessageNotRouted() { + unroutedPublishedMessages.increment(); + } + public AtomicLong getConnections() { return connections; } @@ -144,6 +168,18 @@ public Counter getFailedToPublishMessages() { return failedToPublishMessages; } + public Counter getAckedPublishedMessages() { + return ackedPublishedMessages; + } + + public Counter getNackedPublishedMessages() { + return nackedPublishedMessages; + } + + public Counter getUnroutedPublishedMessages() { + return unroutedPublishedMessages; + } + public Counter getConsumedMessages() { return consumedMessages; } @@ -198,6 +234,24 @@ Object create(MeterRegistry registry, String prefix, Iterable tags) { Object create(MeterRegistry registry, String prefix, Iterable tags) { return registry.counter(prefix + ".failed_to_publish", tags); } + }, + ACKED_PUBLISHED_MESSAGES { + @Override + Object create(MeterRegistry registry, String prefix, Iterable tags) { + return registry.counter(prefix + ".acknowledged_published", tags); + } + }, + NACKED_PUBLISHED_MESSAGES { + @Override + Object create(MeterRegistry registry, String prefix, Iterable tags) { + return registry.counter(prefix + ".not_acknowledged_published", tags); + } + }, + UNROUTED_PUBLISHED_MESSAGES { + @Override + Object create(MeterRegistry registry, String prefix, Iterable tags) { + return registry.counter(prefix + ".unrouted_published", tags); + } }; /** diff --git a/src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java b/src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java index 91f9342b6d..a4062eb3b7 100644 --- a/src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java @@ -42,6 +42,9 @@ public class StandardMetricsCollector extends AbstractMetricsCollector { private final Meter acknowledgedMessages; private final Meter rejectedMessages; private final Meter failedToPublishMessages; + private final Meter publishAcknowledgedMessages; + private final Meter publishNacknowledgedMessages; + private final Meter publishUnroutedMessages; public StandardMetricsCollector(MetricRegistry registry, String metricsPrefix) { @@ -50,6 +53,9 @@ public StandardMetricsCollector(MetricRegistry registry, String metricsPrefix) { this.channels = registry.counter(metricsPrefix+".channels"); this.publishedMessages = registry.meter(metricsPrefix+".published"); this.failedToPublishMessages = registry.meter(metricsPrefix+".failed_to_publish"); + this.publishAcknowledgedMessages = registry.meter(metricsPrefix+".publish_ack"); + this.publishNacknowledgedMessages = registry.meter(metricsPrefix+".publish_nack"); + this.publishUnroutedMessages = registry.meter(metricsPrefix+".publish_unrouted"); this.consumedMessages = registry.meter(metricsPrefix+".consumed"); this.acknowledgedMessages = registry.meter(metricsPrefix+".acknowledged"); this.rejectedMessages = registry.meter(metricsPrefix+".rejected"); @@ -108,8 +114,21 @@ protected void markRejectedMessage() { rejectedMessages.mark(); } + @Override + protected void markMessagePublishAcknowledged() { + acknowledgedMessages.mark(); + } + + @Override + protected void markMessagePublishNotAcknowledged() { + publishNacknowledgedMessages.mark(); + } + + @Override + protected void markPublishedMessageNotRouted() { + publishUnroutedMessages.mark(); + } - public MetricRegistry getMetricRegistry() { return registry; } @@ -141,4 +160,17 @@ public Meter getRejectedMessages() { public Meter getFailedToPublishMessages() { return failedToPublishMessages; } + + public Meter getPublishAcknowledgedMessages() { + return publishAcknowledgedMessages; + } + + public Meter getPublishNotAcknowledgedMessages() { + return publishNacknowledgedMessages; + } + + public Meter getPublishUnroutedMessages() { + return publishUnroutedMessages; + } + } From 704fa1e284865673e84ff0e48a882c7c1d2c348c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Turek?= Date: Mon, 23 Apr 2018 21:58:43 +0200 Subject: [PATCH 09/20] #354 | call metrics (acks and nacks have to be corrected for multiples) --- src/main/java/com/rabbitmq/client/MetricsCollector.java | 4 ++-- src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java | 4 ++-- .../com/rabbitmq/client/impl/AbstractMetricsCollector.java | 4 ++-- src/main/java/com/rabbitmq/client/impl/ChannelN.java | 6 ++++++ 4 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/rabbitmq/client/MetricsCollector.java b/src/main/java/com/rabbitmq/client/MetricsCollector.java index a67918a4b2..101948e6b9 100644 --- a/src/main/java/com/rabbitmq/client/MetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/MetricsCollector.java @@ -38,9 +38,9 @@ public interface MetricsCollector { void basicPublishFailure(Channel channel, Throwable cause); - void basicPublishAck(Channel channel); + void basicPublishAck(Channel channel, long deliveryTag, boolean multiple); - void basicPublishNack(Channel channel); + void basicPublishNack(Channel channel, long deliveryTag, boolean multiple); void basicPublishUnrouted(Channel channel); diff --git a/src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java b/src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java index 6dc4688dd2..30e0d83402 100644 --- a/src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java @@ -76,12 +76,12 @@ public void basicPublishFailure(Channel channel, Throwable cause) { } @Override - public void basicPublishAck(Channel channel) { + public void basicPublishAck(Channel channel, long deliveryTag, boolean multiple) { } @Override - public void basicPublishNack(Channel channel) { + public void basicPublishNack(Channel channel, long deliveryTag, boolean multiple) { } diff --git a/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java b/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java index 6894d09ca4..fd1553acaf 100644 --- a/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java @@ -112,7 +112,7 @@ public void basicPublishFailure(Channel channel, Throwable cause) { } @Override - public void basicPublishAck(Channel channel) { + public void basicPublishAck(Channel channel, long deliveryTag, boolean multiple) { try { markMessagePublishAcknowledged(); } catch(Exception e) { @@ -121,7 +121,7 @@ public void basicPublishAck(Channel channel) { } @Override - public void basicPublishNack(Channel channel) { + public void basicPublishNack(Channel channel, long deliveryTag, boolean multiple) { try { markMessagePublishNotAcknowledged(); } catch(Exception e) { diff --git a/src/main/java/com/rabbitmq/client/impl/ChannelN.java b/src/main/java/com/rabbitmq/client/impl/ChannelN.java index 1685c249a0..17bee9ffe7 100644 --- a/src/main/java/com/rabbitmq/client/impl/ChannelN.java +++ b/src/main/java/com/rabbitmq/client/impl/ChannelN.java @@ -481,6 +481,8 @@ private void callReturnListeners(Command command, Basic.Return basicReturn) { } } catch (Throwable ex) { getConnection().getExceptionHandler().handleReturnListenerException(this, ex); + } finally { + metricsCollector.basicPublishUnrouted(this); } } @@ -491,6 +493,8 @@ private void callConfirmListeners(@SuppressWarnings("unused") Command command, B } } catch (Throwable ex) { getConnection().getExceptionHandler().handleConfirmListenerException(this, ex); + } finally { + metricsCollector.basicPublishAck(this, ack.getDeliveryTag(), ack.getMultiple()); } } @@ -501,6 +505,8 @@ private void callConfirmListeners(@SuppressWarnings("unused") Command command, B } } catch (Throwable ex) { getConnection().getExceptionHandler().handleConfirmListenerException(this, ex); + } finally { + metricsCollector.basicPublishNack(this, nack.getDeliveryTag(), nack.getMultiple()); } } From 5bff5dd0bab1e333c59889c77ab9a1d86ddf3999 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Turek?= Date: Tue, 24 Apr 2018 12:37:16 +0200 Subject: [PATCH 10/20] #354 | ignore acks and nacks for multiple messages Metrics for multiple acks and nacks are unsupported in this naive approach. --- .../rabbitmq/client/impl/AbstractMetricsCollector.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java b/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java index fd1553acaf..99a5b7a138 100644 --- a/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java @@ -113,6 +113,10 @@ public void basicPublishFailure(Channel channel, Throwable cause) { @Override public void basicPublishAck(Channel channel, long deliveryTag, boolean multiple) { + if (multiple) { + // this is a naive approach, as it does not handle multiple nacks + return; + } try { markMessagePublishAcknowledged(); } catch(Exception e) { @@ -122,6 +126,10 @@ public void basicPublishAck(Channel channel, long deliveryTag, boolean multiple) @Override public void basicPublishNack(Channel channel, long deliveryTag, boolean multiple) { + if (multiple) { + // this is a naive approach, as it does not handle multiple nacks + return; + } try { markMessagePublishNotAcknowledged(); } catch(Exception e) { From cbf70455018e5b6f467b4d997922640d2e07b101 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Turek?= Date: Tue, 24 Apr 2018 14:11:46 +0200 Subject: [PATCH 11/20] #354 | fix acks --- .../java/com/rabbitmq/client/impl/StandardMetricsCollector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java b/src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java index a4062eb3b7..29240bcc48 100644 --- a/src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java @@ -116,7 +116,7 @@ protected void markRejectedMessage() { @Override protected void markMessagePublishAcknowledged() { - acknowledgedMessages.mark(); + publishAcknowledgedMessages.mark(); } @Override From b173c45d9fa267866b5e3f9cc16c7537db9839cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Turek?= Date: Sat, 2 Jun 2018 10:28:59 +0200 Subject: [PATCH 12/20] #354 | add unit tests for publishing acks, nacks and unrouted --- .../client/test/MetricsCollectorTest.java | 85 +++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java b/src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java index d6b5095ed0..2afe01ed37 100644 --- a/src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java +++ b/src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java @@ -147,6 +147,66 @@ public void basicGetAndAck() { metrics.cleanStaleState(); assertThat(failedToPublishMessages(metrics), is(2L)); assertThat(publishedMessages(metrics), is(2L)); + + long anyDeliveryTag = 123L; + metrics.basicNack(channel, anyDeliveryTag); + } + + + @Test public void publishingAcknowledgements() { + long anyDeliveryTag = 123L; + AbstractMetricsCollector metrics = factory.create(); + Channel channel = mock(Channel.class); + // begins with no messages acknowledged + assertThat(publishAck(metrics), is(0)); + // first acknowledgement gets tracked + metrics.basicPublishAck(channel, anyDeliveryTag, false); + assertThat(publishAck(metrics), is(1)); + // second acknowledgement gets tracked + metrics.basicPublishAck(channel, anyDeliveryTag, false); + assertThat(publishAck(metrics), is(2)); + // multiple deliveries aren't tracked + metrics.basicPublishAck(channel, anyDeliveryTag, true); + assertThat(publishAck(metrics), is(2)); + // cleaning stale state doesn't affect the metric + metrics.cleanStaleState(); + assertThat(publishAck(metrics), is(2)); + } + + @Test public void publishingNotAcknowledgements() { + long anyDeliveryTag = 123L; + AbstractMetricsCollector metrics = factory.create(); + Channel channel = mock(Channel.class); + // begins with no messages not-acknowledged + assertThat(publishNack(metrics), is(0)); + // first not-acknowledgement gets tracked + metrics.basicPublishNack(channel, anyDeliveryTag, false); + assertThat(publishNack(metrics), is(1)); + // second not-acknowledgement gets tracked + metrics.basicPublishNack(channel, anyDeliveryTag, false); + assertThat(publishNack(metrics), is(2)); + // multiple deliveries aren't tracked + metrics.basicPublishNack(channel, anyDeliveryTag, true); + assertThat(publishNack(metrics), is(2)); + // cleaning stale state doesn't affect the metric + metrics.cleanStaleState(); + assertThat(publishNack(metrics), is(2)); + } + + @Test public void publishingUnrouted() { + AbstractMetricsCollector metrics = factory.create(); + Channel channel = mock(Channel.class); + // begins with no messages not-acknowledged + assertThat(publishUnrouted(metrics), is(0)); + // first unrouted gets tracked + metrics.basicPublishUnrouted(channel); + assertThat(publishUnrouted(metrics), is(1)); + // second unrouted gets tracked + metrics.basicPublishUnrouted(channel); + assertThat(publishUnrouted(metrics), is(2)); + // cleaning stale state doesn't affect the metric + metrics.cleanStaleState(); + assertThat(publishUnrouted(metrics), is(2)); } @Test public void cleanStaleState() { @@ -189,6 +249,31 @@ public void basicGetAndAck() { assertThat(channels(metrics), is(1L)); } + + long publishAck(MetricsCollector metrics) { + if (metrics instanceof StandardMetricsCollector) { + return ((StandardMetricsCollector) metrics).getPublishAcknowledgedMessages().getCount(); + } else { + return (long) ((MicrometerMetricsCollector) metrics).getAckedPublishedMessages().count(); + } + } + + long publishNack(MetricsCollector metrics) { + if (metrics instanceof StandardMetricsCollector) { + return ((StandardMetricsCollector) metrics).getPublishNotAcknowledgedMessages().getCount(); + } else { + return (long) ((MicrometerMetricsCollector) metrics).getNackedPublishedMessages().count(); + } + } + + long publishUnrouted(MetricsCollector metrics) { + if (metrics instanceof StandardMetricsCollector) { + return ((StandardMetricsCollector) metrics).getPublishUnroutedMessages().getCount(); + } else { + return (long) ((MicrometerMetricsCollector) metrics).getUnroutedPublishedMessages().count(); + } + } + long publishedMessages(MetricsCollector metrics) { if (metrics instanceof StandardMetricsCollector) { return ((StandardMetricsCollector) metrics).getPublishedMessages().getCount(); From aae9e7be811548be13fd66347bb593df926dbec6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Turek?= Date: Sat, 2 Jun 2018 13:05:12 +0200 Subject: [PATCH 13/20] #354 | compare to longs rather than ints --- .../client/test/MetricsCollectorTest.java | 31 +++++++++---------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java b/src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java index 2afe01ed37..e07628149a 100644 --- a/src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java +++ b/src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java @@ -147,9 +147,6 @@ public void basicGetAndAck() { metrics.cleanStaleState(); assertThat(failedToPublishMessages(metrics), is(2L)); assertThat(publishedMessages(metrics), is(2L)); - - long anyDeliveryTag = 123L; - metrics.basicNack(channel, anyDeliveryTag); } @@ -158,19 +155,19 @@ public void basicGetAndAck() { AbstractMetricsCollector metrics = factory.create(); Channel channel = mock(Channel.class); // begins with no messages acknowledged - assertThat(publishAck(metrics), is(0)); + assertThat(publishAck(metrics), is(0L)); // first acknowledgement gets tracked metrics.basicPublishAck(channel, anyDeliveryTag, false); - assertThat(publishAck(metrics), is(1)); + assertThat(publishAck(metrics), is(1L)); // second acknowledgement gets tracked metrics.basicPublishAck(channel, anyDeliveryTag, false); - assertThat(publishAck(metrics), is(2)); + assertThat(publishAck(metrics), is(2L)); // multiple deliveries aren't tracked metrics.basicPublishAck(channel, anyDeliveryTag, true); - assertThat(publishAck(metrics), is(2)); + assertThat(publishAck(metrics), is(2L)); // cleaning stale state doesn't affect the metric metrics.cleanStaleState(); - assertThat(publishAck(metrics), is(2)); + assertThat(publishAck(metrics), is(2L)); } @Test public void publishingNotAcknowledgements() { @@ -178,35 +175,35 @@ public void basicGetAndAck() { AbstractMetricsCollector metrics = factory.create(); Channel channel = mock(Channel.class); // begins with no messages not-acknowledged - assertThat(publishNack(metrics), is(0)); + assertThat(publishNack(metrics), is(0L)); // first not-acknowledgement gets tracked metrics.basicPublishNack(channel, anyDeliveryTag, false); - assertThat(publishNack(metrics), is(1)); + assertThat(publishNack(metrics), is(1L)); // second not-acknowledgement gets tracked metrics.basicPublishNack(channel, anyDeliveryTag, false); - assertThat(publishNack(metrics), is(2)); + assertThat(publishNack(metrics), is(2L)); // multiple deliveries aren't tracked metrics.basicPublishNack(channel, anyDeliveryTag, true); - assertThat(publishNack(metrics), is(2)); + assertThat(publishNack(metrics), is(2L)); // cleaning stale state doesn't affect the metric metrics.cleanStaleState(); - assertThat(publishNack(metrics), is(2)); + assertThat(publishNack(metrics), is(2L)); } @Test public void publishingUnrouted() { AbstractMetricsCollector metrics = factory.create(); Channel channel = mock(Channel.class); // begins with no messages not-acknowledged - assertThat(publishUnrouted(metrics), is(0)); + assertThat(publishUnrouted(metrics), is(0L)); // first unrouted gets tracked metrics.basicPublishUnrouted(channel); - assertThat(publishUnrouted(metrics), is(1)); + assertThat(publishUnrouted(metrics), is(1L)); // second unrouted gets tracked metrics.basicPublishUnrouted(channel); - assertThat(publishUnrouted(metrics), is(2)); + assertThat(publishUnrouted(metrics), is(2L)); // cleaning stale state doesn't affect the metric metrics.cleanStaleState(); - assertThat(publishUnrouted(metrics), is(2)); + assertThat(publishUnrouted(metrics), is(2L)); } @Test public void cleanStaleState() { From c3ac0113d8d42d589f91493bf8a432a38007d87c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Turek?= Date: Sat, 2 Jun 2018 13:05:40 +0200 Subject: [PATCH 14/20] #354 | remove unnecessary newline --- src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java b/src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java index e07628149a..74f85ac668 100644 --- a/src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java +++ b/src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java @@ -149,7 +149,6 @@ public void basicGetAndAck() { assertThat(publishedMessages(metrics), is(2L)); } - @Test public void publishingAcknowledgements() { long anyDeliveryTag = 123L; AbstractMetricsCollector metrics = factory.create(); From 5998ffaa6ceb72c6328d1e3c85970c66d44ad7c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Turek?= Date: Sat, 2 Jun 2018 15:25:21 +0200 Subject: [PATCH 15/20] #354 | Implement integration tests --- .../client/test/functional/Metrics.java | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/src/test/java/com/rabbitmq/client/test/functional/Metrics.java b/src/test/java/com/rabbitmq/client/test/functional/Metrics.java index 70400627f8..27a77c42a4 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/Metrics.java +++ b/src/test/java/com/rabbitmq/client/test/functional/Metrics.java @@ -139,6 +139,49 @@ protected void releaseResources() throws IOException { } } + @Test public void metricsPublisherUnrouted() throws IOException, TimeoutException, InterruptedException { + StandardMetricsCollector metrics = new StandardMetricsCollector(); + connectionFactory.setMetricsCollector(metrics); + Connection connection = null; + try { + connection = connectionFactory.newConnection(); + Channel channel = connection.createChannel(); + channel.confirmSelect(); + assertThat(metrics.getPublishUnroutedMessages(), is(1L)); + // when + channel.basicPublish( + "any-exchange", + "any-routing-key", + MessageProperties.MINIMAL_BASIC, + "any-message".getBytes() + ); + channel.waitForConfirms(30 * 60 * 1000); + // then + assertThat(metrics.getPublishUnroutedMessages(), is(1L)); + } finally { + safeClose(connection); + } + } + + @Test public void metricsPublisherAck() throws IOException, TimeoutException, InterruptedException { + StandardMetricsCollector metrics = new StandardMetricsCollector(); + connectionFactory.setMetricsCollector(metrics); + Connection connection = null; + try { + connection = connectionFactory.newConnection(); + Channel channel = connection.createChannel(); + channel.confirmSelect(); + assertThat(metrics.getPublishAcknowledgedMessages(), is(0L)); + channel.basicConsume(QUEUE, false, new MultipleAckConsumer(channel, false)); + // when + sendMessage(channel); + channel.waitForConfirms(30 * 60 * 1000); + // then + assertThat(metrics.getPublishAcknowledgedMessages(), is(1L)); + } finally { + safeClose(connection); + } + } @Test public void metricsAck() throws IOException, TimeoutException { StandardMetricsCollector metrics = new StandardMetricsCollector(); From 1f1821f067bdd228456e5629eb7b0aeafea984b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Turek?= Date: Sat, 2 Jun 2018 15:32:06 +0200 Subject: [PATCH 16/20] #354 | Add missing import --- src/test/java/com/rabbitmq/client/test/functional/Metrics.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/test/java/com/rabbitmq/client/test/functional/Metrics.java b/src/test/java/com/rabbitmq/client/test/functional/Metrics.java index 27a77c42a4..dd5a37d0a2 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/Metrics.java +++ b/src/test/java/com/rabbitmq/client/test/functional/Metrics.java @@ -22,6 +22,7 @@ import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.GetResponse; +import com.rabbitmq.client.MessageProperties; import com.rabbitmq.client.Recoverable; import com.rabbitmq.client.RecoveryListener; import com.rabbitmq.client.impl.StandardMetricsCollector; From 3f546aab4811b6c10edf304b7397f93dad6a2f02 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Turek?= Date: Sat, 2 Jun 2018 16:24:44 +0200 Subject: [PATCH 17/20] #354 | Assert counts --- .../java/com/rabbitmq/client/test/functional/Metrics.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/test/java/com/rabbitmq/client/test/functional/Metrics.java b/src/test/java/com/rabbitmq/client/test/functional/Metrics.java index dd5a37d0a2..26f0a5aafc 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/Metrics.java +++ b/src/test/java/com/rabbitmq/client/test/functional/Metrics.java @@ -148,7 +148,7 @@ protected void releaseResources() throws IOException { connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.confirmSelect(); - assertThat(metrics.getPublishUnroutedMessages(), is(1L)); + assertThat(metrics.getPublishUnroutedMessages().getCount(), is(1L)); // when channel.basicPublish( "any-exchange", @@ -158,7 +158,7 @@ protected void releaseResources() throws IOException { ); channel.waitForConfirms(30 * 60 * 1000); // then - assertThat(metrics.getPublishUnroutedMessages(), is(1L)); + assertThat(metrics.getPublishUnroutedMessages().getCount(), is(1L)); } finally { safeClose(connection); } @@ -172,13 +172,13 @@ protected void releaseResources() throws IOException { connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.confirmSelect(); - assertThat(metrics.getPublishAcknowledgedMessages(), is(0L)); + assertThat(metrics.getPublishAcknowledgedMessages().getCount(), is(0L)); channel.basicConsume(QUEUE, false, new MultipleAckConsumer(channel, false)); // when sendMessage(channel); channel.waitForConfirms(30 * 60 * 1000); // then - assertThat(metrics.getPublishAcknowledgedMessages(), is(1L)); + assertThat(metrics.getPublishAcknowledgedMessages().getCount(), is(1L)); } finally { safeClose(connection); } From c45ad735495d4cce992b052fbf7098a6666c5f62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Turek?= Date: Sun, 3 Jun 2018 01:13:08 +0200 Subject: [PATCH 18/20] #354 | Declare the exchange and wait until unrouted --- .../java/com/rabbitmq/client/test/functional/Metrics.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/test/java/com/rabbitmq/client/test/functional/Metrics.java b/src/test/java/com/rabbitmq/client/test/functional/Metrics.java index 26f0a5aafc..06c73c026a 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/Metrics.java +++ b/src/test/java/com/rabbitmq/client/test/functional/Metrics.java @@ -150,14 +150,18 @@ protected void releaseResources() throws IOException { channel.confirmSelect(); assertThat(metrics.getPublishUnroutedMessages().getCount(), is(1L)); // when + channel.exchangeDeclare("any-exchange", "direct"); channel.basicPublish( "any-exchange", "any-routing-key", MessageProperties.MINIMAL_BASIC, "any-message".getBytes() ); - channel.waitForConfirms(30 * 60 * 1000); // then + waitAtMost(timeout()).until( + () -> metrics.getPublishUnroutedMessages().getCount(), + equalTo(1L) + ); assertThat(metrics.getPublishUnroutedMessages().getCount(), is(1L)); } finally { safeClose(connection); From d85920516c595cb7e97e32d87b65223288778948 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Turek?= Date: Sun, 3 Jun 2018 10:10:35 +0200 Subject: [PATCH 19/20] #354 | Message has to be mandatory for basic.return to be sent back --- src/test/java/com/rabbitmq/client/test/functional/Metrics.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/com/rabbitmq/client/test/functional/Metrics.java b/src/test/java/com/rabbitmq/client/test/functional/Metrics.java index 06c73c026a..4c8d6e485d 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/Metrics.java +++ b/src/test/java/com/rabbitmq/client/test/functional/Metrics.java @@ -154,6 +154,7 @@ protected void releaseResources() throws IOException { channel.basicPublish( "any-exchange", "any-routing-key", + /* basic.return will be sent back only if the message is mandatory */ true, MessageProperties.MINIMAL_BASIC, "any-message".getBytes() ); @@ -162,7 +163,6 @@ protected void releaseResources() throws IOException { () -> metrics.getPublishUnroutedMessages().getCount(), equalTo(1L) ); - assertThat(metrics.getPublishUnroutedMessages().getCount(), is(1L)); } finally { safeClose(connection); } From b3d2733c13c30938a66143a249156705d5e3cafd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Turek?= Date: Sun, 3 Jun 2018 11:38:36 +0200 Subject: [PATCH 20/20] #354 | Fix pre-condition --- .../com/rabbitmq/client/test/functional/Metrics.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/test/java/com/rabbitmq/client/test/functional/Metrics.java b/src/test/java/com/rabbitmq/client/test/functional/Metrics.java index 4c8d6e485d..783d9e197f 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/Metrics.java +++ b/src/test/java/com/rabbitmq/client/test/functional/Metrics.java @@ -140,7 +140,7 @@ protected void releaseResources() throws IOException { } } - @Test public void metricsPublisherUnrouted() throws IOException, TimeoutException, InterruptedException { + @Test public void metricsPublisherUnrouted() throws IOException, TimeoutException { StandardMetricsCollector metrics = new StandardMetricsCollector(); connectionFactory.setMetricsCollector(metrics); Connection connection = null; @@ -148,12 +148,11 @@ protected void releaseResources() throws IOException { connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.confirmSelect(); - assertThat(metrics.getPublishUnroutedMessages().getCount(), is(1L)); + assertThat(metrics.getPublishUnroutedMessages().getCount(), is(0L)); // when - channel.exchangeDeclare("any-exchange", "direct"); channel.basicPublish( - "any-exchange", - "any-routing-key", + "amq.direct", + "any-unroutable-routing-key", /* basic.return will be sent back only if the message is mandatory */ true, MessageProperties.MINIMAL_BASIC, "any-message".getBytes()