Skip to content
Permalink
Browse files

GH-712: Add RabbitTemplate.noLocalReplyConsumer

Fixes #712

For controlling `noLocal` flag for reply consumers in the `sendAndReceive()`
operations introduce a `noLocalReplyConsumer` option for the `RabbitTemplate`.
Make it `false` by default since it does not have any value in RabbitMQ,
but matters in QPid for temporary queues
  • Loading branch information
artembilan authored and garyrussell committed Jul 10, 2018
1 parent f72f2e1 commit 5989d03f03c7b024c47276bec47d29abecd3c964
@@ -246,6 +246,8 @@

private boolean usePublisherConnection;

private boolean noLocalReplyConsumer;

/**
* Convenient constructor for use with setter injection. Don't forget to set the connection factory.
*/
@@ -692,6 +694,17 @@ public void setUsePublisherConnection(boolean usePublisherConnection) {
this.usePublisherConnection = usePublisherConnection;
}

/**
* Set to true for a no-local consumer. Defaults to false.
* @param noLocalReplyConsumer true for a no-local consumer.
* @since 2.1
* @see AbstractMessageListenerContainer#setNoLocal(boolean)
* @see Channel#basicConsume(String, boolean, String, boolean, boolean, Map, com.rabbitmq.client.Consumer)
*/
public void setNoLocalReplyConsumer(boolean noLocalReplyConsumer) {
this.noLocalReplyConsumer = noLocalReplyConsumer;
}

/**
* Invoked by the container during startup so it can verify the queue is correctly
* configured (if a simple reply queue name is used instead of exchange/routingKey.
@@ -1573,7 +1586,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
}

};
channel.basicConsume(replyTo, true, consumerTag, true, true, null, consumer);
channel.basicConsume(replyTo, true, consumerTag, this.noLocalReplyConsumer, true, null, consumer);
Message reply = null;
try {
reply = exchangeMessages(exchange, routingKey, message, correlationData, channel, pendingReply,
@@ -1629,6 +1642,7 @@ private Message doSendAndReceiveWithDirect(String exchange, String routingKey, M
container.setAfterReceivePostProcessors(this.afterReceivePostProcessors
.toArray(new MessagePostProcessor[this.afterReceivePostProcessors.size()]));
}
container.setNoLocal(this.noLocalReplyConsumer);
container.start();
this.directReplyToContainers.put(connectionFactory, container);
this.replyAddress = Address.AMQ_RABBITMQ_REPLY_TO;
@@ -3077,13 +3077,15 @@ Similar request/reply methods are also available where the `MessageConverter` is
Those methods are named `convertSendAndReceive`.
See the Javadoc of `AmqpTemplate` for more detail.

Starting with _version 1.5.0_, each of the `sendAndReceive` method variants has an overloaded version that takes `CorrelationData`.
Starting with version 1.5.0, each of the `sendAndReceive` method variants has an overloaded version that takes `CorrelationData`.
Together with a properly configured connection factory, this enables the receipt of publisher confirms for the send side of the operation.
See <<template-confirms>> and the javadoc for `RabbitOperations` for more information.

Starting with __version 2.0__, there are variants of these methods (`convertSendAndReceiveAsType`) that take an additional `ParameterizedTypeReference` argument to convert complex returned types.
Starting with version 2.0, there are variants of these methods (`convertSendAndReceiveAsType`) that take an additional `ParameterizedTypeReference` argument to convert complex returned types.
The template must be configured with a `SmartMessageConverter`; see <<json-complex>> for more information.

Starting with version 2.1, the `RabbitTemplate` can be configured with the `noLocalReplyConsumer` option to control a `noLocal` flag for reply consumers.
This is `false` now by default.

[[reply-timeout]]
===== Reply Timeout
@@ -3103,26 +3105,26 @@ Also, you must not have registered your own `ReturnCallback` with the `RabbitTem
[[direct-reply-to]]
===== RabbitMQ Direct reply-to

IMPORTANT: Starting with _version 3.4.0_, the RabbitMQ server now supports http://www.rabbitmq.com/direct-reply-to.html[Direct reply-to]; this eliminates the main reason for a fixed reply queue (to avoid the need to create a temporary queue for each request).
IMPORTANT: Starting with version 3.4.0, the RabbitMQ server now supports http://www.rabbitmq.com/direct-reply-to.html[Direct reply-to]; this eliminates the main reason for a fixed reply queue (to avoid the need to create a temporary queue for each request).
Starting with *Spring AMQP version 1.4.1* Direct reply-to will be used by default (if supported by the server) instead of creating temporary reply queues.
When no `replyQueue` is provided (or it is set with the name `amq.rabbitmq.reply-to`), the `RabbitTemplate` will automatically detect whether Direct reply-to is supported and either use it or fall back to using a temporary reply queue.
When using Direct reply-to, a `reply-listener` is not required and should not be configured.

Reply listeners are still supported with named queues (other than `amq.rabbitmq.reply-to`), allowing control of reply concurrency etc.

Starting with _version 1.6_ if, for some reason, you wish to use a temporary, exclusive, auto-delete queue for each
Starting with version 1.6 if, for some reason, you wish to use a temporary, exclusive, auto-delete queue for each
reply, set the `useTemporaryReplyQueues` property to `true`.
This property is ignored if you you set a `replyAddress`.

The decision whether or not to use direct reply-to can be changed to use different criteria by subclassing
`RabbitTemplate` and overriding `useDirectReplyTo()`.
The method is called once only; when the first request is sent.

With versions earlier than _verion 2.0_, the `RabbitTemplate` created a new consumer for each request and canceled the consumer when the reply was received (or timed out).
With versions earlier than version 2.0, the `RabbitTemplate` created a new consumer for each request and canceled the consumer when the reply was received (or timed out).
Now, the template uses a `DirectReplyToMessageListenerContainer` instead, allowing the consumers to be reused; the template still takes care of correlating the replies so there is no danger of a late reply going to a different sender.
If you want to revert to the previous behavior, set property `useDirectReplyToContainer` (`direct-reply-to-container` when using XML configuration) to false.

The `AsynRabbitTemplate` has no such option - it always used a `DirectReplyToContainer` for replies when direct replyTo is being used.
The `AsyncRabbitTemplate` has no such option - it always used a `DirectReplyToContainer` for replies when direct replyTo is being used.

===== Message Correlation With A Reply Queue

@@ -31,10 +31,15 @@ See <<template-confirms>> for more information.
The listener container factories can now be used to create any listener container, not just those for use with `@RabbitListener` s or the `@RabbitListenerEndpointRegistry`.
See <<using-container-factories>> for more information.

`ChannelAwareMessageListener` now inherits from `MesssageListener`.
`ChannelAwareMessageListener` now inherits from `MessageListener`.

===== RabbitAdmin Changes

The `RabbitAdmin` will discover of type `Declarables`, which is a container for `Declarable` (`Queue`, `Exchange`, `Binding`) objects, and declare the objects on the broker.
Users are discouraged from using the old mechanism of declaring `<Collection<Queue>>` etc and should use `Declarables` beans instead.
See <<collection-declaration>> for more information.

===== RabbitTemplate Changes

The `RabbitTemplate` now can be configured with the `noLocalReplyConsumer` option to control a `noLocal` flag for reply consumers in the `sendAndReceive()` operations.
See <<request-reply>> for more information.

0 comments on commit 5989d03

Please sign in to comment.
You can’t perform that action at this time.