Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

AMQP-383 Polishing - Conditional Declaration

JIRA: https://jira.spring.io/browse/AMQP-383

The previous solution did not work for conditional declaration
queues because the internal `RabbitAdmin` was not in the list
of admins that should declare the element.

Add a setter for the `RabbitAdmin` and provide an attribute
in the namespace.

Add a test case for conditional declaration.

Add documentation, what's new.
  • Loading branch information...
commit 6d9630e56169f7687b24ba58a8ade32299aacf08 1 parent e446c79
@garyrussell garyrussell authored
View
5 spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerParser.java
@@ -199,6 +199,11 @@ private void parseListener(Element listenerEle, Element containerEle, ParserCont
containerDef.getPropertyValues().add("consumerArguments", args);
}
+ String admin = listenerEle.getAttribute("admin");
+ if (StringUtils.hasText(admin)) {
+ containerDef.getPropertyValues().add("rabbitAdmin", new RuntimeBeanReference(admin));
+ }
+
// Register the listener and fire event
parserContext.registerBeanComponent(new BeanComponentDefinition(containerDef, containerBeanName));
}
View
25 spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java
@@ -142,7 +142,7 @@
private final Map<String, Object> consumerArgs = new HashMap<String, Object>();
- private RabbitAdmin rabbitAdmin;
+ private volatile RabbitAdmin rabbitAdmin;
public interface ContainerDelegate {
void invokeListener(Channel channel, Message message) throws Exception;
@@ -443,6 +443,22 @@ public void setConsumerArguments(Map<String, Object> args) {
}
}
+ protected RabbitAdmin getRabbitAdmin() {
+ return rabbitAdmin;
+ }
+
+ /**
+ * Set the {@link RabbitAdmin}, used to declare any auto-delete queues, bindings
+ * etc when the container is started. Only needed if those queues use conditional
+ * declaration (have a 'declared-by' attribute). If not specified, an internal
+ * admin will be used which will attempt to declare all elements not having a
+ * 'declared-by' attribute.
+ * @param rabbitAdmin The admin.
+ */
+ public void setRabbitAdmin(RabbitAdmin rabbitAdmin) {
+ this.rabbitAdmin = rabbitAdmin;
+ }
+
@Override
public void setQueueNames(String... queueName) {
super.setQueueNames(queueName);
@@ -579,8 +595,11 @@ protected void doInitialize() throws Exception {
logger.warn("exposeListenerChannel=false is ignored when using a TransactionManager");
}
initializeProxy();
- this.rabbitAdmin = new RabbitAdmin(this.getConnectionFactory());
- this.rabbitAdmin.setApplicationContext(this.getApplicationContext());
+ if (this.rabbitAdmin == null) {
+ RabbitAdmin rabbitAdmin = new RabbitAdmin(this.getConnectionFactory());
+ rabbitAdmin.setApplicationContext(this.getApplicationContext());
+ this.rabbitAdmin = rabbitAdmin;
+ }
}
@ManagedMetric(metricType = MetricType.GAUGE)
View
15 spring-rabbit/src/main/resources/org/springframework/amqp/rabbit/config/spring-rabbit-1.3.xsd
@@ -714,6 +714,21 @@
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
+ <xsd:attribute name="admin" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ Reference to a 'RabbitAdmin'. Required if the listener is using auto-delete
+ queues and those queues are configured for conditional declaration. This
+ is the admin that will (re)declare those queues when the container is
+ (re)started. See the reference documentation for more information.
+ ]]></xsd:documentation>
+ <xsd:appinfo>
+ <tool:annotation kind="ref">
+ <tool:expected-type type="org.springframework.amqp.rabbit.core.RabbitAdmin" />
+ </tool:annotation>
+ </xsd:appinfo>
+ </xsd:annotation>
+ </xsd:attribute>
</xsd:complexType>
<xsd:element name="admin">
View
29 spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/ListenFromAutoDeleteQueueTests.java
@@ -49,7 +49,9 @@
@Rule
public BrokerRunning brokerIsRunning = BrokerRunning.isRunning();
- private SimpleMessageListenerContainer listenerContainer;
+ private SimpleMessageListenerContainer listenerContainer1;
+
+ private SimpleMessageListenerContainer listenerContainer2;
private ConfigurableApplicationContext context;
@@ -59,7 +61,8 @@
public void setup() {
this.context = new ClassPathXmlApplicationContext(this.getClass().getSimpleName() + "-context.xml",
this.getClass());
- this.listenerContainer = context.getBean(SimpleMessageListenerContainer.class);
+ this.listenerContainer1 = context.getBean("container1$listener1", SimpleMessageListenerContainer.class);
+ this.listenerContainer2 = context.getBean("container2$listener2", SimpleMessageListenerContainer.class);
}
@After
@@ -74,13 +77,27 @@ public void testStopStart() throws Exception {
RabbitTemplate template = context.getBean(RabbitTemplate.class);
template.convertAndSend("testContainerWithAutoDeleteQueues", "anon", "foo");
assertNotNull(queue.poll(10, TimeUnit.SECONDS));
- this.listenerContainer.stop();
- RabbitAdmin admin = spy(TestUtils.getPropertyValue(this.listenerContainer, "rabbitAdmin", RabbitAdmin.class));
- new DirectFieldAccessor(this.listenerContainer).setPropertyValue("rabbitAdmin", admin);
- this.listenerContainer.start();
+ this.listenerContainer1.stop();
+ RabbitAdmin admin = spy(TestUtils.getPropertyValue(this.listenerContainer1, "rabbitAdmin", RabbitAdmin.class));
+ new DirectFieldAccessor(this.listenerContainer1).setPropertyValue("rabbitAdmin", admin);
+ this.listenerContainer1.start();
template.convertAndSend("testContainerWithAutoDeleteQueues", "anon", "foo");
assertNotNull(queue.poll(10, TimeUnit.SECONDS));
verify(admin, times(1)).initialize(); // should only be called by one of the consumers
+ this.listenerContainer1.stop();
+ }
+
+ @Test
+ public void testStopStartConditionalDeclarations() throws Exception {
+ RabbitTemplate template = context.getBean(RabbitTemplate.class);
+ this.listenerContainer2.start();
+ template.convertAndSend("otherExchange", "otherAnon", "foo");
+ assertNotNull(queue.poll(10, TimeUnit.SECONDS));
+ this.listenerContainer2.stop();
+ this.listenerContainer2.start();
+ template.convertAndSend("otherExchange", "otherAnon", "foo");
+ assertNotNull(queue.poll(10, TimeUnit.SECONDS));
+ this.listenerContainer2.stop();
}
public static class Listener implements MessageListener {
View
20 ...t/src/test/resources/org/springframework/amqp/rabbit/listener/ListenFromAutoDeleteQueueTests-context.xml
@@ -15,12 +15,28 @@
</rabbit:bindings>
</rabbit:direct-exchange>
- <rabbit:listener-container concurrency="2">
- <rabbit:listener ref="foo" queues="anon" />
+ <rabbit:listener-container id="container1" concurrency="2">
+ <rabbit:listener id="listener1" ref="foo" queues="anon" />
+ </rabbit:listener-container>
+
+ <!-- With Conditional Declarations -->
+
+ <rabbit:queue id="otherAnon" declared-by="containerAdmin" />
+
+ <rabbit:direct-exchange name="otherExchange" auto-delete="true" declared-by="containerAdmin">
+ <rabbit:bindings>
+ <rabbit:binding queue="otherAnon" key="otherAnon" />
+ </rabbit:bindings>
+ </rabbit:direct-exchange>
+
+ <rabbit:listener-container id="container2" auto-startup="false">
+ <rabbit:listener id="listener2" ref="foo" queues="otherAnon" admin="containerAdmin" />
</rabbit:listener-container>
<rabbit:admin connection-factory="rabbitConnectionFactory" />
+ <rabbit:admin id="containerAdmin" connection-factory="rabbitConnectionFactory" auto-startup="false" />
+
<bean id="foo" class="org.springframework.amqp.rabbit.listener.ListenFromAutoDeleteQueueTests$Listener" />
<rabbit:template id="template" connection-factory="rabbitConnectionFactory"/>
View
58 src/reference/docbook/amqp.xml
@@ -924,6 +924,47 @@ container.setConsumerArguments(Collections. <String, Object> singletonMap("x-pri
Starting with <emphasis>version 1.3</emphasis> the queue(s) on which the container is listening can
be modified at runtime; see <xref linkend="listener-queues" />.
</para>
+ <section id="lc-auto-delete">
+ <title>'auto-delete' Queues</title>
+ <para>
+ When a container is configured to listen to <code>auto-delete</code> queue(s), the
+ queue is removed by the broker when the container is stopped (last consumer is cancelled).
+ Before <emphasis>version 1.3</emphasis>, the container could not be restarted because
+ the queue was missing; the <classname>RabbitAdmin</classname> only automatically
+ redeclares queues etc, when the connection is closed/opens, which does not happen
+ when the container is stopped/started.
+ </para>
+ <para>
+ Starting with <emphasis>version 1.3</emphasis>, the container will now use a
+ <classname>RabbitAdmin</classname> to redeclare any missing queues during startup.
+ </para>
+ <para>
+ You can also use conditional declaration (<xref linkend="conditional-declaration"/>)
+ together with an <code>auto-startup="false"</code> admin to defer queue declaration
+ until the container is started.
+ </para>
+ <programlisting language="xml"><![CDATA[<rabbit:queue id="otherAnon" declared-by="containerAdmin" />
+
+<rabbit:direct-exchange name="otherExchange" auto-delete="true" declared-by="containerAdmin">
+ <rabbit:bindings>
+ <rabbit:binding queue="otherAnon" key="otherAnon" />
+ </rabbit:bindings>
+</rabbit:direct-exchange>
+
+<rabbit:listener-container id="container2" auto-startup="false">
+ <rabbit:listener id="listener2" ref="foo" queues="otherAnon" admin="containerAdmin" />
+</rabbit:listener-container>
+
+<rabbit:admin id="containerAdmin" connection-factory="rabbitConnectionFactory"
+ auto-startup="false" />]]></programlisting>
+ <para>
+ In this case, the queue and exchange are declared by <code>containerAdmin</code>
+ which has <code>auto-startup="false"</code> so the elements are not declared
+ during context initialization. Also, the container is not started for the
+ same reason. When the container is later started, it uses it's reference to
+ <code>containerAdmin</code> to declare the elements.
+ </para>
+ </section>
</section>
</section>
@@ -2160,6 +2201,23 @@ public RabbitTransactionManager rabbitTransactionManager() {
appears on the &lt;rabbit:listener/&gt; element along with the queue names.
Default 'false'.</entry>
</row>
+ <row>
+ <entry><literallayout>rabbitAdmin
+(admin)</literallayout></entry>
+
+ <entry>When a listener container listens to at least one auto-delete
+ queue and it is found to be missing during startup, the container uses
+ a <classname>RabbitAdmin</classname> to declare the queue and any
+ related bindings and exchanges. If such elements are configured to
+ use conditional declaration (see <xref linkend="conditional-declaration"/>),
+ the container must use the admin that was configured to declare those
+ elements. Specify that admin here; only required when using auto-delete
+ queues with conditional declaration. If you do not wish the auto-delete
+ queue(s) to be declared until the container is started, set
+ <code>auto-startup</code> to <code>false</code> on the admin.
+ Defaults to a <classname>RabbitAdmin</classname> that
+ will declare all non-conditional elements.</entry>
+ </row>
</tbody>
</tgroup>
</table></para>
View
4 src/reference/docbook/whats-new.xml
@@ -21,6 +21,10 @@
of its configured queues is available for use.
See <xref linkend="listener-queues" />
</para>
+ <para>
+ This listener container will now redeclare any auto-delete queues during
+ startup. See <xref linkend="lc-auto-delete"/>.
+ </para>
</section>
<section>
<title>Consumer Priority</title>
Please sign in to comment.
Something went wrong with that request. Please try again.