Skip to content

Commit

Permalink
Short-circuit rebalancing when disabled (elastic#40966)
Browse files Browse the repository at this point in the history
Today if `cluster.routing.rebalance.enable: none` then rebalancing is disabled,
but we still execute `balanceByWeights()` and perform some rather expensive
calculations before discovering that we cannot rebalance any shards. In a large
cluster this can make cluster state updates occur rather slowly. With this
change we check earlier whether rebalancing is globally disabled and, if so,
avoid the rebalancing process entirely.

Relates elastic#40942 which was reverted because of egregiously faulty tests.
  • Loading branch information
DaveCTurner committed Apr 9, 2019
1 parent 529eda3 commit 2e21a11
Show file tree
Hide file tree
Showing 2 changed files with 260 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,21 @@ public EnableAllocationDecider(Settings settings, ClusterSettings clusterSetting
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING, this::setEnableRebalance);
}

public void setEnableRebalance(Rebalance enableRebalance) {
private void setEnableRebalance(Rebalance enableRebalance) {
this.enableRebalance = enableRebalance;
}

public void setEnableAllocation(Allocation enableAllocation) {
private void setEnableAllocation(Allocation enableAllocation) {
this.enableAllocation = enableAllocation;
}

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return canAllocate(shardRouting, allocation);
}

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocation) {
if (allocation.ignoreDisable()) {
return allocation.decision(Decision.YES, NAME,
"explicitly ignoring any disabling of allocation due to manual allocation commands via the reroute API");
Expand Down Expand Up @@ -136,10 +141,29 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
}
}

@Override
public Decision canRebalance(RoutingAllocation allocation) {
if (allocation.ignoreDisable()) {
return allocation.decision(Decision.YES, NAME, "allocation is explicitly ignoring any disabling of rebalancing");
}

if (enableRebalance == Rebalance.NONE) {
for (IndexMetaData indexMetaData : allocation.metaData()) {
if (INDEX_ROUTING_REBALANCE_ENABLE_SETTING.exists(indexMetaData.getSettings())
&& INDEX_ROUTING_REBALANCE_ENABLE_SETTING.get(indexMetaData.getSettings()) != Rebalance.NONE) {
return allocation.decision(Decision.YES, NAME, "rebalancing is permitted on one or more indices");
}
}
return allocation.decision(Decision.NO, NAME, "no rebalancing is allowed due to %s", setting(enableRebalance, false));
}

return allocation.decision(Decision.YES, NAME, "rebalancing is not globally disabled");
}

@Override
public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
if (allocation.ignoreDisable()) {
return allocation.decision(Decision.YES, NAME, "allocation is explicitly ignoring any disabling of relocation");
return allocation.decision(Decision.YES, NAME, "allocation is explicitly ignoring any disabling of rebalancing");
}

Settings indexSettings = allocation.metaData().getIndexSafe(shardRouting.index()).getSettings();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing.allocation.decider;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.EmptyClusterInfoService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.test.gateway.TestGatewayAllocator;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING;
import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING;
import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;

public class EnableAllocationShortCircuitTests extends ESAllocationTestCase {

private static ClusterState createClusterStateWithAllShardsAssigned() {
AllocationService allocationService = createAllocationService(Settings.EMPTY);

final int numberOfNodes = randomIntBetween(1, 5);
final DiscoveryNodes.Builder discoveryNodesBuilder = DiscoveryNodes.builder();
for (int i = 0; i < numberOfNodes; i++) {
discoveryNodesBuilder.add(newNode("node" + i));
}

final MetaData.Builder metadataBuilder = MetaData.builder();
final RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
for (int i = randomIntBetween(1, 10); i >= 0; i--) {
final IndexMetaData indexMetaData = IndexMetaData.builder("test" + i).settings(settings(Version.CURRENT))
.numberOfShards(1).numberOfReplicas(randomIntBetween(0, numberOfNodes - 1)).build();
metadataBuilder.put(indexMetaData, true);
routingTableBuilder.addAsNew(indexMetaData);
}

ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(Settings.EMPTY))
.nodes(discoveryNodesBuilder).metaData(metadataBuilder).routingTable(routingTableBuilder.build()).build();

while (clusterState.getRoutingNodes().hasUnassignedShards()
|| clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).isEmpty() == false) {
clusterState = allocationService.applyStartedShards(clusterState,
clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING));
clusterState = allocationService.reroute(clusterState, "reroute");
}

return clusterState;
}

public void testRebalancingAttemptedIfPermitted() {
ClusterState clusterState = createClusterStateWithAllShardsAssigned();

final RebalanceShortCircuitPlugin plugin = new RebalanceShortCircuitPlugin();
AllocationService allocationService = createAllocationService(Settings.builder()
.put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(),
randomFrom(EnableAllocationDecider.Rebalance.ALL,
EnableAllocationDecider.Rebalance.PRIMARIES,
EnableAllocationDecider.Rebalance.REPLICAS).name()),
plugin);
allocationService.reroute(clusterState, "reroute").routingTable();
assertThat(plugin.rebalanceAttempts, greaterThan(0));
}

