Skip to content

Commit

Permalink
elastic#2483 Add Explain to AllocationDeciders Decision if explicitl…
Browse files Browse the repository at this point in the history
…y requested
  • Loading branch information
s1monw committed Dec 13, 2012
1 parent c65d5a7 commit 21966cf
Show file tree
Hide file tree
Showing 18 changed files with 105 additions and 86 deletions.
Expand Up @@ -228,7 +228,7 @@ private boolean moveShards(RoutingAllocation allocation) {
continue;
}
RoutingNode routingNode = allocation.routingNodes().node(shardRouting.currentNodeId());
Decision decision = allocation.deciders().canRemain(shardRouting, routingNode, allocation);
Decision decision = allocation.deciders().canRemain(shardRouting, routingNode, allocation, false);
if (decision.type() == Decision.Type.NO) {
logger.debug("[{}][{}] allocated on [{}], but can no longer be allocated on it, moving...", shardRouting.index(), shardRouting.id(), routingNode.node());
boolean moved = shardsAllocators.move(shardRouting, routingNode, allocation);
Expand Down
Expand Up @@ -100,7 +100,7 @@ public boolean allocateUnassigned(RoutingAllocation allocation) {
lastNode = 0;
}

Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
Decision decision = allocation.deciders().canAllocate(shard, node, allocation, false);
if (decision.type() == Decision.Type.YES) {
int numberOfShardsToAllocate = routingNodes.requiredAverageNumberOfShardsPerNode() - node.shards().size();
if (numberOfShardsToAllocate <= 0) {
Expand All @@ -120,7 +120,7 @@ public boolean allocateUnassigned(RoutingAllocation allocation) {
MutableShardRouting shard = it.next();
// go over the nodes and try and allocate the remaining ones
for (RoutingNode routingNode : sortedNodesLeastToHigh(allocation)) {
Decision decision = allocation.deciders().canAllocate(shard, routingNode, allocation);
Decision decision = allocation.deciders().canAllocate(shard, routingNode, allocation, false);
if (decision.type() == Decision.Type.YES) {
changed = true;
routingNode.add(shard);
Expand Down Expand Up @@ -165,12 +165,12 @@ public boolean rebalance(RoutingAllocation allocation) {
boolean relocated = false;
List<MutableShardRouting> startedShards = highRoutingNode.shardsWithState(STARTED);
for (MutableShardRouting startedShard : startedShards) {
Decision rebalanceDecision = allocation.deciders().canRebalance(startedShard, allocation);
Decision rebalanceDecision = allocation.deciders().canRebalance(startedShard, allocation, false);
if (rebalanceDecision.type() == Decision.Type.NO) {
continue;
}

Decision allocateDecision = allocation.deciders().canAllocate(startedShard, lowRoutingNode, allocation);
Decision allocateDecision = allocation.deciders().canAllocate(startedShard, lowRoutingNode, allocation, false);
if (allocateDecision.type() == Decision.Type.YES) {
changed = true;
lowRoutingNode.add(new MutableShardRouting(startedShard.index(), startedShard.id(),
Expand Down Expand Up @@ -208,7 +208,7 @@ public boolean move(MutableShardRouting shardRouting, RoutingNode node, RoutingA
if (nodeToCheck.nodeId().equals(node.nodeId())) {
continue;
}
Decision decision = allocation.deciders().canAllocate(shardRouting, nodeToCheck, allocation);
Decision decision = allocation.deciders().canAllocate(shardRouting, nodeToCheck, allocation, false);
if (decision.type() == Decision.Type.YES) {
nodeToCheck.add(new MutableShardRouting(shardRouting.index(), shardRouting.id(),
nodeToCheck.nodeId(), shardRouting.currentNodeId(),
Expand Down
Expand Up @@ -180,7 +180,7 @@ public void execute(RoutingAllocation allocation) throws ElasticSearchException
}

RoutingNode routingNode = allocation.routingNodes().node(discoNode.id());
Decision decision = allocation.deciders().canAllocate(shardRouting, routingNode, allocation);
Decision decision = allocation.deciders().canAllocate(shardRouting, routingNode, allocation, true);
if (decision.type() == Decision.Type.NO) {
throw new ElasticSearchIllegalArgumentException("[allocate] allocation of " + shardId + " on node " + discoNode + " is not allowed, reason: " + decision);
}
Expand Down
Expand Up @@ -159,7 +159,7 @@ public void execute(RoutingAllocation allocation) throws ElasticSearchException
}

RoutingNode toRoutingNode = allocation.routingNodes().node(toDiscoNode.id());
Decision decision = allocation.deciders().canAllocate(shardRouting, toRoutingNode, allocation);
Decision decision = allocation.deciders().canAllocate(shardRouting, toRoutingNode, allocation, true);
if (decision.type() == Decision.Type.NO) {
throw new ElasticSearchIllegalArgumentException("[move_allocation] can't move " + shardId + ", from " + fromDiscoNode + ", to " + toDiscoNode + ", since its not allowed, reason: " + decision);
}
Expand Down
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.cluster.routing.allocation.decider;

import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
Expand All @@ -45,23 +46,36 @@ protected AllocationDecider(Settings settings) {
* re-balanced to the given allocation. The default is
* {@link Decision#ALWAYS}.
*/
public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation, boolean explain) {
return Decision.ALWAYS;
}

/**
* Returns a {@link Decision} whether the given shard routing can be
* allocated on the given node. The default is {@link Decision#ALWAYS}.
*/
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation, boolean explain) {
return Decision.ALWAYS;
}

/**
* Returns a {@link Decision} whether the given shard routing can be remain
* on the given node. The default is {@link Decision#ALWAYS}.
*/
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation, boolean explain) {
return Decision.ALWAYS;
}

protected static Decision decision(Decision.Type type, String explainMsg, boolean explain) {
switch (type) {
case NO:
return explain ? Decision.single(type, explainMsg) : Decision.NO;
case THROTTLE:
return explain ? Decision.single(type, explainMsg) : Decision.THROTTLE;
case YES:
return explain ? Decision.single(type, explainMsg) : Decision.YES;
default:
throw new ElasticSearchIllegalArgumentException("unknown decision type: " + type);
}
}
}
Expand Up @@ -23,10 +23,12 @@
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision.Multi;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.settings.NodeSettingsService;

import java.util.List;
import java.util.Set;

/**
Expand Down Expand Up @@ -65,10 +67,10 @@ public AllocationDeciders(Settings settings, Set<AllocationDecider> allocations)
}

@Override
public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation, boolean explain) {
Decision.Multi ret = new Decision.Multi();
for (AllocationDecider allocationDecider : allocations) {
Decision decision = allocationDecider.canRebalance(shardRouting, allocation);
Decision decision = allocationDecider.canRebalance(shardRouting, allocation, explain);
if (decision != Decision.ALWAYS) {
ret.add(decision);
}
Expand All @@ -77,13 +79,13 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca
}

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation, boolean explain) {
if (allocation.shouldIgnoreShardForNode(shardRouting.shardId(), node.nodeId())) {
return Decision.NO;
}
Decision.Multi ret = new Decision.Multi();
for (AllocationDecider allocationDecider : allocations) {
Decision decision = allocationDecider.canAllocate(shardRouting, node, allocation);
Decision decision = allocationDecider.canAllocate(shardRouting, node, allocation, explain);
// the assumption is that a decider that returns the static instance Decision#ALWAYS
// does not really implements canAllocate
if (decision != Decision.ALWAYS) {
Expand All @@ -94,13 +96,13 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
}

@Override
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation, boolean explain) {
if (allocation.shouldIgnoreShardForNode(shardRouting.shardId(), node.nodeId())) {
return Decision.NO;
}
Decision.Multi ret = new Decision.Multi();
for (AllocationDecider allocationDecider : allocations) {
Decision decision = allocationDecider.canRemain(shardRouting, node, allocation);
Decision decision = allocationDecider.canRemain(shardRouting, node, allocation, explain);
if (decision != Decision.ALWAYS) {
ret.add(decision);
}
Expand Down
Expand Up @@ -156,26 +156,26 @@ public String[] awarenessAttributes() {
}

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return underCapacity(shardRouting, node, allocation, true) ? Decision.YES : Decision.NO;
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation, boolean explain) {
return underCapacity(shardRouting, node, allocation, true, explain);
}

@Override
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return underCapacity(shardRouting, node, allocation, false) ? Decision.YES : Decision.NO;
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation, boolean explain) {
return underCapacity(shardRouting, node, allocation, false, explain);
}

private boolean underCapacity(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation, boolean moveToNode) {
private Decision underCapacity(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation, boolean moveToNode, boolean explain) {
if (awarenessAttributes.length == 0) {
return true;
return decision(Decision.Type.YES, "[Awareness] No Awareness settings present", explain);
}

IndexMetaData indexMetaData = allocation.metaData().index(shardRouting.index());
int shardCount = indexMetaData.numberOfReplicas() + 1; // 1 for primary
for (String awarenessAttribute : awarenessAttributes) {
// the node the shard exists on must be associated with an awareness attribute
if (!node.node().attributes().containsKey(awarenessAttribute)) {
return false;
return decision(Decision.Type.NO, "[Awareness] Awareness attribute don't exists on node", explain);
}

// build attr_value -> nodes map
Expand Down Expand Up @@ -236,14 +236,14 @@ private boolean underCapacity(ShardRouting shardRouting, RoutingNode node, Routi
int currentNodeCount = shardPerAttribute.get(node.node().attributes().get(awarenessAttribute));
// if we are above with leftover, then we know we are not good, even with mod
if (currentNodeCount > (requiredCountPerAttribute + leftoverPerAttribute)) {
return false;
return decision(Decision.Type.NO, "[Awareness] Allocation would exceed awareness threshold", explain);
}
// all is well, we are below or same as average
if (currentNodeCount <= requiredCountPerAttribute) {
continue;
}
}

return true;
return decision(Decision.Type.YES, "[Awareness] Allocation awareness satisfied", explain);
}
}
Expand Up @@ -85,39 +85,38 @@ public ClusterRebalanceAllocationDecider(Settings settings) {
}

@Override
public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation, boolean explain) {
if (type == ClusterRebalanceType.INDICES_PRIMARIES_ACTIVE) {
for (MutableShardRouting shard : allocation.routingNodes().unassigned()) {
if (shard.primary()) {
return Decision.NO;
return decision(Decision.Type.NO, "[ClusterRebalance] All primaries must be active for rebalance", explain);
}
}
for (RoutingNode node : allocation.routingNodes()) {
List<MutableShardRouting> shards = node.shards();
for (int i = 0; i < shards.size(); i++) {
MutableShardRouting shard = shards.get(i);
if (shard.primary() && !shard.active() && shard.relocatingNodeId() == null) {
return Decision.NO;
return decision(Decision.Type.NO, "[ClusterRebalance] All primaries must be active for rebalance", explain);
}
}
}
return Decision.YES;
return decision(Decision.Type.YES, "[ClusterRebalance] All primaries are active", explain);
}
if (type == ClusterRebalanceType.INDICES_ALL_ACTIVE) {
if (!allocation.routingNodes().unassigned().isEmpty()) {
return Decision.NO;
return decision(Decision.Type.NO, "[ClusterRebalance] All replicas must be active for rebalance", explain);
}
for (RoutingNode node : allocation.routingNodes()) {
List<MutableShardRouting> shards = node.shards();
for (int i = 0; i < shards.size(); i++) {
MutableShardRouting shard = shards.get(i);
if (!shard.active() && shard.relocatingNodeId() == null) {
return Decision.NO;
return decision(Decision.Type.NO, "[ClusterRebalance] All replicas must be active for rebalance", explain);
}
}
}
}
// type == Type.ALWAYS
return Decision.YES;
return decision(Decision.Type.YES, "[ClusterRebalance] All replicas are active", explain);
}
}
Expand Up @@ -74,9 +74,9 @@ public ConcurrentRebalanceAllocationDecider(Settings settings, NodeSettingsServi
}

@Override
public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation, boolean explain) {
if (clusterConcurrentRebalance == -1) {
return Decision.YES;
return decision(Decision.Type.YES, "[ConcurrentRebalance] Concurrent rebalance is unlimited", explain);
}
int rebalance = 0;
for (RoutingNode node : allocation.routingNodes()) {
Expand All @@ -88,8 +88,9 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca
}
}
if (rebalance >= clusterConcurrentRebalance) {
return Decision.NO;
return decision(Decision.Type.NO, "[ConcurrentRebalance] Too many concurrent rebalance operations", explain);
}
return Decision.YES;
return decision(Decision.Type.YES, "[ConcurrentRebalance] Concurrent rebalance operations is below threshold", explain);

}
}
Expand Up @@ -100,17 +100,20 @@ public DisableAllocationDecider(Settings settings, NodeSettingsService nodeSetti
}

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation, boolean explain) {
if (shardRouting.primary() && !allocation.routingNodes().routingTable().index(shardRouting.index()).shard(shardRouting.id()).primaryAllocatedPostApi()) {
// if its primary, and it hasn't been allocated post API (meaning its a "fresh newly created shard"), only disable allocation
// on a special disable allocation flag
return allocation.ignoreDisable() ? Decision.YES : disableNewAllocation ? Decision.NO : Decision.YES;
return allocation.ignoreDisable() ? decision(Decision.Type.YES, "[DisableAllocation] Disabled allocations are ignored", explain) :
disableNewAllocation ? decision(Decision.Type.NO, "[DisableAllocation] New allocations are disabled", explain) :
decision(Decision.Type.YES, "[DisableAllocation] PreAPI allocations are allowed ", explain);
}
if (disableAllocation) {
return allocation.ignoreDisable() ? Decision.YES : Decision.NO;
return allocation.ignoreDisable() ? decision(Decision.Type.YES, "[DisableAllocation] Disabled allocations are ignored", explain) : decision(Decision.Type.NO, "Shard allocation is disabled", explain);
}
if (disableReplicaAllocation) {
return shardRouting.primary() ? Decision.YES : allocation.ignoreDisable() ? Decision.YES : Decision.NO;
return shardRouting.primary() ? decision(Decision.Type.YES, "[DisableAllocation] Only replica allocations are disabled", explain) :
(allocation.ignoreDisable() ? decision(Decision.Type.YES, "[DisableAllocation] Disabled allocations are ignored", explain) : decision(Decision.Type.NO, "Replica allocation is disabled", explain));
}
return Decision.YES;
}
Expand Down

0 comments on commit 21966cf

Please sign in to comment.