Skip to content

Commit

Permalink
Implement one of the soft constraints (apache#450)
Browse files Browse the repository at this point in the history
Implement Instance Partitions Count soft constraint.
Evaluate by instance's current partition count versus estimated max partition count.
Intuitively, Encourage the assignment if the instance's occupancy rate is below average;
Discourage the assignment if the instance's occupancy rate is above average.

The final normalized score will be within [0, 1].
The implementation of the class will depend on the cluster current total partitions count as the max score.
  • Loading branch information
i3wangyi authored and jiajunwang committed Feb 7, 2020
1 parent bbae20d commit 03d0d80
Show file tree
Hide file tree
Showing 9 changed files with 196 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ private Optional<AssignableNode> getNodeWithHighestPoints(AssignableReplica repl
Function<AssignableNode, Float> calculatePoints =
(candidateNode) -> _softConstraintsWeightModel.getSumOfScores(_softConstraints.stream()
.collect(Collectors.toMap(Function.identity(), softConstraint -> softConstraint
.getAssignmentOriginScore(candidateNode, replica, clusterContext))));
.getAssignmentNormalizedScore(candidateNode, replica, clusterContext))));

return candidateNodes.stream().max(Comparator.comparing(calculatePoints));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package org.apache.helix.controller.rebalancer.waged.constraints;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;

