Skip to content

Commit

Permalink
Schedule ping by default for remote clusters
Browse files Browse the repository at this point in the history
When we connect to remote clusters, there may be a few more routers/firewalls in-between compared to when we connect to nodes in the same cluster. We've experienced cases where firewalls drop connections completely and keep-alives seem not to be enough, or they are not properly configured. With this commit we enable application-level pings by default every 5 seconds from CCS nodes to the selected remote nodes. We also add a setting called `cluster.remote.ping_schedule` that allows to change the interval and potentially disable application-level pings, similar to `transport.ping_schedule` but the new setting only affects connections made to remote clusters.

Relates to elastic#34405
  • Loading branch information
javanna committed Oct 23, 2018
1 parent edfdac5 commit 95b55f3
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 22 deletions.
6 changes: 6 additions & 0 deletions docs/reference/modules/remote-clusters.asciidoc
Expand Up @@ -129,6 +129,12 @@ PUT _cluster/settings
The time to wait for remote connections to be established when the node
starts. The default is `30s`.

`cluster.remote.ping_schedule`::

Schedule a regular application-level ping message to ensure that transport
connections to nodes belonging to remote clusters are kept alive. Defaults
to `5s`, it can be set to `-1` to disable pings.

`cluster.remote.node.attr`::

A node attribute to filter out nodes that are eligible as a gateway node in
Expand Down
8 changes: 4 additions & 4 deletions docs/reference/modules/transport.asciidoc
Expand Up @@ -46,10 +46,10 @@ between all nodes. Defaults to `false`.

|`transport.ping_schedule` | Schedule a regular application-level ping message
to ensure that transport connections between nodes are kept alive. Defaults to
`5s` in the transport client and `-1` (disabled) elsewhere. It is preferable to
correctly configure TCP keep-alives instead of using this feature, because TCP
keep-alives apply to all kinds of long-lived connection and not just to
transport connections.
`5s` in the transport client and connections to remote clusters, and `-1`
(disabled) elsewhere. It is preferable to correctly configure TCP keep-alives
instead of using this feature, because TCP keep-alives apply to all kinds of
long-lived connections and not just to transport connections.

|=======================================================================

Expand Down
Expand Up @@ -292,6 +292,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteClusterService.SEARCH_REMOTE_NODE_ATTRIBUTE,
RemoteClusterService.ENABLE_REMOTE_CLUSTERS,
RemoteClusterService.SEARCH_ENABLE_REMOTE_CLUSTERS,
RemoteClusterService.REMOTE_PING_SCHEDULE,
TransportService.TRACE_LOG_EXCLUDE_SETTING,
TransportService.TRACE_LOG_INCLUDE_SETTING,
TransportCloseIndexAction.CLUSTER_INDICES_CLOSE_ENABLE_SETTING,
Expand Down
Expand Up @@ -67,12 +67,15 @@ public class ConnectionManager implements Closeable {
private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener();

public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool) {
this(settings, transport, threadPool, TcpTransport.PING_SCHEDULE.get(settings));
}

public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool, TimeValue pingSchedule) {
this.transport = transport;
this.threadPool = threadPool;
this.pingSchedule = TcpTransport.PING_SCHEDULE.get(settings);
this.pingSchedule = pingSchedule;
this.defaultProfile = buildDefaultConnectionProfile(settings);
this.lifecycle.moveToStarted();

if (pingSchedule.millis() > 0) {
threadPool.schedule(pingSchedule, ThreadPool.Names.GENERIC, new ScheduledPing());
}
Expand Down Expand Up @@ -287,6 +290,10 @@ public void onFailure(Exception e) {
}
}

public TimeValue getPingSchedule() {
return pingSchedule;
}

private static final class DelegatingNodeConnectionListener implements TransportConnectionListener {

private final CopyOnWriteArrayList<TransportConnectionListener> listeners = new CopyOnWriteArrayList<>();
Expand Down
Expand Up @@ -71,6 +71,10 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
assert Version.CURRENT.major < 8;
}

//the default here (5s) differs from the default in TcpTransport.PING_SCHEDULE (which is -1, hence disabled)
public static final Setting<TimeValue> REMOTE_PING_SCHEDULE =
Setting.timeSetting("cluster.remote.ping_schedule", TimeValue.timeValueSeconds(5), Setting.Property.NodeScope);

public static final Setting<Integer> SEARCH_REMOTE_CONNECTIONS_PER_CLUSTER =
Setting.intSetting("search.remote.connections_per_cluster", 3, 1, Setting.Property.NodeScope, Setting.Property.Deprecated);

Expand Down Expand Up @@ -211,9 +215,10 @@ private synchronized void updateRemoteClusters(Map<String, Tuple<String, List<Su
}

