Skip to content

Commit

Permalink
Allow shards of closed indices to be replicated as regular shards (el…
Browse files Browse the repository at this point in the history
…astic#38024)

This commit allows shards of indices in CLOSE state to be replicated as normal shards. 
It changes the MetaDataIndexStateService so that index routing tables of closed indices 
are kept in cluster state when the index is closed. Index routing tables are modified so 
that shard routings are reinitialized with the INDEX_CLOSED unassigned information. 
The IndicesClusterStateService is modified to remove IndexService instances of closed 
or reopened indices. In combination with the ShardRouting being in INITIALIZING state 
the shards are recreated on the data nodes to reflect the new state. If the index state is 
closed, the IndexShard instances will be created using the NoOpEngine as the engine
implementation.

This commit also mutes two tests that rely on the fact that shard locks are released when 
an index is closed, which is not the case anymore with replicated closed indices (actually 
the locks are released but reacquired once the shard is reinitialized after being closed). 
These tests will be adapted in follow up PRs.

Finally, many things will require to be adapted or improved in follow up PRs (see elastic#33888)
 but this is the first big step towards replicated closed indices.

Relates to elastic#33888
  • Loading branch information
tlrx committed Mar 1, 2019
1 parent a4ee55c commit 7c6f8e9
Show file tree
Hide file tree
Showing 16 changed files with 132 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -410,14 +410,16 @@ static ClusterState closeRoutingTable(final ClusterState currentState,
}

logger.debug("closing index {} succeeded", index);
blocks.removeIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID).addIndexBlock(index.getName(), INDEX_CLOSED_BLOCK);
metadata.put(IndexMetaData.builder(indexMetaData).state(IndexMetaData.State.CLOSE));
routingTable.remove(index.getName());
blocks.removeIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID);
blocks.addIndexBlock(index.getName(), INDEX_CLOSED_BLOCK);
routingTable.addAsFromOpenToClose(metadata.getSafe(index));
closedIndices.add(index.getName());
} catch (final IndexNotFoundException e) {
logger.debug("index {} has been deleted since it was blocked before closing, ignoring", index);
}
}

logger.info("completed closing of indices {}", closedIndices);
return ClusterState.builder(currentState).blocks(blocks).metaData(metadata).routingTable(routingTable.build()).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,13 @@ public Builder initializeAsFromCloseToOpen(IndexMetaData indexMetaData) {
return initializeEmpty(indexMetaData, new UnassignedInfo(UnassignedInfo.Reason.INDEX_REOPENED, null));
}

/**
* Initializes a new empty index, as as a result of closing an opened index.
*/
public Builder initializeAsFromOpenToClose(IndexMetaData indexMetaData) {
return initializeEmpty(indexMetaData, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CLOSED, null));
}

/**
* Initializes a new empty index, to be restored from a snapshot
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,13 @@ public Builder addAsFromCloseToOpen(IndexMetaData indexMetaData) {
return this;
}

public Builder addAsFromOpenToClose(IndexMetaData indexMetaData) {
assert indexMetaData.getState() == IndexMetaData.State.CLOSE;
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetaData.getIndex())
.initializeAsFromOpenToClose(indexMetaData);
return add(indexRoutingBuilder);
}

public Builder addAsRestore(IndexMetaData indexMetaData, SnapshotRecoverySource recoverySource) {
IndexRoutingTable.Builder indexRoutingBuilder = new IndexRoutingTable.Builder(indexMetaData.getIndex())
.initializeAsRestore(indexMetaData, recoverySource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,11 @@ public enum Reason {
/**
* Forced manually to allocate
*/
MANUAL_ALLOCATION
MANUAL_ALLOCATION,
/**
* Unassigned as a result of closing an index.
*/
INDEX_CLOSED
}

