Skip to content

Commit

Permalink
[CCR] Refactor AutoFollowCoordinator to track leader indices per remo…
Browse files Browse the repository at this point in the history
…te cluster

and replaced poll interval setting with a hardcoded poll interval.
The hard coded interval will be removed in a follow up change to make
use of cluster state API's wait_for_metatdata_version.

Originates from elastic#35895
Relates to elastic#33007
  • Loading branch information
martijnvg committed Nov 29, 2018
1 parent 1390f36 commit cf23302
Show file tree
Hide file tree
Showing 5 changed files with 245 additions and 137 deletions.
Expand Up @@ -6,9 +6,15 @@

package org.elasticsearch.xpack.ccr;

import org.elasticsearch.client.Request;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;

import java.util.Map;

import static org.hamcrest.Matchers.equalTo;

public class ChainIT extends ESCCRRestTestCase {

public void testFollowIndex() throws Exception {
Expand Down Expand Up @@ -71,4 +77,53 @@ public void testFollowIndex() throws Exception {
}
}

public void testAutoFollowPatterns() throws Exception {
if ("follow".equals(targetCluster) == false) {
return;
}
Request putPatternRequest = new Request("PUT", "/_ccr/auto_follow/leader_cluster_pattern");
putPatternRequest.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"], \"remote_cluster\": \"leader_cluster\"}");
assertOK(client().performRequest(putPatternRequest));
putPatternRequest = new Request("PUT", "/_ccr/auto_follow/middle_cluster_pattern");
putPatternRequest.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"], \"remote_cluster\": \"middle_cluster\"}");
assertOK(client().performRequest(putPatternRequest));
try (RestClient leaderClient = buildLeaderClient()) {
Settings settings = Settings.builder()
.put("index.soft_deletes.enabled", true)
.build();
Request request = new Request("PUT", "/logs-20190101");
request.setJsonEntity("{\"settings\": " + Strings.toString(settings) +
", \"mappings\": {\"_doc\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}} }");
assertOK(leaderClient.performRequest(request));
for (int i = 0; i < 5; i++) {
String id = Integer.toString(i);
index(leaderClient, "logs-20190101", id, "field", i, "filtered_field", "true");
}
}
try (RestClient middleClient = buildMiddleClient()) {
Settings settings = Settings.builder()
.put("index.soft_deletes.enabled", true)
.build();
Request request = new Request("PUT", "/logs-20200101");
request.setJsonEntity("{\"settings\": " + Strings.toString(settings) +
", \"mappings\": {\"_doc\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}} }");
assertOK(middleClient.performRequest(request));
for (int i = 0; i < 5; i++) {
String id = Integer.toString(i);
index(middleClient, "logs-20200101", id, "field", i, "filtered_field", "true");
}
}
assertBusy(() -> {
Request statsRequest = new Request("GET", "/_ccr/stats");
Map<?, ?> response = toMap(client().performRequest(statsRequest));
Map<?, ?> autoFollowStats = (Map<?, ?>) response.get("auto_follow_stats");
assertThat(autoFollowStats.get("number_of_successful_follow_indices"), equalTo(2));

ensureYellow("logs-20190101");
ensureYellow("logs-20200101");
verifyDocuments("logs-20190101", 5, "filtered_field:true");
verifyDocuments("logs-20200101", 5, "filtered_field:true");
});
}

}
Expand Up @@ -144,7 +144,7 @@ public Collection<Object> createComponents(

return Arrays.asList(
ccrLicenseChecker,
new AutoFollowCoordinator(settings, client, threadPool, clusterService, ccrLicenseChecker)
new AutoFollowCoordinator(client, threadPool, clusterService, ccrLicenseChecker)
);
}

Expand Down
Expand Up @@ -7,7 +7,6 @@

import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.XPackSettings;

import java.util.Arrays;
Expand All @@ -29,12 +28,6 @@ private CcrSettings() {
public static final Setting<Boolean> CCR_FOLLOWING_INDEX_SETTING =
Setting.boolSetting("index.xpack.ccr.following_index", false, Property.IndexScope, Property.InternalIndex);

/**
* Setting for controlling the interval in between polling leader clusters to check whether there are indices to follow
*/
public static final Setting<TimeValue> CCR_AUTO_FOLLOW_POLL_INTERVAL =
Setting.timeSetting("xpack.ccr.auto_follow.poll_interval", TimeValue.timeValueMillis(2500), Property.NodeScope);

/**
* The settings defined by CCR.
*
Expand All @@ -43,8 +36,7 @@ private CcrSettings() {
static List<Setting<?>> getSettings() {
return Arrays.asList(
XPackSettings.CCR_ENABLED_SETTING,
CCR_FOLLOWING_INDEX_SETTING,
CCR_AUTO_FOLLOW_POLL_INTERVAL);
CCR_FOLLOWING_INDEX_SETTING);
}

}

0 comments on commit cf23302

Please sign in to comment.