Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.IndexBalanceAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.IndexVersionAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.NodeReplacementAllocationDecider;
Expand Down Expand Up @@ -497,6 +498,7 @@ public static Collection<AllocationDecider> createAllocationDeciders(
addAllocationDecider(deciders, new ThrottlingAllocationDecider(clusterSettings));
addAllocationDecider(deciders, new ShardsLimitAllocationDecider(clusterSettings));
addAllocationDecider(deciders, new AwarenessAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new IndexBalanceAllocationDecider(settings, clusterSettings));

clusterPlugins.stream()
.flatMap(p -> p.createAllocationDeciders(settings, clusterSettings).stream())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,10 @@ private boolean isSingleNodeFilterInternal() {
|| (filters.size() > 1 && opType == OpType.AND && NON_ATTRIBUTE_NAMES.containsAll(filters.keySet()));
}

public boolean hasFilters() {
return filters.isEmpty() == false;
}

/**
* Generates a human-readable string for the DiscoverNodeFilters.
* Example: {@code _id:"id1 OR blah",name:"blah OR name2"}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster.routing.allocation;

import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;

/**
* Settings definitions for the index shard count allocation decider and associated infrastructure
*/
public class IndexBalanceConstraintSettings {

private static final String SETTING_PREFIX = "cluster.routing.allocation.index_balance_decider.";

public static final Setting<Boolean> INDEX_BALANCE_DECIDER_ENABLED_SETTING = Setting.boolSetting(
SETTING_PREFIX + "enabled",
false,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

/**
* This setting permits nodes to host more than ideally balanced number of index shards.
* Maximum tolerated index shard count = ideal + skew_tolerance
* i.e. ideal = 4 shards, skew_tolerance = 1
* maximum tolerated index shards = 4 + 1 = 5.
*/
public static final Setting<Integer> INDEX_BALANCE_DECIDER_EXCESS_SHARDS = Setting.intSetting(
SETTING_PREFIX + "excess_shards",
0,
0,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

private volatile boolean deciderEnabled;
private volatile int excessShards;

public IndexBalanceConstraintSettings(ClusterSettings clusterSettings) {
clusterSettings.initializeAndWatch(INDEX_BALANCE_DECIDER_ENABLED_SETTING, enabled -> this.deciderEnabled = enabled);
clusterSettings.initializeAndWatch(INDEX_BALANCE_DECIDER_EXCESS_SHARDS, value -> this.excessShards = value);
}

public boolean isDeciderEnabled() {
return this.deciderEnabled;
}

public int getExcessShards() {
return this.excessShards;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1581,6 +1581,11 @@ private boolean tryRelocateShard(ModelNode minNode, ModelNode maxNode, ProjectIn
logger.trace("No shards of [{}] can relocate from [{}] to [{}]", idx, maxNode.getNodeId(), minNode.getNodeId());
return false;
}

// Visible for testing.
public RoutingAllocation getAllocation() {
return this.allocation;
}
}

public static class ModelNode implements Iterable<ModelIndex> {
Expand Down Expand Up @@ -1824,7 +1829,8 @@ public WeightFunction getWeightFunction() {
}
}

record ProjectIndex(ProjectId project, String indexName) {
// Visible for testing.
public record ProjectIndex(ProjectId project, String indexName) {
ProjectIndex(RoutingAllocation allocation, ShardRouting shard) {
this(allocation.metadata().projectFor(shard.index()).id(), shard.getIndexName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public WeightFunction(float shardBalance, float indexBalance, float writeLoadBal
theta3 = diskUsageBalance / sum;
}

float calculateNodeWeightWithIndex(
// Visible for testing
public float calculateNodeWeightWithIndex(
BalancedShardsAllocator.Balancer balancer,
BalancedShardsAllocator.ModelNode node,
ProjectIndex index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ public class FilterAllocationDecider extends AllocationDecider {

public static final String NAME = "filter";

private static final String CLUSTER_ROUTING_REQUIRE_GROUP_PREFIX = "cluster.routing.allocation.require";
private static final String CLUSTER_ROUTING_INCLUDE_GROUP_PREFIX = "cluster.routing.allocation.include";
private static final String CLUSTER_ROUTING_EXCLUDE_GROUP_PREFIX = "cluster.routing.allocation.exclude";
public static final String CLUSTER_ROUTING_REQUIRE_GROUP_PREFIX = "cluster.routing.allocation.require";
public static final String CLUSTER_ROUTING_INCLUDE_GROUP_PREFIX = "cluster.routing.allocation.include";
public static final String CLUSTER_ROUTING_EXCLUDE_GROUP_PREFIX = "cluster.routing.allocation.exclude";

public static final Setting.AffixSetting<List<String>> CLUSTER_ROUTING_REQUIRE_GROUP_SETTING = Setting.prefixKeySetting(
CLUSTER_ROUTING_REQUIRE_GROUP_PREFIX + ".",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster.routing.allocation.decider;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeFilters;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.IndexBalanceConstraintSettings;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Strings;
import org.elasticsearch.index.Index;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.AND;
import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.OR;
import static org.elasticsearch.cluster.node.DiscoveryNodeRole.INDEX_ROLE;
import static org.elasticsearch.cluster.node.DiscoveryNodeRole.SEARCH_ROLE;
import static org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider.CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING;
import static org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider.CLUSTER_ROUTING_INCLUDE_GROUP_SETTING;
import static org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider.CLUSTER_ROUTING_REQUIRE_GROUP_SETTING;

/**
* For an index of n shards hosted by a cluster of m nodes, a node should not host
* significantly more than n / m shards. This allocation decider enforces this principle.
* This allocation decider excludes any nodes flagged for shutdown from consideration
* when computing optimal shard distributions.
*/
public class IndexBalanceAllocationDecider extends AllocationDecider {

private static final Logger logger = LogManager.getLogger(IndexBalanceAllocationDecider.class);
private static final String EMPTY = "";

public static final String NAME = "index_balance";

private final IndexBalanceConstraintSettings indexBalanceConstraintSettings;
private final boolean isStateless;

private volatile DiscoveryNodeFilters clusterRequireFilters;
private volatile DiscoveryNodeFilters clusterIncludeFilters;
private volatile DiscoveryNodeFilters clusterExcludeFilters;

public IndexBalanceAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
this.indexBalanceConstraintSettings = new IndexBalanceConstraintSettings(clusterSettings);
setClusterRequireFilters(CLUSTER_ROUTING_REQUIRE_GROUP_SETTING.getAsMap(settings));
setClusterExcludeFilters(CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING.getAsMap(settings));
setClusterIncludeFilters(CLUSTER_ROUTING_INCLUDE_GROUP_SETTING.getAsMap(settings));
clusterSettings.addAffixMapUpdateConsumer(CLUSTER_ROUTING_REQUIRE_GROUP_SETTING, this::setClusterRequireFilters, (a, b) -> {});
clusterSettings.addAffixMapUpdateConsumer(CLUSTER_ROUTING_EXCLUDE_GROUP_SETTING, this::setClusterExcludeFilters, (a, b) -> {});
clusterSettings.addAffixMapUpdateConsumer(CLUSTER_ROUTING_INCLUDE_GROUP_SETTING, this::setClusterIncludeFilters, (a, b) -> {});
isStateless = DiscoveryNode.isStateless(settings);
}

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (indexBalanceConstraintSettings.isDeciderEnabled() == false || isStateless == false || hasFilters()) {
return allocation.decision(Decision.YES, NAME, "Decider is disabled.");
}

Index index = shardRouting.index();
if (node.hasIndex(index) == false) {
return allocation.decision(Decision.YES, NAME, "Node does not currently host this index.");
}

assert node.node() != null;
assert node.node().getRoles().contains(INDEX_ROLE) || node.node().getRoles().contains(SEARCH_ROLE);

if (node.node().getRoles().contains(INDEX_ROLE) && shardRouting.primary() == false) {
return allocation.decision(Decision.YES, NAME, "An index node cannot own search shards. Decider inactive.");
}

if (node.node().getRoles().contains(SEARCH_ROLE) && shardRouting.primary()) {
return allocation.decision(Decision.YES, NAME, "A search node cannot own primary shards. Decider inactive.");
}

final ProjectId projectId = allocation.getClusterState().metadata().projectFor(index).id();
final Set<DiscoveryNode> eligibleNodes = new HashSet<>();
int totalShards = 0;
String nomenclature = EMPTY;

if (node.node().getRoles().contains(INDEX_ROLE)) {
collectEligibleNodes(allocation, eligibleNodes, INDEX_ROLE);
// Primary shards only.
totalShards = allocation.getClusterState().routingTable(projectId).index(index).size();
nomenclature = "index";
} else if (node.node().getRoles().contains(SEARCH_ROLE)) {
collectEligibleNodes(allocation, eligibleNodes, SEARCH_ROLE);
// Replicas only.
final IndexMetadata indexMetadata = allocation.getClusterState().metadata().getProject(projectId).index(index);
totalShards = indexMetadata.getNumberOfShards() * indexMetadata.getNumberOfReplicas();
nomenclature = "search";
}

assert eligibleNodes.isEmpty() == false;
if (eligibleNodes.isEmpty()) {
return allocation.decision(Decision.YES, NAME, "There are no eligible nodes available.");
}
assert totalShards > 0;
final double idealAllocation = Math.ceil((double) totalShards / eligibleNodes.size());

// Adding the excess shards before division ensures that with tolerance 1 we get:
// 2 shards, 2 nodes, allow 2 on each
// 3 shards, 2 nodes, allow 2 on each etc.
final int threshold = Math.ceilDiv(totalShards + indexBalanceConstraintSettings.getExcessShards(), eligibleNodes.size());
final int currentAllocation = node.numberOfOwningShardsForIndex(index);

if (currentAllocation >= threshold) {
String explanation = Strings.format(
"There are [%d] eligible nodes in the [%s] tier for assignment of [%d] shards in index [%s]. Ideally no more than [%.0f] "
+ "shard would be assigned per node (the index balance excess shards setting is [%d]). This node is already assigned"
+ " [%d] shards of the index.",
eligibleNodes.size(),
nomenclature,
totalShards,
index,
idealAllocation,
indexBalanceConstraintSettings.getExcessShards(),
currentAllocation
);

logger.trace(explanation);

return allocation.decision(Decision.NOT_PREFERRED, NAME, explanation);
}

return allocation.decision(Decision.YES, NAME, "Node index shard allocation is under the threshold.");
}

private void collectEligibleNodes(RoutingAllocation allocation, Set<DiscoveryNode> eligibleNodes, DiscoveryNodeRole role) {
for (DiscoveryNode discoveryNode : allocation.nodes()) {
if (discoveryNode.getRoles().contains(role) && allocation.metadata().nodeShutdowns().contains(discoveryNode.getId()) == false) {
eligibleNodes.add(discoveryNode);
}
}
}

private void setClusterRequireFilters(Map<String, List<String>> filters) {
clusterRequireFilters = DiscoveryNodeFilters.trimTier(DiscoveryNodeFilters.buildFromKeyValues(AND, filters));
}

private void setClusterIncludeFilters(Map<String, List<String>> filters) {
clusterIncludeFilters = DiscoveryNodeFilters.trimTier(DiscoveryNodeFilters.buildFromKeyValues(OR, filters));
}

private void setClusterExcludeFilters(Map<String, List<String>> filters) {
clusterExcludeFilters = DiscoveryNodeFilters.trimTier(DiscoveryNodeFilters.buildFromKeyValues(OR, filters));
}

private boolean hasFilters() {
return (clusterExcludeFilters != null && clusterExcludeFilters.hasFilters())
|| (clusterIncludeFilters != null && clusterIncludeFilters.hasFilters())
|| (clusterRequireFilters != null && clusterRequireFilters.hasFilters());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.cluster.routing.allocation.DataTier;
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
import org.elasticsearch.cluster.routing.allocation.IndexBalanceConstraintSettings;
import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings;
import org.elasticsearch.cluster.routing.allocation.allocator.AllocationBalancingRoundSummaryService;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
Expand Down Expand Up @@ -659,6 +660,8 @@ public void apply(Settings value, Settings current, Settings previous) {
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_HIGH_UTILIZATION_DURATION_SETTING,
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_QUEUE_LATENCY_THRESHOLD_SETTING,
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_REROUTE_INTERVAL_SETTING,
IndexBalanceConstraintSettings.INDEX_BALANCE_DECIDER_ENABLED_SETTING,
IndexBalanceConstraintSettings.INDEX_BALANCE_DECIDER_EXCESS_SHARDS,
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_MINIMUM_LOGGING_INTERVAL,
SamplingService.TTL_POLL_INTERVAL_SETTING,
BlobStoreRepository.MAX_HEAP_SIZE_FOR_SNAPSHOT_DELETION_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.IndexBalanceAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.IndexVersionAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.NodeReplacementAllocationDecider;
Expand Down Expand Up @@ -286,7 +287,8 @@ public void testAllocationDeciderOrder() {
DiskThresholdDecider.class,
ThrottlingAllocationDecider.class,
ShardsLimitAllocationDecider.class,
AwarenessAllocationDecider.class
AwarenessAllocationDecider.class,
IndexBalanceAllocationDecider.class
);
Collection<AllocationDecider> deciders = ClusterModule.createAllocationDeciders(
Settings.EMPTY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1277,7 +1277,7 @@ private static class NodeNameDrivenWeightFunction extends WeightFunction {
}

@Override
float calculateNodeWeightWithIndex(
public float calculateNodeWeightWithIndex(
BalancedShardsAllocator.Balancer balancer,
BalancedShardsAllocator.ModelNode node,
BalancedShardsAllocator.ProjectIndex index
Expand Down
Loading