diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/indexlifecycle/ShrinkAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/indexlifecycle/ShrinkAction.java index 99ff9d7a69c26..ab777524b3bcc 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/indexlifecycle/ShrinkAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/indexlifecycle/ShrinkAction.java @@ -5,38 +5,73 @@ */ package org.elasticsearch.xpack.indexlifecycle; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.alias.Alias; +import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.elasticsearch.action.admin.indices.shrink.ResizeRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.AliasMetaData; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.logging.ESLoggerFactory; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.index.Index; import java.io.IOException; +import java.util.Objects; /** * A {@link LifecycleAction} which shrinks the index. */ public class ShrinkAction implements LifecycleAction { public static final String NAME = "shrink"; + public static final ParseField NUMBER_OF_SHARDS_FIELD = new ParseField("number_of_shards"); - private static final ObjectParser PARSER = new ObjectParser<>(NAME, ShrinkAction::new); + private static final Logger logger = ESLoggerFactory.getLogger(ShrinkAction.class); + private static final String SHRUNK_INDEX_NAME_PREFIX = "shrunk-"; + private static final ConstructingObjectParser PARSER = + new ConstructingObjectParser<>(NAME, a -> new ShrinkAction((Integer) a[0])); - public static ShrinkAction parse(XContentParser parser) { - return PARSER.apply(parser, null); + static { + PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_SHARDS_FIELD); } - public ShrinkAction() { + private int numberOfShards; + + public static ShrinkAction parse(XContentParser parser) throws IOException { + return PARSER.parse(parser, new CreateIndexRequest()); + } + + public ShrinkAction(int numberOfShards) { + if (numberOfShards <= 0) { + throw new IllegalArgumentException("[" + NUMBER_OF_SHARDS_FIELD.getPreferredName() + "] must be greater than 0"); + } + this.numberOfShards = numberOfShards; } public ShrinkAction(StreamInput in) throws IOException { + this.numberOfShards = in.readVInt(); + } + + int getNumberOfShards() { + return numberOfShards; } @Override public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(numberOfShards); } @Override @@ -47,35 +82,123 @@ public String getWriteableName() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); + builder.field(NUMBER_OF_SHARDS_FIELD.getPreferredName(), numberOfShards); builder.endObject(); return builder; } + /** + * Executes the Shrink Action. + * + * This function first checks whether the target shrunk index exists already, if it does not, then + * it will set the index to read-only and issue a resize request. + * + * Since the shrink response is not returned after a successful shrunk operation, we must poll to see if + * all the shards of the newly shrunk index are initialized. If so, then we can return the index to read-write + * and tell the listener that we have completed the action. + * + * @param index + * the {@link Index} on which to perform the action. + * @param client + * the {@link Client} to use for making changes to the index. + * @param clusterService + * the {@link ClusterService} to retrieve the current cluster state from. + * @param listener + * the {@link LifecycleAction.Listener} to return completion or failure responses to. + */ @Override public void execute(Index index, Client client, ClusterService clusterService, Listener listener) { - // NORELEASE: stub - listener.onSuccess(true); + String targetIndexName = SHRUNK_INDEX_NAME_PREFIX + index.getName(); + ClusterState clusterState = clusterService.state(); + IndexMetaData indexMetaData = clusterState.metaData().index(index.getName()); + String sourceIndexName = IndexMetaData.INDEX_SHRINK_SOURCE_NAME.get(indexMetaData.getSettings()); + boolean isShrunkIndex = index.getName().equals(SHRUNK_INDEX_NAME_PREFIX + sourceIndexName); + IndexMetaData shrunkIndexMetaData = clusterState.metaData().index(targetIndexName); + if (isShrunkIndex) { + // We are currently managing the shrunken index. This means all previous operations were successful and + // the original index is deleted. It is important to add an alias from the original index name to the shrunken + // index so that previous actions will still succeed. + boolean aliasAlreadyExists = indexMetaData.getAliases().values().contains(AliasMetaData.builder(sourceIndexName).build()); + boolean sourceIndexDeleted = clusterState.metaData().hasIndex(sourceIndexName) == false; + if (sourceIndexDeleted && aliasAlreadyExists) { + listener.onSuccess(true); + } else { + IndicesAliasesRequest aliasesRequest = new IndicesAliasesRequest() + .addAliasAction(IndicesAliasesRequest.AliasActions.removeIndex().index(sourceIndexName)) + .addAliasAction(IndicesAliasesRequest.AliasActions.add().index(index.getName()).alias(sourceIndexName)); + client.admin().indices().aliases(aliasesRequest, ActionListener.wrap(response -> { + listener.onSuccess(true); + }, listener::onFailure)); + } + } else if (shrunkIndexMetaData == null) { + // Shrunken index is not present yet, it is time to issue to shrink request + ResizeRequest resizeRequest = new ResizeRequest(targetIndexName, index.getName()); + resizeRequest.getTargetIndexRequest().settings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numberOfShards) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, indexMetaData.getNumberOfReplicas()) + .build()); + indexMetaData.getAliases().values().spliterator().forEachRemaining(aliasMetaDataObjectCursor -> { + resizeRequest.getTargetIndexRequest().alias(new Alias(aliasMetaDataObjectCursor.value.alias())); + }); + UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(Settings.builder() + .put(IndexMetaData.SETTING_BLOCKS_WRITE, true).build(), index.getName()); + client.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(r -> { + client.admin().indices().resizeIndex(resizeRequest, ActionListener.wrap( + resizeResponse -> { + if (resizeResponse.isAcknowledged()) { + listener.onSuccess(false); + } else { + listener.onFailure(new IllegalStateException("Shrink request failed to be acknowledged")); + } + }, listener::onFailure)); + }, listener::onFailure)); + } else if (index.getName().equals(IndexMetaData.INDEX_SHRINK_SOURCE_NAME.get(shrunkIndexMetaData.getSettings())) == false) { + // The target shrunken index exists, but it was not shrunk from our managed index. This means + // some external actions were done to create this index, and so we cannot progress with the shrink + // action until this is resolved. + listener.onFailure(new IllegalStateException("Cannot shrink index [" + index.getName() + "] because target " + + "index [" + targetIndexName + "] already exists.")); + } else if (ActiveShardCount.ALL.enoughShardsActive(clusterService.state(), targetIndexName)) { + if (indexMetaData.getSettings().get("index.lifecycle.name") + .equals(shrunkIndexMetaData.getSettings().get("index.lifecycle.name"))) { + // Since both the shrunken and original indices co-exist, do nothing and wait until + // the final step of the shrink action is completed and this original index is deleted. + listener.onSuccess(false); + } else { + // Since all shards of the shrunken index are active, it is safe to continue forward + // and begin swapping the indices by inheriting the lifecycle management to the new shrunken index. + UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(Settings.builder() + .put("index.lifecycle.name", indexMetaData.getSettings().get("index.lifecycle.name")) + .put("index.lifecycle.phase", indexMetaData.getSettings().get("index.lifecycle.phase")) + .put("index.lifecycle.action", indexMetaData.getSettings().get("index.lifecycle.action")).build(), targetIndexName); + client.admin().indices().updateSettings(updateSettingsRequest, + ActionListener.wrap(r -> listener.onSuccess(false) , listener::onFailure)); + } + } else { + // We are here because both the shrunken and original indices exist, but the shrunken index is not + // fully active yet. This means that we wait for another poll iteration of execute to check the + // state again. + logger.debug("index [" + index.getName() + "] has been shrunk to shrunken-index [" + targetIndexName + "], but" + + "shrunken index is not fully active yet"); + listener.onSuccess(false); + } } @Override - public int hashCode() { - return 1; + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ShrinkAction that = (ShrinkAction) o; + return Objects.equals(numberOfShards, that.numberOfShards); } @Override - public boolean equals(Object obj) { - if (obj == null) { - return false; - } - if (obj.getClass() != getClass()) { - return false; - } - return true; + public int hashCode() { + return Objects.hash(numberOfShards); } @Override public String toString() { return Strings.toString(this); } - } diff --git a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java index 152d3a84636d1..d17f333316477 100644 --- a/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java +++ b/x-pack/plugin/src/main/java/org/elasticsearch/xpack/indexlifecycle/IndexLifecycle.java @@ -149,6 +149,7 @@ public List getNa new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ForceMergeAction.NAME), ForceMergeAction::parse), new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ReplicasAction.NAME), ReplicasAction::parse), new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(RolloverAction.NAME), RolloverAction::parse), + new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(ShrinkAction.NAME), ShrinkAction::parse), new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse)); } diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/ShrinkActionTests.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/ShrinkActionTests.java index 5f0e384ca2cf0..2551a0019e26e 100644 --- a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/ShrinkActionTests.java +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/ShrinkActionTests.java @@ -5,11 +5,49 @@ */ package org.elasticsearch.xpack.indexlifecycle; +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.alias.Alias; +import org.elasticsearch.action.admin.indices.alias.IndicesAliasesAction; +import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; +import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsTestHelper; +import org.elasticsearch.action.admin.indices.shrink.ResizeAction; +import org.elasticsearch.action.admin.indices.shrink.ResizeRequest; +import org.elasticsearch.action.admin.indices.shrink.ResizeResponse; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.AdminClient; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.IndicesAdminClient; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.AliasMetaData; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.AbstractSerializingTestCase; +import org.mockito.Mockito; import java.io.IOException; +import java.util.Collections; + +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Matchers.any; public class ShrinkActionTests extends AbstractSerializingTestCase { @@ -20,11 +58,495 @@ protected ShrinkAction doParseInstance(XContentParser parser) throws IOException @Override protected ShrinkAction createTestInstance() { - return new ShrinkAction(); + return new ShrinkAction(randomIntBetween(1, 100)); + } + + @Override + protected ShrinkAction mutateInstance(ShrinkAction action) { + return new ShrinkAction(action.getNumberOfShards() + randomIntBetween(1, 2)); } @Override protected Reader instanceReader() { return ShrinkAction::new; } + + public void testNonPositiveShardNumber() { + Exception e = expectThrows(Exception.class, () -> new ShrinkAction(randomIntBetween(-100, 0))); + assertThat(e.getMessage(), equalTo("[number_of_shards] must be greater than 0")); + } + + public void testExecuteSuccessfullyCompleted() { + String originalIndexName = randomAlphaOfLengthBetween(1, 20); + Index index = new Index("shrunk-" + originalIndexName, randomAlphaOfLengthBetween(1, 20)); + ClusterService clusterService = Mockito.mock(ClusterService.class); + IndexMetaData originalIndexMetaData = IndexMetaData.builder(originalIndexName) + .settings(settings(Version.CURRENT)).numberOfReplicas(0).numberOfShards(1).build(); + IndexMetaData indexMetadata = IndexMetaData.builder(index.getName()) + .settings(settings(Version.CURRENT).put(IndexMetaData.INDEX_SHRINK_SOURCE_NAME_KEY, originalIndexName)) + .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); + ImmutableOpenMap.Builder indices = ImmutableOpenMap. builder().fPut(index.getName(), + indexMetadata).fPut(originalIndexName, originalIndexMetaData); + ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metaData(MetaData.builder().indices(indices.build())) + .build(); + Mockito.when(clusterService.state()).thenReturn(clusterState); + Client client = Mockito.mock(Client.class); + AdminClient adminClient = Mockito.mock(AdminClient.class); + IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class); + Mockito.when(client.admin()).thenReturn(adminClient); + Mockito.when(adminClient.indices()).thenReturn(indicesClient); + + Mockito.doAnswer(invocationOnMock -> { + IndicesAliasesRequest request = (IndicesAliasesRequest) invocationOnMock.getArguments()[0]; + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; + IndicesAliasesResponse response = IndicesAliasesAction.INSTANCE.newResponse(); + response.readFrom(StreamInput.wrap(new byte[] { 1 })); + + assertThat(request.getAliasActions().size(), equalTo(2)); + assertThat(request.getAliasActions().get(0).actionType(), equalTo(IndicesAliasesRequest.AliasActions.Type.REMOVE_INDEX)); + assertThat(request.getAliasActions().get(0).indices(), equalTo(new String[] { originalIndexName })); + assertThat(request.getAliasActions().get(1).actionType(), equalTo(IndicesAliasesRequest.AliasActions.Type.ADD)); + assertThat(request.getAliasActions().get(1).indices(), equalTo(new String[] { index.getName() })); + assertThat(request.getAliasActions().get(1).aliases(), equalTo(new String[] { originalIndexName })); + + listener.onResponse(response); + return null; + }).when(indicesClient).aliases(any(), any()); + + SetOnce actionCompleted = new SetOnce<>(); + ShrinkAction action = new ShrinkAction(randomIntBetween(1, 10)); + + action.execute(index, client, clusterService, new LifecycleAction.Listener() { + @Override + public void onSuccess(boolean completed) { + actionCompleted.set(completed); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError("Unexpected method call", e); + } + }); + + assertTrue(actionCompleted.get()); + Mockito.verify(client, Mockito.only()).admin(); + Mockito.verify(adminClient, Mockito.only()).indices(); + Mockito.verify(indicesClient, Mockito.only()).aliases(any(), any()); + } + + public void testExecuteAlreadyCompletedAndRunAgain() { + String originalIndexName = randomAlphaOfLengthBetween(1, 20); + Index index = new Index("shrunk-" + originalIndexName, randomAlphaOfLengthBetween(1, 20)); + ClusterService clusterService = Mockito.mock(ClusterService.class); + IndexMetaData indexMetadata = IndexMetaData.builder(index.getName()) + .putAlias(AliasMetaData.builder(originalIndexName).build()) + .settings(settings(Version.CURRENT).put(IndexMetaData.INDEX_SHRINK_SOURCE_NAME_KEY, originalIndexName)) + .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); + ImmutableOpenMap.Builder indices = ImmutableOpenMap. builder().fPut(index.getName(), + indexMetadata); + ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metaData(MetaData.builder().indices(indices.build())) + .build(); + Mockito.when(clusterService.state()).thenReturn(clusterState); + Client client = Mockito.mock(Client.class); + SetOnce actionCompleted = new SetOnce<>(); + ShrinkAction action = new ShrinkAction(randomIntBetween(1, 10)); + + action.execute(index, client, clusterService, new LifecycleAction.Listener() { + @Override + public void onSuccess(boolean completed) { + actionCompleted.set(completed); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError("Unexpected method call", e); + } + }); + + assertTrue(actionCompleted.get()); + Mockito.verify(client, Mockito.never()).admin(); + } + + public void testExecuteOriginalIndexAliasFailure() { + String originalIndexName = randomAlphaOfLengthBetween(1, 20); + Index index = new Index("shrunk-" + originalIndexName, randomAlphaOfLengthBetween(1, 20)); + Index targetIndex = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20)); + ClusterService clusterService = Mockito.mock(ClusterService.class); + IndexMetaData indexMetadata = IndexMetaData.builder(index.getName()) + .settings(settings(Version.CURRENT).put(IndexMetaData.INDEX_SHRINK_SOURCE_NAME_KEY, originalIndexName)) + .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); + ImmutableOpenMap.Builder indices = ImmutableOpenMap. builder().fPut(index.getName(), + indexMetadata); + ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metaData(MetaData.builder().indices(indices.build())) + .routingTable(RoutingTable.builder() + .add(IndexRoutingTable.builder(targetIndex).addShard( + TestShardRouting.newShardRouting(new ShardId(index, 0), "node1", true, + ShardRoutingState.STARTED))) + .build()) + .build(); + Mockito.when(clusterService.state()).thenReturn(clusterState); + Client client = Mockito.mock(Client.class); + AdminClient adminClient = Mockito.mock(AdminClient.class); + IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class); + Mockito.when(client.admin()).thenReturn(adminClient); + Mockito.when(adminClient.indices()).thenReturn(indicesClient); + + Mockito.doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; + listener.onFailure(new RuntimeException("failed")); + return null; + }).when(indicesClient).aliases(any(), any()); + + SetOnce onFailureException = new SetOnce<>(); + ShrinkAction action = new ShrinkAction(randomIntBetween(1, 10)); + + action.execute(index, client, clusterService, new LifecycleAction.Listener() { + @Override + public void onSuccess(boolean completed) { + throw new AssertionError("Unexpected method call"); + } + + @Override + public void onFailure(Exception e) { + onFailureException.set(e); + } + }); + + assertThat(onFailureException.get().getMessage(), equalTo("failed")); + + Mockito.verify(client, Mockito.only()).admin(); + Mockito.verify(adminClient, Mockito.only()).indices(); + Mockito.verify(indicesClient, Mockito.only()).aliases(any(), any()); + } + + public void testExecuteWithIssuedResizeRequest() { + Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20)); + Index targetIndex = new Index("shrunk-" + index.getName(), randomAlphaOfLengthBetween(1, 20)); + int numberOfShards = randomIntBetween(1, 5); + int numberOfReplicas = randomIntBetween(1, 5); + ClusterService clusterService = Mockito.mock(ClusterService.class); + IndexMetaData indexMetadata = IndexMetaData.builder(index.getName()) + .settings(settings(Version.CURRENT)) + .putAlias(AliasMetaData.builder("my_alias")) + .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(numberOfReplicas).build(); + ImmutableOpenMap.Builder indices = ImmutableOpenMap. builder().fPut(index.getName(), + indexMetadata); + ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metaData(MetaData.builder().indices(indices.build())) + .routingTable(RoutingTable.builder() + .add(IndexRoutingTable.builder(targetIndex).addShard( + TestShardRouting.newShardRouting(new ShardId(index, 0), "node1", true, + ShardRoutingState.STARTED))) + .build()) + .build(); + Mockito.when(clusterService.state()).thenReturn(clusterState); + Client client = Mockito.mock(Client.class); + AdminClient adminClient = Mockito.mock(AdminClient.class); + IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class); + Mockito.when(client.admin()).thenReturn(adminClient); + Mockito.when(adminClient.indices()).thenReturn(indicesClient); + + Settings expectedSettings = Settings.builder().put(IndexMetaData.SETTING_BLOCKS_WRITE, true).build(); + + Mockito.doAnswer(invocationOnMock -> { + UpdateSettingsRequest request = (UpdateSettingsRequest) invocationOnMock.getArguments()[0]; + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; + UpdateSettingsTestHelper.assertSettingsRequest(request, expectedSettings, index.getName()); + listener.onResponse(UpdateSettingsTestHelper.createMockResponse(true)); + return null; + }).when(indicesClient).updateSettings(any(), any()); + + Mockito.doAnswer(invocationOnMock -> { + ResizeRequest request = (ResizeRequest) invocationOnMock.getArguments()[0]; + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; + assertThat(request.getSourceIndex(), equalTo(index.getName())); + assertThat(request.getTargetIndexRequest().aliases(), equalTo(Collections.singleton(new Alias("my_alias")))); + assertThat(request.getTargetIndexRequest().settings(), equalTo(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numberOfShards) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas).build())); + assertThat(request.getTargetIndexRequest().index(), equalTo(targetIndex.getName())); + ResizeResponse resizeResponse = ResizeAction.INSTANCE.newResponse(); + resizeResponse.readFrom(StreamInput.wrap(new byte[] { 1, 1, 1, 1, 1 })); + listener.onResponse(resizeResponse); + return null; + }).when(indicesClient).resizeIndex(any(), any()); + + SetOnce actionCompleted = new SetOnce<>(); + ShrinkAction action = new ShrinkAction(numberOfShards); + + action.execute(index, client, clusterService, new LifecycleAction.Listener() { + @Override + public void onSuccess(boolean completed) { + actionCompleted.set(completed); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError("Unexpected method call", e); + } + }); + + assertFalse(actionCompleted.get()); + + Mockito.verify(client, Mockito.atLeast(1)).admin(); + Mockito.verify(adminClient, Mockito.atLeast(1)).indices(); + Mockito.verify(indicesClient, Mockito.atLeast(1)).updateSettings(any(), any()); + Mockito.verify(indicesClient, Mockito.atLeast(1)).resizeIndex(any(), any()); + } + + public void testExecuteWithIssuedResizeRequestFailure() { + Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20)); + Index targetIndex = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20)); + ClusterService clusterService = Mockito.mock(ClusterService.class); + IndexMetaData indexMetadata = IndexMetaData.builder(index.getName()) + .settings(settings(Version.CURRENT)) + .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); + ImmutableOpenMap.Builder indices = ImmutableOpenMap. builder().fPut(index.getName(), + indexMetadata); + ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metaData(MetaData.builder().indices(indices.build())) + .routingTable(RoutingTable.builder() + .add(IndexRoutingTable.builder(targetIndex).addShard( + TestShardRouting.newShardRouting(new ShardId(index, 0), "node1", true, + ShardRoutingState.STARTED))) + .build()) + .build(); + Mockito.when(clusterService.state()).thenReturn(clusterState); + Client client = Mockito.mock(Client.class); + AdminClient adminClient = Mockito.mock(AdminClient.class); + IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class); + Mockito.when(client.admin()).thenReturn(adminClient); + Mockito.when(adminClient.indices()).thenReturn(indicesClient); + + Mockito.doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; + listener.onResponse(UpdateSettingsTestHelper.createMockResponse(true)); + return null; + }).when(indicesClient).updateSettings(any(), any()); + + Mockito.doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; + ResizeResponse resizeResponse = ResizeAction.INSTANCE.newResponse(); + resizeResponse.readFrom(StreamInput.wrap(new byte[] { 0, 1, 1, 1, 1 })); + listener.onResponse(resizeResponse); + return null; + }).when(indicesClient).resizeIndex(any(), any()); + + SetOnce exceptionReturned = new SetOnce<>(); + ShrinkAction action = new ShrinkAction(randomIntBetween(1, 10)); + + action.execute(index, client, clusterService, new LifecycleAction.Listener() { + @Override + public void onSuccess(boolean completed) { + throw new AssertionError("Unexpected method call to onSuccess"); + } + + @Override + public void onFailure(Exception e) { + exceptionReturned.set(e); + } + }); + + assertThat(exceptionReturned.get().getMessage(), equalTo("Shrink request failed to be acknowledged")); + + Mockito.verify(client, Mockito.atLeast(1)).admin(); + Mockito.verify(adminClient, Mockito.atLeast(1)).indices(); + Mockito.verify(indicesClient, Mockito.atLeast(1)).updateSettings(any(), any()); + Mockito.verify(indicesClient, Mockito.atLeast(1)).resizeIndex(any(), any()); + } + + public void testExecuteWithAllShardsAllocatedAndShrunkenIndexSetting() { + Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20)); + Index targetIndex = new Index("shrunk-" + index.getName(), randomAlphaOfLengthBetween(1, 20)); + ClusterService clusterService = Mockito.mock(ClusterService.class); + IndexMetaData indexMetadata = IndexMetaData.builder(index.getName()) + .settings(settings(Version.CURRENT).put("index.lifecycle.phase", "phase1") + .put("index.lifecycle.action", "action1").put("index.lifecycle.name", "test")) + .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); + IndexMetaData targetIndexMetaData = IndexMetaData.builder(targetIndex.getName()) + .settings(settings(Version.CURRENT).put(IndexMetaData.INDEX_SHRINK_SOURCE_NAME_KEY, index.getName())) + .numberOfShards(randomIntBetween(1, 3)).numberOfReplicas(indexMetadata.getNumberOfReplicas()).build(); + ImmutableOpenMap.Builder indices = ImmutableOpenMap. builder() + .fPut(index.getName(), indexMetadata).fPut(targetIndex.getName(), targetIndexMetaData); + ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metaData(MetaData.builder().indices(indices.build())) + .routingTable(RoutingTable.builder() + .add(IndexRoutingTable.builder(targetIndex).addShard( + TestShardRouting.newShardRouting(new ShardId(index, 0), "node1", true, + ShardRoutingState.STARTED))) + .build()) + .build(); + Mockito.when(clusterService.state()).thenReturn(clusterState); + Client client = Mockito.mock(Client.class); + AdminClient adminClient = Mockito.mock(AdminClient.class); + IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class); + Mockito.when(client.admin()).thenReturn(adminClient); + Mockito.when(adminClient.indices()).thenReturn(indicesClient); + + Settings expectedSettings = Settings.builder().put("index.lifecycle.name", "test") + .put("index.lifecycle.phase", "phase1").put("index.lifecycle.action", "action1").build(); + + Mockito.doAnswer(invocationOnMock -> { + UpdateSettingsRequest request = (UpdateSettingsRequest) invocationOnMock.getArguments()[0]; + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; + UpdateSettingsTestHelper.assertSettingsRequest(request, expectedSettings, targetIndex.getName()); + listener.onResponse(UpdateSettingsTestHelper.createMockResponse(true)); + return null; + }).when(indicesClient).updateSettings(any(), any()); + + Mockito.doAnswer(invocationOnMock -> { + DeleteIndexRequest request = (DeleteIndexRequest) invocationOnMock.getArguments()[0]; + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[1]; + assertNotNull(request); + assertEquals(1, request.indices().length); + assertEquals(index.getName(), request.indices()[0]); + listener.onResponse(null); + return null; + }).when(indicesClient).delete(any(), any()); + + SetOnce actionCompleted = new SetOnce<>(); + ShrinkAction action = new ShrinkAction(randomIntBetween(1, 10)); + + action.execute(index, client, clusterService, new LifecycleAction.Listener() { + @Override + public void onSuccess(boolean completed) { + actionCompleted.set(completed); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError("Unexpected method call", e); + } + }); + + assertFalse(actionCompleted.get()); + Mockito.verify(client, Mockito.atLeast(1)).admin(); + Mockito.verify(adminClient, Mockito.atLeast(1)).indices(); + Mockito.verify(indicesClient, Mockito.atLeast(1)).updateSettings(any(), any()); + } + + public void testExecuteWithAllShardsAllocatedAndShrunkenIndexConfigured() { + String lifecycleName = randomAlphaOfLengthBetween(5, 10); + Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20)); + Index targetIndex = new Index("shrunk-" + index.getName(), randomAlphaOfLengthBetween(1, 20)); + ClusterService clusterService = Mockito.mock(ClusterService.class); + IndexMetaData indexMetadata = IndexMetaData.builder(index.getName()) + .settings(settings(Version.CURRENT).put("index.lifecycle.phase", "phase1") + .put("index.lifecycle.action", "action1").put("index.lifecycle.name", lifecycleName)) + .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); + IndexMetaData targetIndexMetaData = IndexMetaData.builder(targetIndex.getName()) + .settings(settings(Version.CURRENT) + .put(IndexMetaData.INDEX_SHRINK_SOURCE_NAME_KEY, index.getName()) + .put(IndexLifecycle.LIFECYCLE_NAME_SETTING.getKey(), lifecycleName)) + .numberOfShards(randomIntBetween(1, 3)).numberOfReplicas(indexMetadata.getNumberOfReplicas()).build(); + ImmutableOpenMap.Builder indices = ImmutableOpenMap. builder() + .fPut(index.getName(), indexMetadata).fPut(targetIndex.getName(), targetIndexMetaData); + ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metaData(MetaData.builder().indices(indices.build())) + .routingTable(RoutingTable.builder() + .add(IndexRoutingTable.builder(targetIndex).addShard( + TestShardRouting.newShardRouting(new ShardId(index, 0), "node1", true, + ShardRoutingState.STARTED))) + .build()) + .build(); + Mockito.when(clusterService.state()).thenReturn(clusterState); + Client client = Mockito.mock(Client.class); + + SetOnce actionCompleted = new SetOnce<>(); + ShrinkAction action = new ShrinkAction(randomIntBetween(1, 10)); + + action.execute(index, client, clusterService, new LifecycleAction.Listener() { + @Override + public void onSuccess(boolean completed) { + actionCompleted.set(completed); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError("Unexpected method call", e); + } + }); + + assertFalse(actionCompleted.get()); + Mockito.verifyZeroInteractions(client); + } + + public void testExecuteWaitingOnAllShardsActive() { + Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20)); + Index targetIndex = new Index("shrunk-" + index.getName(), randomAlphaOfLengthBetween(1, 20)); + ClusterService clusterService = Mockito.mock(ClusterService.class); + IndexMetaData indexMetadata = IndexMetaData.builder(index.getName()) + .settings(settings(Version.CURRENT)) + .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); + IndexMetaData targetIndexMetadata = IndexMetaData.builder(targetIndex.getName()) + .settings(settings(Version.CURRENT).put(IndexMetaData.INDEX_SHRINK_SOURCE_NAME_KEY, index.getName())) + .numberOfShards(1).numberOfReplicas(randomIntBetween(0, 5)).build(); + ImmutableOpenMap.Builder indices = ImmutableOpenMap. builder().fPut(index.getName(), + indexMetadata).fPut(targetIndex.getName(), targetIndexMetadata); + ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE).metaData(MetaData.builder().indices(indices.build())) + .routingTable(RoutingTable.builder() + .add(IndexRoutingTable.builder(targetIndex).addShard( + TestShardRouting.newShardRouting(new ShardId(index, 0), "node1", true, + ShardRoutingState.INITIALIZING))) + .build()) + .build(); + Mockito.when(clusterService.state()).thenReturn(clusterState); + Client client = Mockito.mock(Client.class); + SetOnce actionCompleted = new SetOnce<>(); + ShrinkAction action = new ShrinkAction(randomIntBetween(1, 10)); + + action.execute(index, client, clusterService, new LifecycleAction.Listener() { + @Override + public void onSuccess(boolean completed) { + actionCompleted.set(completed); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError("Unexpected method call", e); + } + }); + + assertFalse(actionCompleted.get()); + } + + public void testExecuteIndexAlreadyExists() { + Index index = new Index(randomAlphaOfLengthBetween(1, 20), randomAlphaOfLengthBetween(1, 20)); + Index targetIndex = new Index("shrunk-" + index.getName(), randomAlphaOfLengthBetween(1, 20)); + ClusterService clusterService = Mockito.mock(ClusterService.class); + IndexMetaData indexMetadata = IndexMetaData.builder(index.getName()) + .settings(settings(Version.CURRENT)) + .numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build(); + IndexMetaData targetIndexMetadata = IndexMetaData.builder(targetIndex.getName()) + .settings(settings(Version.CURRENT)) + .numberOfShards(1).numberOfReplicas(randomIntBetween(0, 5)).build(); + ImmutableOpenMap.Builder indices = ImmutableOpenMap. builder().fPut(index.getName(), + indexMetadata).fPut(targetIndex.getName(), targetIndexMetadata); + ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE) + .metaData(MetaData.builder().indices(indices.build())).build(); + Mockito.when(clusterService.state()).thenReturn(clusterState); + Client client = Mockito.mock(Client.class); + + SetOnce actionFailed = new SetOnce<>(); + ShrinkAction action = new ShrinkAction(randomIntBetween(1, 10)); + + action.execute(index, client, clusterService, new LifecycleAction.Listener() { + @Override + public void onSuccess(boolean completed) { + throw new AssertionError("Unexpected method call to onSuccess"); + } + + @Override + public void onFailure(Exception e) { + actionFailed.set(e); + } + }); + + assertThat(actionFailed.get().getMessage(), equalTo("Cannot shrink index [" + index.getName() + "]" + + " because target index [" + targetIndex.getName() + "] already exists.")); + } } diff --git a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/TimeseriesLifecycleTypeTests.java b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/TimeseriesLifecycleTypeTests.java index 31334793aec00..092840a71130a 100644 --- a/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/TimeseriesLifecycleTypeTests.java +++ b/x-pack/plugin/src/test/java/org/elasticsearch/xpack/indexlifecycle/TimeseriesLifecycleTypeTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.indexlifecycle; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; @@ -33,7 +34,7 @@ public class TimeseriesLifecycleTypeTests extends ESTestCase { private static final ForceMergeAction TEST_FORCE_MERGE_ACTION = new ForceMergeAction(1); private static final ReplicasAction TEST_REPLICAS_ACTION = new ReplicasAction(1); private static final RolloverAction TEST_ROLLOVER_ACTION = new RolloverAction("", new ByteSizeValue(1), null, null); - private static final ShrinkAction TEST_SHRINK_ACTION = new ShrinkAction(); + private static final ShrinkAction TEST_SHRINK_ACTION = new ShrinkAction(1); public void testGetFirstPhase() { Map phases = new HashMap<>();