diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java index 22cac7e9cb1..551239d6ee7 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java @@ -20,7 +20,6 @@ */ import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -29,7 +28,6 @@ import java.util.Set; import java.util.stream.Collectors; -import com.google.common.annotations.VisibleForTesting; import org.apache.helix.HelixConstants; import org.apache.helix.HelixManager; import org.apache.helix.HelixRebalanceException; @@ -42,6 +40,7 @@ import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider; import org.apache.helix.controller.rebalancer.waged.model.OptimalAssignment; import org.apache.helix.controller.stages.CurrentStateOutput; +import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.IdealState; import org.apache.helix.model.Partition; import org.apache.helix.model.Resource; @@ -49,6 +48,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; + /** * Weight-Aware Globally-Even Distribute Rebalancer. * @see GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES = - Collections - .unmodifiableSet(new HashSet<>(Arrays.asList(HelixConstants.ChangeType.RESOURCE_CONFIG, - HelixConstants.ChangeType.CLUSTER_CONFIG, - HelixConstants.ChangeType.INSTANCE_CONFIG))); + ImmutableSet.of(HelixConstants.ChangeType.RESOURCE_CONFIG, + HelixConstants.ChangeType.CLUSTER_CONFIG, HelixConstants.ChangeType.INSTANCE_CONFIG); // The cluster change detector is a stateful object. // Make it static to avoid unnecessary reinitialization. private static final ThreadLocal CHANGE_DETECTOR_THREAD_LOCAL = @@ -79,13 +79,12 @@ public class WagedRebalancer { private final RebalanceAlgorithm _rebalanceAlgorithm; // ------------------------------------------------------------------------------------// - public WagedRebalancer(HelixManager helixManager) { + public WagedRebalancer(HelixManager helixManager, + Map preferences) { this( // TODO init the metadata store according to their requirement when integrate, // or change to final static method if possible. - new AssignmentMetadataStore(helixManager), - // TODO parse the cluster setting - ConstraintBasedAlgorithmFactory.getInstance(), + new AssignmentMetadataStore(helixManager), ConstraintBasedAlgorithmFactory.getInstance(preferences), // Use DelayedAutoRebalancer as the mapping calculator for the final assignment output. // Mapping calculator will translate the best possible assignment into the applicable state // mapping based on the current states. diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java index 479fb78d1ff..89a3f29662c 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithm.java @@ -53,14 +53,12 @@ class ConstraintBasedAlgorithm implements RebalanceAlgorithm { private static final Logger LOG = LoggerFactory.getLogger(ConstraintBasedAlgorithm.class); private final List _hardConstraints; - private final List _softConstraints; - private final SoftConstraintWeightModel _softConstraintsWeightModel; + private final Map _softConstraints; ConstraintBasedAlgorithm(List hardConstraints, - List softConstraints, SoftConstraintWeightModel softConstraintWeightModel) { + Map softConstraints) { _hardConstraints = hardConstraints; _softConstraints = softConstraints; - _softConstraintsWeightModel = softConstraintWeightModel; } @Override @@ -115,13 +113,22 @@ private Optional getNodeWithHighestPoints(AssignableReplica repl } Function calculatePoints = - (candidateNode) -> _softConstraintsWeightModel.getSumOfScores(_softConstraints.stream() - .collect(Collectors.toMap(Function.identity(), softConstraint -> softConstraint - .getAssignmentNormalizedScore(candidateNode, replica, clusterContext)))); + (candidateNode) -> getAssignmentNormalizedScore(candidateNode, replica, clusterContext); return candidateNodes.stream().max(Comparator.comparing(calculatePoints)); } + private float getAssignmentNormalizedScore(AssignableNode node, AssignableReplica replica, + ClusterContext clusterContext) { + float sum = 0; + for (Map.Entry softConstraintEntry : _softConstraints.entrySet()) { + SoftConstraint softConstraint = softConstraintEntry.getKey(); + float weight = softConstraintEntry.getValue(); + sum += weight * softConstraint.getAssignmentNormalizedScore(node, replica, clusterContext); + } + return sum; + } + private List convertFailureReasons(List hardConstraints) { return hardConstraints.stream().map(HardConstraint::getDescription) .collect(Collectors.toList()); diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java index 895fa61c086..8568444aa84 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java @@ -19,23 +19,41 @@ * under the License. */ -import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm; import org.apache.helix.model.ClusterConfig; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +/** + * The factory class to create an instance of {@link ConstraintBasedAlgorithm} + */ public class ConstraintBasedAlgorithmFactory { - // TODO: the parameter comes from cluster config, will tune how these 2 integers will change the - // soft constraint weight model - public static RebalanceAlgorithm getInstance() { - // TODO initialize constraints, depending on constraints implementations PRs - List hardConstraints = new ArrayList<>(); - List softConstraints = new ArrayList<>(); - SoftConstraintWeightModel softConstraintWeightModel = new SoftConstraintWeightModel(); + public static RebalanceAlgorithm getInstance( + Map preferences) { + List hardConstraints = + ImmutableList.of(new FaultZoneAwareConstraint(), new NodeCapacityConstraint(), + new ReplicaActivateConstraint(), new NodeMaxPartitionLimitConstraint(), + new ValidGroupTagConstraint(), new SamePartitionOnInstanceConstraint()); + + int evennessPreference = + preferences.getOrDefault(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, 1); + int movementPreference = + preferences.getOrDefault(ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT, 1); + float evennessRatio = (float) evennessPreference / (evennessPreference + movementPreference); + float movementRatio = (float) movementPreference / (evennessPreference + movementPreference); + + Map softConstraints = ImmutableMap. builder() + .put(new PartitionMovementConstraint(), movementRatio) + .put(new InstancePartitionsCountConstraint(), 0.3f * evennessRatio) + .put(new ResourcePartitionAntiAffinityConstraint(), 0.1f * evennessRatio) + .put(new ResourceTopStateAntiAffinityConstraint(), 0.1f * evennessRatio) + .put(new MaxCapacityUsageInstanceConstraint(), 0.5f * evennessRatio).build(); - return new ConstraintBasedAlgorithm(hardConstraints, softConstraints, - softConstraintWeightModel); + return new ConstraintBasedAlgorithm(hardConstraints, softConstraints); } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/SoftConstraintWeightModel.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/SoftConstraintWeightModel.java deleted file mode 100644 index 953005c2d8d..00000000000 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/SoftConstraintWeightModel.java +++ /dev/null @@ -1,58 +0,0 @@ -package org.apache.helix.controller.rebalancer.waged.constraints; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.Map; - -import com.google.common.collect.ImmutableMap; - -/** - * The class retrieves the offline model that defines the relative importance of soft constraints. - */ -class SoftConstraintWeightModel { - private static Map MODEL; - - // TODO either define the weights in property files or zookeeper node or static human input - SoftConstraintWeightModel() { - - } - - static { - // TODO update the weight - MODEL = ImmutableMap. builder().put(InstancePartitionsCountConstraint.class, 1.0f) - .build(); - } - - /** - * Get the sum of normalized scores, given calculated scores map of soft constraints - * @param originScoresMap The origin scores by soft constraints - * @return The sum of soft constraints scores - */ - float getSumOfScores(Map originScoresMap) { - float sum = 0; - for (Map.Entry softConstraintScoreEntry : originScoresMap.entrySet()) { - SoftConstraint softConstraint = softConstraintScoreEntry.getKey(); - float weight = MODEL.get(softConstraint.getClass()); - sum += softConstraintScoreEntry.getValue() * weight; - } - - return sum; - } -} diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java index ba4da88c4f1..806ef858918 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java @@ -40,6 +40,7 @@ import org.apache.helix.controller.rebalancer.SemiAutoRebalancer; import org.apache.helix.controller.rebalancer.internal.MappingCalculator; import org.apache.helix.controller.rebalancer.waged.WagedRebalancer; +import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.MaintenanceSignal; @@ -121,7 +122,9 @@ private BestPossibleStateOutput compute(ClusterEvent event, Map newIdealStates = new HashMap<>(); - WagedRebalancer wagedRebalancer = new WagedRebalancer(helixManager); + Map preferences = cache.getClusterConfig() + .getGlobalRebalancePreference(); + WagedRebalancer wagedRebalancer = new WagedRebalancer(helixManager, preferences); try { newIdealStates .putAll(wagedRebalancer.computeNewIdealStates(cache, resourceMap, currentStateOutput)); diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestConstraintBasedAlgorithm.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestConstraintBasedAlgorithm.java index 0e61eb3054b..b2deaefa525 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestConstraintBasedAlgorithm.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestConstraintBasedAlgorithm.java @@ -34,6 +34,7 @@ import org.testng.annotations.Test; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; public class TestConstraintBasedAlgorithm { private ConstraintBasedAlgorithm _algorithm; @@ -42,12 +43,11 @@ public class TestConstraintBasedAlgorithm { public void beforeMethod() { HardConstraint mockHardConstraint = mock(HardConstraint.class); SoftConstraint mockSoftConstraint = mock(SoftConstraint.class); - SoftConstraintWeightModel mockSoftConstraintWeightModel = mock(SoftConstraintWeightModel.class); when(mockHardConstraint.isAssignmentValid(any(), any(), any())).thenReturn(false); when(mockSoftConstraint.getAssignmentNormalizedScore(any(), any(), any())).thenReturn(1.0f); _algorithm = new ConstraintBasedAlgorithm(ImmutableList.of(mockHardConstraint), - ImmutableList.of(mockSoftConstraint), mockSoftConstraintWeightModel); + ImmutableMap.of(mockSoftConstraint, 1f)); } @Test(expectedExceptions = HelixRebalanceException.class) @@ -60,12 +60,10 @@ public void testCalculateNoValidAssignment() throws IOException, HelixRebalanceE public void testCalculateWithValidAssignment() throws IOException, HelixRebalanceException { HardConstraint mockHardConstraint = mock(HardConstraint.class); SoftConstraint mockSoftConstraint = mock(SoftConstraint.class); - SoftConstraintWeightModel mockSoftConstraintWeightModel = mock(SoftConstraintWeightModel.class); when(mockHardConstraint.isAssignmentValid(any(), any(), any())).thenReturn(true); when(mockSoftConstraint.getAssignmentNormalizedScore(any(), any(), any())).thenReturn(1.0f); - when(mockSoftConstraintWeightModel.getSumOfScores(any())).thenReturn(1.0f); _algorithm = new ConstraintBasedAlgorithm(ImmutableList.of(mockHardConstraint), - ImmutableList.of(mockSoftConstraint), mockSoftConstraintWeightModel); + ImmutableMap.of(mockSoftConstraint, 1f)); ClusterModel clusterModel = new ClusterModelTestHelper().getDefaultClusterModel(); OptimalAssignment optimalAssignment = _algorithm.calculate(clusterModel);