diff --git a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactory.java b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactory.java index b208fbfca63..68ed5c66a59 100644 --- a/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactory.java +++ b/spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package org.springframework.integration.ip.tcp.connection; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.UUID; @@ -34,18 +35,70 @@ * Given a list of connection factories, serves up {@link TcpConnection}s * that can iterate over a connection from each factory until the write * succeeds or the list is exhausted. + * * @author Gary Russell * @since 2.2 * */ public class FailoverClientConnectionFactory extends AbstractClientConnectionFactory { + private static final long DEFAULT_REFRESH_SHARED_INTERVAL = 0L; + private final List factories; + private final boolean cachingDelegates; + + private long refreshSharedInterval = DEFAULT_REFRESH_SHARED_INTERVAL; + + private boolean closeOnRefresh; + + private volatile long creationTime; + + /** + * Construct an instance with the provided delegate factories. + * @param factories the delegates. + */ public FailoverClientConnectionFactory(List factories) { super("", 0); Assert.notEmpty(factories, "At least one factory is required"); - this.factories = factories; + this.factories = new ArrayList<>(factories); + this.cachingDelegates = factories.stream() + .anyMatch(factory -> factory instanceof CachingClientConnectionFactory); + } + + /** + * When using a shared connection {@link #setSingleUse(boolean) singleUse} is false, + * specify how long to wait before trying to fail back to start from the beginning of + * the factory list. Default is 0 for backwards compatibility to always try to get a + * connection to the primary server. If you don't want to fail back until the current + * connection is closed, set this to {@link Long#MAX_VALUE}. + * Cannot be changed when using {@link CachingClientConnectionFactory} delegates. + * @param refreshSharedInterval the interval in milliseconds. + * @since 4.3.22 + * @see #setSingleUse(boolean) + * @see #setCloseOnRefresh(boolean) + */ + public void setRefreshSharedInterval(long refreshSharedInterval) { + Assert.isTrue(!this.cachingDelegates, + "'refreshSharedInterval' cannot be changed when using 'CachingClientConnectionFactory` delegates"); + this.refreshSharedInterval = refreshSharedInterval; + } + + /** + * When using a shared connection {@link #setSingleUse(boolean) singleUse} is false, + * set this to true to close the old shared connection after a refresh. If this is + * false, the connection will remain open, but unused until its connection factory is + * again used to get a connection. Default is false for backwards compatibility. + * Cannot be changed when using {@link CachingClientConnectionFactory} delegates. + * @param closeOnRefresh true to close. + * @since 4.3.22 + * @see #setSingleUse(boolean) + * @see #setRefreshSharedInterval(long) + */ + public void setCloseOnRefresh(boolean closeOnRefresh) { + Assert.isTrue(!this.cachingDelegates, + "'closeOnRefresh' cannot be changed when using 'CachingClientConnectionFactory` delegates"); + this.closeOnRefresh = closeOnRefresh; } @Override @@ -93,27 +146,42 @@ public void registerSender(TcpSender sender) { @Override protected TcpConnectionSupport obtainConnection() throws Exception { - TcpConnectionSupport connection = this.getTheConnection(); - if (connection != null && connection.isOpen()) { - ((FailoverTcpConnection) connection).incrementEpoch(); - return connection; + FailoverTcpConnection sharedConnection = (FailoverTcpConnection) getTheConnection(); + boolean shared = !isSingleUse() && !this.cachingDelegates; + boolean refreshShared = shared + && sharedConnection != null + && System.currentTimeMillis() > this.creationTime + this.refreshSharedInterval; + if (sharedConnection != null && sharedConnection.isOpen() && !refreshShared) { + sharedConnection.incrementEpoch(); + return sharedConnection; } FailoverTcpConnection failoverTcpConnection = new FailoverTcpConnection(this.factories); if (getListener() != null) { failoverTcpConnection.registerListener(getListener()); } failoverTcpConnection.incrementEpoch(); + this.creationTime = System.currentTimeMillis(); + if (shared) { + /* + * We may have simply wrapped the same connection in a new wrapper; don't close. + */ + if (refreshShared && this.closeOnRefresh + && !sharedConnection.delegate.equals(failoverTcpConnection.delegate) + && sharedConnection.isOpen()) { + sharedConnection.close(); + } + setTheConnection(failoverTcpConnection); + } return failoverTcpConnection; } - @Override public void start() { for (AbstractClientConnectionFactory factory : this.factories) { factory.enableManualListenerRegistration(); factory.start(); } - this.setActive(true); + setActive(true); super.start(); } @@ -150,16 +218,16 @@ private final class FailoverTcpConnection extends TcpConnectionSupport implement private final String connectionId; + private final AtomicLong epoch = new AtomicLong(); + private volatile Iterator factoryIterator; private volatile AbstractClientConnectionFactory currentFactory; - private volatile TcpConnectionSupport delegate; + volatile TcpConnectionSupport delegate; // NOSONAR visibility private volatile boolean open = true; - private final AtomicLong epoch = new AtomicLong(); - private FailoverTcpConnection(List factories) throws Exception { this.factories = factories; this.factoryIterator = factories.iterator(); diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactoryTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactoryTests.java index 612f0c4efc5..0802688b84f 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactoryTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/FailoverClientConnectionFactoryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,12 +23,16 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.when; import java.io.IOException; +import java.io.UncheckedIOException; import java.net.Socket; import java.nio.channels.SocketChannel; import java.util.ArrayList; @@ -42,6 +46,7 @@ import org.junit.Rule; import org.junit.Test; +import org.mockito.InOrder; import org.mockito.Mockito; import org.springframework.beans.factory.BeanFactory; @@ -113,6 +118,58 @@ public void testFailoverGood() throws Exception { Mockito.verify(conn2).send(message); } + @Test + public void testRefreshShared() throws Exception { + testRefreshShared(false); + } + + @Test + public void testRefreshSharedCloseOnRefresh() throws Exception { + testRefreshShared(true); + } + + private void testRefreshShared(boolean closeOnRefresh) throws Exception { + AbstractClientConnectionFactory factory1 = mock(AbstractClientConnectionFactory.class); + AbstractClientConnectionFactory factory2 = mock(AbstractClientConnectionFactory.class); + List factories = new ArrayList(); + factories.add(factory1); + factories.add(factory2); + TcpConnectionSupport conn1 = makeMockConnection(); + doReturn("conn1").when(conn1).getConnectionId(); + TcpConnectionSupport conn2 = makeMockConnection(); + doReturn("conn2").when(conn2).getConnectionId(); + doThrow(new UncheckedIOException(new IOException("fail"))) + .when(factory1).getConnection(); + if (closeOnRefresh) { + when(factory2.getConnection()).thenReturn(conn1, conn2); + } + else { + when(factory2.getConnection()).thenReturn(conn1); + } + when(factory1.isActive()).thenReturn(true); + when(factory2.isActive()).thenReturn(true); + FailoverClientConnectionFactory failoverFactory = new FailoverClientConnectionFactory(factories); + failoverFactory.setCloseOnRefresh(closeOnRefresh); + failoverFactory.start(); + TcpConnectionSupport connection = failoverFactory.getConnection(); + assertNotNull(TestUtils.getPropertyValue(failoverFactory, "theConnection")); + failoverFactory.setRefreshSharedInterval(10_000); + assertSame(failoverFactory.getConnection(), connection); + failoverFactory.setRefreshSharedInterval(-1); + assertNotSame(failoverFactory.getConnection(), connection); + InOrder inOrder = inOrder(factory1, factory2, conn1); + inOrder.verify(factory1).getConnection(); + inOrder.verify(factory2).getConnection(); + inOrder.verify(factory1).getConnection(); + inOrder.verify(factory2).getConnection(); + if (closeOnRefresh) { + inOrder.verify(conn1).close(); + } + else { + inOrder.verify(conn1, never()).close(); + } + } + @Test(expected = IOException.class) public void testFailoverAllDead() throws Exception { AbstractClientConnectionFactory factory1 = mock(AbstractClientConnectionFactory.class); diff --git a/src/reference/asciidoc/ip.adoc b/src/reference/asciidoc/ip.adoc index 49cc5189a66..0247e983971 100644 --- a/src/reference/asciidoc/ip.adoc +++ b/src/reference/asciidoc/ip.adoc @@ -530,6 +530,23 @@ The following example shows how to configure a failover client connection factor NOTE: When using the failover connection factory, the `singleUse` property must be consistent between the factory itself and the list of factories it is configured to use. +The connnection factory has two properties when used with a shared connection (`singleUse=false`): + +* `refreshSharedInterval` +* `closeOnRefresh` + +These are `0` and `false` to retain the same behavior that existed before the properties were added. + +Consider the following scenario based on the above configuration: +Let's say `clientFactory1` cannot establish a connection but `clientFactory2` can. +Each time the `failCF` `getConnection()` method is called, we will again attempt to connect using `clientFactory1`; if successful, the "old" connection will remain open and may be reused in future if the first factory fails once more. + +Set `refreshSharedInterval` to only attempt to reconnect with the first factory after that time has expired; set it to `Long.MAX_VALUE` if you only want to fail back to the first factory when the current connection fails. + +Set `closeOnRefresh` to close the "old" connection after a refresh actually creates a new connection. + +IMPORTANT: These properties do not apply if any of the delegate factories is a `CachingClientConnectionFactory` because the connection caching is handled there; in that case the list of connection factories will always be consulted to get a connection. + [[tcp-affinity-cf]] ==== TCP Thread Affinity Connection Factory