Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions RUNNING_TESTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ For details on running specific tests, see below.

## Running a Specific Test Suite

To run a specific test suite you should execute one of the following in the
To run a specific test suite, execute one of the following in the
top-level directory of the source tree:

* To run the client unit tests:
Expand All @@ -59,7 +59,14 @@ top-level directory of the source tree:
* To run a single test:

```
./mvnw -Ddeps.dir=$(pwd)/deps/deps verify -Dit.test=DeadLetterExchange
./mvnw -Ddeps.dir=$(pwd)/deps verify -Dit.test=DeadLetterExchange
```

When running from the repository cloned as part of the [RabbitMQ public umbrella](https://github.com/rabbitmq/rabbitmq-public-umbrella),
the `deps.dir` property path may have to change, e.g.

```
./mvnw -Ddeps.dir=$(pwd)/.. verify -Dit.test=ConnectionRecovery
```

For example, to run the client tests:
Expand Down Expand Up @@ -175,4 +182,4 @@ mvn verify -P '!setup-test-cluster'
```

Note that by doing so some tests will fail as they require `rabbitmqctl` to
control the running nodes.
control the running nodes.
17 changes: 17 additions & 0 deletions src/main/java/com/rabbitmq/client/ConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import com.rabbitmq.client.impl.nio.NioParams;
import com.rabbitmq.client.impl.nio.SocketChannelFrameHandlerFactory;
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
Expand Down Expand Up @@ -172,6 +174,12 @@ public class ConnectionFactory implements Cloneable {
*/
private int workPoolTimeout = DEFAULT_WORK_POOL_TIMEOUT;

/**
* Filter to include/exclude entities from topology recovery.
* @since 4.8.0
*/
private TopologyRecoveryFilter topologyRecoveryFilter;

/** @return the default host to use for connections */
public String getHost() {
return host;
Expand Down Expand Up @@ -1046,6 +1054,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
result.setChannelShouldCheckRpcResponseType(channelShouldCheckRpcResponseType);
result.setWorkPoolTimeout(workPoolTimeout);
result.setErrorOnWriteListener(errorOnWriteListener);
result.setTopologyRecoveryFilter(topologyRecoveryFilter);
return result;
}

Expand Down Expand Up @@ -1379,4 +1388,12 @@ public int getWorkPoolTimeout() {
public void setErrorOnWriteListener(ErrorOnWriteListener errorOnWriteListener) {
this.errorOnWriteListener = errorOnWriteListener;
}

/**
* Set filter to include/exclude entities from topology recovery.
* @since 4.8.0
*/
public void setTopologyRecoveryFilter(TopologyRecoveryFilter topologyRecoveryFilter) {
this.topologyRecoveryFilter = topologyRecoveryFilter;
}
}
10 changes: 10 additions & 0 deletions src/main/java/com/rabbitmq/client/impl/ConnectionParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.rabbitmq.client.RecoveryDelayHandler;
import com.rabbitmq.client.RecoveryDelayHandler.DefaultRecoveryDelayHandler;
import com.rabbitmq.client.SaslConfig;
import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter;

import java.util.Map;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -46,6 +47,7 @@ public class ConnectionParams {
private boolean channelShouldCheckRpcResponseType;
private ErrorOnWriteListener errorOnWriteListener;
private int workPoolTimeout = -1;
private TopologyRecoveryFilter topologyRecoveryFilter;

private ExceptionHandler exceptionHandler;
private ThreadFactory threadFactory;
Expand Down Expand Up @@ -235,4 +237,12 @@ public void setWorkPoolTimeout(int workPoolTimeout) {
public int getWorkPoolTimeout() {
return workPoolTimeout;
}

public void setTopologyRecoveryFilter(TopologyRecoveryFilter topologyRecoveryFilter) {
this.topologyRecoveryFilter = topologyRecoveryFilter;
}

public TopologyRecoveryFilter getTopologyRecoveryFilter() {
return topologyRecoveryFilter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ public class AutorecoveringConnection implements RecoverableConnection, NetworkC
private final Map<String, RecordedConsumer> consumers = Collections.synchronizedMap(new LinkedHashMap<String, RecordedConsumer>());
private final List<ConsumerRecoveryListener> consumerRecoveryListeners = Collections.synchronizedList(new ArrayList<ConsumerRecoveryListener>());
private final List<QueueRecoveryListener> queueRecoveryListeners = Collections.synchronizedList(new ArrayList<QueueRecoveryListener>());

private final TopologyRecoveryFilter topologyRecoveryFilter;

// Used to block connection recovery attempts after close() is invoked.
private volatile boolean manuallyClosed = false;
Expand All @@ -103,6 +105,10 @@ public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f,
setupErrorOnWriteListenerForPotentialRecovery();

this.channels = new ConcurrentHashMap<Integer, AutorecoveringChannel>();


this.topologyRecoveryFilter = params.getTopologyRecoveryFilter() == null ?
letAllPassFilter() : params.getTopologyRecoveryFilter();
}

private void setupErrorOnWriteListenerForPotentialRecovery() {
Expand Down Expand Up @@ -133,6 +139,31 @@ public void run() {
});
}

private TopologyRecoveryFilter letAllPassFilter() {
return new TopologyRecoveryFilter() {

@Override
public boolean filterExchange(RecordedExchange recordedExchange) {
return true;
}

@Override
public boolean filterQueue(RecordedQueue recordedQueue) {
return true;
}

@Override
public boolean filterBinding(RecordedBinding recordedBinding) {
return true;
}

@Override
public boolean filterConsumer(RecordedConsumer recordedConsumer) {
return true;
}
};
}

/**
* Private API.
* @throws IOException
Expand Down Expand Up @@ -655,8 +686,10 @@ private void recoverTopology(final ExecutorService executor) {
private void recoverExchange(final RecordedExchange x) {
// recorded exchanges are guaranteed to be non-predefined (we filter out predefined ones in exchangeDeclare). MK.
try {
x.recover();
LOGGER.debug("{} has recovered", x);
if (topologyRecoveryFilter.filterExchange(x)) {
x.recover();
LOGGER.debug("{} has recovered", x);
}
} catch (Exception cause) {
final String message = "Caught an exception while recovering exchange " + x.getName() +
": " + cause.getMessage();
Expand All @@ -666,30 +699,33 @@ private void recoverExchange(final RecordedExchange x) {
}

private void recoverQueue(final String oldName, final RecordedQueue q) {
LOGGER.debug("Recovering {}", q);

try {
q.recover();
String newName = q.getName();
if (!oldName.equals(newName)) {
// make sure server-named queues are re-added with
// their new names. MK.
synchronized (this.recordedQueues) {
this.propagateQueueNameChangeToBindings(oldName, newName);
this.propagateQueueNameChangeToConsumers(oldName, newName);
// bug26552:
// remove old name after we've updated the bindings and consumers,
// plus only for server-named queues, both to make sure we don't lose
// anything to recover. MK.
if(q.isServerNamed()) {
deleteRecordedQueue(oldName);
if (topologyRecoveryFilter.filterQueue(q)) {
LOGGER.debug("Recovering {}", q);
q.recover();
String newName = q.getName();
if (!oldName.equals(newName)) {
// make sure server-named queues are re-added with
// their new names. MK.
synchronized (this.recordedQueues) {
this.propagateQueueNameChangeToBindings(oldName, newName);
this.propagateQueueNameChangeToConsumers(oldName, newName);
// bug26552:
// remove old name after we've updated the bindings and consumers,
// plus only for server-named queues, both to make sure we don't lose
// anything to recover. MK.
if(q.isServerNamed()) {
deleteRecordedQueue(oldName);
}
this.recordedQueues.put(newName, q);
}
this.recordedQueues.put(newName, q);
}
for (QueueRecoveryListener qrl : Utility.copy(this.queueRecoveryListeners)) {
qrl.queueRecovered(oldName, newName);
}
LOGGER.debug("{} has recovered", q);
}
for (QueueRecoveryListener qrl : Utility.copy(this.queueRecoveryListeners)) {
qrl.queueRecovered(oldName, newName);
}
LOGGER.debug("{} has recovered", q);
} catch (Exception cause) {
final String message = "Caught an exception while recovering queue " + oldName +
": " + cause.getMessage();
Expand All @@ -700,8 +736,10 @@ private void recoverQueue(final String oldName, final RecordedQueue q) {

private void recoverBinding(final RecordedBinding b) {
try {
b.recover();
LOGGER.debug("{} has recovered", b);
if (this.topologyRecoveryFilter.filterBinding(b)) {
b.recover();
LOGGER.debug("{} has recovered", b);
}
} catch (Exception cause) {
String message = "Caught an exception while recovering binding between " + b.getSource() +
" and " + b.getDestination() + ": " + cause.getMessage();
Expand All @@ -711,22 +749,24 @@ private void recoverBinding(final RecordedBinding b) {
}

private void recoverConsumer(final String tag, final RecordedConsumer consumer) {
LOGGER.debug("Recovering {}", consumer);
try {
String newTag = consumer.recover();
// make sure server-generated tags are re-added. MK.
if(tag != null && !tag.equals(newTag)) {
synchronized (this.consumers) {
this.consumers.remove(tag);
this.consumers.put(newTag, consumer);
if (this.topologyRecoveryFilter.filterConsumer(consumer)) {
LOGGER.debug("Recovering {}", consumer);
String newTag = consumer.recover();
// make sure server-generated tags are re-added. MK.
if(tag != null && !tag.equals(newTag)) {
synchronized (this.consumers) {
this.consumers.remove(tag);
this.consumers.put(newTag, consumer);
}
consumer.getChannel().updateConsumerTag(tag, newTag);
}
consumer.getChannel().updateConsumerTag(tag, newTag);
}

for (ConsumerRecoveryListener crl : Utility.copy(this.consumerRecoveryListeners)) {
crl.consumerRecovered(tag, newTag);
for (ConsumerRecoveryListener crl : Utility.copy(this.consumerRecoveryListeners)) {
crl.consumerRecovered(tag, newTag);
}
LOGGER.debug("{} has recovered", consumer);
}
LOGGER.debug("{} has recovered", consumer);
} catch (Exception cause) {
final String message = "Caught an exception while recovering consumer " + tag +
": " + cause.getMessage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ public String getDestination() {
return destination;
}

public String getRoutingKey() {
return routingKey;
}

public Map<String, Object> getArguments() {
return arguments;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// info@rabbitmq.com.

package com.rabbitmq.client.impl.recovery;

/**
* Filter to know whether entities should be recovered or not.
* @since 4.8.0
*/
public interface TopologyRecoveryFilter {

/**
* Decides whether an exchange is recovered or not.
* @param recordedExchange
* @return true to recover the exchange, false otherwise
*/
boolean filterExchange(RecordedExchange recordedExchange);

/**
* Decides whether a queue is recovered or not.
* @param recordedQueue
* @return true to recover the queue, false otherwise
*/
boolean filterQueue(RecordedQueue recordedQueue);

/**
* Decides whether a binding is recovered or not.
* @param recordedBinding
* @return true to recover the binding, false otherwise
*/
boolean filterBinding(RecordedBinding recordedBinding);

/**
* Decides whether a consumer is recovered or not.
* @param recordedConsumer
* @return true to recover the consumer, false otherwise
*/
boolean filterConsumer(RecordedConsumer recordedConsumer);

}
16 changes: 16 additions & 0 deletions src/test/java/com/rabbitmq/client/test/BrokerTestCase.java
Original file line number Diff line number Diff line change
Expand Up @@ -300,10 +300,26 @@ protected void deleteExchange(String x) throws IOException {
channel.exchangeDelete(x);
}

protected void deleteExchanges(String [] exchanges) throws IOException {
if (exchanges != null) {
for (String exchange : exchanges) {
deleteExchange(exchange);
}
}
}

protected void deleteQueue(String q) throws IOException {
channel.queueDelete(q);
}

protected void deleteQueues(String [] queues) throws IOException {
if (queues != null) {
for (String queue : queues) {
deleteQueue(queue);
}
}
}

protected void clearAllResourceAlarms() throws IOException, InterruptedException {
clearResourceAlarm("memory");
clearResourceAlarm("disk");
Expand Down
Loading