diff --git a/docs/reference/modules/remote-clusters.asciidoc b/docs/reference/modules/remote-clusters.asciidoc index 81d882f5f0eb6..93f1bad06faff 100644 --- a/docs/reference/modules/remote-clusters.asciidoc +++ b/docs/reference/modules/remote-clusters.asciidoc @@ -129,6 +129,12 @@ PUT _cluster/settings The time to wait for remote connections to be established when the node starts. The default is `30s`. +`cluster.remote.ping_schedule`:: + + Schedule a regular application-level ping message to ensure that transport + connections to nodes belonging to remote clusters are kept alive. Defaults + to `5s`, it can be set to `-1` to disable pings. + `cluster.remote.node.attr`:: A node attribute to filter out nodes that are eligible as a gateway node in diff --git a/docs/reference/modules/transport.asciidoc b/docs/reference/modules/transport.asciidoc index 257181f70c507..757bc9eb10db6 100644 --- a/docs/reference/modules/transport.asciidoc +++ b/docs/reference/modules/transport.asciidoc @@ -46,10 +46,10 @@ between all nodes. Defaults to `false`. |`transport.ping_schedule` | Schedule a regular application-level ping message to ensure that transport connections between nodes are kept alive. Defaults to -`5s` in the transport client and `-1` (disabled) elsewhere. It is preferable to -correctly configure TCP keep-alives instead of using this feature, because TCP -keep-alives apply to all kinds of long-lived connection and not just to -transport connections. +`5s` in the transport client and connections to remote clusters, and `-1` +(disabled) elsewhere. It is preferable to correctly configure TCP keep-alives +instead of using this feature, because TCP keep-alives apply to all kinds of +long-lived connections and not just to transport connections. |======================================================================= diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 4b4ebb7414acb..4966d4497f27b 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -292,6 +292,7 @@ public void apply(Settings value, Settings current, Settings previous) { RemoteClusterService.SEARCH_REMOTE_NODE_ATTRIBUTE, RemoteClusterService.ENABLE_REMOTE_CLUSTERS, RemoteClusterService.SEARCH_ENABLE_REMOTE_CLUSTERS, + RemoteClusterService.REMOTE_PING_SCHEDULE, TransportService.TRACE_LOG_EXCLUDE_SETTING, TransportService.TRACE_LOG_INCLUDE_SETTING, TransportCloseIndexAction.CLUSTER_INDICES_CLOSE_ENABLE_SETTING, diff --git a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java index dc6241d1da338..52bdd0e49177f 100644 --- a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java @@ -67,12 +67,15 @@ public class ConnectionManager implements Closeable { private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener(); public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool) { + this(settings, transport, threadPool, TcpTransport.PING_SCHEDULE.get(settings)); + } + + public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool, TimeValue pingSchedule) { this.transport = transport; this.threadPool = threadPool; - this.pingSchedule = TcpTransport.PING_SCHEDULE.get(settings); + this.pingSchedule = pingSchedule; this.defaultProfile = buildDefaultConnectionProfile(settings); this.lifecycle.moveToStarted(); - if (pingSchedule.millis() > 0) { threadPool.schedule(pingSchedule, ThreadPool.Names.GENERIC, new ScheduledPing()); } @@ -287,6 +290,10 @@ public void onFailure(Exception e) { } } + public TimeValue getPingSchedule() { + return pingSchedule; + } + private static final class DelegatingNodeConnectionListener implements TransportConnectionListener { private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList<>(); diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index dc3bd3a353604..639b3b6c5e3fc 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -71,6 +71,10 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl assert Version.CURRENT.major < 8; } + //the default here (5s) differs from the default in TcpTransport.PING_SCHEDULE (which is -1, hence disabled) + public static final Setting REMOTE_PING_SCHEDULE = + Setting.timeSetting("cluster.remote.ping_schedule", TimeValue.timeValueSeconds(5), Setting.Property.NodeScope); + public static final Setting SEARCH_REMOTE_CONNECTIONS_PER_CLUSTER = Setting.intSetting("search.remote.connections_per_cluster", 3, 1, Setting.Property.NodeScope, Setting.Property.Deprecated); @@ -211,9 +215,10 @@ private synchronized void updateRemoteClusters(Map listener) { - RemoteClusterConnection remoteClusterConnection = remoteClusters.get(clusterAlias); - if (remoteClusterConnection == null) { - throw new IllegalArgumentException("no such remote cluster: " + clusterAlias); - } - remoteClusterConnection.ensureConnected(listener); + void ensureConnected(String clusterAlias, ActionListener listener) { + getRemoteClusterConnection(clusterAlias).ensureConnected(listener); } public Transport.Connection getConnection(String cluster) { + return getRemoteClusterConnection(cluster).getConnection(); + } + + RemoteClusterConnection getRemoteClusterConnection(String cluster) { RemoteClusterConnection connection = remoteClusters.get(cluster); if (connection == null) { throw new IllegalArgumentException("no such remote cluster: " + cluster); } - return connection.getConnection(); + return connection; } @Override @@ -386,7 +387,6 @@ synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavail } } - @Override protected void updateRemoteCluster(String clusterAlias, List addresses, String proxyAddress) { updateRemoteCluster(clusterAlias, addresses, proxyAddress, ActionListener.wrap((x) -> {}, (x) -> {})); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 94ac7e963c1da..31aac92b68093 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; @@ -97,6 +98,7 @@ public void testSettingsAreRegistered() { assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER)); assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING)); assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_NODE_ATTRIBUTE)); + assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_PING_SCHEDULE)); } public void testRemoteClusterSeedSetting() { @@ -337,6 +339,57 @@ public void testIncrementallyAddClusters() throws IOException { } } + public void testDefaultPingSchedule() throws IOException { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT)) { + DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); + knownNodes.add(seedTransport.getLocalDiscoNode()); + + try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, + null)) { + transportService.start(); + transportService.acceptIncomingRequests(); + try (RemoteClusterService service = new RemoteClusterService(Settings.EMPTY, transportService)) { + assertFalse(service.isCrossClusterSearchEnabled()); + service.initializeRemoteClusters(); + assertFalse(service.isCrossClusterSearchEnabled()); + service.updateRemoteCluster("cluster_1", Collections.singletonList(seedNode.getAddress().toString()), null); + assertTrue(service.isCrossClusterSearchEnabled()); + assertTrue(service.isRemoteClusterRegistered("cluster_1")); + RemoteClusterConnection remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); + assertEquals(TimeValue.timeValueSeconds(5), remoteClusterConnection.getConnectionManager().getPingSchedule()); + } + } + } + } + + public void testCustomPingSchedule() throws IOException { + List knownNodes = new CopyOnWriteArrayList<>(); + try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT)) { + DiscoveryNode seedNode = seedTransport.getLocalDiscoNode(); + knownNodes.add(seedTransport.getLocalDiscoNode()); + + try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, + null)) { + transportService.start(); + transportService.acceptIncomingRequests(); + Settings.Builder builder = Settings.builder(); + TimeValue pingSchedule = randomBoolean() ? TimeValue.MINUS_ONE : TimeValue.timeValueSeconds(randomIntBetween(1, 10)); + builder.put("cluster.remote.ping_schedule", pingSchedule); + try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { + assertFalse(service.isCrossClusterSearchEnabled()); + service.initializeRemoteClusters(); + assertFalse(service.isCrossClusterSearchEnabled()); + service.updateRemoteCluster("cluster_1", Collections.singletonList(seedNode.getAddress().toString()), null); + assertTrue(service.isCrossClusterSearchEnabled()); + assertTrue(service.isRemoteClusterRegistered("cluster_1")); + RemoteClusterConnection remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); + assertEquals(pingSchedule, remoteClusterConnection.getConnectionManager().getPingSchedule()); + } + } + } + } + public void testRemoteNodeAttribute() throws IOException, InterruptedException { final Settings settings = Settings.builder().put("cluster.remote.node.attr", "gateway").build();