public void testRebalancingSkippedIfDisabled() {
ClusterState clusterState = createClusterStateWithAllShardsAssigned();

final RebalanceShortCircuitPlugin plugin = new RebalanceShortCircuitPlugin();
AllocationService allocationService = createAllocationService(Settings.builder()
.put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Allocation.NONE.name()),
plugin);
allocationService.reroute(clusterState, "reroute").routingTable();
assertThat(plugin.rebalanceAttempts, equalTo(0));
}

public void testRebalancingSkippedIfDisabledIncludingOnSpecificIndices() {
ClusterState clusterState = createClusterStateWithAllShardsAssigned();
final IndexMetaData indexMetaData = randomFrom(clusterState.metaData().indices().values().toArray(IndexMetaData.class));
clusterState = ClusterState.builder(clusterState).metaData(MetaData.builder(clusterState.metaData())
.put(IndexMetaData.builder(indexMetaData).settings(Settings.builder().put(indexMetaData.getSettings())
.put(INDEX_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE.name()))).build()).build();

final RebalanceShortCircuitPlugin plugin = new RebalanceShortCircuitPlugin();
AllocationService allocationService = createAllocationService(Settings.builder()
.put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE.name()),
plugin);
allocationService.reroute(clusterState, "reroute").routingTable();
assertThat(plugin.rebalanceAttempts, equalTo(0));
}

public void testRebalancingAttemptedIfDisabledButOverridenOnSpecificIndices() {
ClusterState clusterState = createClusterStateWithAllShardsAssigned();
final IndexMetaData indexMetaData = randomFrom(clusterState.metaData().indices().values().toArray(IndexMetaData.class));
clusterState = ClusterState.builder(clusterState).metaData(MetaData.builder(clusterState.metaData())
.put(IndexMetaData.builder(indexMetaData).settings(Settings.builder().put(indexMetaData.getSettings())
.put(INDEX_ROUTING_REBALANCE_ENABLE_SETTING.getKey(),
randomFrom(EnableAllocationDecider.Rebalance.ALL,
EnableAllocationDecider.Rebalance.PRIMARIES,
EnableAllocationDecider.Rebalance.REPLICAS).name()))).build()).build();

final RebalanceShortCircuitPlugin plugin = new RebalanceShortCircuitPlugin();
AllocationService allocationService = createAllocationService(Settings.builder()
.put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE.name()),
plugin);
allocationService.reroute(clusterState, "reroute").routingTable();
assertThat(plugin.rebalanceAttempts, greaterThan(0));
}

public void testAllocationSkippedIfDisabled() {
final AllocateShortCircuitPlugin plugin = new AllocateShortCircuitPlugin();
AllocationService allocationService = createAllocationService(Settings.builder()
.put(CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), EnableAllocationDecider.Allocation.NONE.name()),
plugin);

MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
.build();

RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.build();

ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metaData(metaData).routingTable(routingTable).nodes(DiscoveryNodes.builder().add(newNode("node1"))).build();

allocationService.reroute(clusterState, "reroute").routingTable();
assertThat(plugin.canAllocateAttempts, equalTo(0));
}

private static AllocationService createAllocationService(Settings.Builder settings, ClusterPlugin plugin) {
final ClusterSettings emptyClusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
List<AllocationDecider> deciders = new ArrayList<>(ClusterModule.createAllocationDeciders(settings.build(), emptyClusterSettings,
Collections.singletonList(plugin)));
return new MockAllocationService(
new AllocationDeciders(deciders),
new TestGatewayAllocator(), new BalancedShardsAllocator(Settings.EMPTY), EmptyClusterInfoService.INSTANCE);
}

private static class RebalanceShortCircuitPlugin implements ClusterPlugin {
int rebalanceAttempts;

@Override
public Collection<AllocationDecider> createAllocationDeciders(Settings settings, ClusterSettings clusterSettings) {
return Collections.singletonList(new RebalanceShortCircuitAllocationDecider());
}

private class RebalanceShortCircuitAllocationDecider extends AllocationDecider {

@Override
public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
rebalanceAttempts++;
return super.canRebalance(shardRouting, allocation);
}

@Override
public Decision canRebalance(RoutingAllocation allocation) {
rebalanceAttempts++;
return super.canRebalance(allocation);
}
}
}

private static class AllocateShortCircuitPlugin implements ClusterPlugin {
int canAllocateAttempts;

@Override
public Collection<AllocationDecider> createAllocationDeciders(Settings settings, ClusterSettings clusterSettings) {
return Collections.singletonList(new AllocateShortCircuitAllocationDecider());
}

private class AllocateShortCircuitAllocationDecider extends AllocationDecider {

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
canAllocateAttempts++;
return super.canAllocate(shardRouting, node, allocation);
}

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocation) {
canAllocateAttempts++;
return super.canAllocate(shardRouting, allocation);
}

@Override
public Decision canAllocate(IndexMetaData indexMetaData, RoutingNode node, RoutingAllocation allocation) {
canAllocateAttempts++;
return super.canAllocate(indexMetaData, node, allocation);
}

@Override
public Decision canAllocate(RoutingNode node, RoutingAllocation allocation) {
canAllocateAttempts++;
return super.canAllocate(node, allocation);
}
}
}
}

0 comments on commit 2e21a11

Please sign in to comment.