Permalink
Browse files

AMQP-345: Allow Broker Queue Name Generation

JIRA: https://jira.springsource.org/browse/AMQP-345

* `RabbitAdmin#declareQueues`: add a check if provided queue name isn't equal to `declareOk.getQueue()`
and replace the name with generated one by Broker using `DFA`
* Add tests case
* Polishing mock tests for new state of `channel.queueDeclare`
* Gradle: remove `sourceSet` declaration as it works as is and previous state caused issue with IDE
configuration and tests classes weren't presented as classes, but just resources

AMQP-345: Polishing

* `declareQueue(Queue queue)` returns the name of the declared queue
* Add documentation
* Upgrade to Gradle 1.10
* Add `-options` to `-Xlint` Java arg to suppress bootstrap CP warning from Java 7 for Java 6 source code
* Add Gradle's `wrapper` `distributionUrl` to download `all` facet.
It allow to use Gradle source code from IDE.
Actually the IDE suggested to do that.

AMQP-345 Polishing

Doc polishing.
Add not null assertion for Queue.name (causes an NPE elsewhere otherwise).

Update version to 1.3.0.BUILD-SNAPSHOT
  • Loading branch information...
1 parent 8ad919d commit 89f396a6f011ef43f0232784fc6a204a645189c3 Artem Bilan committed with garyrussell Dec 15, 2013
View
16 build.gradle
@@ -73,14 +73,6 @@ subprojects { subproject ->
}
}
- sourceSets {
- test {
- resources {
- srcDirs = ['src/test/resources', 'src/test/java']
- }
- }
- }
-
// See http://www.gradle.org/docs/current/userguide/dependency_management.html#sub:configurations
// and http://www.gradle.org/docs/current/dsl/org.gradle.api.artifacts.ConfigurationContainer.html
configurations {
@@ -104,7 +96,7 @@ subprojects { subproject ->
}
// enable all compiler warnings; individual projects may customize further
- ext.xLintArg = '-Xlint:all'
+ ext.xLintArg = '-Xlint:all,-options'
[compileJava, compileTestJava]*.options*.compilerArgs = [xLintArg]
test {
@@ -211,6 +203,9 @@ project('spring-rabbit') {
}
+ // suppress deprecation warnings (@SuppressWarnings("deprecation") is not enough for javac)
+ compileJava.options.compilerArgs = ["${xLintArg},-deprecation"]
+
}
apply plugin: 'docbook-reference'
@@ -381,5 +376,6 @@ task dist(dependsOn: assemble) {
task wrapper(type: Wrapper) {
description = 'Generates gradlew[.bat] scripts'
- gradleVersion = '1.9'
+ gradleVersion = '1.10'
+ distributionUrl = "http://services.gradle.org/distributions/gradle-${gradleVersion}-all.zip"
}
View
2 gradle.properties
@@ -1 +1 @@
-version=1.2.1.BUILD-SNAPSHOT
+version=1.3.0.BUILD-SNAPSHOT
View
BIN gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
View
4 gradle/wrapper/gradle-wrapper.properties
@@ -1,6 +1,6 @@
-#Tue Dec 10 18:11:16 EET 2013
+#Thu Dec 19 12:04:12 EET 2013
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
-distributionUrl=http\://services.gradle.org/distributions/gradle-1.9-bin.zip
+distributionUrl=http\://services.gradle.org/distributions/gradle-1.10-all.zip
View
3 spring-amqp/src/main/java/org/springframework/amqp/core/AmqpAdmin.java
@@ -50,8 +50,9 @@
/**
* Declare the given queue
* @param queue the queue to declare
+ * @return the name of the queue.
*/
- void declareQueue(Queue queue);
+ String declareQueue(Queue queue);
/**
* Delete a queue, without regard for whether it is in use or has messages on it
View
6 spring-amqp/src/main/java/org/springframework/amqp/core/Queue.java
@@ -15,6 +15,8 @@
import java.util.Map;
+import org.springframework.util.Assert;
+
/**
* Simple container collecting information to describe a queue. Used in conjunction with AmqpAdmin.
*
@@ -67,15 +69,15 @@ public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete
/**
* Construct a new queue, given a name, durability flag, and auto-delete flag, and arguments.
- * @param name the name of the queue.
+ * @param name the name of the queue - must not be null; set to "" to have the broker generate the name.
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
* @param exclusive true if we are declaring an exclusive queue (the queue will only be used by the declarer's
* connection)
* @param autoDelete true if the server should delete the queue when it is no longer in use
* @param arguments the arguments used to declare the queue
*/
public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) {
- super();
+ Assert.notNull(name, "'name' cannot be null");
this.name = name;
this.durable = durable;
this.exclusive = exclusive;
View
26 spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitAdmin.java
@@ -47,6 +47,7 @@
* @author Dave Syer
* @author Ed Scriven
* @author Gary Russell
+ * @author Artem Bilan
*/
public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, InitializingBean {
@@ -128,12 +129,17 @@ public Boolean doInRabbit(Channel channel) throws Exception {
// Queue operations
+ /**
+ * Declare the given queue.
+ * If the queue doesn't have a value for 'name' property,
+ * the queue name will be generated by Broker and returned from this method.
+ * But the 'name' property of the queue remains as is.
+ */
@ManagedOperation
- public void declareQueue(final Queue queue) {
- this.rabbitTemplate.execute(new ChannelCallback<Object>() {
- public Object doInRabbit(Channel channel) throws Exception {
- declareQueues(channel, queue);
- return null;
+ public String declareQueue(final Queue queue) {
+ return this.rabbitTemplate.execute(new ChannelCallback<String>() {
+ public String doInRabbit(Channel channel) throws Exception {
+ return declareQueues(channel, queue)[0].getQueue();
}
});
}
@@ -406,15 +412,18 @@ private void declareExchanges(final Channel channel, final Exchange... exchanges
}
}
- private void declareQueues(final Channel channel, final Queue... queues) throws IOException {
- for (Queue queue : queues) {
+ private DeclareOk[] declareQueues(final Channel channel, final Queue... queues) throws IOException {
+ DeclareOk[] declareOks = new DeclareOk[queues.length];
+ for (int i = 0; i < queues.length; i++) {
+ Queue queue = queues[i];
if (!queue.getName().startsWith("amq.")) {
if (logger.isDebugEnabled()) {
logger.debug("declaring Queue '" + queue.getName() + "'");
}
try {
- channel.queueDeclare(queue.getName(), queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(),
+ DeclareOk declareOk = channel.queueDeclare(queue.getName(), queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(),
queue.getArguments());
+ declareOks[i] = declareOk;
}
catch (IOException e) {
if (this.ignoreDeclarationExceptions) {
@@ -430,6 +439,7 @@ private void declareQueues(final Channel channel, final Queue... queues) throws
logger.debug("Queue with name that starts with 'amq.' cannot be declared.");
}
}
+ return declareOks;
}
private void declareBindings(final Channel channel, final Binding... bindings) throws IOException {
View
27 ...abbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminDeclarationTests.java
@@ -25,6 +25,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicReference;
@@ -47,9 +48,11 @@
import org.springframework.context.support.GenericApplicationContext;
import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.impl.AMQImpl;
/**
* @author Gary Russell
+ * @author Artem Bilan
* @since 1.2
*
*/
@@ -62,6 +65,7 @@ public void testUnconditional() throws Exception {
Channel channel = mock(Channel.class);
when(cf.createConnection()).thenReturn(conn);
when(conn.createChannel(false)).thenReturn(channel);
+ when(channel.queueDeclare("foo", true, false, false, null)).thenReturn(new AMQImpl.Queue.DeclareOk("foo", 0, 0));
final AtomicReference<ConnectionListener> listener = new AtomicReference<ConnectionListener>();
doAnswer(new Answer<Object>() {
@@ -97,6 +101,7 @@ public void testUnconditionalWithExplicitFactory() throws Exception {
Channel channel = mock(Channel.class);
when(cf.createConnection()).thenReturn(conn);
when(conn.createChannel(false)).thenReturn(channel);
+ when(channel.queueDeclare("foo", true, false, false, null)).thenReturn(new AMQImpl.Queue.DeclareOk("foo", 0, 0));
final AtomicReference<ConnectionListener> listener = new AtomicReference<ConnectionListener>();
doAnswer(new Answer<Object>() {
@@ -135,6 +140,7 @@ public void testSkipBecauseDifferentFactory() throws Exception {
Channel channel = mock(Channel.class);
when(cf.createConnection()).thenReturn(conn);
when(conn.createChannel(false)).thenReturn(channel);
+ when(channel.queueDeclare("foo", true, false, false, null)).thenReturn(new AMQImpl.Queue.DeclareOk("foo", 0, 0));
final AtomicReference<ConnectionListener> listener = new AtomicReference<ConnectionListener>();
doAnswer(new Answer<Object>() {
@@ -174,6 +180,7 @@ public void testSkipBecauseShouldntDeclare() throws Exception {
Channel channel = mock(Channel.class);
when(cf.createConnection()).thenReturn(conn);
when(conn.createChannel(false)).thenReturn(channel);
+ when(channel.queueDeclare("foo", true, false, false, null)).thenReturn(new AMQImpl.Queue.DeclareOk("foo", 0, 0));
final AtomicReference<ConnectionListener> listener = new AtomicReference<ConnectionListener>();
doAnswer(new Answer<Object>() {
@@ -234,7 +241,7 @@ public void testAddRemove() {
assertEquals(0, queue.getDeclaringAdmins().size());
queue.setAdminsThatShouldDeclare(admin1, admin2);
assertEquals(2, queue.getDeclaringAdmins().size());
- queue.setAdminsThatShouldDeclare(new AmqpAdmin[0]);
+ queue.setAdminsThatShouldDeclare();
assertEquals(0, queue.getDeclaringAdmins().size());
queue.setAdminsThatShouldDeclare(admin1, admin2);
assertEquals(2, queue.getDeclaringAdmins().size());
@@ -245,7 +252,7 @@ public void testAddRemove() {
queue.setAdminsThatShouldDeclare((AmqpAdmin[]) null);
assertEquals(0, queue.getDeclaringAdmins().size());
try {
- queue.setAdminsThatShouldDeclare(new AmqpAdmin[] {null, admin1});
+ queue.setAdminsThatShouldDeclare(null, admin1);
fail("Expected Exception");
}
catch (IllegalArgumentException e) {}
@@ -267,10 +274,11 @@ public void testAddRemove() {
private static ConnectionListener listener2;
@Bean
- public ConnectionFactory cf1() {
+ public ConnectionFactory cf1() throws IOException {
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
when(connectionFactory.createConnection()).thenReturn(conn1);
when(conn1.createChannel(false)).thenReturn(channel1);
+ when(channel1.queueDeclare("foo", true, false, false, null)).thenReturn(new AMQImpl.Queue.DeclareOk("foo", 0, 0));
doAnswer(new Answer<Object>() {
@Override
@@ -283,10 +291,11 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
}
@Bean
- public ConnectionFactory cf2() {
+ public ConnectionFactory cf2() throws IOException {
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
when(connectionFactory.createConnection()).thenReturn(conn2);
when(conn2.createChannel(false)).thenReturn(channel2);
+ when(channel2.queueDeclare("foo", true, false, false, null)).thenReturn(new AMQImpl.Queue.DeclareOk("foo", 0, 0));
doAnswer(new Answer<Object>() {
@Override
@@ -299,35 +308,35 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
}
@Bean
- public RabbitAdmin admin1() {
+ public RabbitAdmin admin1() throws IOException {
RabbitAdmin rabbitAdmin = new RabbitAdmin(cf1());
rabbitAdmin.afterPropertiesSet();
return rabbitAdmin;
}
@Bean
- public RabbitAdmin admin2() {
+ public RabbitAdmin admin2() throws IOException {
RabbitAdmin rabbitAdmin = new RabbitAdmin(cf2());
rabbitAdmin.afterPropertiesSet();
return rabbitAdmin;
}
@Bean
- public Queue queue() {
+ public Queue queue() throws IOException {
Queue queue = new Queue("foo");
queue.setAdminsThatShouldDeclare(admin1());
return queue;
}
@Bean
- public Exchange exchange() {
+ public Exchange exchange() throws IOException {
DirectExchange exchange = new DirectExchange("bar");
exchange.setAdminsThatShouldDeclare(admin1());
return exchange;
}
@Bean
- public Binding binding() {
+ public Binding binding() throws IOException {
Binding binding = new Binding("foo", DestinationType.QUEUE, exchange().getName(), "foo", null);
binding.setAdminsThatShouldDeclare(admin1());
return binding;
View
40 ...abbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminIntegrationTests.java
@@ -12,6 +12,7 @@
*/
package org.springframework.amqp.rabbit.core;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -38,6 +39,13 @@
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
+/**
+ * @author Dave Syer
+ * @author Ed Scriven
+ * @author Gary Russell
+ * @author Gunnar Hillert
+ * @author Artem Bilan
+ */
public class RabbitAdminIntegrationTests {
private final CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
@@ -170,6 +178,31 @@ public void testQueueWithoutAutoDelete() throws Exception {
}
@Test
+ public void testQueueWithoutName() throws Exception {
+
+ final Queue queue = new Queue("", true, false, true);
+ String generatedName = rabbitAdmin.declareQueue(queue);
+
+ assertEquals("", queue.getName());
+ Queue queueWithGeneratedName = new Queue(generatedName, true, false, true);
+ assertTrue(queueExists(queueWithGeneratedName));
+
+ // Stop and broker retains queue (only verifiable in native API)
+ connectionFactory.destroy();
+ assertTrue(queueExists(queueWithGeneratedName));
+
+ // Start and queue still exists
+ connectionFactory.createConnection();
+ assertTrue(queueExists(queueWithGeneratedName));
+
+ // Queue manually deleted
+ assertTrue(rabbitAdmin.deleteQueue(generatedName));
+ assertFalse(queueExists(queueWithGeneratedName));
+
+ connectionFactory.destroy();
+ }
+
+ @Test
public void testDeclareExchangeWithDefaultExchange() throws Exception {
Exchange exchange = new DirectExchange(RabbitAdmin.DEFAULT_EXCHANGE_NAME);
@@ -290,7 +323,7 @@ public void testSpringWithDefaultExchangeNonImplicitBinding() throws Exception {
* Verify that a queue exists using the native Rabbit API to bypass all the connection and
* channel caching and callbacks in Spring AMQP.
*
- * @param Queue The queue to verify
+ * @param queue The queue to verify
* @return True if the queue exists
*/
private boolean queueExists(final Queue queue) throws Exception {
@@ -302,10 +335,7 @@ private boolean queueExists(final Queue queue) throws Exception {
DeclareOk result = channel.queueDeclarePassive(queue.getName());
return result != null;
} catch (IOException e) {
- if (e.getCause().getMessage().contains("RESOURCE_LOCKED")) {
- return true;
- }
- return false;
+ return e.getCause().getMessage().contains("RESOURCE_LOCKED");
} finally {
connection.close();
}
View
41 src/reference/docbook/amqp.xml
@@ -1082,7 +1082,7 @@ Object receiveAndConvert(String queueName) throws AmqpException;]]></programlist
Queue declareQueue();
- void declareQueue(Queue queue);
+ String declareQueue(Queue queue);
void deleteQueue(String queueName);
@@ -1094,14 +1094,38 @@ Object receiveAndConvert(String queueName) throws AmqpException;]]></programlist
void declareBinding(Binding binding);
+ void removeBinding(Binding binding);
+
+ Properties getQueueProperties(String queueName);
+
}]]></programlisting>
<para>The no-arg declareQueue() method defines a queue on the broker whose name
is automatically generated. The additional properties of this auto-generated
- queue are exclusive=true, autoDelete=true, and durable=false.<note>
- <para>Removing a binding was not introduced until the 0.9 version of
- the AMQP spec.</para>
- </note></para>
+ queue are <code>exclusive=true</code>, <code>autoDelete=true</code>, and <code>durable=false</code>.</para>
+
+ <para>The <code>declareQueue(Queue queue)</code> method takes a <classname>Queue</classname> object
+ and returns the name of the declared queue. This is useful if you wish the broker to generate the
+ queue's name. This is in contrast to an <classname>AnonymousQueue</classname> where the
+ framework generates a unique (<code>UUID</code>) name and sets <code>durable</code> to
+ <code>false</code> and <code>exlusive, autoDelete</code> to <code>true</code>.
+ If the provided <classname>Queue</classname>'s <code>name</code> property
+ is an empty String,
+ the Broker declares the queue with a generated name and that name is returned to the caller.
+ The <classname>Queue</classname> object itself is not changed.
+ This functionality
+ can only be used programmatically by invoking the <classname>RabbitAdmin</classname> directly.
+ It is not supported for auto-declaration by the admin by defining a queue declaratively in the
+ application context.
+ A <code>&lt;rabbit:queue/&gt;</code> with an empty, or missing, <code>name</code> will
+ always create an <classname>AnonymousQueue</classname>. This is because the name will
+ change if redeclared due to a connection failure. Declarative queues must have fixed names because they
+ might be referenced elsewhere in the context, for example, in a listener:
+ <programlisting language="xml"><![CDATA[<rabbit:listener-container>
+ <rabbit:listener ref="listener" queue-names="#{someQueue.name}" />
+</rabbit:listener-container>]]></programlisting>
+ See <xref linkend="automatic-declaration" />.
+ </para>
<para>The RabbitMQ implementation of this interface is RabbitAdmin which
when configured using Spring XML would look like this:
@@ -1904,7 +1928,7 @@ public RabbitTransactionManager rabbitTransactionManager() {
yourself) in the
<classname>SimpleMessageListenerContainer</classname>.</para>
- <section>
+ <section id="automatic-declaration">
<title>Automatic Declaration of Exchanges, Queues and
Bindings</title>
@@ -1919,6 +1943,11 @@ public RabbitTransactionManager rabbitTransactionManager() {
for any reason (e.g. broker death, network glitch, etc.) they
will be applied again the next time they are needed.</para>
+ <note>
+ Queues declared this way must have fixed names; either explicitly
+ declared, or generated by the framework for <classname>AnonymousQueue</classname>s.
+ Anonymous queues are non-durable, exclusive, and auto-delete.
+ </note>
</section>
<section>
View
11 src/reference/docbook/whats-new.xml
@@ -4,13 +4,22 @@
<title>What's New</title>
<section>
- <title>Changes in 1.2.1 Since 1.2.0</title>
+ <title>Changes in 1.3 Since 1.2</title>
<section>
+ <title>Consumer Priority</title>
<para>
The listener container now supports consumer arguments, allowing the
<code>x-priority</code> argument to be set. See <xref linkend="consumer-priority" />.
</para>
</section>
+ <section>
+ <title>Rabbit Admin</title>
+ <para>
+ It is now possible to have the Broker to generate the queue name, regardless
+ of durable, autoDelete and exclusive settings.
+ See <xref linkend="broker-configuration" />.
+ </para>
+ </section>
</section>
<section>

0 comments on commit 89f396a

Please sign in to comment.