diff --git a/x-pack/plugin/ccr/qa/chain/src/test/java/org/elasticsearch/xpack/ccr/ChainIT.java b/x-pack/plugin/ccr/qa/chain/src/test/java/org/elasticsearch/xpack/ccr/ChainIT.java index e5a37aa829bbf..fcd1de325c20e 100644 --- a/x-pack/plugin/ccr/qa/chain/src/test/java/org/elasticsearch/xpack/ccr/ChainIT.java +++ b/x-pack/plugin/ccr/qa/chain/src/test/java/org/elasticsearch/xpack/ccr/ChainIT.java @@ -6,9 +6,15 @@ package org.elasticsearch.xpack.ccr; +import org.elasticsearch.client.Request; import org.elasticsearch.client.RestClient; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; + public class ChainIT extends ESCCRRestTestCase { public void testFollowIndex() throws Exception { @@ -71,4 +77,53 @@ public void testFollowIndex() throws Exception { } } + public void testAutoFollowPatterns() throws Exception { + if ("follow".equals(targetCluster) == false) { + return; + } + Request putPatternRequest = new Request("PUT", "/_ccr/auto_follow/leader_cluster_pattern"); + putPatternRequest.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"], \"remote_cluster\": \"leader_cluster\"}"); + assertOK(client().performRequest(putPatternRequest)); + putPatternRequest = new Request("PUT", "/_ccr/auto_follow/middle_cluster_pattern"); + putPatternRequest.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"], \"remote_cluster\": \"middle_cluster\"}"); + assertOK(client().performRequest(putPatternRequest)); + try (RestClient leaderClient = buildLeaderClient()) { + Settings settings = Settings.builder() + .put("index.soft_deletes.enabled", true) + .build(); + Request request = new Request("PUT", "/logs-20190101"); + request.setJsonEntity("{\"settings\": " + Strings.toString(settings) + + ", \"mappings\": {\"_doc\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}} }"); + assertOK(leaderClient.performRequest(request)); + for (int i = 0; i < 5; i++) { + String id = Integer.toString(i); + index(leaderClient, "logs-20190101", id, "field", i, "filtered_field", "true"); + } + } + try (RestClient middleClient = buildMiddleClient()) { + Settings settings = Settings.builder() + .put("index.soft_deletes.enabled", true) + .build(); + Request request = new Request("PUT", "/logs-20200101"); + request.setJsonEntity("{\"settings\": " + Strings.toString(settings) + + ", \"mappings\": {\"_doc\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}} }"); + assertOK(middleClient.performRequest(request)); + for (int i = 0; i < 5; i++) { + String id = Integer.toString(i); + index(middleClient, "logs-20200101", id, "field", i, "filtered_field", "true"); + } + } + assertBusy(() -> { + Request statsRequest = new Request("GET", "/_ccr/stats"); + Map response = toMap(client().performRequest(statsRequest)); + Map autoFollowStats = (Map) response.get("auto_follow_stats"); + assertThat(autoFollowStats.get("number_of_successful_follow_indices"), equalTo(2)); + + ensureYellow("logs-20190101"); + ensureYellow("logs-20200101"); + verifyDocuments("logs-20190101", 5, "filtered_field:true"); + verifyDocuments("logs-20200101", 5, "filtered_field:true"); + }); + } + } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index ec196d637e1a9..67c30e6a7c252 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -144,7 +144,7 @@ public Collection createComponents( return Arrays.asList( ccrLicenseChecker, - new AutoFollowCoordinator(settings, client, threadPool, clusterService, ccrLicenseChecker) + new AutoFollowCoordinator(client, threadPool, clusterService, ccrLicenseChecker) ); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java index d3f5c85b4f83e..544a45792e070 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java @@ -7,7 +7,6 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.xpack.core.XPackSettings; import java.util.Arrays; @@ -29,12 +28,6 @@ private CcrSettings() { public static final Setting CCR_FOLLOWING_INDEX_SETTING = Setting.boolSetting("index.xpack.ccr.following_index", false, Property.IndexScope, Property.InternalIndex); - /** - * Setting for controlling the interval in between polling leader clusters to check whether there are indices to follow - */ - public static final Setting CCR_AUTO_FOLLOW_POLL_INTERVAL = - Setting.timeSetting("xpack.ccr.auto_follow.poll_interval", TimeValue.timeValueMillis(2500), Property.NodeScope); - /** * The settings defined by CCR. * @@ -43,8 +36,7 @@ private CcrSettings() { static List> getSettings() { return Arrays.asList( XPackSettings.CCR_ENABLED_SETTING, - CCR_FOLLOWING_INDEX_SETTING, - CCR_AUTO_FOLLOW_POLL_INTERVAL); + CCR_FOLLOWING_INDEX_SETTING); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index 0e86aa157adfc..0f49adfab6ddb 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -15,14 +15,14 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateApplier; +import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.collect.CopyOnWriteHashMap; import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.CountDown; @@ -31,7 +31,6 @@ import org.elasticsearch.license.LicenseUtils; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; -import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern; import org.elasticsearch.xpack.core.ccr.AutoFollowStats; @@ -45,28 +44,29 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.TreeMap; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; /** * A component that runs only on the elected master node and follows leader indices automatically * if they match with a auto follow pattern that is defined in {@link AutoFollowMetadata}. */ -public class AutoFollowCoordinator implements ClusterStateApplier { +public class AutoFollowCoordinator implements ClusterStateListener { private static final Logger LOGGER = LogManager.getLogger(AutoFollowCoordinator.class); private static final int MAX_AUTO_FOLLOW_ERRORS = 256; private final Client client; - private final TimeValue pollInterval; private final ThreadPool threadPool; private final ClusterService clusterService; private final CcrLicenseChecker ccrLicenseChecker; - private volatile boolean localNodeMaster = false; + private volatile Map autoFollowers = Collections.emptyMap(); // The following fields are read and updated under a lock: private long numberOfSuccessfulIndicesAutoFollowed = 0; @@ -75,7 +75,6 @@ public class AutoFollowCoordinator implements ClusterStateApplier { private final LinkedHashMap recentAutoFollowErrors; public AutoFollowCoordinator( - Settings settings, Client client, ThreadPool threadPool, ClusterService clusterService, @@ -84,10 +83,7 @@ public AutoFollowCoordinator( this.threadPool = threadPool; this.clusterService = clusterService; this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker, "ccrLicenseChecker"); - - this.pollInterval = CcrSettings.CCR_AUTO_FOLLOW_POLL_INTERVAL.get(settings); - clusterService.addStateApplier(this); - + clusterService.addListener(this); this.recentAutoFollowErrors = new LinkedHashMap() { @Override protected boolean removeEldestEntry(final Map.Entry eldest) { @@ -130,151 +126,179 @@ synchronized void updateStats(List results) { } } - private void doAutoFollow() { - if (localNodeMaster == false) { - return; - } - ClusterState followerClusterState = clusterService.state(); + void updateAutoFollowers(ClusterState followerClusterState) { AutoFollowMetadata autoFollowMetadata = followerClusterState.getMetaData().custom(AutoFollowMetadata.TYPE); if (autoFollowMetadata == null) { - threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow); - return; - } - - if (autoFollowMetadata.getPatterns().isEmpty()) { - threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow); return; } if (ccrLicenseChecker.isCcrAllowed() == false) { // TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API LOGGER.warn("skipping auto-follower coordination", LicenseUtils.newComplianceException("ccr")); - threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow); return; } - Consumer> handler = results -> { - updateStats(results); - threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow); - }; - AutoFollower operation = new AutoFollower(handler, followerClusterState) { + final CopyOnWriteHashMap autoFollowers = CopyOnWriteHashMap.copyOf(this.autoFollowers); + Set newRemoteClusters = autoFollowMetadata.getPatterns().entrySet().stream() + .filter(entry -> autoFollowers.containsKey(entry.getValue().getRemoteCluster()) == false) + .map(entry -> entry.getValue().getRemoteCluster()) + .collect(Collectors.toSet()); + + Map newAutoFollowers = new HashMap<>(newRemoteClusters.size()); + for (String remoteCluster : newRemoteClusters) { + AutoFollower autoFollower = new AutoFollower(remoteCluster, threadPool, this::updateStats, clusterService::state) { + + @Override + void getLeaderClusterState(final String remoteCluster, + final BiConsumer handler) { + final ClusterStateRequest request = new ClusterStateRequest(); + request.clear(); + request.metaData(true); + request.routingTable(true);// TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API + ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState( + client, + remoteCluster, + request, + e -> handler.accept(null, e), + leaderClusterState -> handler.accept(leaderClusterState, null)); + } - @Override - void getLeaderClusterState(final String remoteCluster, - final BiConsumer handler) { - final ClusterStateRequest request = new ClusterStateRequest(); - request.clear(); - request.metaData(true); - request.routingTable(true); - // TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API - ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState( - client, - remoteCluster, - request, - e -> handler.accept(null, e), - leaderClusterState -> handler.accept(leaderClusterState, null)); - } + @Override + void createAndFollow(Map headers, + PutFollowAction.Request request, + Runnable successHandler, + Consumer failureHandler) { + Client followerClient = CcrLicenseChecker.wrapClient(client, headers); + followerClient.execute( + PutFollowAction.INSTANCE, + request, + ActionListener.wrap(r -> successHandler.run(), failureHandler) + ); + } - @Override - void createAndFollow(Map headers, - PutFollowAction.Request request, - Runnable successHandler, - Consumer failureHandler) { - Client followerClient = CcrLicenseChecker.wrapClient(client, headers); - followerClient.execute( - PutFollowAction.INSTANCE, - request, - ActionListener.wrap(r -> successHandler.run(), failureHandler) - ); - } + @Override + void updateAutoFollowMetadata(Function updateFunction, + Consumer handler) { + clusterService.submitStateUpdateTask("update_auto_follow_metadata", new ClusterStateUpdateTask() { - @Override - void updateAutoFollowMetadata(Function updateFunction, - Consumer handler) { - clusterService.submitStateUpdateTask("update_auto_follow_metadata", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + return updateFunction.apply(currentState); + } - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - return updateFunction.apply(currentState); - } + @Override + public void onFailure(String source, Exception e) { + handler.accept(e); + } - @Override - public void onFailure(String source, Exception e) { - handler.accept(e); - } + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + handler.accept(null); + } + }); + } - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - handler.accept(null); - } - }); - } + }; + newAutoFollowers.put(remoteCluster, autoFollower); + autoFollower.autoFollowIndices(); + } - }; - operation.autoFollowIndices(); + List removedRemoteClusters = new ArrayList<>(); + for (String remoteCluster : autoFollowers.keySet()) { + boolean exist = autoFollowMetadata.getPatterns().values().stream() + .anyMatch(pattern -> pattern.getRemoteCluster().equals(remoteCluster)); + if (exist == false) { + removedRemoteClusters.add(remoteCluster); + } + } + this.autoFollowers = autoFollowers + .copyAndPutAll(newAutoFollowers) + .copyAndRemoveAll(removedRemoteClusters); } @Override - public void applyClusterState(ClusterChangedEvent event) { - final boolean beforeLocalMasterNode = localNodeMaster; - localNodeMaster = event.localNodeMaster(); - if (beforeLocalMasterNode == false && localNodeMaster) { - threadPool.schedule(pollInterval, ThreadPool.Names.SAME, this::doAutoFollow); + public void clusterChanged(ClusterChangedEvent event) { + if (event.localNodeMaster()) { + updateAutoFollowers(event.state()); } } abstract static class AutoFollower { - private final Consumer> handler; - private final ClusterState followerClusterState; - private final AutoFollowMetadata autoFollowMetadata; - - private final CountDown autoFollowPatternsCountDown; - private final AtomicArray autoFollowResults; - - AutoFollower(final Consumer> handler, final ClusterState followerClusterState) { - this.handler = handler; - this.followerClusterState = followerClusterState; - this.autoFollowMetadata = followerClusterState.getMetaData().custom(AutoFollowMetadata.TYPE); - this.autoFollowPatternsCountDown = new CountDown(autoFollowMetadata.getPatterns().size()); - this.autoFollowResults = new AtomicArray<>(autoFollowMetadata.getPatterns().size()); + private final String remoteCluster; + private final ThreadPool threadPool; + private final Consumer> statsUpdater; + private final Supplier followerClusterStateSupplier; + + private volatile CountDown autoFollowPatternsCountDown; + private volatile AtomicArray autoFollowResults; + + AutoFollower(final String remoteCluster, + ThreadPool threadPool, final Consumer> statsUpdater, + final Supplier followerClusterStateSupplier) { + this.remoteCluster = remoteCluster; + this.threadPool = threadPool; + this.statsUpdater = statsUpdater; + this.followerClusterStateSupplier = followerClusterStateSupplier; } void autoFollowIndices() { - int i = 0; - for (Map.Entry entry : autoFollowMetadata.getPatterns().entrySet()) { - final int slot = i; - final String autoFollowPattenName = entry.getKey(); - final AutoFollowPattern autoFollowPattern = entry.getValue(); - final String remoteCluster = autoFollowPattern.getRemoteCluster(); - - Map headers = autoFollowMetadata.getHeaders().get(autoFollowPattenName); - getLeaderClusterState(remoteCluster, (leaderClusterState, e) -> { - if (leaderClusterState != null) { - assert e == null; - final List followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(autoFollowPattenName); - final List leaderIndicesToFollow = getLeaderIndicesToFollow(remoteCluster, autoFollowPattern, - leaderClusterState, followerClusterState, followedIndices); + final ClusterState followerClusterState = followerClusterStateSupplier.get(); + final AutoFollowMetadata autoFollowMetadata = followerClusterState.metaData().custom(AutoFollowMetadata.TYPE); + if (autoFollowMetadata == null) { + LOGGER.info("AutoFollower for cluster [{}] has stopped, because there is no autofollow metadata", remoteCluster); + return; + } + + final List patterns = autoFollowMetadata.getPatterns().entrySet().stream() + .filter(entry -> entry.getValue().getRemoteCluster().equals(remoteCluster)) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + if (patterns.isEmpty()) { + LOGGER.info("AutoFollower for cluster [{}] has stopped, because there are no more patterns", remoteCluster); + return; + } + + this.autoFollowPatternsCountDown = new CountDown(patterns.size()); + this.autoFollowResults = new AtomicArray<>(patterns.size()); + + getLeaderClusterState(remoteCluster, (leaderClusterState, e) -> { + if (leaderClusterState != null) { + assert e == null; + + int i = 0; + for (String autoFollowPatternName : patterns) { + final int slot = i; + AutoFollowPattern autoFollowPattern = autoFollowMetadata.getPatterns().get(autoFollowPatternName); + Map headers = autoFollowMetadata.getHeaders().get(autoFollowPatternName); + List followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(autoFollowPatternName); + + final List leaderIndicesToFollow = getLeaderIndicesToFollow(autoFollowPattern, leaderClusterState, + followerClusterState, followedIndices); if (leaderIndicesToFollow.isEmpty()) { - finalise(slot, new AutoFollowResult(autoFollowPattenName)); + finalise(slot, new AutoFollowResult(autoFollowPatternName)); } else { List> patternsForTheSameLeaderCluster = autoFollowMetadata.getPatterns() .entrySet().stream() - .filter(item -> autoFollowPattenName.equals(item.getKey()) == false) + .filter(item -> autoFollowPatternName.equals(item.getKey()) == false) .filter(item -> remoteCluster.equals(item.getValue().getRemoteCluster())) .map(item -> new Tuple<>(item.getKey(), item.getValue())) .collect(Collectors.toList()); Consumer resultHandler = result -> finalise(slot, result); - checkAutoFollowPattern(autoFollowPattenName, remoteCluster, autoFollowPattern, leaderIndicesToFollow, headers, + checkAutoFollowPattern(autoFollowPatternName, remoteCluster, autoFollowPattern, leaderIndicesToFollow, headers, patternsForTheSameLeaderCluster, resultHandler); } - } else { - finalise(slot, new AutoFollowResult(autoFollowPattenName, e)); + i++; } - }); - i++; - } + } else { + List results = new ArrayList<>(patterns.size()); + for (String autoFollowPatternName : patterns) { + results.add(new AutoFollowResult(autoFollowPatternName, e)); + } + statsUpdater.accept(results); + } + }); } private void checkAutoFollowPattern(String autoFollowPattenName, @@ -357,12 +381,13 @@ private void finalise(int slot, AutoFollowResult result) { assert autoFollowResults.get(slot) == null; autoFollowResults.set(slot, result); if (autoFollowPatternsCountDown.countDown()) { - handler.accept(autoFollowResults.asList()); + statsUpdater.accept(autoFollowResults.asList()); + // TODO: Remove scheduling here with using cluster state API's waitForMetadataVersion: + threadPool.schedule(TimeValue.timeValueMillis(2500), ThreadPool.Names.GENERIC, this::autoFollowIndices); } } - static List getLeaderIndicesToFollow(String remoteCluster, - AutoFollowPattern autoFollowPattern, + static List getLeaderIndicesToFollow(AutoFollowPattern autoFollowPattern, ClusterState leaderClusterState, ClusterState followerClusterState, List followedIndexUUIDs) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index 4624a3622b992..1f566b0f0e4ce 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; @@ -34,17 +35,21 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -52,6 +57,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { public void testAutoFollower() { Client client = mock(Client.class); + ThreadPool threadPool = mockThreadPool(); when(client.getRemoteClusterClient(anyString())).thenReturn(client); ClusterState leaderState = createRemoteClusterState("logs-20190101"); @@ -81,7 +87,7 @@ public void testAutoFollower() { assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101")); assertThat(entries.get(0).getValue(), nullValue()); }; - AutoFollower autoFollower = new AutoFollower(handler, currentState) { + AutoFollower autoFollower = new AutoFollower("remote", threadPool, handler, followerClusterStateSupplier(currentState)) { @Override void getLeaderClusterState(String remoteCluster, BiConsumer handler) { @@ -117,6 +123,7 @@ void updateAutoFollowMetadata(Function updateFunctio public void testAutoFollowerClusterStateApiFailure() { Client client = mock(Client.class); + ThreadPool threadPool = mockThreadPool(); when(client.getRemoteClusterClient(anyString())).thenReturn(client); AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), @@ -140,7 +147,7 @@ public void testAutoFollowerClusterStateApiFailure() { assertThat(results.get(0).clusterStateFetchException, sameInstance(failure)); assertThat(results.get(0).autoFollowExecutionResults.entrySet().size(), equalTo(0)); }; - AutoFollower autoFollower = new AutoFollower(handler, followerState) { + AutoFollower autoFollower = new AutoFollower("remote", threadPool, handler, followerClusterStateSupplier(followerState)) { @Override void getLeaderClusterState(String remoteCluster, BiConsumer handler) { @@ -167,6 +174,7 @@ void updateAutoFollowMetadata(Function updateFunctio public void testAutoFollowerUpdateClusterStateFailure() { Client client = mock(Client.class); + ThreadPool threadPool = mockThreadPool(); when(client.getRemoteClusterClient(anyString())).thenReturn(client); ClusterState leaderState = createRemoteClusterState("logs-20190101"); @@ -194,7 +202,7 @@ public void testAutoFollowerUpdateClusterStateFailure() { assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101")); assertThat(entries.get(0).getValue(), sameInstance(failure)); }; - AutoFollower autoFollower = new AutoFollower(handler, followerState) { + AutoFollower autoFollower = new AutoFollower("remote", threadPool, handler, followerClusterStateSupplier(followerState)) { @Override void getLeaderClusterState(String remoteCluster, BiConsumer handler) { @@ -223,6 +231,7 @@ void updateAutoFollowMetadata(Function updateFunctio public void testAutoFollowerCreateAndFollowApiCallFailure() { Client client = mock(Client.class); + ThreadPool threadPool = mockThreadPool(); when(client.getRemoteClusterClient(anyString())).thenReturn(client); ClusterState leaderState = createRemoteClusterState("logs-20190101"); @@ -250,7 +259,7 @@ public void testAutoFollowerCreateAndFollowApiCallFailure() { assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101")); assertThat(entries.get(0).getValue(), sameInstance(failure)); }; - AutoFollower autoFollower = new AutoFollower(handler, followerState) { + AutoFollower autoFollower = new AutoFollower("remote", threadPool, handler, followerClusterStateSupplier(followerState)) { @Override void getLeaderClusterState(String remoteCluster, BiConsumer handler) { @@ -322,7 +331,7 @@ public void testGetLeaderIndicesToFollow() { .routingTable(routingTableBuilder.build()) .build(); - List result = AutoFollower.getLeaderIndicesToFollow("remote", autoFollowPattern, leaderState, followerState, + List result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, leaderState, followerState, Collections.emptyList()); result.sort(Comparator.comparing(Index::getName)); assertThat(result.size(), equalTo(3)); @@ -331,7 +340,7 @@ public void testGetLeaderIndicesToFollow() { assertThat(result.get(2).getName(), equalTo("metrics-4")); List followedIndexUUIDs = Collections.singletonList(leaderState.metaData().index("metrics-2").getIndexUUID()); - result = AutoFollower.getLeaderIndicesToFollow("remote", autoFollowPattern, leaderState, followerState, followedIndexUUIDs); + result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, leaderState, followerState, followedIndexUUIDs); result.sort(Comparator.comparing(Index::getName)); assertThat(result.size(), equalTo(2)); assertThat(result.get(0).getName(), equalTo("metrics-0")); @@ -363,7 +372,7 @@ public void testGetLeaderIndicesToFollow_shardsNotStarted() { .routingTable(RoutingTable.builder(leaderState.routingTable()).add(indexRoutingTable).build()) .build(); - List result = AutoFollower.getLeaderIndicesToFollow("remote", autoFollowPattern, leaderState, followerState, + List result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, leaderState, followerState, Collections.emptyList()); assertThat(result.size(), equalTo(1)); assertThat(result.get(0).getName(), equalTo("index1")); @@ -377,7 +386,7 @@ public void testGetLeaderIndicesToFollow_shardsNotStarted() { .routingTable(RoutingTable.builder(leaderState.routingTable()).add(indexRoutingTable).build()) .build(); - result = AutoFollower.getLeaderIndicesToFollow("remote", autoFollowPattern, leaderState, followerState, Collections.emptyList()); + result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, leaderState, followerState, Collections.emptyList()); assertThat(result.size(), equalTo(2)); result.sort(Comparator.comparing(Index::getName)); assertThat(result.get(0).getName(), equalTo("index1")); @@ -400,7 +409,6 @@ public void testGetFollowerIndexName() { public void testStats() { AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator( - Settings.EMPTY, null, null, mock(ClusterService.class), @@ -474,4 +482,32 @@ private static ClusterState createRemoteClusterState(String indexName) { return csBuilder.build(); } + private static Supplier followerClusterStateSupplier(ClusterState... states) { + final AutoFollowMetadata emptyAutoFollowMetadata = + new AutoFollowMetadata(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()); + final ClusterState lastState = ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, emptyAutoFollowMetadata)) + .build(); + final LinkedList queue = new LinkedList<>(Arrays.asList(states)); + return () -> { + final ClusterState current = queue.poll(); + if (current != null) { + return current; + } else { + return lastState; + } + }; + } + + private static ThreadPool mockThreadPool() { + ThreadPool threadPool = mock(ThreadPool.class); + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + Runnable task = (Runnable) args[2]; + task.run(); + return null; + }).when(threadPool).schedule(any(), anyString(), any()); + return threadPool; + } + }