Skip to content

Commit

Permalink
This is PR #354
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Jun 4, 2020
2 parents 5617568 + aebfc85 commit d34cfbf
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,23 @@ public interface AddressControl {
long getNumberOfMessages() throws Exception;

/**
* Returns the names of the queues bound to this address.
* Returns the names of the remote queue(s) bound to this address.
*/
@Attribute(desc = "names of the queue(s) bound to this address")
@Attribute(desc = "names of the remote queue(s) bound to this address")
String[] getRemoteQueueNames() throws Exception;

/**
* Returns the names of the local queue(s) bound to this address.
*/
@Attribute(desc = "names of the local queue(s) bound to this address")
String[] getQueueNames() throws Exception;

/**
* Returns the names of both the local & remote queue(s) bound to this address.
*/
@Attribute(desc = "names of both the local & remote queue(s) bound to this address")
String[] getAllQueueNames() throws Exception;

/**
* Returns the number of pages used by this address.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -808,7 +808,7 @@ public synchronized boolean destroyTopic(final String name, final boolean remove
checkInitialised();
AddressControl addressControl = (AddressControl) server.getManagementService().getResource(ResourceNames.ADDRESS + name);
if (addressControl != null) {
for (String queueName : addressControl.getQueueNames()) {
for (String queueName : addressControl.getAllQueueNames()) {
Binding binding = server.getPostOffice().getBinding(new SimpleString(queueName));
if (binding == null) {
ActiveMQJMSServerLogger.LOGGER.noQueueOnTopic(queueName, name);
Expand All @@ -821,7 +821,7 @@ public synchronized boolean destroyTopic(final String name, final boolean remove
}
}

if (addressControl.getQueueNames().length == 0) {
if (addressControl.getAllQueueNames().length == 0) {
try {
server.removeAddressInfo(SimpleString.toSimpleString(name), null);
} catch (ActiveMQAddressDoesNotExistException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.activemq.artemis.core.security.SecurityStore;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
Expand Down Expand Up @@ -130,18 +131,36 @@ public String getRoutingTypesAsJSON() throws Exception {
}
}

@Override
public String[] getRemoteQueueNames() throws Exception {
return getQueueNames(SearchType.REMOTE);
}

@Override
public String[] getQueueNames() throws Exception {
return getQueueNames(SearchType.LOCAL);
}

@Override
public String[] getAllQueueNames() throws Exception {
return getQueueNames(SearchType.ALL);
}

enum SearchType {
LOCAL, REMOTE, ALL
}

private String[] getQueueNames(SearchType searchType) throws Exception {
if (AuditLogger.isEnabled()) {
AuditLogger.getQueueNames(this.addressInfo);
AuditLogger.getQueueNames(this.addressInfo, searchType);
}
clearIO();
try {
Bindings bindings = server.getPostOffice().lookupBindingsForAddress(addressInfo.getName());
if (bindings != null) {
List<String> queueNames = new ArrayList<>();
for (Binding binding : bindings.getBindings()) {
if (binding instanceof QueueBinding) {
if (binding instanceof QueueBinding && ((searchType == SearchType.ALL) || (searchType == SearchType.LOCAL && binding.isLocal()) || (searchType == SearchType.REMOTE && binding instanceof RemoteQueueBinding))) {
queueNames.add(binding.getUniqueName().toString());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,17 @@ public void testClusteredSubscriptionCount() throws Exception {
AddressControl topicControl1 = ManagementControlHelper.createAddressControl(simpleTopicName, mBeanServer1);
AddressControl topicControl2 = ManagementControlHelper.createAddressControl(simpleTopicName, mBeanServer2);

assertTrue("There should be 3 subscriptions on the topic, 2 local and 1 remote.",
Wait.waitFor(() -> topicControl1.getQueueNames().length == 3, 2000));
assertTrue("There should be 2 local subscriptions on the topic.",
Wait.waitFor(() -> topicControl1.getQueueNames().length == 2, 2000));

assertTrue("There should be 3 subscriptions on the topic, 1 local and 2 remote.",
Wait.waitFor(() -> topicControl2.getQueueNames().length == 3, 2000));
assertTrue("There should be 1 remote subscription on the topic.",
Wait.waitFor(() -> topicControl1.getRemoteQueueNames().length == 1, 2000));

assertTrue("There should be 1 local subscription on the topic.",
Wait.waitFor(() -> topicControl2.getQueueNames().length == 1, 2000));

assertTrue("There should be 2 remote subscriptions on the topic.",
Wait.waitFor(() -> topicControl2.getRemoteQueueNames().length == 2, 2000));
}

jmsServer1.destroyTopic("t1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.Wait;
Expand Down Expand Up @@ -99,13 +102,17 @@ public void testIsRetroactiveResource() throws Exception {
}

@Test
public void testGetQueueNames() throws Exception {
public void testGetLocalQueueNames() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
SimpleString anotherQueue = RandomUtil.randomSimpleString();

session.createQueue(new QueueConfiguration(queue).setAddress(address));

// add a fake RemoteQueueBinding to simulate being in a cluster; we don't want this binding to be returned by getQueueNames()
RemoteQueueBinding binding = new RemoteQueueBindingImpl(server.getStorageManager().generateID(), address, RandomUtil.randomSimpleString(), RandomUtil.randomSimpleString(), RandomUtil.randomLong(), null, null, RandomUtil.randomSimpleString(), RandomUtil.randomInt() + 1, MessageLoadBalancingType.OFF);
server.getPostOffice().addBinding(binding);

AddressControl addressControl = createManagementControl(address);
String[] queueNames = addressControl.getQueueNames();
Assert.assertEquals(1, queueNames.length);
Expand All @@ -124,6 +131,57 @@ public void testGetQueueNames() throws Exception {
session.deleteQueue(anotherQueue);
}

@Test
public void testGetRemoteQueueNames() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();

session.createAddress(address, RoutingType.MULTICAST, false);

// add a fake RemoteQueueBinding to simulate being in a cluster; this should be returned by getRemoteQueueNames()
RemoteQueueBinding binding = new RemoteQueueBindingImpl(server.getStorageManager().generateID(), address, queue, RandomUtil.randomSimpleString(), RandomUtil.randomLong(), null, null, RandomUtil.randomSimpleString(), RandomUtil.randomInt() + 1, MessageLoadBalancingType.OFF);
server.getPostOffice().addBinding(binding);

AddressControl addressControl = createManagementControl(address);
String[] queueNames = addressControl.getRemoteQueueNames();
Assert.assertEquals(1, queueNames.length);
Assert.assertEquals(queue.toString(), queueNames[0]);
}

@Test
public void testGetAllQueueNames() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
SimpleString anotherQueue = RandomUtil.randomSimpleString();
SimpleString remoteQueue = RandomUtil.randomSimpleString();

session.createQueue(new QueueConfiguration(queue).setAddress(address));

// add a fake RemoteQueueBinding to simulate being in a cluster
RemoteQueueBinding binding = new RemoteQueueBindingImpl(server.getStorageManager().generateID(), address, remoteQueue, RandomUtil.randomSimpleString(), RandomUtil.randomLong(), null, null, RandomUtil.randomSimpleString(), RandomUtil.randomInt() + 1, MessageLoadBalancingType.OFF);
server.getPostOffice().addBinding(binding);

AddressControl addressControl = createManagementControl(address);
String[] queueNames = addressControl.getAllQueueNames();
Assert.assertEquals(2, queueNames.length);
Assert.assertTrue(Arrays.asList(queueNames).contains(queue.toString()));
Assert.assertTrue(Arrays.asList(queueNames).contains(remoteQueue.toString()));

session.createQueue(new QueueConfiguration(anotherQueue).setAddress(address).setDurable(false));
queueNames = addressControl.getAllQueueNames();
Assert.assertEquals(3, queueNames.length);
Assert.assertTrue(Arrays.asList(queueNames).contains(anotherQueue.toString()));

session.deleteQueue(queue);

queueNames = addressControl.getAllQueueNames();
Assert.assertEquals(2, queueNames.length);
Assert.assertTrue(Arrays.asList(queueNames).contains(anotherQueue.toString()));
Assert.assertFalse(Arrays.asList(queueNames).contains(queue.toString()));

session.deleteQueue(anotherQueue);
}

@Test
public void testGetBindingNames() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,16 @@ public long getNumberOfMessages() throws Exception {
return (long) proxy.retrieveAttributeValue("numberOfMessages");
}

@Override
public String[] getRemoteQueueNames() throws Exception {
return (String[]) proxy.retrieveAttributeValue("remoteQueueNames", String.class);
}

@Override
public String[] getAllQueueNames() throws Exception {
return (String[]) proxy.retrieveAttributeValue("allQueueNames", String.class);
}

@Override
public String[] getQueueNames() throws Exception {
return (String[]) proxy.retrieveAttributeValue("queueNames", String.class);
Expand Down

0 comments on commit d34cfbf

Please sign in to comment.