Skip to content

Commit

Permalink
Core: Avoid null references that may be returned due to concurrent ch…
Browse files Browse the repository at this point in the history
…anges or inconsistent cluster state

Closes elastic#7181
  • Loading branch information
martijnvg committed Aug 11, 2014
1 parent 62fc565 commit 6d66af1
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 25 deletions.
Expand Up @@ -114,11 +114,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
NodeInfo nodeInfo = nodeService.info(false, true, false, true, false, false, true, false, true);
NodeStats nodeStats = nodeService.stats(CommonStatsFlags.NONE, false, true, true, false, false, true, false, false, false);
List<ShardStats> shardsStats = new ArrayList<>();
for (String index : indicesService.indices()) {
IndexService indexService = indicesService.indexService(index);
if (indexService == null) {
continue;
}
for (IndexService indexService : indicesService.indices().values()) {
for (IndexShard indexShard : indexService) {
if (indexShard.routingEntry().active()) {
// only report on fully started shards
Expand Down
Expand Up @@ -105,7 +105,7 @@ protected void resolveRequest(ClusterState state, ExplainRequest request) {
}

protected ExplainResponse shardOperation(ExplainRequest request, int shardId) throws ElasticsearchException {
IndexService indexService = indicesService.indexService(request.index());
IndexService indexService = indicesService.indexServiceSafe(request.index());
IndexShard indexShard = indexService.shardSafe(shardId);
Term uidTerm = new Term(UidFieldMapper.NAME, Uid.createUidAsBytes(request.type(), request.id()));
Engine.GetResult result = indexShard.get(new Engine.Get(false, uidTerm));
Expand Down
Expand Up @@ -537,6 +537,10 @@ public ClusterState execute(final ClusterState currentState) throws Exception {
// do the actual merge here on the master, and update the mapping source
DocumentMapper newMapper = entry.getValue();
IndexService indexService = indicesService.indexService(index);
if (indexService == null) {
continue;
}

CompressedString existingSource = null;
if (existingMappers.containsKey(entry.getKey())) {
existingSource = existingMappers.get(entry.getKey()).mappingSource();
Expand Down
Expand Up @@ -151,12 +151,14 @@ public ClusterState execute(ClusterState currentState) {

@Override
public void onFailure(String source, Throwable t) {
logger.error("unexpected failure during [{}]", t, source);
ClusterState state = clusterService.state();
logger.error("unexpected failure during [{}], current state:\n{}", t, source, state.prettyPrint());
}
});
routingTableDirty = false;
} catch (Exception e) {
logger.warn("Failed to reroute routing table", e);
ClusterState state = clusterService.state();
logger.warn("Failed to reroute routing table, current state:\n{}", e, state.prettyPrint());
}
}

Expand Down
Expand Up @@ -200,6 +200,10 @@ public boolean allocateUnassigned(RoutingAllocation allocation) {
Set<DiscoveryNode> noNodes = Sets.newHashSet();
for (DiscoveryNode discoNode : nodesWithHighestVersion) {
RoutingNode node = routingNodes.node(discoNode.id());
if (node == null) {
continue;
}

Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
if (decision.type() == Decision.Type.THROTTLE) {
throttledNodes.add(discoNode);
Expand Down
19 changes: 16 additions & 3 deletions src/main/java/org/elasticsearch/indices/IndicesService.java
Expand Up @@ -19,14 +19,13 @@

package org.elasticsearch.indices;

import com.google.common.collect.ImmutableMap;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.service.IndexService;

import java.util.Set;

/**
*
*/
Expand All @@ -50,10 +49,24 @@ public interface IndicesService extends Iterable<IndexService>, LifecycleCompone

IndicesLifecycle indicesLifecycle();

Set<String> indices();
/**
* Returns a snapshot of the started indices and the associated {@link IndexService} instances.
*
* The map being returned is not a live view and subsequent calls can return a different view.
*/
ImmutableMap<String, IndexService> indices();

/**
* Returns an IndexService for the specified index if exists otherwise returns <code>null</code>.
*
* Even if the index name appeared in {@link #indices()} <code>null</code> can still be returned as an
* index maybe removed in the meantime, so preferable use the associated {@link IndexService} in order to prevent NPE.
*/
IndexService indexService(String index);

/**
* Returns an IndexService for the specified index if exists otherwise a {@link IndexMissingException} is thrown.
*/
IndexService indexServiceSafe(String index) throws IndexMissingException;

IndexService createIndex(String index, Settings settings, String localNodeId) throws ElasticsearchException;
Expand Down
Expand Up @@ -74,14 +74,12 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static com.google.common.collect.Maps.newHashMap;
import static com.google.common.collect.Sets.newHashSet;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.common.collect.MapBuilder.newMapBuilder;
Expand Down Expand Up @@ -239,8 +237,8 @@ public boolean hasIndex(String index) {
return indices.containsKey(index);
}

public Set<String> indices() {
return newHashSet(indices.keySet());
public ImmutableMap<String, IndexService> indices() {
return indices;
}

public IndexService indexService(String index) {
Expand Down
Expand Up @@ -160,8 +160,9 @@ public void clusterChanged(final ClusterChangedEvent event) {
// are going to recover them again once state persistence is disabled (no master / not recovered)
// TODO: this feels a bit hacky here, a block disables state persistence, and then we clean the allocated shards, maybe another flag in blocks?
if (event.state().blocks().disableStatePersistence()) {
for (final String index : indicesService.indices()) {
IndexService indexService = indicesService.indexService(index);
for (Map.Entry<String, IndexService> entry : indicesService.indices().entrySet()) {
String index = entry.getKey();
IndexService indexService = entry.getValue();
for (Integer shardId : indexService.shardIds()) {
logger.debug("[{}][{}] removing shard (disabled block persistence)", index, shardId);
try {
Expand Down Expand Up @@ -218,10 +219,11 @@ private void cleanMismatchedIndexUUIDs(final ClusterChangedEvent event) {
private void applyCleanedIndices(final ClusterChangedEvent event) {
// handle closed indices, since they are not allocated on a node once they are closed
// so applyDeletedIndices might not take them into account
for (final String index : indicesService.indices()) {
for (Map.Entry<String, IndexService> entry : indicesService.indices().entrySet()) {
String index = entry.getKey();
IndexMetaData indexMetaData = event.state().metaData().index(index);
if (indexMetaData != null && indexMetaData.state() == IndexMetaData.State.CLOSE) {
IndexService indexService = indicesService.indexService(index);
IndexService indexService = entry.getValue();
for (Integer shardId : indexService.shardIds()) {
logger.debug("[{}][{}] removing shard (index is closed)", index, shardId);
try {
Expand All @@ -232,8 +234,10 @@ private void applyCleanedIndices(final ClusterChangedEvent event) {
}
}
}
for (final String index : indicesService.indices()) {
if (indicesService.indexService(index).shardIds().isEmpty()) {
for (Map.Entry<String, IndexService> entry : indicesService.indices().entrySet()) {
String index = entry.getKey();
IndexService indexService = entry.getValue();
if (indexService.shardIds().isEmpty()) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] cleaning index (no shards allocated)", index);
}
Expand All @@ -244,7 +248,7 @@ private void applyCleanedIndices(final ClusterChangedEvent event) {
}

private void applyDeletedIndices(final ClusterChangedEvent event) {
for (final String index : indicesService.indices()) {
for (final String index : indicesService.indices().keySet()) {
if (!event.state().metaData().hasIndex(index)) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] cleaning index, no longer part of the metadata", index);
Expand Down Expand Up @@ -441,12 +445,12 @@ private void applyAliases(ClusterChangedEvent event) {
if (aliasesChanged(event)) {
// go over and update aliases
for (IndexMetaData indexMetaData : event.state().metaData()) {
if (!indicesService.hasIndex(indexMetaData.index())) {
String index = indexMetaData.index();
IndexService indexService = indicesService.indexService(index);
if (indexService == null) {
// we only create / update here
continue;
}
String index = indexMetaData.index();
IndexService indexService = indicesService.indexService(index);
IndexAliasesService indexAliasesService = indexService.aliasesService();
processAliases(index, indexMetaData.aliases().values(), indexAliasesService);
// go over and remove aliases
Expand Down
@@ -0,0 +1,81 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.indices.state;

import com.google.common.collect.ImmutableMap;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.DiskUsage;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.local.LocalGatewayAllocator;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;

/**
*/
@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.SUITE, numDataNodes = 1, numClientNodes = 0, transportClientRatio = 0)
public class RareClusterStateTests extends ElasticsearchIntegrationTest {

@Override
protected int numberOfShards() {
return 1;
}

@Override
protected int numberOfReplicas() {
return 0;
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return ImmutableSettings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put("gateway.type", "local")
.build();
}

@Test
public void testUnassignedShardAndEmptyNodesInRoutingTable() throws Exception {
createIndex("a");
ensureSearchable("a");
ClusterState current = clusterService().state();
LocalGatewayAllocator allocator = internalCluster().getInstance(LocalGatewayAllocator.class);

AllocationDeciders allocationDeciders = new AllocationDeciders(ImmutableSettings.EMPTY, new AllocationDecider[0]);
RoutingNodes routingNodes = new RoutingNodes(
ClusterState.builder(current)
.routingTable(RoutingTable.builder(current.routingTable()).remove("a").addAsRecovery(current.metaData().index("a")))
.nodes(DiscoveryNodes.EMPTY_NODES)
.build()
);
ClusterInfo clusterInfo = new ClusterInfo(ImmutableMap.<String, DiskUsage>of(), ImmutableMap.<String, Long>of());

RoutingAllocation routingAllocation = new RoutingAllocation(allocationDeciders, routingNodes, current.nodes(), clusterInfo);
allocator.allocateUnassigned(routingAllocation);
}

}

0 comments on commit 6d66af1

Please sign in to comment.