/**
* Evaluate by instance's current partition count versus estimated max partition count
* Intuitively, Encourage the assignment if the instance's occupancy rate is below average;
* Discourage the assignment if the instance's occupancy rate is above average
*/
class InstancePartitionsCountConstraint extends SoftConstraint {
private static final float MAX_SCORE = 1f;
private static final float MIN_SCORE = 0f;

InstancePartitionsCountConstraint() {
super(MAX_SCORE, MIN_SCORE);
}

@Override
protected float getAssignmentScore(AssignableNode node, AssignableReplica replica,
ClusterContext clusterContext) {
float doubleEstimatedMaxPartitionCount = 2 * clusterContext.getEstimatedMaxPartitionCount();
float currentPartitionCount = node.getAssignedReplicaCount();
return Math.max((doubleEstimatedMaxPartitionCount - currentPartitionCount)
/ doubleEstimatedMaxPartitionCount, 0);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@
import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;

/**
* Evaluate a partition allocation proposal and return a score within the normalized range.
* A higher score means the proposal is more preferred.
* The "soft" constraint evaluates the optimality of an assignment by giving it a score of a scale of [minScore, maxScore]
* The higher the score, the better the assignment; Intuitively, the assignment is encouraged.
* The lower score the score, the worse the assignment; Intuitively, the assignment is penalized.
*/
abstract class SoftConstraint {
private float _maxScore = 1000f;
private float _minScore = -1000f;

interface ScalerFunction {
interface NormalizeFunction {
/**
* Scale the origin score to a normalized range (0, 1).
* The purpose is to compare scores between different soft constraints.
Expand All @@ -57,23 +58,38 @@ interface ScalerFunction {
_minScore = minScore;
}

float getMaxScore() {
return _maxScore;
}

float getMinScore() {
return _minScore;
}

/**
* The scoring function returns a score between MINIMAL_SCORE and MAXIMUM_SCORE, which is then
* weighted by the
* individual normalized constraint weights.
* Each individual constraint will define the meaning of MINIMAL_SCORE to MAXIMUM_SCORE
* differently.
* @return float value representing the score
* Evaluate and give a score for an potential assignment partition -> instance
* Child class only needs to care about how the score is implemented
* @return The score of the assignment in float value
*/
abstract float getAssignmentOriginScore(AssignableNode node, AssignableReplica replica,
protected abstract float getAssignmentScore(AssignableNode node, AssignableReplica replica,
ClusterContext clusterContext);

/**
* Evaluate and give a score for an potential assignment partition -> instance
* It's the only exposed method to the caller
* @return The score is normalized to be within MinScore and MaxScore
*/
float getAssignmentNormalizedScore(AssignableNode node, AssignableReplica replica,
ClusterContext clusterContext) {
return getNormalizeFunction().scale(getAssignmentScore(node, replica, clusterContext));
}

/**
* The default scaler function that squashes any score within (min_score, max_score) to (0, 1);
* Child class could override the method and customize the method on its own
* @return The MinMaxScaler instance by default
*/
ScalerFunction getScalerFunction() {
return (score) -> (score - _minScore) / (_maxScore - _minScore);
NormalizeFunction getNormalizeFunction() {
return (score) -> (score - getMinScore()) / (getMaxScore() - getMinScore());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,17 @@
* The class retrieves the offline model that defines the relative importance of soft constraints.
*/
class SoftConstraintWeightModel {
private static Map<? extends SoftConstraint, Float> MODEL;
private static Map<Class, Float> MODEL;

// TODO either define the weights in property files or zookeeper node or static human input
SoftConstraintWeightModel() {

}

static {
MODEL = ImmutableMap.<SoftConstraint, Float> builder()
.put(LeastPartitionCountConstraint.INSTANCE, 1.0f).build();
//TODO update the weight
MODEL = ImmutableMap.<Class, Float> builder().put(InstancePartitionsCountConstraint.class, 1.0f)
.build();
}

/**
Expand All @@ -48,9 +49,8 @@ float getSumOfScores(Map<SoftConstraint, Float> originScoresMap) {
float sum = 0;
for (Map.Entry<SoftConstraint, Float> softConstraintScoreEntry : originScoresMap.entrySet()) {
SoftConstraint softConstraint = softConstraintScoreEntry.getKey();
float score = softConstraint.getScalerFunction().scale(softConstraintScoreEntry.getValue());
float weight = MODEL.get(softConstraint);
sum += score * weight;
float weight = MODEL.get(softConstraint.getClass());
sum += softConstraintScoreEntry.getValue() * weight;
}

return sum;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
* under the License.
*/

import org.apache.helix.HelixException;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -29,6 +27,8 @@
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.helix.HelixException;

/**
* This class tracks the rebalance-related global cluster status.
*/
Expand All @@ -47,8 +47,7 @@ public class ClusterContext {

/**
* Construct the cluster context based on the current instance status.
*
* @param replicaSet All the partition replicas that are managed by the rebalancer
* @param replicaSet All the partition replicas that are managed by the rebalancer
* @param instanceCount The count of all the active instances that can be used to host partitions.
*/
ClusterContext(Set<AssignableReplica> replicaSet, int instanceCount) {
Expand Down Expand Up @@ -95,8 +94,8 @@ public Set<String> getPartitionsForResourceAndFaultZone(String resourceName, Str
void addPartitionToFaultZone(String faultZoneId, String resourceName, String partition) {
if (!_assignmentForFaultZoneMap.computeIfAbsent(faultZoneId, k -> new HashMap<>())
.computeIfAbsent(resourceName, k -> new HashSet<>()).add(partition)) {
throw new HelixException(String
.format("Resource %s already has a replica from partition %s in fault zone %s",
throw new HelixException(
String.format("Resource %s already has a replica from partition %s in fault zone %s",
resourceName, partition, faultZoneId));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void beforeMethod() {
SoftConstraint mockSoftConstraint = mock(SoftConstraint.class);
SoftConstraintWeightModel mockSoftConstraintWeightModel = mock(SoftConstraintWeightModel.class);
when(mockHardConstraint.isAssignmentValid(any(), any(), any())).thenReturn(false);
when(mockSoftConstraint.getAssignmentOriginScore(any(), any(), any())).thenReturn(1.0f);
when(mockSoftConstraint.getAssignmentNormalizedScore(any(), any(), any())).thenReturn(1.0f);

_algorithm = new ConstraintBasedAlgorithm(ImmutableList.of(mockHardConstraint),
ImmutableList.of(mockSoftConstraint), mockSoftConstraintWeightModel);
Expand All @@ -62,7 +62,7 @@ public void testCalculateWithValidAssignment() throws IOException, HelixRebalanc
SoftConstraint mockSoftConstraint = mock(SoftConstraint.class);
SoftConstraintWeightModel mockSoftConstraintWeightModel = mock(SoftConstraintWeightModel.class);
when(mockHardConstraint.isAssignmentValid(any(), any(), any())).thenReturn(true);
when(mockSoftConstraint.getAssignmentOriginScore(any(), any(), any())).thenReturn(1.0f);
when(mockSoftConstraint.getAssignmentNormalizedScore(any(), any(), any())).thenReturn(1.0f);
when(mockSoftConstraintWeightModel.getSumOfScores(any())).thenReturn(1.0f);
_algorithm = new ConstraintBasedAlgorithm(ImmutableList.of(mockHardConstraint),
ImmutableList.of(mockSoftConstraint), mockSoftConstraintWeightModel);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package org.apache.helix.controller.rebalancer.waged.constraints;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import static org.mockito.Mockito.when;

import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

public class TestInstancePartitionsCountConstraint {
private final AssignableReplica _testReplica = Mockito.mock(AssignableReplica.class);
private final AssignableNode _testNode = Mockito.mock(AssignableNode.class);
private final ClusterContext _clusterContext = Mockito.mock(ClusterContext.class);

private final SoftConstraint _constraint = new InstancePartitionsCountConstraint();

@Test
public void testWhenInstanceIsIdle() {
when(_testNode.getAssignedReplicaCount()).thenReturn(0);
float score = _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext);
Assert.assertEquals(score, 1.0f);
}

@Test
public void testWhenInstanceIsFull() {
when(_testNode.getAssignedReplicaCount()).thenReturn(10);
when(_clusterContext.getEstimatedMaxPartitionCount()).thenReturn(10);
float score = _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext);
Assert.assertEquals(score, 0.5f);
}

@Test
public void testWhenInstanceHalfOccupied() {
when(_testNode.getAssignedReplicaCount()).thenReturn(10);
when(_clusterContext.getEstimatedMaxPartitionCount()).thenReturn(20);
float score = _constraint.getAssignmentNormalizedScore(_testNode, _testReplica, _clusterContext);
Assert.assertEquals(score, 0.75f);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package org.apache.helix.controller.rebalancer.waged.constraints;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
import org.testng.Assert;
import org.testng.annotations.Test;

public class TestSoftConstraintNormalizeFunction {
@Test
public void testDefaultNormalizeFunction() {
int maxScore = 100;
int minScore = 0;
SoftConstraint softConstraint = new SoftConstraint(maxScore, minScore) {
@Override
protected float getAssignmentScore(AssignableNode node, AssignableReplica replica,
ClusterContext clusterContext) {
return 0;
}
};

for (int i = minScore; i <= maxScore; i++) {
float normalized = softConstraint.getNormalizeFunction().scale(i);
Assert.assertTrue(normalized <= 1 && normalized >= 0,
String.format("input: %s, output: %s", i, normalized));
}
}
}

0 comments on commit 03d0d80

Please sign in to comment.