From dabd1d50aec5b65c68eab288ef688f0b43f655db Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 23 Jan 2019 10:40:10 +0100 Subject: [PATCH] Fix follow index filtering in follow info api. The filtering by follower index was completely broken. Also the wrong persistent tasks were selected, causing the wrong status to be reported. Closes #37738 --- .../ccr/action/TransportFollowInfoAction.java | 22 +-- .../elasticsearch/xpack/ccr/FollowInfoIT.java | 139 ++++++++++++++++++ .../TransportFollowInfoActionTests.java | 73 +++++++++ .../TransportFollowStatsActionTests.java | 2 +- 4 files changed, 226 insertions(+), 10 deletions(-) create mode 100644 x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowInfoIT.java create mode 100644 x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportFollowInfoActionTests.java diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowInfoAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowInfoAction.java index 3e9c0ecbef881..6bf6e5a43753d 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowInfoAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowInfoAction.java @@ -65,19 +65,28 @@ protected void masterOperation(FollowInfoAction.Request request, List concreteFollowerIndices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(state, IndicesOptions.STRICT_EXPAND_OPEN_CLOSED, request.getFollowerIndices())); + List followerInfos = getFollowInfos(concreteFollowerIndices, state); + listener.onResponse(new FollowInfoAction.Response(followerInfos)); + } + + @Override + protected ClusterBlockException checkBlock(FollowInfoAction.Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + } + static List getFollowInfos(List concreteFollowerIndices, ClusterState state) { List followerInfos = new ArrayList<>(); PersistentTasksCustomMetaData persistentTasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE); - for (IndexMetaData indexMetaData : state.metaData()) { + for (String index : concreteFollowerIndices) { + IndexMetaData indexMetaData = state.metaData().index(index); Map ccrCustomData = indexMetaData.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY); if (ccrCustomData != null) { Optional result; if (persistentTasks != null) { result = persistentTasks.taskMap().values().stream() .map(persistentTask -> (ShardFollowTask) persistentTask.getParams()) - .filter(shardFollowTask -> concreteFollowerIndices.isEmpty() || - concreteFollowerIndices.contains(shardFollowTask.getFollowShardId().getIndexName())) + .filter(shardFollowTask -> index.equals(shardFollowTask.getFollowShardId().getIndexName())) .findAny(); } else { result = Optional.empty(); @@ -107,11 +116,6 @@ protected void masterOperation(FollowInfoAction.Request request, } } - listener.onResponse(new FollowInfoAction.Response(followerInfos)); - } - - @Override - protected ClusterBlockException checkBlock(FollowInfoAction.Request request, ClusterState state) { - return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + return followerInfos; } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowInfoIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowInfoIT.java new file mode 100644 index 0000000000000..478043a862b38 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowInfoIT.java @@ -0,0 +1,139 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr; + +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.xpack.CcrSingleNodeTestCase; +import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction; +import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction; +import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; + +import java.util.Comparator; + +import static java.util.Collections.singletonMap; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.xpack.ccr.LocalIndexFollowingIT.getIndexSettings; +import static org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.Status; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +public class FollowInfoIT extends CcrSingleNodeTestCase { + + public void testFollowInfoApiFollowerIndexFiltering() throws Exception { + final String leaderIndexSettings = getIndexSettings(1, 0, + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(client().admin().indices().prepareCreate("leader1").setSource(leaderIndexSettings, XContentType.JSON)); + ensureGreen("leader1"); + assertAcked(client().admin().indices().prepareCreate("leader2").setSource(leaderIndexSettings, XContentType.JSON)); + ensureGreen("leader2"); + + PutFollowAction.Request followRequest = getPutFollowRequest("leader1", "follower1"); + client().execute(PutFollowAction.INSTANCE, followRequest).get(); + + followRequest = getPutFollowRequest("leader2", "follower2"); + client().execute(PutFollowAction.INSTANCE, followRequest).get(); + + FollowInfoAction.Request request = new FollowInfoAction.Request(); + request.setFollowerIndices("follower1"); + FollowInfoAction.Response response = client().execute(FollowInfoAction.INSTANCE, request).actionGet(); + assertThat(response.getFollowInfos().size(), equalTo(1)); + assertThat(response.getFollowInfos().get(0).getFollowerIndex(), equalTo("follower1")); + assertThat(response.getFollowInfos().get(0).getLeaderIndex(), equalTo("leader1")); + assertThat(response.getFollowInfos().get(0).getStatus(), equalTo(Status.ACTIVE)); + assertThat(response.getFollowInfos().get(0).getParameters(), notNullValue()); + + request = new FollowInfoAction.Request(); + request.setFollowerIndices("follower2"); + response = client().execute(FollowInfoAction.INSTANCE, request).actionGet(); + assertThat(response.getFollowInfos().size(), equalTo(1)); + assertThat(response.getFollowInfos().get(0).getFollowerIndex(), equalTo("follower2")); + assertThat(response.getFollowInfos().get(0).getLeaderIndex(), equalTo("leader2")); + assertThat(response.getFollowInfos().get(0).getStatus(), equalTo(Status.ACTIVE)); + assertThat(response.getFollowInfos().get(0).getParameters(), notNullValue()); + + request = new FollowInfoAction.Request(); + request.setFollowerIndices("_all"); + response = client().execute(FollowInfoAction.INSTANCE, request).actionGet(); + response.getFollowInfos().sort(Comparator.comparing(FollowInfoAction.Response.FollowerInfo::getFollowerIndex)); + assertThat(response.getFollowInfos().size(), equalTo(2)); + assertThat(response.getFollowInfos().get(0).getFollowerIndex(), equalTo("follower1")); + assertThat(response.getFollowInfos().get(0).getLeaderIndex(), equalTo("leader1")); + assertThat(response.getFollowInfos().get(0).getStatus(), equalTo(Status.ACTIVE)); + assertThat(response.getFollowInfos().get(0).getParameters(), notNullValue()); + assertThat(response.getFollowInfos().get(1).getFollowerIndex(), equalTo("follower2")); + assertThat(response.getFollowInfos().get(1).getLeaderIndex(), equalTo("leader2")); + assertThat(response.getFollowInfos().get(1).getStatus(), equalTo(Status.ACTIVE)); + assertThat(response.getFollowInfos().get(1).getParameters(), notNullValue()); + + // Pause follower1 index and check the follower info api: + assertAcked(client().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower1")).actionGet()); + + request = new FollowInfoAction.Request(); + request.setFollowerIndices("follower1"); + response = client().execute(FollowInfoAction.INSTANCE, request).actionGet(); + assertThat(response.getFollowInfos().size(), equalTo(1)); + assertThat(response.getFollowInfos().get(0).getFollowerIndex(), equalTo("follower1")); + assertThat(response.getFollowInfos().get(0).getLeaderIndex(), equalTo("leader1")); + assertThat(response.getFollowInfos().get(0).getStatus(), equalTo(Status.PAUSED)); + assertThat(response.getFollowInfos().get(0).getParameters(), nullValue()); + + request = new FollowInfoAction.Request(); + request.setFollowerIndices("follower2"); + response = client().execute(FollowInfoAction.INSTANCE, request).actionGet(); + assertThat(response.getFollowInfos().size(), equalTo(1)); + assertThat(response.getFollowInfos().get(0).getFollowerIndex(), equalTo("follower2")); + assertThat(response.getFollowInfos().get(0).getLeaderIndex(), equalTo("leader2")); + assertThat(response.getFollowInfos().get(0).getStatus(), equalTo(Status.ACTIVE)); + assertThat(response.getFollowInfos().get(0).getParameters(), notNullValue()); + + request = new FollowInfoAction.Request(); + request.setFollowerIndices("_all"); + response = client().execute(FollowInfoAction.INSTANCE, request).actionGet(); + response.getFollowInfos().sort(Comparator.comparing(FollowInfoAction.Response.FollowerInfo::getFollowerIndex)); + assertThat(response.getFollowInfos().size(), equalTo(2)); + assertThat(response.getFollowInfos().get(0).getFollowerIndex(), equalTo("follower1")); + assertThat(response.getFollowInfos().get(0).getLeaderIndex(), equalTo("leader1")); + assertThat(response.getFollowInfos().get(0).getStatus(), equalTo(Status.PAUSED)); + assertThat(response.getFollowInfos().get(0).getParameters(), nullValue()); + assertThat(response.getFollowInfos().get(1).getFollowerIndex(), equalTo("follower2")); + assertThat(response.getFollowInfos().get(1).getLeaderIndex(), equalTo("leader2")); + assertThat(response.getFollowInfos().get(1).getStatus(), equalTo(Status.ACTIVE)); + assertThat(response.getFollowInfos().get(1).getParameters(), notNullValue()); + + assertAcked(client().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower2")).actionGet()); + } + + public void testFollowInfoApiIndexMissing() throws Exception { + final String leaderIndexSettings = getIndexSettings(1, 0, + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(client().admin().indices().prepareCreate("leader1").setSource(leaderIndexSettings, XContentType.JSON)); + ensureGreen("leader1"); + assertAcked(client().admin().indices().prepareCreate("leader2").setSource(leaderIndexSettings, XContentType.JSON)); + ensureGreen("leader2"); + + PutFollowAction.Request followRequest = getPutFollowRequest("leader1", "follower1"); + client().execute(PutFollowAction.INSTANCE, followRequest).get(); + + followRequest = getPutFollowRequest("leader2", "follower2"); + client().execute(PutFollowAction.INSTANCE, followRequest).get(); + + FollowInfoAction.Request request1 = new FollowInfoAction.Request(); + request1.setFollowerIndices("follower3"); + expectThrows(IndexNotFoundException.class, () -> client().execute(FollowInfoAction.INSTANCE, request1).actionGet()); + + FollowInfoAction.Request request2 = new FollowInfoAction.Request(); + request2.setFollowerIndices("follower2", "follower3"); + expectThrows(IndexNotFoundException.class, () -> client().execute(FollowInfoAction.INSTANCE, request2).actionGet()); + + assertAcked(client().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower1")).actionGet()); + assertAcked(client().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower2")).actionGet()); + } + +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportFollowInfoActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportFollowInfoActionTests.java new file mode 100644 index 0000000000000..4a023e6ee3ed2 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportFollowInfoActionTests.java @@ -0,0 +1,73 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ccr.Ccr; +import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response; +import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowerInfo; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + +import static org.elasticsearch.xpack.ccr.action.TransportFollowStatsActionTests.createShardFollowTask; +import static org.hamcrest.Matchers.equalTo; + +public class TransportFollowInfoActionTests extends ESTestCase { + + public void testGetFollowInfos() { + ClusterState state = createCS( + new String[] {"follower1", "follower2", "follower3", "index4"}, + new boolean[]{true, true, true, false}, + new boolean[]{true, true, false, false} + ); + List concreteIndices = Arrays.asList("follower1", "follower3"); + + List result = TransportFollowInfoAction.getFollowInfos(concreteIndices, state); + assertThat(result.size(), equalTo(2)); + assertThat(result.get(0).getFollowerIndex(), equalTo("follower1")); + assertThat(result.get(0).getStatus(), equalTo(Response.Status.ACTIVE)); + assertThat(result.get(1).getFollowerIndex(), equalTo("follower3")); + assertThat(result.get(1).getStatus(), equalTo(Response.Status.PAUSED)); + } + + private static ClusterState createCS(String[] indices, boolean[] followerIndices, boolean[] statuses) { + PersistentTasksCustomMetaData.Builder persistentTasks = PersistentTasksCustomMetaData.builder(); + MetaData.Builder mdBuilder = MetaData.builder(); + for (int i = 0; i < indices.length; i++) { + String index = indices[i]; + boolean isFollowIndex = followerIndices[i]; + boolean active = statuses[i]; + + IndexMetaData.Builder imdBuilder = IndexMetaData.builder(index) + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(0); + + if (isFollowIndex) { + imdBuilder.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, new HashMap<>()); + if (active) { + persistentTasks.addTask(Integer.toString(i), ShardFollowTask.NAME, createShardFollowTask(index), null); + } + } + mdBuilder.put(imdBuilder); + } + + mdBuilder.putCustom(PersistentTasksCustomMetaData.TYPE, persistentTasks.build()); + return ClusterState.builder(new ClusterName("_cluster")) + .metaData(mdBuilder.build()) + .build(); + } + +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportFollowStatsActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportFollowStatsActionTests.java index bc8c58f1de7de..b8f570e4ef4f6 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportFollowStatsActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportFollowStatsActionTests.java @@ -44,7 +44,7 @@ public void testFindFollowerIndicesFromShardFollowTasks() { assertThat(result.size(), equalTo(0)); } - private static ShardFollowTask createShardFollowTask(String followerIndex) { + static ShardFollowTask createShardFollowTask(String followerIndex) { return new ShardFollowTask( null, new ShardId(followerIndex, "", 0),