Skip to content

Commit

Permalink
Add the remaining implementation of ConstraintBasedAlgorithmFactory (a…
Browse files Browse the repository at this point in the history
…pache#478)

Implementation of ConstraintBasedAlgorithmFactory and the soft constraint weight model.
Remove SoftConstraintWeightModel class.
Get the rebalance preference and adjust the corresponding weight.
Pass the preference keys instead of cluster config.
  • Loading branch information
i3wangyi authored and jiajunwang committed Nov 27, 2019
1 parent dac9b66 commit f769bb7
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
*/

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -29,7 +28,6 @@
import java.util.Set;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixRebalanceException;
Expand All @@ -42,13 +40,17 @@
import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
import org.apache.helix.controller.rebalancer.waged.model.OptimalAssignment;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;

/**
* Weight-Aware Globally-Even Distribute Rebalancer.
* @see <a
Expand All @@ -62,10 +64,8 @@ public class WagedRebalancer {
// When any of the following change happens, the rebalancer needs to do a global rebalance which
// contains 1. baseline recalculate, 2. partial rebalance that is based on the new baseline.
private static final Set<HelixConstants.ChangeType> GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES =
Collections
.unmodifiableSet(new HashSet<>(Arrays.asList(HelixConstants.ChangeType.RESOURCE_CONFIG,
HelixConstants.ChangeType.CLUSTER_CONFIG,
HelixConstants.ChangeType.INSTANCE_CONFIG)));
ImmutableSet.of(HelixConstants.ChangeType.RESOURCE_CONFIG,
HelixConstants.ChangeType.CLUSTER_CONFIG, HelixConstants.ChangeType.INSTANCE_CONFIG);
// The cluster change detector is a stateful object.
// Make it static to avoid unnecessary reinitialization.
private static final ThreadLocal<ResourceChangeDetector> CHANGE_DETECTOR_THREAD_LOCAL =
Expand All @@ -79,13 +79,12 @@ public class WagedRebalancer {
private final RebalanceAlgorithm _rebalanceAlgorithm;
// ------------------------------------------------------------------------------------//

public WagedRebalancer(HelixManager helixManager) {
public WagedRebalancer(HelixManager helixManager,
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
this(
// TODO init the metadata store according to their requirement when integrate,
// or change to final static method if possible.
new AssignmentMetadataStore(helixManager),
// TODO parse the cluster setting
ConstraintBasedAlgorithmFactory.getInstance(),
new AssignmentMetadataStore(helixManager), ConstraintBasedAlgorithmFactory.getInstance(preferences),
// Use DelayedAutoRebalancer as the mapping calculator for the final assignment output.
// Mapping calculator will translate the best possible assignment into the applicable state
// mapping based on the current states.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,12 @@
class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
private static final Logger LOG = LoggerFactory.getLogger(ConstraintBasedAlgorithm.class);
private final List<HardConstraint> _hardConstraints;
private final List<SoftConstraint> _softConstraints;
private final SoftConstraintWeightModel _softConstraintsWeightModel;
private final Map<SoftConstraint, Float> _softConstraints;

ConstraintBasedAlgorithm(List<HardConstraint> hardConstraints,
List<SoftConstraint> softConstraints, SoftConstraintWeightModel softConstraintWeightModel) {
Map<SoftConstraint, Float> softConstraints) {
_hardConstraints = hardConstraints;
_softConstraints = softConstraints;
_softConstraintsWeightModel = softConstraintWeightModel;
}

@Override
Expand Down Expand Up @@ -115,13 +113,22 @@ private Optional<AssignableNode> getNodeWithHighestPoints(AssignableReplica repl
}

Function<AssignableNode, Float> calculatePoints =
(candidateNode) -> _softConstraintsWeightModel.getSumOfScores(_softConstraints.stream()
.collect(Collectors.toMap(Function.identity(), softConstraint -> softConstraint
.getAssignmentNormalizedScore(candidateNode, replica, clusterContext))));
(candidateNode) -> getAssignmentNormalizedScore(candidateNode, replica, clusterContext);

return candidateNodes.stream().max(Comparator.comparing(calculatePoints));
}

private float getAssignmentNormalizedScore(AssignableNode node, AssignableReplica replica,
ClusterContext clusterContext) {
float sum = 0;
for (Map.Entry<SoftConstraint, Float> softConstraintEntry : _softConstraints.entrySet()) {
SoftConstraint softConstraint = softConstraintEntry.getKey();
float weight = softConstraintEntry.getValue();
sum += weight * softConstraint.getAssignmentNormalizedScore(node, replica, clusterContext);
}
return sum;
}

private List<String> convertFailureReasons(List<HardConstraint> hardConstraints) {
return hardConstraints.stream().map(HardConstraint::getDescription)
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,41 @@
* under the License.
*/

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm;
import org.apache.helix.model.ClusterConfig;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

/**
* The factory class to create an instance of {@link ConstraintBasedAlgorithm}
*/
public class ConstraintBasedAlgorithmFactory {

// TODO: the parameter comes from cluster config, will tune how these 2 integers will change the
// soft constraint weight model
public static RebalanceAlgorithm getInstance() {
// TODO initialize constraints, depending on constraints implementations PRs
List<HardConstraint> hardConstraints = new ArrayList<>();
List<SoftConstraint> softConstraints = new ArrayList<>();
SoftConstraintWeightModel softConstraintWeightModel = new SoftConstraintWeightModel();
public static RebalanceAlgorithm getInstance(
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
List<HardConstraint> hardConstraints =
ImmutableList.of(new FaultZoneAwareConstraint(), new NodeCapacityConstraint(),
new ReplicaActivateConstraint(), new NodeMaxPartitionLimitConstraint(),
new ValidGroupTagConstraint(), new SamePartitionOnInstanceConstraint());

int evennessPreference =
preferences.getOrDefault(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, 1);
int movementPreference =
preferences.getOrDefault(ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT, 1);
float evennessRatio = (float) evennessPreference / (evennessPreference + movementPreference);
float movementRatio = (float) movementPreference / (evennessPreference + movementPreference);

Map<SoftConstraint, Float> softConstraints = ImmutableMap.<SoftConstraint, Float> builder()
.put(new PartitionMovementConstraint(), movementRatio)
.put(new InstancePartitionsCountConstraint(), 0.3f * evennessRatio)
.put(new ResourcePartitionAntiAffinityConstraint(), 0.1f * evennessRatio)
.put(new ResourceTopStateAntiAffinityConstraint(), 0.1f * evennessRatio)
.put(new MaxCapacityUsageInstanceConstraint(), 0.5f * evennessRatio).build();

return new ConstraintBasedAlgorithm(hardConstraints, softConstraints,
softConstraintWeightModel);
return new ConstraintBasedAlgorithm(hardConstraints, softConstraints);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.MaintenanceSignal;
Expand Down Expand Up @@ -121,7 +122,9 @@ private BestPossibleStateOutput compute(ClusterEvent event, Map<String, Resource
// configured to use the WAGED rebalancer.
// For the other resources, the legacy rebalancers will be triggered in the next step.
Map<String, IdealState> newIdealStates = new HashMap<>();
WagedRebalancer wagedRebalancer = new WagedRebalancer(helixManager);
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences = cache.getClusterConfig()
.getGlobalRebalancePreference();
WagedRebalancer wagedRebalancer = new WagedRebalancer(helixManager, preferences);
try {
newIdealStates
.putAll(wagedRebalancer.computeNewIdealStates(cache, resourceMap, currentStateOutput));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.testng.annotations.Test;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

public class TestConstraintBasedAlgorithm {
private ConstraintBasedAlgorithm _algorithm;
Expand All @@ -42,12 +43,11 @@ public class TestConstraintBasedAlgorithm {
public void beforeMethod() {
HardConstraint mockHardConstraint = mock(HardConstraint.class);
SoftConstraint mockSoftConstraint = mock(SoftConstraint.class);
SoftConstraintWeightModel mockSoftConstraintWeightModel = mock(SoftConstraintWeightModel.class);
when(mockHardConstraint.isAssignmentValid(any(), any(), any())).thenReturn(false);
when(mockSoftConstraint.getAssignmentNormalizedScore(any(), any(), any())).thenReturn(1.0f);

_algorithm = new ConstraintBasedAlgorithm(ImmutableList.of(mockHardConstraint),
ImmutableList.of(mockSoftConstraint), mockSoftConstraintWeightModel);
ImmutableMap.of(mockSoftConstraint, 1f));
}

@Test(expectedExceptions = HelixRebalanceException.class)
Expand All @@ -60,12 +60,10 @@ public void testCalculateNoValidAssignment() throws IOException, HelixRebalanceE
public void testCalculateWithValidAssignment() throws IOException, HelixRebalanceException {
HardConstraint mockHardConstraint = mock(HardConstraint.class);
SoftConstraint mockSoftConstraint = mock(SoftConstraint.class);
SoftConstraintWeightModel mockSoftConstraintWeightModel = mock(SoftConstraintWeightModel.class);
when(mockHardConstraint.isAssignmentValid(any(), any(), any())).thenReturn(true);
when(mockSoftConstraint.getAssignmentNormalizedScore(any(), any(), any())).thenReturn(1.0f);
when(mockSoftConstraintWeightModel.getSumOfScores(any())).thenReturn(1.0f);
_algorithm = new ConstraintBasedAlgorithm(ImmutableList.of(mockHardConstraint),
ImmutableList.of(mockSoftConstraint), mockSoftConstraintWeightModel);
ImmutableMap.of(mockSoftConstraint, 1f));
ClusterModel clusterModel = new ClusterModelTestHelper().getDefaultClusterModel();
OptimalAssignment optimalAssignment = _algorithm.calculate(clusterModel);

Expand Down

0 comments on commit f769bb7

Please sign in to comment.