From d90b1754576f96c96dbcdfb4d25a99dae0a6a23a Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Wed, 28 Feb 2024 17:35:02 -0500 Subject: [PATCH] GH-2640: Fix leak for non-confirmed channel Fixes: #2640 Rabbit server was unstable for a while. Once restored, we were unable to publish new confirmed messages to it (the max number of channel on connection was reached and the existing channels were ignored). Essentially `PublisherCallbackChannel` instances ara waiting for acks on their confirms which never going to happen. Therefore, these channels are not closed and cache state is not reset. * Fix `CachingConnectionFactory.CachedChannelInvocationHandler.returnToCache()` to schedule `waitForConfirms()` in the separate thread. If `TimeoutException` happens, perform `physicalClose()` to avoid any possible memory leaks * Adjust `RabbitTemplatePublisherCallbacksIntegrationTests.testPublisherConfirmNotReceived()` to ensure that "unconfirmed" channel is closed and `CachingConnectionFactory` can produce a new channel (cherry picked from commit 9a8d741faa51030adcfba5bc745cb84554ec8cc2) --- .../connection/CachingConnectionFactory.java | 37 ++++++++++++++++--- ...atePublisherCallbacksIntegrationTests.java | 22 +++++++++-- 2 files changed, 50 insertions(+), 9 deletions(-) diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java index dcf0dc3ce..8b211ecff 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2023 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -1250,13 +1250,38 @@ private void logicalClose(ChannelProxy proxy) throws IOException, TimeoutExcepti } private void returnToCache(ChannelProxy proxy) { - if (CachingConnectionFactory.this.active && this.publisherConfirms - && proxy instanceof PublisherCallbackChannel) { + if (CachingConnectionFactory.this.active + && this.publisherConfirms + && proxy instanceof PublisherCallbackChannel publisherCallbackChannel) { this.theConnection.channelsAwaitingAcks.put(this.target, proxy); - ((PublisherCallbackChannel) proxy) - .setAfterAckCallback(c -> - doReturnToCache(this.theConnection.channelsAwaitingAcks.remove(c))); + AtomicBoolean ackCallbackCalledImmediately = new AtomicBoolean(); + publisherCallbackChannel + .setAfterAckCallback(c -> { + ackCallbackCalledImmediately.set(true); + doReturnToCache(this.theConnection.channelsAwaitingAcks.remove(c)); + }); + + if (!ackCallbackCalledImmediately.get()) { + getChannelsExecutor() + .execute(() -> { + try { + publisherCallbackChannel.waitForConfirms(ASYNC_CLOSE_TIMEOUT); + } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + catch (TimeoutException ex) { + // The channel didn't handle confirms, so close it altogether to avoid + // memory leaks for pending confirms + try { + physicalClose(this.theConnection.channelsAwaitingAcks.remove(this.target)); + } + catch (@SuppressWarnings(UNUSED) Exception e) { + } + } + }); + } } else { doReturnToCache(proxy); diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplatePublisherCallbacksIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplatePublisherCallbacksIntegrationTests.java index 9c158ebd8..1e179c5ee 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplatePublisherCallbacksIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplatePublisherCallbacksIntegrationTests.java @@ -20,6 +20,7 @@ import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.willAnswer; @@ -43,6 +44,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -326,6 +328,14 @@ public void testPublisherConfirmNotReceived() throws Exception { given(mockChannel.isOpen()).willReturn(true); given(mockChannel.getNextPublishSeqNo()).willReturn(1L); + CountDownLatch timeoutExceptionLatch = new CountDownLatch(1); + + given(mockChannel.waitForConfirms(anyLong())) + .willAnswer(invocation -> { + timeoutExceptionLatch.await(10, TimeUnit.SECONDS); + throw new TimeoutException(); + }); + given(mockConnectionFactory.newConnection(any(ExecutorService.class), anyString())).willReturn(mockConnection); given(mockConnection.isOpen()).willReturn(true); @@ -334,20 +344,26 @@ public void testPublisherConfirmNotReceived() throws Exception { .createChannel(); CachingConnectionFactory ccf = new CachingConnectionFactory(mockConnectionFactory); - ccf.setExecutor(mock(ExecutorService.class)); + ccf.setExecutor(Executors.newCachedThreadPool()); ccf.setPublisherConfirmType(ConfirmType.CORRELATED); + ccf.setChannelCacheSize(1); + ccf.setChannelCheckoutTimeout(10000); final RabbitTemplate template = new RabbitTemplate(ccf); final AtomicBoolean confirmed = new AtomicBoolean(); template.setConfirmCallback((correlationData, ack, cause) -> confirmed.set(true)); template.convertAndSend(ROUTE, (Object) "message", new CorrelationData("abc")); - Thread.sleep(5); + assertThat(template.getUnconfirmedCount()).isEqualTo(1); Collection unconfirmed = template.getUnconfirmed(-1); assertThat(template.getUnconfirmedCount()).isEqualTo(0); assertThat(unconfirmed).hasSize(1); assertThat(unconfirmed.iterator().next().getId()).isEqualTo("abc"); assertThat(confirmed.get()).isFalse(); + + timeoutExceptionLatch.countDown(); + + assertThat(ccf.createConnection().createChannel(false)).isNotNull(); } @Test @@ -563,7 +579,7 @@ public void testPublisherConfirmMultipleWithTwoListeners() throws Exception { * time as adding a new pending ack to the map. Test verifies we don't * get a {@link ConcurrentModificationException}. */ - @SuppressWarnings({ "rawtypes", "unchecked" }) + @SuppressWarnings({"rawtypes", "unchecked"}) @Test public void testConcurrentConfirms() throws Exception { ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class);