Skip to content

Commit

Permalink
Adapt more tests suites to closed indices (elastic#39186)
Browse files Browse the repository at this point in the history
* Adapt more tests suites to closed indices

Similarly to elastic#38631, this pull request modifies multiple test suites so
that they runs the tests with opened or closed indices.

The suites are testing:
- shard allocation filtering
- shard allocation awereness
- Reroute API

Relates to elastic#33888
  • Loading branch information
tlrx committed Mar 1, 2019
1 parent 4d080e9 commit 855d83e
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData.State;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
Expand All @@ -33,9 +35,11 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;

Expand All @@ -54,7 +58,6 @@ public void testSimpleAwareness() throws Exception {
.put("cluster.routing.allocation.awareness.attributes", "rack_id")
.build();


logger.info("--> starting 2 nodes on the same rack");
internalCluster().startNodes(2, Settings.builder().put(commonSettings).put("node.attr.rack_id", "rack_1").build());

Expand All @@ -68,21 +71,33 @@ public void testSimpleAwareness() throws Exception {

ensureGreen();

final List<String> indicesToClose = randomSubsetOf(Arrays.asList("test1", "test2"));
indicesToClose.forEach(indexToClose -> assertAcked(client().admin().indices().prepareClose(indexToClose).get()));

logger.info("--> starting 1 node on a different rack");
final String node3 = internalCluster().startNode(Settings.builder().put(commonSettings).put("node.attr.rack_id", "rack_2").build());

// On slow machines the initial relocation might be delayed
assertThat(awaitBusy(
() -> {
logger.info("--> waiting for no relocation");
ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus().setWaitForNodes("3").setWaitForNoRelocatingShards(true).get();
ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth()
.setIndices("test1", "test2")
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.setWaitForNodes("3")
.setWaitForNoRelocatingShards(true)
.get();
if (clusterHealth.isTimedOut()) {
return false;
}

logger.info("--> checking current state");
ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
// check that closed indices are effectively closed
if (indicesToClose.stream().anyMatch(index -> clusterState.metaData().index(index).getState() != State.CLOSE)) {
return false;
}
// verify that we have all the primaries on node3
ObjectIntHashMap<String> counts = new ObjectIntHashMap<>();
for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) {
Expand All @@ -99,7 +114,7 @@ public void testSimpleAwareness() throws Exception {
), equalTo(true));
}

public void testAwarenessZones() throws Exception {
public void testAwarenessZones() {
Settings commonSettings = Settings.builder()
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "zone.values", "a,b")
.put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone")
Expand All @@ -121,12 +136,20 @@ public void testAwarenessZones() throws Exception {
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("4").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

client().admin().indices().prepareCreate("test")
.setSettings(Settings.builder().put("index.number_of_shards", 5)
.put("index.number_of_replicas", 1)).execute().actionGet();
createIndex("test", Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 5)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build());

if (randomBoolean()) {
assertAcked(client().admin().indices().prepareClose("test"));
}

logger.info("--> waiting for shards to be allocated");
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus()
health = client().admin().cluster().prepareHealth()
.setIndices("test")
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.setWaitForNoRelocatingShards(true).execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

Expand All @@ -146,7 +169,7 @@ public void testAwarenessZones() throws Exception {
assertThat(counts.get(B_0), anyOf(equalTo(2),equalTo(3)));
}

public void testAwarenessZonesIncrementalNodes() throws Exception {
public void testAwarenessZonesIncrementalNodes() {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b")
.put("cluster.routing.allocation.awareness.attributes", "zone")
Expand All @@ -159,11 +182,23 @@ public void testAwarenessZonesIncrementalNodes() throws Exception {
);
String A_0 = nodes.get(0);
String B_0 = nodes.get(1);
client().admin().indices().prepareCreate("test")
.setSettings(Settings.builder().put("index.number_of_shards", 5)
.put("index.number_of_replicas", 1)).execute().actionGet();
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus().setWaitForNodes("2").setWaitForNoRelocatingShards(true).execute().actionGet();

createIndex("test", Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 5)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build());

if (randomBoolean()) {
assertAcked(client().admin().indices().prepareClose("test"));
}

ClusterHealthResponse health = client().admin().cluster().prepareHealth()
.setIndices("test")
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.setWaitForNodes("2")
.setWaitForNoRelocatingShards(true)
.execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
ClusterState clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
ObjectIntHashMap<String> counts = new ObjectIntHashMap<>();
Expand All @@ -180,12 +215,22 @@ public void testAwarenessZonesIncrementalNodes() throws Exception {
logger.info("--> starting another node in zone 'b'");

String B_1 = internalCluster().startNode(Settings.builder().put(commonSettings).put("node.attr.zone", "b").build());
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus()
.setWaitForNodes("3").execute().actionGet();
health = client().admin().cluster().prepareHealth()
.setIndices("test")
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.setWaitForNodes("3")
.execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
client().admin().cluster().prepareReroute().get();
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus()
.setWaitForNodes("3").setWaitForActiveShards(10).setWaitForNoRelocatingShards(true).execute().actionGet();
health = client().admin().cluster().prepareHealth()
.setIndices("test")
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.setWaitForNodes("3")
.setWaitForActiveShards(10)
.setWaitForNoRelocatingShards(true)
.execute().actionGet();

assertThat(health.isTimedOut(), equalTo(false));
clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
Expand All @@ -204,12 +249,22 @@ public void testAwarenessZonesIncrementalNodes() throws Exception {
assertThat(counts.get(B_1), equalTo(2));

String noZoneNode = internalCluster().startNode();
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus()
.setWaitForNodes("4").execute().actionGet();
health = client().admin().cluster().prepareHealth()
.setIndices("test")
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.setWaitForNodes("4")
.execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));
client().admin().cluster().prepareReroute().get();
health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus()
.setWaitForNodes("4").setWaitForActiveShards(10).setWaitForNoRelocatingShards(true).execute().actionGet();
health = client().admin().cluster().prepareHealth()
.setIndices("test")
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.setWaitForNodes("4")
.setWaitForActiveShards(10)
.setWaitForNoRelocatingShards(true)
.execute().actionGet();

assertThat(health.isTimedOut(), equalTo(false));
clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
Expand All @@ -231,8 +286,14 @@ public void testAwarenessZonesIncrementalNodes() throws Exception {
client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder().put("cluster.routing.allocation.awareness.attributes", "").build()).get();

health = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus()
.setWaitForNodes("4").setWaitForActiveShards(10).setWaitForNoRelocatingShards(true).execute().actionGet();
health = client().admin().cluster().prepareHealth()
.setIndices("test")
.setWaitForEvents(Priority.LANGUID)
.setWaitForGreenStatus()
.setWaitForNodes("4")
.setWaitForActiveShards(10)
.setWaitForNoRelocatingShards(true)
.execute().actionGet();

assertThat(health.isTimedOut(), equalTo(false));
clusterState = client().admin().cluster().prepareState().execute().actionGet().getState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse;
import org.elasticsearch.action.admin.cluster.reroute.TransportClusterRerouteAction;
Expand All @@ -34,6 +33,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.RerouteExplanation;
import org.elasticsearch.cluster.routing.allocation.RoutingExplanations;
import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
Expand All @@ -48,6 +48,7 @@
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -102,6 +103,10 @@ private void rerouteWithCommands(Settings commonSettings) throws Exception {
.setSettings(Settings.builder().put("index.number_of_shards", 1))
.execute().actionGet();

if (randomBoolean()) {
client().admin().indices().prepareClose("test").get();
}

ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState();
assertThat(state.getRoutingNodes().unassigned().size(), equalTo(2));

Expand All @@ -128,8 +133,11 @@ private void rerouteWithCommands(Settings commonSettings) throws Exception {
assertThat(state.getRoutingNodes().node(state.nodes().resolveNode(node_1).getId()).iterator().next().state(),
equalTo(ShardRoutingState.INITIALIZING));

ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID)
.setWaitForYellowStatus().execute().actionGet();
ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth()
.setIndices("test")
.setWaitForEvents(Priority.LANGUID)
.setWaitForYellowStatus()
.execute().actionGet();
assertThat(healthResponse.isTimedOut(), equalTo(false));

logger.info("--> get the state, verify shard 1 primary allocated");
Expand All @@ -149,9 +157,12 @@ private void rerouteWithCommands(Settings commonSettings) throws Exception {
assertThat(state.getRoutingNodes().node(state.nodes().resolveNode(node_2).getId()).iterator().next().state(),
equalTo(ShardRoutingState.INITIALIZING));


healthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus()
.setWaitForNoRelocatingShards(true).execute().actionGet();
healthResponse = client().admin().cluster().prepareHealth()
.setIndices("test")
.setWaitForEvents(Priority.LANGUID)
.setWaitForYellowStatus()
.setWaitForNoRelocatingShards(true)
.execute().actionGet();
assertThat(healthResponse.isTimedOut(), equalTo(false));

logger.info("--> get the state, verify shard 1 primary moved from node1 to node2");
Expand Down Expand Up @@ -193,11 +204,15 @@ public void testDelayWithALargeAmountOfShards() throws Exception {

logger.info("--> create indices");
for (int i = 0; i < 25; i++) {
client().admin().indices().prepareCreate("test" + i)
.setSettings(Settings.builder()
.put("index.number_of_shards", 5).put("index.number_of_replicas", 1)
.put("index.unassigned.node_left.delayed_timeout", randomIntBetween(250, 1000) + "ms"))
.execute().actionGet();
final String indexName = "test" + i;
createIndex(indexName, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 5)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), randomIntBetween(250, 1000) + "ms")
.build());
if (randomBoolean()) {
assertAcked(client().admin().indices().prepareClose(indexName));
}
}

ensureGreen(TimeValue.timeValueMinutes(1));
Expand Down Expand Up @@ -294,10 +309,14 @@ public void testRerouteExplain() {
assertThat(healthResponse.isTimedOut(), equalTo(false));

logger.info("--> create an index with 1 shard");
client().admin().indices().prepareCreate("test")
.setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0))
.execute().actionGet();
createIndex("test", Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build());

if (randomBoolean()) {
assertAcked(client().admin().indices().prepareClose("test"));
}
ensureGreen("test");

logger.info("--> disable allocation");
Expand Down Expand Up @@ -403,12 +422,18 @@ public void testMessageLogging() throws Exception{
Loggers.removeAppender(actionLogger, allocateMockLog);
}

public void testClusterRerouteWithBlocks() throws Exception {
public void testClusterRerouteWithBlocks() {
List<String> nodesIds = internalCluster().startNodes(2);

logger.info("--> create an index with 1 shard and 0 replicas");
assertAcked(prepareCreate("test-blocks").setSettings(Settings.builder().put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)));
createIndex("test-blocks", Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build());

if (randomBoolean()) {
assertAcked(client().admin().indices().prepareClose("test-blocks"));
}
ensureGreen("test-blocks");

logger.info("--> check that the index has 1 shard");
Expand All @@ -432,11 +457,14 @@ public void testClusterRerouteWithBlocks() throws Exception {
SETTING_READ_ONLY_ALLOW_DELETE)) {
try {
enableIndexBlock("test-blocks", blockSetting);
assertAcked(client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("test-blocks", 0,
nodesIds.get(toggle % 2), nodesIds.get(++toggle % 2))));
assertAcked(client().admin().cluster().prepareReroute()
.add(new MoveAllocationCommand("test-blocks", 0, nodesIds.get(toggle % 2), nodesIds.get(++toggle % 2))));

ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth().setWaitForYellowStatus()
.setWaitForNoRelocatingShards(true).execute().actionGet();
ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth()
.setIndices("test-blocks")
.setWaitForYellowStatus()
.setWaitForNoRelocatingShards(true)
.execute().actionGet();
assertThat(healthResponse.isTimedOut(), equalTo(false));
} finally {
disableIndexBlock("test-blocks", blockSetting);
Expand Down
Loading

0 comments on commit 855d83e

Please sign in to comment.