From a89dd9aaa47f9bfbb9066a00906996506d6f675e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Wed, 1 Aug 2018 12:49:41 +0200 Subject: [PATCH 1/4] Add filter to skip entities on topology recovery [#159461281] References #382 Fixes #383 --- .../rabbitmq/client/ConnectionFactory.java | 17 + .../client/impl/ConnectionParams.java | 10 + .../recovery/AutorecoveringConnection.java | 112 ++++--- .../client/impl/recovery/RecordedBinding.java | 4 + .../impl/recovery/TopologyRecoveryFilter.java | 52 +++ .../rabbitmq/client/test/BrokerTestCase.java | 16 + .../test/functional/ConnectionRecovery.java | 1 - .../test/functional/FunctionalTests.java | 3 +- .../functional/TopologyRecoveryFiltering.java | 306 ++++++++++++++++++ 9 files changed, 483 insertions(+), 38 deletions(-) create mode 100644 src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryFilter.java create mode 100644 src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryFiltering.java diff --git a/src/main/java/com/rabbitmq/client/ConnectionFactory.java b/src/main/java/com/rabbitmq/client/ConnectionFactory.java index 5364da2b78..cbbc4338fe 100644 --- a/src/main/java/com/rabbitmq/client/ConnectionFactory.java +++ b/src/main/java/com/rabbitmq/client/ConnectionFactory.java @@ -29,6 +29,8 @@ import com.rabbitmq.client.impl.nio.NioParams; import com.rabbitmq.client.impl.nio.SocketChannelFrameHandlerFactory; import com.rabbitmq.client.impl.recovery.AutorecoveringConnection; +import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter; + import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; @@ -172,6 +174,12 @@ public class ConnectionFactory implements Cloneable { */ private int workPoolTimeout = DEFAULT_WORK_POOL_TIMEOUT; + /** + * Filter to include/exclude entities from topology recovery. + * @since 4.8.0 + */ + private TopologyRecoveryFilter topologyRecoveryFilter; + /** @return the default host to use for connections */ public String getHost() { return host; @@ -1046,6 +1054,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) { result.setChannelShouldCheckRpcResponseType(channelShouldCheckRpcResponseType); result.setWorkPoolTimeout(workPoolTimeout); result.setErrorOnWriteListener(errorOnWriteListener); + result.setTopologyRecoveryFilter(topologyRecoveryFilter); return result; } @@ -1379,4 +1388,12 @@ public int getWorkPoolTimeout() { public void setErrorOnWriteListener(ErrorOnWriteListener errorOnWriteListener) { this.errorOnWriteListener = errorOnWriteListener; } + + /** + * Set filter to include/exclude entities from topology recovery. + * @since 4.8.0 + */ + public void setTopologyRecoveryFilter(TopologyRecoveryFilter topologyRecoveryFilter) { + this.topologyRecoveryFilter = topologyRecoveryFilter; + } } diff --git a/src/main/java/com/rabbitmq/client/impl/ConnectionParams.java b/src/main/java/com/rabbitmq/client/impl/ConnectionParams.java index 08c123bd81..093142b296 100644 --- a/src/main/java/com/rabbitmq/client/impl/ConnectionParams.java +++ b/src/main/java/com/rabbitmq/client/impl/ConnectionParams.java @@ -19,6 +19,7 @@ import com.rabbitmq.client.RecoveryDelayHandler; import com.rabbitmq.client.RecoveryDelayHandler.DefaultRecoveryDelayHandler; import com.rabbitmq.client.SaslConfig; +import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -46,6 +47,7 @@ public class ConnectionParams { private boolean channelShouldCheckRpcResponseType; private ErrorOnWriteListener errorOnWriteListener; private int workPoolTimeout = -1; + private TopologyRecoveryFilter topologyRecoveryFilter; private ExceptionHandler exceptionHandler; private ThreadFactory threadFactory; @@ -235,4 +237,12 @@ public void setWorkPoolTimeout(int workPoolTimeout) { public int getWorkPoolTimeout() { return workPoolTimeout; } + + public void setTopologyRecoveryFilter(TopologyRecoveryFilter topologyRecoveryFilter) { + this.topologyRecoveryFilter = topologyRecoveryFilter; + } + + public TopologyRecoveryFilter getTopologyRecoveryFilter() { + return topologyRecoveryFilter; + } } diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java index 85fe95ba2a..28d64747d1 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java @@ -80,6 +80,8 @@ public class AutorecoveringConnection implements RecoverableConnection, NetworkC private final Map consumers = Collections.synchronizedMap(new LinkedHashMap()); private final List consumerRecoveryListeners = Collections.synchronizedList(new ArrayList()); private final List queueRecoveryListeners = Collections.synchronizedList(new ArrayList()); + + private final TopologyRecoveryFilter topologyRecoveryFilter; // Used to block connection recovery attempts after close() is invoked. private volatile boolean manuallyClosed = false; @@ -103,6 +105,10 @@ public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, setupErrorOnWriteListenerForPotentialRecovery(); this.channels = new ConcurrentHashMap(); + + + this.topologyRecoveryFilter = params.getTopologyRecoveryFilter() == null ? + letAllPassFilter() : params.getTopologyRecoveryFilter(); } private void setupErrorOnWriteListenerForPotentialRecovery() { @@ -133,6 +139,31 @@ public void run() { }); } + private TopologyRecoveryFilter letAllPassFilter() { + return new TopologyRecoveryFilter() { + + @Override + public boolean filterExchange(RecordedExchange recordedExchange) { + return true; + } + + @Override + public boolean filterQueue(RecordedQueue recordedQueue) { + return true; + } + + @Override + public boolean filterBinding(RecordedBinding recordedBinding) { + return true; + } + + @Override + public boolean filterConsumer(RecordedConsumer recordedConsumer) { + return true; + } + }; + } + /** * Private API. * @throws IOException @@ -655,8 +686,10 @@ private void recoverTopology(final ExecutorService executor) { private void recoverExchange(final RecordedExchange x) { // recorded exchanges are guaranteed to be non-predefined (we filter out predefined ones in exchangeDeclare). MK. try { - x.recover(); - LOGGER.debug("{} has recovered", x); + if (topologyRecoveryFilter.filterExchange(x)) { + x.recover(); + LOGGER.debug("{} has recovered", x); + } } catch (Exception cause) { final String message = "Caught an exception while recovering exchange " + x.getName() + ": " + cause.getMessage(); @@ -666,30 +699,33 @@ private void recoverExchange(final RecordedExchange x) { } private void recoverQueue(final String oldName, final RecordedQueue q) { - LOGGER.debug("Recovering {}", q); + try { - q.recover(); - String newName = q.getName(); - if (!oldName.equals(newName)) { - // make sure server-named queues are re-added with - // their new names. MK. - synchronized (this.recordedQueues) { - this.propagateQueueNameChangeToBindings(oldName, newName); - this.propagateQueueNameChangeToConsumers(oldName, newName); - // bug26552: - // remove old name after we've updated the bindings and consumers, - // plus only for server-named queues, both to make sure we don't lose - // anything to recover. MK. - if(q.isServerNamed()) { - deleteRecordedQueue(oldName); + if (topologyRecoveryFilter.filterQueue(q)) { + LOGGER.debug("Recovering {}", q); + q.recover(); + String newName = q.getName(); + if (!oldName.equals(newName)) { + // make sure server-named queues are re-added with + // their new names. MK. + synchronized (this.recordedQueues) { + this.propagateQueueNameChangeToBindings(oldName, newName); + this.propagateQueueNameChangeToConsumers(oldName, newName); + // bug26552: + // remove old name after we've updated the bindings and consumers, + // plus only for server-named queues, both to make sure we don't lose + // anything to recover. MK. + if(q.isServerNamed()) { + deleteRecordedQueue(oldName); + } + this.recordedQueues.put(newName, q); } - this.recordedQueues.put(newName, q); } + for (QueueRecoveryListener qrl : Utility.copy(this.queueRecoveryListeners)) { + qrl.queueRecovered(oldName, newName); + } + LOGGER.debug("{} has recovered", q); } - for (QueueRecoveryListener qrl : Utility.copy(this.queueRecoveryListeners)) { - qrl.queueRecovered(oldName, newName); - } - LOGGER.debug("{} has recovered", q); } catch (Exception cause) { final String message = "Caught an exception while recovering queue " + oldName + ": " + cause.getMessage(); @@ -700,8 +736,10 @@ private void recoverQueue(final String oldName, final RecordedQueue q) { private void recoverBinding(final RecordedBinding b) { try { - b.recover(); - LOGGER.debug("{} has recovered", b); + if (this.topologyRecoveryFilter.filterBinding(b)) { + b.recover(); + LOGGER.debug("{} has recovered", b); + } } catch (Exception cause) { String message = "Caught an exception while recovering binding between " + b.getSource() + " and " + b.getDestination() + ": " + cause.getMessage(); @@ -711,22 +749,24 @@ private void recoverBinding(final RecordedBinding b) { } private void recoverConsumer(final String tag, final RecordedConsumer consumer) { - LOGGER.debug("Recovering {}", consumer); try { - String newTag = consumer.recover(); - // make sure server-generated tags are re-added. MK. - if(tag != null && !tag.equals(newTag)) { - synchronized (this.consumers) { - this.consumers.remove(tag); - this.consumers.put(newTag, consumer); + if (this.topologyRecoveryFilter.filterConsumer(consumer)) { + LOGGER.debug("Recovering {}", consumer); + String newTag = consumer.recover(); + // make sure server-generated tags are re-added. MK. + if(tag != null && !tag.equals(newTag)) { + synchronized (this.consumers) { + this.consumers.remove(tag); + this.consumers.put(newTag, consumer); + } + consumer.getChannel().updateConsumerTag(tag, newTag); } - consumer.getChannel().updateConsumerTag(tag, newTag); - } - for (ConsumerRecoveryListener crl : Utility.copy(this.consumerRecoveryListeners)) { - crl.consumerRecovered(tag, newTag); + for (ConsumerRecoveryListener crl : Utility.copy(this.consumerRecoveryListeners)) { + crl.consumerRecovered(tag, newTag); + } + LOGGER.debug("{} has recovered", consumer); } - LOGGER.debug("{} has recovered", consumer); } catch (Exception cause) { final String message = "Caught an exception while recovering consumer " + tag + ": " + cause.getMessage(); diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/RecordedBinding.java b/src/main/java/com/rabbitmq/client/impl/recovery/RecordedBinding.java index 578ce31dce..4b144ae00b 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/RecordedBinding.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/RecordedBinding.java @@ -59,6 +59,10 @@ public String getDestination() { return destination; } + public String getRoutingKey() { + return routingKey; + } + public Map getArguments() { return arguments; } diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryFilter.java b/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryFilter.java new file mode 100644 index 0000000000..443866fbb8 --- /dev/null +++ b/src/main/java/com/rabbitmq/client/impl/recovery/TopologyRecoveryFilter.java @@ -0,0 +1,52 @@ +// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.impl.recovery; + +/** + * Filter to know whether entities should be recovered or not. + * @since 4.8.0 + */ +public interface TopologyRecoveryFilter { + + /** + * Decides whether an exchange is recovered or not. + * @param recordedExchange + * @return true to recover the exchange, false otherwise + */ + boolean filterExchange(RecordedExchange recordedExchange); + + /** + * Decides whether a queue is recovered or not. + * @param recordedQueue + * @return true to recover the queue, false otherwise + */ + boolean filterQueue(RecordedQueue recordedQueue); + + /** + * Decides whether a binding is recovered or not. + * @param recordedBinding + * @return true to recover the binding, false otherwise + */ + boolean filterBinding(RecordedBinding recordedBinding); + + /** + * Decides whether a consumer is recovered or not. + * @param recordedConsumer + * @return true to recover the consumer, false otherwise + */ + boolean filterConsumer(RecordedConsumer recordedConsumer); + +} diff --git a/src/test/java/com/rabbitmq/client/test/BrokerTestCase.java b/src/test/java/com/rabbitmq/client/test/BrokerTestCase.java index 7a0b508e9d..27ea0cbd0d 100644 --- a/src/test/java/com/rabbitmq/client/test/BrokerTestCase.java +++ b/src/test/java/com/rabbitmq/client/test/BrokerTestCase.java @@ -300,10 +300,26 @@ protected void deleteExchange(String x) throws IOException { channel.exchangeDelete(x); } + protected void deleteExchanges(String [] exchanges) throws IOException { + if (exchanges != null) { + for (String exchange : exchanges) { + deleteExchange(exchange); + } + } + } + protected void deleteQueue(String q) throws IOException { channel.queueDelete(q); } + protected void deleteQueues(String [] queues) throws IOException { + if (queues != null) { + for (String queue : queues) { + deleteQueue(queue); + } + } + } + protected void clearAllResourceAlarms() throws IOException, InterruptedException { clearResourceAlarm("memory"); clearResourceAlarm("disk"); diff --git a/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java b/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java index 50a0c6350e..a89bb16995 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java +++ b/src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java @@ -32,7 +32,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; diff --git a/src/test/java/com/rabbitmq/client/test/functional/FunctionalTests.java b/src/test/java/com/rabbitmq/client/test/functional/FunctionalTests.java index c88b2a7f19..aeadb303dd 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/FunctionalTests.java +++ b/src/test/java/com/rabbitmq/client/test/functional/FunctionalTests.java @@ -77,7 +77,8 @@ BasicGet.class, Nack.class, ExceptionMessages.class, - Metrics.class + Metrics.class, + TopologyRecoveryFiltering.class }) public class FunctionalTests { diff --git a/src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryFiltering.java b/src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryFiltering.java new file mode 100644 index 0000000000..9655aa42fc --- /dev/null +++ b/src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryFiltering.java @@ -0,0 +1,306 @@ +// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +// +// This software, the RabbitMQ Java client library, is triple-licensed under the +// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2 +// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see +// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. + +package com.rabbitmq.client.test.functional; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; +import com.rabbitmq.client.Recoverable; +import com.rabbitmq.client.RecoverableConnection; +import com.rabbitmq.client.RecoveryListener; +import com.rabbitmq.client.ShutdownSignalException; +import com.rabbitmq.client.impl.NetworkConnection; +import com.rabbitmq.client.impl.recovery.AutorecoveringConnection; +import com.rabbitmq.client.impl.recovery.RecordedBinding; +import com.rabbitmq.client.impl.recovery.RecordedConsumer; +import com.rabbitmq.client.impl.recovery.RecordedExchange; +import com.rabbitmq.client.impl.recovery.RecordedQueue; +import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter; +import com.rabbitmq.client.test.BrokerTestCase; +import com.rabbitmq.client.test.TestUtils; +import com.rabbitmq.tools.Host; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.awaitility.Awaitility.waitAtMost; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * + */ +public class TopologyRecoveryFiltering extends BrokerTestCase { + + static { + System.setProperty("rabbitmqctl.bin", "/home/acogoluegnes/Downloads/rabbitmq_server-3.7.7/sbin/rabbitmqctl"); + System.setProperty("test-broker.A.nodename", "rabbit@acogoluegnes-xps"); + } + + String[] exchangesToDelete = new String[] { + "recovered.exchange", "filtered.exchange", "topology.recovery.exchange" + }; + String[] queuesToDelete = new String[] { + "topology.recovery.queue.1", "topology.recovery.queue.2" + }; + Connection c; + + private static boolean sendAndConsumeMessage(String exchange, String routingKey, String queue, Connection c) + throws IOException, TimeoutException, InterruptedException { + Channel ch = c.createChannel(); + try { + ch.confirmSelect(); + final CountDownLatch latch = new CountDownLatch(1); + ch.basicConsume(queue, true, new DefaultConsumer(ch) { + + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + latch.countDown(); + } + }); + ch.basicPublish(exchange, routingKey, null, "".getBytes()); + ch.waitForConfirmsOrDie(5000); + return latch.await(5, TimeUnit.SECONDS); + } finally { + if (ch != null && ch.isOpen()) { + ch.close(); + } + } + } + + private static boolean resourceExists(Callable callback) throws Exception { + Channel declarePassiveChannel = null; + try { + declarePassiveChannel = callback.call(); + return true; + } catch (IOException e) { + if (e.getCause() instanceof ShutdownSignalException) { + ShutdownSignalException cause = (ShutdownSignalException) e.getCause(); + if (cause.getReason() instanceof AMQP.Channel.Close) { + if (((AMQP.Channel.Close) cause.getReason()).getReplyCode() == 404) { + return false; + } else { + throw e; + } + } + return false; + } else { + throw e; + } + } finally { + if (declarePassiveChannel != null && declarePassiveChannel.isOpen()) { + declarePassiveChannel.close(); + } + } + } + + private static boolean queueExists(final String queue, final Connection connection) throws Exception { + return resourceExists(new Callable() { + + @Override + public Channel call() throws Exception { + Channel channel = connection.createChannel(); + channel.queueDeclarePassive(queue); + return channel; + } + }); + } + + private static boolean exchangeExists(final String exchange, final Connection connection) throws Exception { + return resourceExists(new Callable() { + + @Override + public Channel call() throws Exception { + Channel channel = connection.createChannel(); + channel.exchangeDeclarePassive(exchange); + return channel; + } + }); + } + + private static void closeAndWaitForRecovery(RecoverableConnection connection) throws IOException, InterruptedException { + CountDownLatch latch = prepareForRecovery(connection); + Host.closeConnection((NetworkConnection) connection); + wait(latch); + } + + private static CountDownLatch prepareForRecovery(Connection conn) { + final CountDownLatch latch = new CountDownLatch(1); + ((AutorecoveringConnection) conn).addRecoveryListener(new RecoveryListener() { + + @Override + public void handleRecovery(Recoverable recoverable) { + latch.countDown(); + } + + @Override + public void handleRecoveryStarted(Recoverable recoverable) { + // No-op + } + }); + return latch; + } + + private static void wait(CountDownLatch latch) throws InterruptedException { + assertTrue(latch.await(20, TimeUnit.SECONDS)); + } + + @Override + protected ConnectionFactory newConnectionFactory() { + ConnectionFactory connectionFactory = TestUtils.connectionFactory(); + connectionFactory.setTopologyRecoveryFilter(new SimpleTopologyRecoveryFilter()); + connectionFactory.setNetworkRecoveryInterval(1000); + return connectionFactory; + } + + @Override + protected void createResources() throws IOException, TimeoutException { + super.createResources(); + c = connectionFactory.newConnection(); + deleteExchanges(exchangesToDelete); + deleteQueues(queuesToDelete); + } + + @Override + protected void releaseResources() throws IOException { + super.releaseResources(); + c.close(); + deleteExchanges(exchangesToDelete); + deleteQueues(queuesToDelete); + } + + @Test + public void topologyRecoveryFilteringExchangesAndQueues() throws Exception { + Channel ch = c.createChannel(); + ch.exchangeDeclare("recovered.exchange", "direct"); + ch.exchangeDeclare("filtered.exchange", "direct"); + ch.queueDeclare("recovered.queue", false, true, true, null); + ch.queueDeclare("filtered.queue", false, true, true, null); + + // to check whether the other connection recovers them or not + channel.exchangeDelete("recovered.exchange"); + channel.exchangeDelete("filtered.exchange"); + + closeAndWaitForRecovery((RecoverableConnection) c); + + assertTrue(exchangeExists("recovered.exchange", c)); + assertFalse(exchangeExists("filtered.exchange", c)); + + assertTrue(queueExists("recovered.queue", c)); + assertFalse(queueExists("filtered.queue", c)); + } + + @Test + public void topologyRecoveryFilteringBindings() throws Exception { + Channel ch = c.createChannel(); + + ch.exchangeDeclare("topology.recovery.exchange", "direct"); + ch.queueDeclare("topology.recovery.queue.1", false, false, false, null); + ch.queueDeclare("topology.recovery.queue.2", false, false, false, null); + ch.queueBind("topology.recovery.queue.1", "topology.recovery.exchange", "recovered.binding"); + ch.queueBind("topology.recovery.queue.2", "topology.recovery.exchange", "filtered.binding"); + + // to check whether the other connection recovers them or not + channel.queueUnbind("topology.recovery.queue.1", "topology.recovery.exchange", "recovered.binding"); + channel.queueUnbind("topology.recovery.queue.2", "topology.recovery.exchange", "filtered.binding"); + + closeAndWaitForRecovery((RecoverableConnection) c); + + assertTrue("The message should have been received by now", sendAndConsumeMessage( + "topology.recovery.exchange", "recovered.binding", "topology.recovery.queue.1", c + )); + assertFalse("Binding shouldn't recover, no messages should have been received", sendAndConsumeMessage( + "topology.recovery.exchange", "filtered.binding", "topology.recovery.queue.2", c + )); + } + + @Test + public void topologyRecoveryFilteringConsumers() throws Exception { + Channel ch = c.createChannel(); + + ch.exchangeDeclare("topology.recovery.exchange", "direct"); + ch.queueDeclare("topology.recovery.queue.1", false, false, false, null); + ch.queueDeclare("topology.recovery.queue.2", false, false, false, null); + ch.queueBind("topology.recovery.queue.1", "topology.recovery.exchange", "recovered.consumer"); + ch.queueBind("topology.recovery.queue.2", "topology.recovery.exchange", "filtered.consumer"); + + final AtomicInteger recoveredConsumerMessageCount = new AtomicInteger(0); + ch.basicConsume("topology.recovery.queue.1", true, "recovered.consumer", new DefaultConsumer(ch) { + + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + recoveredConsumerMessageCount.incrementAndGet(); + } + }); + ch.basicPublish("topology.recovery.exchange", "recovered.consumer", null, "".getBytes()); + waitAtMost(5, TimeUnit.SECONDS).untilAtomic(recoveredConsumerMessageCount, is(1)); + + final AtomicInteger filteredConsumerMessageCount = new AtomicInteger(0); + final CountDownLatch filteredConsumerLatch = new CountDownLatch(2); + ch.basicConsume("topology.recovery.queue.2", true, "filtered.consumer", new DefaultConsumer(ch) { + + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + filteredConsumerMessageCount.incrementAndGet(); + filteredConsumerLatch.countDown(); + } + }); + ch.basicPublish("topology.recovery.exchange", "filtered.consumer", null, "".getBytes()); + waitAtMost(5, TimeUnit.SECONDS).untilAtomic(filteredConsumerMessageCount, is(1)); + + closeAndWaitForRecovery((RecoverableConnection) c); + + int initialCount = recoveredConsumerMessageCount.get(); + ch.basicPublish("topology.recovery.exchange", "recovered.consumer", null, "".getBytes()); + waitAtMost(5, TimeUnit.SECONDS).untilAtomic(recoveredConsumerMessageCount, is(initialCount + 1)); + + ch.basicPublish("topology.recovery.exchange", "filtered.consumer", null, "".getBytes()); + assertFalse("Consumer shouldn't recover, no extra messages should have been received", + filteredConsumerLatch.await(5, TimeUnit.SECONDS)); + } + + private static class SimpleTopologyRecoveryFilter implements TopologyRecoveryFilter { + + @Override + public boolean filterExchange(RecordedExchange recordedExchange) { + return !recordedExchange.getName().contains("filtered"); + } + + @Override + public boolean filterQueue(RecordedQueue recordedQueue) { + return !recordedQueue.getName().contains("filtered"); + } + + @Override + public boolean filterBinding(RecordedBinding recordedBinding) { + return !recordedBinding.getRoutingKey().contains("filtered"); + } + + @Override + public boolean filterConsumer(RecordedConsumer recordedConsumer) { + return !recordedConsumer.getConsumerTag().contains("filtered"); + } + } +} From 16fb6251624d6f8c7a4b16ebb7b60b63405d0fa1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Wed, 1 Aug 2018 12:52:35 +0200 Subject: [PATCH 2/4] Remove local testing code --- .../client/test/functional/TopologyRecoveryFiltering.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryFiltering.java b/src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryFiltering.java index 9655aa42fc..803fcf3cc9 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryFiltering.java +++ b/src/test/java/com/rabbitmq/client/test/functional/TopologyRecoveryFiltering.java @@ -54,11 +54,6 @@ */ public class TopologyRecoveryFiltering extends BrokerTestCase { - static { - System.setProperty("rabbitmqctl.bin", "/home/acogoluegnes/Downloads/rabbitmq_server-3.7.7/sbin/rabbitmqctl"); - System.setProperty("test-broker.A.nodename", "rabbit@acogoluegnes-xps"); - } - String[] exchangesToDelete = new String[] { "recovered.exchange", "filtered.exchange", "topology.recovery.exchange" }; From 305af33df9923313527b86643be09429a60fc8ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Wed, 1 Aug 2018 13:11:23 +0200 Subject: [PATCH 3/4] Add test case to avoid broken RpcClient after broken restart [#159461281] References #383 Fixes #382 --- .../com/rabbitmq/client/test/RpcTest.java | 117 ++++++++++++++++-- 1 file changed, 108 insertions(+), 9 deletions(-) diff --git a/src/test/java/com/rabbitmq/client/test/RpcTest.java b/src/test/java/com/rabbitmq/client/test/RpcTest.java index 4c919d4ed9..b7a11ce9a5 100644 --- a/src/test/java/com/rabbitmq/client/test/RpcTest.java +++ b/src/test/java/com/rabbitmq/client/test/RpcTest.java @@ -13,12 +13,25 @@ // If you have any questions regarding licensing, please contact us at // info@rabbitmq.com. - package com.rabbitmq.client.test; -import com.rabbitmq.client.*; +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.QueueingConsumer; +import com.rabbitmq.client.Recoverable; +import com.rabbitmq.client.RecoveryListener; +import com.rabbitmq.client.RpcClient; +import com.rabbitmq.client.RpcServer; +import com.rabbitmq.client.ShutdownSignalException; import com.rabbitmq.client.impl.NetworkConnection; import com.rabbitmq.client.impl.recovery.AutorecoveringConnection; +import com.rabbitmq.client.impl.recovery.RecordedBinding; +import com.rabbitmq.client.impl.recovery.RecordedConsumer; +import com.rabbitmq.client.impl.recovery.RecordedExchange; +import com.rabbitmq.client.impl.recovery.RecordedQueue; +import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter; import com.rabbitmq.tools.Host; import org.junit.After; import org.junit.Before; @@ -32,6 +45,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class RpcTest { @@ -40,7 +54,8 @@ public class RpcTest { String queue = "rpc.queue"; RpcServer rpcServer; - @Before public void init() throws Exception { + @Before + public void init() throws Exception { clientConnection = TestUtils.connectionFactory().newConnection(); clientChannel = clientConnection.createChannel(); serverConnection = TestUtils.connectionFactory().newConnection(); @@ -48,11 +63,12 @@ public class RpcTest { serverChannel.queueDeclare(queue, false, false, false, null); } - @After public void tearDown() throws Exception { - if(rpcServer != null) { + @After + public void tearDown() throws Exception { + if (rpcServer != null) { rpcServer.terminateMainloop(); } - if(serverChannel != null) { + if (serverChannel != null) { serverChannel.queueDelete(queue); } clientConnection.close(); @@ -63,6 +79,7 @@ public class RpcTest { public void rpc() throws Exception { rpcServer = new TestRpcServer(serverChannel, queue); new Thread(new Runnable() { + @Override public void run() { try { @@ -81,10 +98,12 @@ public void run() { client.close(); } - @Test public void brokenAfterBrokerRestart() throws Exception { + @Test + public void givenConsumerNotRecoveredCanCreateNewClientOnSameChannelAfterConnectionFailure() throws Exception { // see https://github.com/rabbitmq/rabbitmq-java-client/issues/382 rpcServer = new TestRpcServer(serverChannel, queue); new Thread(new Runnable() { + @Override public void run() { try { @@ -96,8 +115,8 @@ public void run() { }).start(); ConnectionFactory cf = TestUtils.connectionFactory(); - cf.setTopologyRecoveryEnabled(false); - cf.setNetworkRecoveryInterval(2000); + cf.setTopologyRecoveryFilter(new NoDirectReplyToConsumerTopologyRecoveryFilter()); + cf.setNetworkRecoveryInterval(1000); Connection connection = null; try { connection = cf.newConnection(); @@ -107,10 +126,12 @@ public void run() { assertEquals("*** hello ***", new String(response.getBody())); final CountDownLatch recoveryLatch = new CountDownLatch(1); ((AutorecoveringConnection) connection).addRecoveryListener(new RecoveryListener() { + @Override public void handleRecovery(Recoverable recoverable) { recoveryLatch.countDown(); } + @Override public void handleRecoveryStarted(Recoverable recoverable) { @@ -126,7 +147,62 @@ public void handleRecoveryStarted(Recoverable recoverable) { connection.close(); } } + } + + @Test + public void givenConsumerIsRecoveredCanNotCreateNewClientOnSameChannelAfterConnectionFailure() throws Exception { + // see https://github.com/rabbitmq/rabbitmq-java-client/issues/382 + rpcServer = new TestRpcServer(serverChannel, queue); + new Thread(new Runnable() { + + @Override + public void run() { + try { + rpcServer.mainloop(); + } catch (Exception e) { + // safe to ignore when loops ends/server is canceled + } + } + }).start(); + + ConnectionFactory cf = TestUtils.connectionFactory(); + cf.setNetworkRecoveryInterval(1000); + Connection connection = null; + try { + connection = cf.newConnection(); + Channel channel = connection.createChannel(); + RpcClient client = new RpcClient(channel, "", queue, 1000); + RpcClient.Response response = client.doCall(null, "hello".getBytes()); + assertEquals("*** hello ***", new String(response.getBody())); + final CountDownLatch recoveryLatch = new CountDownLatch(1); + ((AutorecoveringConnection) connection).addRecoveryListener(new RecoveryListener() { + @Override + public void handleRecovery(Recoverable recoverable) { + recoveryLatch.countDown(); + } + + @Override + public void handleRecoveryStarted(Recoverable recoverable) { + + } + }); + Host.closeConnection((NetworkConnection) connection); + assertTrue("Connection should have recovered by now", recoveryLatch.await(10, TimeUnit.SECONDS)); + try { + new RpcClient(channel, "", queue, 1000); + fail("Cannot create RPC client on same channel, an exception should have been thrown"); + } catch (IOException e) { + assertTrue(e.getCause() instanceof ShutdownSignalException); + ShutdownSignalException cause = (ShutdownSignalException) e.getCause(); + assertTrue(cause.getReason() instanceof AMQP.Channel.Close); + assertEquals(406, ((AMQP.Channel.Close) cause.getReason()).getReplyCode()); + } + } finally { + if (connection != null) { + connection.close(); + } + } } private static class TestRpcServer extends RpcServer { @@ -157,4 +233,27 @@ protected AMQP.BasicProperties postprocessReplyProperties(QueueingConsumer.Deliv return builder.build(); } } + + private static class NoDirectReplyToConsumerTopologyRecoveryFilter implements TopologyRecoveryFilter { + + @Override + public boolean filterExchange(RecordedExchange recordedExchange) { + return true; + } + + @Override + public boolean filterQueue(RecordedQueue recordedQueue) { + return true; + } + + @Override + public boolean filterBinding(RecordedBinding recordedBinding) { + return true; + } + + @Override + public boolean filterConsumer(RecordedConsumer recordedConsumer) { + return !"amq.rabbitmq.reply-to".equals(recordedConsumer.getQueue()); + } + } } From 47dc236f292153a74922e34b57f26cc536ce2310 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 1 Aug 2018 18:29:39 +0300 Subject: [PATCH 4/4] Drive-by change: correct a path in RUNNING_TESTS.md --- RUNNING_TESTS.md | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/RUNNING_TESTS.md b/RUNNING_TESTS.md index 2619d9699f..a449a0d099 100644 --- a/RUNNING_TESTS.md +++ b/RUNNING_TESTS.md @@ -41,7 +41,7 @@ For details on running specific tests, see below. ## Running a Specific Test Suite -To run a specific test suite you should execute one of the following in the +To run a specific test suite, execute one of the following in the top-level directory of the source tree: * To run the client unit tests: @@ -59,7 +59,14 @@ top-level directory of the source tree: * To run a single test: ``` -./mvnw -Ddeps.dir=$(pwd)/deps/deps verify -Dit.test=DeadLetterExchange +./mvnw -Ddeps.dir=$(pwd)/deps verify -Dit.test=DeadLetterExchange +``` + +When running from the repository cloned as part of the [RabbitMQ public umbrella](https://github.com/rabbitmq/rabbitmq-public-umbrella), +the `deps.dir` property path may have to change, e.g. + +``` +./mvnw -Ddeps.dir=$(pwd)/.. verify -Dit.test=ConnectionRecovery ``` For example, to run the client tests: @@ -175,4 +182,4 @@ mvn verify -P '!setup-test-cluster' ``` Note that by doing so some tests will fail as they require `rabbitmqctl` to -control the running nodes. \ No newline at end of file +control the running nodes.