Skip to content

Commit

Permalink
AMQP-847: Close channel in RabbitTemplate.receive
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/AMQP-847

To avoid unacked messages race condition when client timeouts, but at
this moment the message becomes available in queue, physically close
a receive channel on the `TimeoutException` from the `Future.get()`

**Cherry-pick to 2.0.x & 1.7.x**
  • Loading branch information
artembilan authored and garyrussell committed Dec 11, 2018
1 parent d5a2306 commit 9d67546
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 1 deletion.
Expand Up @@ -137,6 +137,7 @@
* @author Artem Bilan
* @author Ernest Sadykov
* @author Mark Norkin
*
* @since 1.0
*/
public class RabbitTemplate extends RabbitAccessor implements BeanFactoryAware, RabbitOperations, MessageListener,
Expand Down Expand Up @@ -1269,7 +1270,7 @@ private Delivery consumeDelivery(Channel channel, String queueName, long timeout
Thread.currentThread().interrupt();
}
catch (TimeoutException e) {
// no result in time
RabbitUtils.setPhysicalCloseRequired(channel, true);
}
finally {
if (!(exception instanceof ConsumerCancelledException) && channel.isOpen()) {
Expand Down
Expand Up @@ -48,6 +48,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
Expand Down Expand Up @@ -397,6 +398,14 @@ public void testReceiveBlockingNoTimeout() throws Exception {
}
}

@Test
public void testReceiveTimeoutRequeue() {
assertNull(this.template.receiveAndConvert(ROUTE, 1));
assertEquals(0,
TestUtils.getPropertyValue(this.connectionFactory, "cachedChannelsNonTransactional", List.class)
.size());
}

@Test
public void testReceiveBlockingTx() throws Exception {
this.template.convertAndSend(ROUTE, "blockTX");
Expand Down

0 comments on commit 9d67546

Please sign in to comment.