diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactory.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactory.java index cd8e6c29003..c66d761c281 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactory.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactory.java @@ -52,6 +52,8 @@ public class FailoverClientConnectionFactory extends AbstractClientConnectionFac private boolean closeOnRefresh; + private boolean failBack = true; + private volatile long creationTime; /** @@ -82,6 +84,7 @@ public void setRefreshSharedInterval(long refreshSharedInterval) { Assert.isTrue(!this.cachingDelegates, "'refreshSharedInterval' cannot be changed when using 'CachingClientConnectionFactory` delegates"); this.refreshSharedInterval = refreshSharedInterval; + this.failBack = refreshSharedInterval != Long.MAX_VALUE; } /** @@ -148,7 +151,7 @@ public void registerSender(TcpSender sender) { protected TcpConnectionSupport obtainConnection() throws Exception { FailoverTcpConnection sharedConnection = (FailoverTcpConnection) getTheConnection(); boolean shared = !isSingleUse() && !this.cachingDelegates; - boolean refreshShared = shared + boolean refreshShared = this.failBack && shared && sharedConnection != null && System.currentTimeMillis() > this.creationTime + this.refreshSharedInterval; if (sharedConnection != null && sharedConnection.isOpen() && !refreshShared) { diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactoryTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactoryTests.java index 0802688b84f..4b8d3a7419a 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactoryTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactoryTests.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; @@ -44,7 +45,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import org.junit.Rule; import org.junit.Test; import org.mockito.InOrder; import org.mockito.Mockito; @@ -60,7 +60,6 @@ import org.springframework.integration.ip.tcp.TcpInboundGateway; import org.springframework.integration.ip.tcp.TcpOutboundGateway; import org.springframework.integration.ip.util.TestingUtilities; -import org.springframework.integration.test.rule.Log4j2LevelAdjuster; import org.springframework.integration.test.util.TestUtils; import org.springframework.integration.util.SimplePool; import org.springframework.messaging.Message; @@ -90,12 +89,6 @@ public void publishEvent(Object event) { }; - @Rule - public Log4j2LevelAdjuster adjuster = - Log4j2LevelAdjuster.trace() - .classes(SimplePool.class) - .categories("org.springframework.integration.ip.tcp"); - @Test public void testFailoverGood() throws Exception { AbstractClientConnectionFactory factory1 = mock(AbstractClientConnectionFactory.class); @@ -120,15 +113,20 @@ public void testFailoverGood() throws Exception { @Test public void testRefreshShared() throws Exception { - testRefreshShared(false); + testRefreshShared(false, 10_000); } @Test public void testRefreshSharedCloseOnRefresh() throws Exception { - testRefreshShared(true); + testRefreshShared(true, 10_000); + } + + @Test + public void testRefreshSharedInfinite() throws Exception { + testRefreshShared(false, Long.MAX_VALUE); } - private void testRefreshShared(boolean closeOnRefresh) throws Exception { + private void testRefreshShared(boolean closeOnRefresh, long interval) throws Exception { AbstractClientConnectionFactory factory1 = mock(AbstractClientConnectionFactory.class); AbstractClientConnectionFactory factory2 = mock(AbstractClientConnectionFactory.class); List factories = new ArrayList(); @@ -153,21 +151,29 @@ private void testRefreshShared(boolean closeOnRefresh) throws Exception { failoverFactory.start(); TcpConnectionSupport connection = failoverFactory.getConnection(); assertNotNull(TestUtils.getPropertyValue(failoverFactory, "theConnection")); - failoverFactory.setRefreshSharedInterval(10_000); + failoverFactory.setRefreshSharedInterval(interval); + InOrder inOrder = inOrder(factory1, factory2, conn1, conn2); + inOrder.verify(factory1).getConnection(); + inOrder.verify(factory2).getConnection(); + inOrder.verify(conn1).registerListener(any()); + inOrder.verify(conn1).isOpen(); assertSame(failoverFactory.getConnection(), connection); + inOrder.verifyNoMoreInteractions(); failoverFactory.setRefreshSharedInterval(-1); assertNotSame(failoverFactory.getConnection(), connection); - InOrder inOrder = inOrder(factory1, factory2, conn1); - inOrder.verify(factory1).getConnection(); - inOrder.verify(factory2).getConnection(); inOrder.verify(factory1).getConnection(); inOrder.verify(factory2).getConnection(); if (closeOnRefresh) { + inOrder.verify(conn2).registerListener(any()); + inOrder.verify(conn2).isOpen(); inOrder.verify(conn1).close(); } else { + inOrder.verify(conn1).registerListener(any()); + inOrder.verify(conn1).isOpen(); inOrder.verify(conn1, never()).close(); } + inOrder.verifyNoMoreInteractions(); } @Test(expected = IOException.class)