Skip to content

Commit

Permalink
GH-3199: FailoverClientCF - Fail Back Option
Browse files Browse the repository at this point in the history
Resolves: #3199

Previously, the FCCF did not cache a shared connection; if server 1 is down
and server 2 is up, this caused an attempt to connect to server 1 every time
we got the connection.

Add 2 options: `refreshSharedInterval` and `closeOnRefresh`, defaulting to
0 and false respectively, to maintain the same behavior as before the options
existed.

Disallow caching of the single shared connection if the delegate factories are
`CachingClientConnectionFactory` instances.

**cherry-pick to 5.2.x**

I will backport to 5.1.x, 4.3.x after review/merge.

* Polish javadocs and fix typo in docs
  • Loading branch information
garyrussell committed Feb 28, 2020
1 parent a8bb444 commit dc266fa
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 12 deletions.
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@

package org.springframework.integration.ip.tcp.connection;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
Expand All @@ -34,18 +35,70 @@
* Given a list of connection factories, serves up {@link TcpConnection}s
* that can iterate over a connection from each factory until the write
* succeeds or the list is exhausted.
*
* @author Gary Russell
* @since 2.2
*
*/
public class FailoverClientConnectionFactory extends AbstractClientConnectionFactory {

private static final long DEFAULT_REFRESH_SHARED_INTERVAL = 0L;

private final List<AbstractClientConnectionFactory> factories;

private final boolean cachingDelegates;

private long refreshSharedInterval = DEFAULT_REFRESH_SHARED_INTERVAL;

private boolean closeOnRefresh;

private volatile long creationTime;

/**
* Construct an instance with the provided delegate factories.
* @param factories the delegates.
*/
public FailoverClientConnectionFactory(List<AbstractClientConnectionFactory> factories) {
super("", 0);
Assert.notEmpty(factories, "At least one factory is required");
this.factories = factories;
this.factories = new ArrayList<>(factories);
this.cachingDelegates = factories.stream()
.anyMatch(factory -> factory instanceof CachingClientConnectionFactory);
}

/**
* When using a shared connection {@link #setSingleUse(boolean) singleUse} is false,
* specify how long to wait before trying to fail back to start from the beginning of
* the factory list. Default is 0 for backwards compatibility to always try to get a
* connection to the primary server. If you don't want to fail back until the current
* connection is closed, set this to {@link Long#MAX_VALUE}.
* Cannot be changed when using {@link CachingClientConnectionFactory} delegates.
* @param refreshSharedInterval the interval in milliseconds.
* @since 4.3.22
* @see #setSingleUse(boolean)
* @see #setCloseOnRefresh(boolean)
*/
public void setRefreshSharedInterval(long refreshSharedInterval) {
Assert.isTrue(!this.cachingDelegates,
"'refreshSharedInterval' cannot be changed when using 'CachingClientConnectionFactory` delegates");
this.refreshSharedInterval = refreshSharedInterval;
}

/**
* When using a shared connection {@link #setSingleUse(boolean) singleUse} is false,
* set this to true to close the old shared connection after a refresh. If this is
* false, the connection will remain open, but unused until its connection factory is
* again used to get a connection. Default is false for backwards compatibility.
* Cannot be changed when using {@link CachingClientConnectionFactory} delegates.
* @param closeOnRefresh true to close.
* @since 4.3.22
* @see #setSingleUse(boolean)
* @see #setRefreshSharedInterval(long)
*/
public void setCloseOnRefresh(boolean closeOnRefresh) {
Assert.isTrue(!this.cachingDelegates,
"'closeOnRefresh' cannot be changed when using 'CachingClientConnectionFactory` delegates");
this.closeOnRefresh = closeOnRefresh;
}

@Override
Expand Down Expand Up @@ -93,27 +146,42 @@ public void registerSender(TcpSender sender) {

@Override
protected TcpConnectionSupport obtainConnection() throws Exception {
TcpConnectionSupport connection = this.getTheConnection();
if (connection != null && connection.isOpen()) {
((FailoverTcpConnection) connection).incrementEpoch();
return connection;
FailoverTcpConnection sharedConnection = (FailoverTcpConnection) getTheConnection();
boolean shared = !isSingleUse() && !this.cachingDelegates;
boolean refreshShared = shared
&& sharedConnection != null
&& System.currentTimeMillis() > this.creationTime + this.refreshSharedInterval;
if (sharedConnection != null && sharedConnection.isOpen() && !refreshShared) {
sharedConnection.incrementEpoch();
return sharedConnection;
}
FailoverTcpConnection failoverTcpConnection = new FailoverTcpConnection(this.factories);
if (getListener() != null) {
failoverTcpConnection.registerListener(getListener());
}
failoverTcpConnection.incrementEpoch();
this.creationTime = System.currentTimeMillis();
if (shared) {
/*
* We may have simply wrapped the same connection in a new wrapper; don't close.
*/
if (refreshShared && this.closeOnRefresh
&& !sharedConnection.delegate.equals(failoverTcpConnection.delegate)
&& sharedConnection.isOpen()) {
sharedConnection.close();
}
setTheConnection(failoverTcpConnection);
}
return failoverTcpConnection;
}


@Override
public void start() {
for (AbstractClientConnectionFactory factory : this.factories) {
factory.enableManualListenerRegistration();
factory.start();
}
this.setActive(true);
setActive(true);
super.start();
}

Expand Down Expand Up @@ -150,16 +218,16 @@ private final class FailoverTcpConnection extends TcpConnectionSupport implement

private final String connectionId;

private final AtomicLong epoch = new AtomicLong();

private volatile Iterator<AbstractClientConnectionFactory> factoryIterator;

private volatile AbstractClientConnectionFactory currentFactory;

private volatile TcpConnectionSupport delegate;
volatile TcpConnectionSupport delegate; // NOSONAR visibility

private volatile boolean open = true;

private final AtomicLong epoch = new AtomicLong();

private FailoverTcpConnection(List<AbstractClientConnectionFactory> factories) throws Exception {
this.factories = factories;
this.factoryIterator = factories.iterator();
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,12 +23,16 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.Socket;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
Expand All @@ -42,6 +46,7 @@

import org.junit.Rule;
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;

import org.springframework.beans.factory.BeanFactory;
Expand Down Expand Up @@ -113,6 +118,58 @@ public void testFailoverGood() throws Exception {
Mockito.verify(conn2).send(message);
}

@Test
public void testRefreshShared() throws Exception {
testRefreshShared(false);
}

@Test
public void testRefreshSharedCloseOnRefresh() throws Exception {
testRefreshShared(true);
}

private void testRefreshShared(boolean closeOnRefresh) throws Exception {
AbstractClientConnectionFactory factory1 = mock(AbstractClientConnectionFactory.class);
AbstractClientConnectionFactory factory2 = mock(AbstractClientConnectionFactory.class);
List<AbstractClientConnectionFactory> factories = new ArrayList<AbstractClientConnectionFactory>();
factories.add(factory1);
factories.add(factory2);
TcpConnectionSupport conn1 = makeMockConnection();
doReturn("conn1").when(conn1).getConnectionId();
TcpConnectionSupport conn2 = makeMockConnection();
doReturn("conn2").when(conn2).getConnectionId();
doThrow(new UncheckedIOException(new IOException("fail")))
.when(factory1).getConnection();
if (closeOnRefresh) {
when(factory2.getConnection()).thenReturn(conn1, conn2);
}
else {
when(factory2.getConnection()).thenReturn(conn1);
}
when(factory1.isActive()).thenReturn(true);
when(factory2.isActive()).thenReturn(true);
FailoverClientConnectionFactory failoverFactory = new FailoverClientConnectionFactory(factories);
failoverFactory.setCloseOnRefresh(closeOnRefresh);
failoverFactory.start();
TcpConnectionSupport connection = failoverFactory.getConnection();
assertNotNull(TestUtils.getPropertyValue(failoverFactory, "theConnection"));
failoverFactory.setRefreshSharedInterval(10_000);
assertSame(failoverFactory.getConnection(), connection);
failoverFactory.setRefreshSharedInterval(-1);
assertNotSame(failoverFactory.getConnection(), connection);
InOrder inOrder = inOrder(factory1, factory2, conn1);
inOrder.verify(factory1).getConnection();
inOrder.verify(factory2).getConnection();
inOrder.verify(factory1).getConnection();
inOrder.verify(factory2).getConnection();
if (closeOnRefresh) {
inOrder.verify(conn1).close();
}
else {
inOrder.verify(conn1, never()).close();
}
}

@Test(expected = IOException.class)
public void testFailoverAllDead() throws Exception {
AbstractClientConnectionFactory factory1 = mock(AbstractClientConnectionFactory.class);
Expand Down
17 changes: 17 additions & 0 deletions src/reference/asciidoc/ip.adoc
Expand Up @@ -530,6 +530,23 @@ The following example shows how to configure a failover client connection factor

NOTE: When using the failover connection factory, the `singleUse` property must be consistent between the factory itself and the list of factories it is configured to use.

The connnection factory has two properties when used with a shared connection (`singleUse=false`):

* `refreshSharedInterval`
* `closeOnRefresh`

These are `0` and `false` to retain the same behavior that existed before the properties were added.

Consider the following scenario based on the above configuration:
Let's say `clientFactory1` cannot establish a connection but `clientFactory2` can.
Each time the `failCF` `getConnection()` method is called, we will again attempt to connect using `clientFactory1`; if successful, the "old" connection will remain open and may be reused in future if the first factory fails once more.

Set `refreshSharedInterval` to only attempt to reconnect with the first factory after that time has expired; set it to `Long.MAX_VALUE` if you only want to fail back to the first factory when the current connection fails.

Set `closeOnRefresh` to close the "old" connection after a refresh actually creates a new connection.

IMPORTANT: These properties do not apply if any of the delegate factories is a `CachingClientConnectionFactory` because the connection caching is handled there; in that case the list of connection factories will always be consulted to get a connection.

[[tcp-affinity-cf]]
==== TCP Thread Affinity Connection Factory

Expand Down

0 comments on commit dc266fa

Please sign in to comment.