diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ResourcePartitionAntiAffinityConstraint.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ResourcePartitionAntiAffinityConstraint.java new file mode 100644 index 00000000000..a2f9099e235 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ResourcePartitionAntiAffinityConstraint.java @@ -0,0 +1,52 @@ +package org.apache.helix.controller.rebalancer.waged.constraints; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.controller.rebalancer.waged.model.AssignableNode; +import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica; +import org.apache.helix.controller.rebalancer.waged.model.ClusterContext; + +/** + * This constraint exists to make partitions belonging to the same resource be assigned as far from + * each other as possible. This is because it is undesirable to have many partitions belonging to + * the same resource be assigned to the same node to minimize the impact of node failure scenarios. + * The score is higher the fewer the partitions are on the node belonging to the same resource. + */ +class ResourcePartitionAntiAffinityConstraint extends SoftConstraint { + private static final float MAX_SCORE = 1f; + private static final float MIN_SCORE = 0f; + + ResourcePartitionAntiAffinityConstraint() { + super(MAX_SCORE, MIN_SCORE); + } + + @Override + protected float getAssignmentScore(AssignableNode node, AssignableReplica replica, + ClusterContext clusterContext) { + String resource = replica.getResourceName(); + int curPartitionCountForResource = node.getAssignedPartitionsByResource(resource).size(); + int doubleMaxPartitionCountForResource = + 2 * clusterContext.getEstimatedMaxPartitionByResource(resource); + // The score measures the twice the max allowed count versus current counts + // The returned value is a measurement of remaining quota ratio, in the case of exceeding allowed counts, return 0 + return Math.max(((float) doubleMaxPartitionCountForResource - curPartitionCountForResource) + / doubleMaxPartitionCountForResource, 0); + } +} diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestResourcePartitionAntiAffinityConstraint.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestResourcePartitionAntiAffinityConstraint.java new file mode 100644 index 00000000000..c6830cf9a13 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/constraints/TestResourcePartitionAntiAffinityConstraint.java @@ -0,0 +1,67 @@ +package org.apache.helix.controller.rebalancer.waged.constraints; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import static org.mockito.Mockito.when; + +import java.util.Collections; + +import org.apache.helix.controller.rebalancer.waged.model.AssignableNode; +import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica; +import org.apache.helix.controller.rebalancer.waged.model.ClusterContext; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.google.common.collect.ImmutableSet; + +public class TestResourcePartitionAntiAffinityConstraint { + private static final String TEST_PARTITION = "TestPartition"; + private static final String TEST_RESOURCE = "TestResource"; + private final AssignableReplica _testReplica = Mockito.mock(AssignableReplica.class); + private final AssignableNode _testNode = Mockito.mock(AssignableNode.class); + private final ClusterContext _clusterContext = Mockito.mock(ClusterContext.class); + private final SoftConstraint _constraint = new ResourcePartitionAntiAffinityConstraint(); + + @Test + public void testGetAssignmentScore() { + when(_testReplica.getResourceName()).thenReturn(TEST_RESOURCE); + when(_testNode.getAssignedPartitionsByResource(TEST_RESOURCE)).thenReturn( + ImmutableSet.of(TEST_PARTITION + "1", TEST_PARTITION + "2", TEST_PARTITION + "3")); + when(_clusterContext.getEstimatedMaxPartitionByResource(TEST_RESOURCE)).thenReturn(10); + + float score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext); + float normalizedScore = _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext); + Assert.assertEquals(score, 0.85f); + Assert.assertEquals(normalizedScore, 0.85f); + } + + @Test + public void testGetAssignmentScoreMaxScore() { + when(_testReplica.getResourceName()).thenReturn(TEST_RESOURCE); + when(_testNode.getAssignedPartitionsByResource(TEST_RESOURCE)).thenReturn(Collections.emptySet()); + when(_clusterContext.getEstimatedMaxPartitionByResource(TEST_RESOURCE)).thenReturn(10); + + float score = _constraint.getAssignmentScore(_testNode, _testReplica, _clusterContext); + float normalizedScore = _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext); + Assert.assertEquals(score, 1f); + Assert.assertEquals(normalizedScore, 1f); + } +}