/**
Expand Down Expand Up @@ -264,6 +268,8 @@ public UnassignedInfo(StreamInput in) throws IOException {
public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().before(Version.V_6_0_0_beta2) && reason == Reason.MANUAL_ALLOCATION) {
out.writeByte((byte) Reason.ALLOCATION_FAILED.ordinal());
} else if (out.getVersion().before(Version.V_7_0_0) && reason == Reason.INDEX_CLOSED) {
out.writeByte((byte) Reason.REINITIALIZED.ordinal());
} else {
out.writeByte((byte) reason.ordinal());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ public NoOpEngine(EngineConfig config) {
protected DirectoryReader open(final IndexCommit commit) throws IOException {
final Directory directory = commit.getDirectory();
final List<IndexCommit> indexCommits = DirectoryReader.listCommits(directory);
assert indexCommits.size() == 1 : "expected only one commit point";
IndexCommit indexCommit = indexCommits.get(indexCommits.size() - 1);
final IndexCommit indexCommit = indexCommits.get(indexCommits.size() - 1);
return new DirectoryReader(directory, new LeafReader[0]) {
@Override
protected DirectoryReader doOpenIfChanged() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
import static org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.DELETED;
import static org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.FAILURE;
import static org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.NO_LONGER_ASSIGNED;
import static org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.REOPENED;

public class IndicesClusterStateService extends AbstractLifecycleComponent implements ClusterStateApplier {
private static final Logger logger = LogManager.getLogger(IndicesClusterStateService.class);
Expand Down Expand Up @@ -257,7 +258,7 @@ public synchronized void applyClusterState(final ClusterChangedEvent event) {

deleteIndices(event); // also deletes shards of deleted indices

removeUnallocatedIndices(event); // also removes shards of removed indices
removeIndices(event); // also removes shards of removed indices

failMissingShards(state);

Expand Down Expand Up @@ -369,17 +370,18 @@ protected void doRun() throws Exception {
}

/**
* Removes indices that have no shards allocated to this node. This does not delete the shard data as we wait for enough
* shard copies to exist in the cluster before deleting shard data (triggered by {@link org.elasticsearch.indices.store.IndicesStore}).
* Removes indices that have no shards allocated to this node or indices whose state has changed. This does not delete the shard data
* as we wait for enough shard copies to exist in the cluster before deleting shard data (triggered by
* {@link org.elasticsearch.indices.store.IndicesStore}).
*
* @param event the cluster changed event
*/
private void removeUnallocatedIndices(final ClusterChangedEvent event) {
private void removeIndices(final ClusterChangedEvent event) {
final ClusterState state = event.state();
final String localNodeId = state.nodes().getLocalNodeId();
assert localNodeId != null;

Set<Index> indicesWithShards = new HashSet<>();
final Set<Index> indicesWithShards = new HashSet<>();
RoutingNode localRoutingNode = state.getRoutingNodes().node(localNodeId);
if (localRoutingNode != null) { // null e.g. if we are not a data node
for (ShardRouting shardRouting : localRoutingNode) {
Expand All @@ -388,20 +390,27 @@ private void removeUnallocatedIndices(final ClusterChangedEvent event) {
}

for (AllocatedIndex<? extends Shard> indexService : indicesService) {
Index index = indexService.index();
if (indicesWithShards.contains(index) == false) {
final Index index = indexService.index();
final IndexMetaData indexMetaData = state.metaData().index(index);
final IndexMetaData existingMetaData = indexService.getIndexSettings().getIndexMetaData();

AllocatedIndices.IndexRemovalReason reason = null;
if (indexMetaData != null && indexMetaData.getState() != existingMetaData.getState()) {
reason = indexMetaData.getState() == IndexMetaData.State.CLOSE ? CLOSED : REOPENED;
} else if (indicesWithShards.contains(index) == false) {
// if the cluster change indicates a brand new cluster, we only want
// to remove the in-memory structures for the index and not delete the
// contents on disk because the index will later be re-imported as a
// dangling index
final IndexMetaData indexMetaData = state.metaData().index(index);
assert indexMetaData != null || event.isNewCluster() :
"index " + index + " does not exist in the cluster state, it should either " +
"have been deleted or the cluster must be new";
final AllocatedIndices.IndexRemovalReason reason =
indexMetaData != null && indexMetaData.getState() == IndexMetaData.State.CLOSE ? CLOSED : NO_LONGER_ASSIGNED;
logger.debug("{} removing index, [{}]", index, reason);
indicesService.removeIndex(index, reason, "removing index (no shards allocated)");
reason = indexMetaData != null && indexMetaData.getState() == IndexMetaData.State.CLOSE ? CLOSED : NO_LONGER_ASSIGNED;
}

if (reason != null) {
logger.debug("{} removing index ({})", index, reason);
indicesService.removeIndex(index, reason, "removing index (" + reason + ")");
}
}
}
Expand Down Expand Up @@ -612,7 +621,7 @@ private void updateShard(DiscoveryNodes nodes, ShardRouting shardRouting, Shard
ClusterState clusterState) {
final ShardRouting currentRoutingEntry = shard.routingEntry();
assert currentRoutingEntry.isSameAllocation(shardRouting) :
"local shard has a different allocation id but wasn't cleaning by removeShards. "
"local shard has a different allocation id but wasn't cleaned by removeShards. "
+ "cluster state: " + shardRouting + " local: " + currentRoutingEntry;

final long primaryTerm;
Expand Down Expand Up @@ -747,7 +756,7 @@ private void failAndRemoveShard(ShardRouting shardRouting, boolean sendShardFail
private void sendFailShard(ShardRouting shardRouting, String message, @Nullable Exception failure, ClusterState state) {
try {
logger.warn(() -> new ParameterizedMessage(
"[{}] marking and sending shard failed due to [{}]", shardRouting.shardId(), message), failure);
"{} marking and sending shard failed due to [{}]", shardRouting.shardId(), message), failure);
failedShardsCache.put(shardRouting.shardId(), shardRouting);
shardStateAction.localShardFailed(shardRouting, message, failure, SHARD_STATE_ACTION_LISTENER, state);
} catch (Exception inner) {
Expand Down Expand Up @@ -948,7 +957,7 @@ enum IndexRemovalReason {
DELETED,

/**
* The index have been closed. The index should be removed and all associated resources released. Persistent parts of the index
* The index has been closed. The index should be removed and all associated resources released. Persistent parts of the index
* like the shards files, state and transaction logs are kept around in the case of a disaster recovery.
*/
CLOSED,
Expand All @@ -958,7 +967,13 @@ enum IndexRemovalReason {
* Persistent parts of the index like the shards files, state and transaction logs are kept around in the
* case of a disaster recovery.
*/
FAILURE
FAILURE,

/**
* The index has been reopened. The index should be removed and all associated resources released. Persistent parts of the index
* like the shards files, state and transaction logs are kept around in the case of a disaster recovery.
*/
REOPENED,
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem
// it's fine to keep the contexts open if the index is still "alive"
// unfortunately we don't have a clear way to signal today why an index is closed.
// to release memory and let references to the filesystem go etc.
if (reason == IndexRemovalReason.DELETED || reason == IndexRemovalReason.CLOSED) {
if (reason == IndexRemovalReason.DELETED || reason == IndexRemovalReason.CLOSED || reason == IndexRemovalReason.REOPENED) {
freeAllContextForIndex(index);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.shards.ClusterShardLimitIT;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ValidationException;
Expand Down Expand Up @@ -210,7 +211,14 @@ public void testAddIndexClosedBlocks() {
for (Index index : indices) {
assertTrue(blockedIndices.containsKey(index));
if (mixedVersions) {
assertIsClosed(index.getName(), updatedState);
assertThat(updatedState.metaData().index(index).getState(), is(IndexMetaData.State.CLOSE));
assertTrue(updatedState.blocks().hasIndexBlock(index.getName(), MetaDataIndexStateService.INDEX_CLOSED_BLOCK));
assertThat("Index " + index + " must have only 1 block with id=" + MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID,
updatedState.blocks().indices().getOrDefault(index.getName(), emptySet()).stream().filter(clusterBlock ->
clusterBlock.id() == MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID).count(), equalTo(1L));

final IndexRoutingTable indexRoutingTable = updatedState.routingTable().index(index);
assertThat(indexRoutingTable, nullValue());
} else {
assertHasBlock(index.getName(), updatedState, blockedIndices.get(index));
}
Expand Down Expand Up @@ -346,19 +354,18 @@ private static ClusterState addIndex(final ClusterState currentState,
final ClusterState.Builder clusterStateBuilder = ClusterState.builder(currentState);
clusterStateBuilder.metaData(MetaData.builder(currentState.metaData()).put(indexMetaData, true));

if (state == IndexMetaData.State.OPEN) {
final IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(indexMetaData.getIndex());
for (int j = 0; j < indexMetaData.getNumberOfShards(); j++) {
ShardId shardId = new ShardId(indexMetaData.getIndex(), j);
IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId);
indexShardRoutingBuilder.addShard(newShardRouting(shardId, randomAlphaOfLength(10), true, ShardRoutingState.STARTED));
for (int k = 0; k < indexMetaData.getNumberOfReplicas(); k++) {
indexShardRoutingBuilder.addShard(newShardRouting(shardId, randomAlphaOfLength(10), false, ShardRoutingState.STARTED));
}
indexRoutingTable.addIndexShard(indexShardRoutingBuilder.build());
final IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(indexMetaData.getIndex());
for (int j = 0; j < indexMetaData.getNumberOfShards(); j++) {
ShardId shardId = new ShardId(indexMetaData.getIndex(), j);
IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId);
indexShardRoutingBuilder.addShard(newShardRouting(shardId, randomAlphaOfLength(10), true, ShardRoutingState.STARTED));
for (int k = 0; k < indexMetaData.getNumberOfReplicas(); k++) {
indexShardRoutingBuilder.addShard(newShardRouting(shardId, randomAlphaOfLength(10), false, ShardRoutingState.STARTED));
}
clusterStateBuilder.routingTable(RoutingTable.builder(currentState.routingTable()).add(indexRoutingTable).build());
indexRoutingTable.addIndexShard(indexShardRoutingBuilder.build());
}
clusterStateBuilder.routingTable(RoutingTable.builder(currentState.routingTable()).add(indexRoutingTable).build());

if (block != null) {
clusterStateBuilder.blocks(ClusterBlocks.builder().blocks(currentState.blocks()).addIndexBlock(index, block));
}
Expand All @@ -372,11 +379,19 @@ private static void assertIsOpened(final String indexName, final ClusterState cl

private static void assertIsClosed(final String indexName, final ClusterState clusterState) {
assertThat(clusterState.metaData().index(indexName).getState(), is(IndexMetaData.State.CLOSE));
assertThat(clusterState.routingTable().index(indexName), nullValue());
assertThat(clusterState.blocks().hasIndexBlock(indexName, MetaDataIndexStateService.INDEX_CLOSED_BLOCK), is(true));
assertThat("Index " + indexName + " must have only 1 block with [id=" + MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID + "]",
clusterState.blocks().indices().getOrDefault(indexName, emptySet()).stream()
.filter(clusterBlock -> clusterBlock.id() == MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID).count(), equalTo(1L));

final IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(indexName);
assertThat(indexRoutingTable, notNullValue());

for(IndexShardRoutingTable shardRoutingTable : indexRoutingTable) {
assertThat(shardRoutingTable.shards().stream().allMatch(ShardRouting::unassigned), is(true));
assertThat(shardRoutingTable.shards().stream().map(ShardRouting::unassignedInfo).map(UnassignedInfo::getReason)
.allMatch(info -> info == UnassignedInfo.Reason.INDEX_CLOSED), is(true));
}
}

private static void assertHasBlock(final String indexName, final ClusterState clusterState, final ClusterBlock closingBlock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.test.VersionUtils;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -54,6 +55,7 @@
import static org.hamcrest.Matchers.nullValue;

public class UnassignedInfoTests extends ESAllocationTestCase {

public void testReasonOrdinalOrder() {
UnassignedInfo.Reason[] order = new UnassignedInfo.Reason[]{
UnassignedInfo.Reason.INDEX_CREATED,
Expand All @@ -70,7 +72,8 @@ public void testReasonOrdinalOrder() {
UnassignedInfo.Reason.REALLOCATED_REPLICA,
UnassignedInfo.Reason.PRIMARY_FAILED,
UnassignedInfo.Reason.FORCED_EMPTY_PRIMARY,
UnassignedInfo.Reason.MANUAL_ALLOCATION,};
UnassignedInfo.Reason.MANUAL_ALLOCATION,
UnassignedInfo.Reason.INDEX_CLOSED,};
for (int i = 0; i < order.length; i++) {
assertThat(order[i].ordinal(), equalTo(i));
}
Expand All @@ -95,6 +98,21 @@ public void testSerialization() throws Exception {
assertThat(read.getNumFailedAllocations(), equalTo(meta.getNumFailedAllocations()));
}

public void testBwcSerialization() throws Exception {
final UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CLOSED, "message");
BytesStreamOutput out = new BytesStreamOutput();
out.setVersion(VersionUtils.randomVersionBetween(random(), Version.V_6_0_0, VersionUtils.getPreviousVersion(Version.V_7_0_0)));
unassignedInfo.writeTo(out);
out.close();

UnassignedInfo read = new UnassignedInfo(out.bytes().streamInput());
assertThat(read.getReason(), equalTo(UnassignedInfo.Reason.REINITIALIZED));
assertThat(read.getUnassignedTimeInMillis(), equalTo(unassignedInfo.getUnassignedTimeInMillis()));
assertThat(read.getMessage(), equalTo(unassignedInfo.getMessage()));
assertThat(read.getDetails(), equalTo(unassignedInfo.getDetails()));
assertThat(read.getNumFailedAllocations(), equalTo(unassignedInfo.getNumFailedAllocations()));
}

public void testIndexCreated() {
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT))
Expand Down
Loading

0 comments on commit 7c6f8e9

Please sign in to comment.