if (remote == null) { // this is a new cluster we have to add a new representation
remote = new RemoteClusterConnection(settings, entry.getKey(), seedList, transportService,
new ConnectionManager(settings, transportService.transport, transportService.threadPool), numRemoteConnections,
getNodePredicate(settings), proxyAddress);
ConnectionManager connectionManager = new ConnectionManager(settings, transportService.transport,
transportService.threadPool, REMOTE_PING_SCHEDULE.get(settings));
remote = new RemoteClusterConnection(settings, entry.getKey(), seedList, transportService, connectionManager,
numRemoteConnections, getNodePredicate(settings), proxyAddress);
remoteClusters.put(entry.getKey(), remote);
}

Expand Down Expand Up @@ -340,31 +345,27 @@ public void onFailure(Exception e) {
* @throws IllegalArgumentException if the remote cluster is unknown
*/
public Transport.Connection getConnection(DiscoveryNode node, String cluster) {
RemoteClusterConnection connection = remoteClusters.get(cluster);
if (connection == null) {
throw new IllegalArgumentException("no such remote cluster: " + cluster);
}
return connection.getConnection(node);
return getRemoteClusterConnection(cluster).getConnection(node);
}

/**
* Ensures that the given cluster alias is connected. If the cluster is connected this operation
* will invoke the listener immediately.
*/
public void ensureConnected(String clusterAlias, ActionListener<Void> listener) {
RemoteClusterConnection remoteClusterConnection = remoteClusters.get(clusterAlias);
if (remoteClusterConnection == null) {
throw new IllegalArgumentException("no such remote cluster: " + clusterAlias);
}
remoteClusterConnection.ensureConnected(listener);
void ensureConnected(String clusterAlias, ActionListener<Void> listener) {
getRemoteClusterConnection(clusterAlias).ensureConnected(listener);
}

public Transport.Connection getConnection(String cluster) {
return getRemoteClusterConnection(cluster).getConnection();
}

RemoteClusterConnection getRemoteClusterConnection(String cluster) {
RemoteClusterConnection connection = remoteClusters.get(cluster);
if (connection == null) {
throw new IllegalArgumentException("no such remote cluster: " + cluster);
}
return connection.getConnection();
return connection;
}

@Override
Expand All @@ -386,7 +387,6 @@ synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavail
}
}


@Override
protected void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxyAddress) {
updateRemoteCluster(clusterAlias, addresses, proxyAddress, ActionListener.wrap((x) -> {}, (x) -> {}));
Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
Expand Down Expand Up @@ -97,6 +98,7 @@ public void testSettingsAreRegistered() {
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_NODE_ATTRIBUTE));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_PING_SCHEDULE));
}

public void testRemoteClusterSeedSetting() {
Expand Down Expand Up @@ -337,6 +339,57 @@ public void testIncrementallyAddClusters() throws IOException {
}
}

public void testDefaultPingSchedule() throws IOException {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT)) {
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
knownNodes.add(seedTransport.getLocalDiscoNode());

try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool,
null)) {
transportService.start();
transportService.acceptIncomingRequests();
try (RemoteClusterService service = new RemoteClusterService(Settings.EMPTY, transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
assertFalse(service.isCrossClusterSearchEnabled());
service.updateRemoteCluster("cluster_1", Collections.singletonList(seedNode.getAddress().toString()), null);
assertTrue(service.isCrossClusterSearchEnabled());
assertTrue(service.isRemoteClusterRegistered("cluster_1"));
RemoteClusterConnection remoteClusterConnection = service.getRemoteClusterConnection("cluster_1");
assertEquals(TimeValue.timeValueSeconds(5), remoteClusterConnection.getConnectionManager().getPingSchedule());
}
}
}
}

public void testCustomPingSchedule() throws IOException {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT)) {
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
knownNodes.add(seedTransport.getLocalDiscoNode());

try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool,
null)) {
transportService.start();
transportService.acceptIncomingRequests();
Settings.Builder builder = Settings.builder();
TimeValue pingSchedule = randomBoolean() ? TimeValue.MINUS_ONE : TimeValue.timeValueSeconds(randomIntBetween(1, 10));
builder.put("cluster.remote.ping_schedule", pingSchedule);
try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
assertFalse(service.isCrossClusterSearchEnabled());
service.updateRemoteCluster("cluster_1", Collections.singletonList(seedNode.getAddress().toString()), null);
assertTrue(service.isCrossClusterSearchEnabled());
assertTrue(service.isRemoteClusterRegistered("cluster_1"));
RemoteClusterConnection remoteClusterConnection = service.getRemoteClusterConnection("cluster_1");
assertEquals(pingSchedule, remoteClusterConnection.getConnectionManager().getPingSchedule());
}
}
}
}

public void testRemoteNodeAttribute() throws IOException, InterruptedException {
final Settings settings =
Settings.builder().put("cluster.remote.node.attr", "gateway").build();
Expand Down

0 comments on commit 95b55f3

Please sign in to comment.