Skip to content

Commit

Permalink
PartitionMovementSoftConstraint Implementation (apache#474)
Browse files Browse the repository at this point in the history
Add soft constraint: partition movement constraint

Evaluate the proposed assignment according to the potential partition movements cost.
The cost is evaluated based on the difference between the old assignment and the new assignment.
  • Loading branch information
Yi Wang authored and jiajunwang committed Jan 6, 2020
1 parent af972c8 commit b101515
Show file tree
Hide file tree
Showing 8 changed files with 278 additions and 58 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package org.apache.helix.controller.rebalancer.waged.constraints;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import java.util.Collections;
import java.util.Map;

import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
import org.apache.helix.model.Partition;
import org.apache.helix.model.ResourceAssignment;

/**
* Evaluate the proposed assignment according to the potential partition movements cost.
* The cost is evaluated based on the difference between the old assignment and the new assignment.
* In detail, we consider the following two previous assignments as the base.
* - Baseline assignment that is calculated regardless of the node state (online/offline).
* - Previous Best Possible assignment.
* Any change to these two assignments will increase the partition movements cost, so that the
* evaluated score will become lower.
*/
class PartitionMovementConstraint extends SoftConstraint {
private static final float MAX_SCORE = 1f;
private static final float MIN_SCORE = 0f;
//TODO: these factors will be tuned based on user's preference
// This factor indicates the default score that is evaluated if only partition allocation matches
// (states are different).
private static final float ALLOCATION_MATCH_FACTOR = 0.5f;
// This factor indicates the contribution of the Baseline assignment matching to the final score.
private static final float BASELINE_MATCH_FACTOR = 0.25f;

PartitionMovementConstraint() {
super(MAX_SCORE, MIN_SCORE);
}

@Override
protected float getAssignmentScore(AssignableNode node, AssignableReplica replica,
ClusterContext clusterContext) {
Map<String, String> bestPossibleStateMap =
getStateMap(replica, clusterContext.getBestPossibleAssignment());
Map<String, String> baselineStateMap =
getStateMap(replica, clusterContext.getBaselineAssignment());

// Prioritize the matching of the previous Best Possible assignment.
float scale = calculateAssignmentScale(node, replica, bestPossibleStateMap);
// If the baseline is also provided, adjust the final score accordingly.
scale = scale * (1 - BASELINE_MATCH_FACTOR)
+ calculateAssignmentScale(node, replica, baselineStateMap) * BASELINE_MATCH_FACTOR;

return scale;
}

@Override
NormalizeFunction getNormalizeFunction() {
return score -> score * (getMaxScore() - getMinScore()) + getMinScore();
}

private Map<String, String> getStateMap(AssignableReplica replica,
Map<String, ResourceAssignment> assignment) {
String resourceName = replica.getResourceName();
String partitionName = replica.getPartitionName();
if (assignment == null || !assignment.containsKey(resourceName)) {
return Collections.emptyMap();
}
return assignment.get(resourceName).getReplicaMap(new Partition(partitionName));
}

private float calculateAssignmentScale(AssignableNode node, AssignableReplica replica,
Map<String, String> instanceToStateMap) {
String instanceName = node.getInstanceName();
if (!instanceToStateMap.containsKey(instanceName)) {
return 0;
} else {
return (instanceToStateMap.get(instanceName).equals(replica.getReplicaState()) ? 1
: ALLOCATION_MATCH_FACTOR);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.stream.Collectors;

import org.apache.helix.HelixException;
import org.apache.helix.model.ResourceAssignment;


/**
* This class tracks the rebalance-related global cluster status.
Expand All @@ -44,30 +46,47 @@ public class ClusterContext {

// map{zoneName : map{resourceName : set(partitionNames)}}
private Map<String, Map<String, Set<String>>> _assignmentForFaultZoneMap = new HashMap<>();
// Records about the previous assignment
// <ResourceName, ResourceAssignment contains the baseline assignment>
private final Map<String, ResourceAssignment> _baselineAssignment;
// <ResourceName, ResourceAssignment contains the best possible assignment>
private final Map<String, ResourceAssignment> _bestPossibleAssignment;

/**
* Construct the cluster context based on the current instance status.
* @param replicaSet All the partition replicas that are managed by the rebalancer
* @param instanceCount The count of all the active instances that can be used to host partitions.
*/
ClusterContext(Set<AssignableReplica> replicaSet, int instanceCount) {
ClusterContext(Set<AssignableReplica> replicaSet, int instanceCount,
Map<String, ResourceAssignment> baselineAssignment, Map<String, ResourceAssignment> bestPossibleAssignment) {
int totalReplicas = 0;
int totalTopStateReplicas = 0;

for (Map.Entry<String, List<AssignableReplica>> entry : replicaSet.stream()
.collect(Collectors.groupingBy(AssignableReplica::getResourceName)).entrySet()) {
.collect(Collectors.groupingBy(AssignableReplica::getResourceName))
.entrySet()) {
int replicas = entry.getValue().size();
totalReplicas += replicas;

int replicaCnt = Math.max(1, estimateAvgReplicaCount(replicas, instanceCount));
_estimatedMaxPartitionByResource.put(entry.getKey(), replicaCnt);

totalTopStateReplicas +=
entry.getValue().stream().filter(AssignableReplica::isReplicaTopState).count();
totalTopStateReplicas += entry.getValue().stream().filter(AssignableReplica::isReplicaTopState).count();
}

_estimatedMaxPartitionCount = estimateAvgReplicaCount(totalReplicas, instanceCount);
_estimatedMaxTopStateCount = estimateAvgReplicaCount(totalTopStateReplicas, instanceCount);
_baselineAssignment = baselineAssignment;
_bestPossibleAssignment = bestPossibleAssignment;
}

public Map<String, ResourceAssignment> getBaselineAssignment() {
return _baselineAssignment == null || _baselineAssignment.isEmpty() ? Collections.emptyMap() : _baselineAssignment;
}

public Map<String, ResourceAssignment> getBestPossibleAssignment() {
return _bestPossibleAssignment == null || _bestPossibleAssignment.isEmpty() ? Collections.emptyMap()
: _bestPossibleAssignment;
}

public Map<String, Map<String, Set<String>>> getAssignmentForFaultZoneMap() {
Expand All @@ -93,25 +112,25 @@ public Set<String> getPartitionsForResourceAndFaultZone(String resourceName, Str

void addPartitionToFaultZone(String faultZoneId, String resourceName, String partition) {
if (!_assignmentForFaultZoneMap.computeIfAbsent(faultZoneId, k -> new HashMap<>())
.computeIfAbsent(resourceName, k -> new HashSet<>()).add(partition)) {
.computeIfAbsent(resourceName, k -> new HashSet<>())
.add(partition)) {
throw new HelixException(
String.format("Resource %s already has a replica from partition %s in fault zone %s",
resourceName, partition, faultZoneId));
String.format("Resource %s already has a replica from partition %s in fault zone %s", resourceName, partition,
faultZoneId));
}
}

boolean removePartitionFromFaultZone(String faultZoneId, String resourceName, String partition) {
return _assignmentForFaultZoneMap.getOrDefault(faultZoneId, Collections.emptyMap())
.getOrDefault(resourceName, Collections.emptySet()).remove(partition);
.getOrDefault(resourceName, Collections.emptySet())
.remove(partition);
}

void setAssignmentForFaultZoneMap(
Map<String, Map<String, Set<String>>> assignmentForFaultZoneMap) {
void setAssignmentForFaultZoneMap(Map<String, Map<String, Set<String>>> assignmentForFaultZoneMap) {
_assignmentForFaultZoneMap = assignmentForFaultZoneMap;
}

private int estimateAvgReplicaCount(int replicaCount, int instanceCount) {
return (int) Math
.ceil((float) replicaCount / instanceCount * ERROR_MARGIN_FOR_ESTIMATED_MAX_COUNT);
return (int) Math.ceil((float) replicaCount / instanceCount * ERROR_MARGIN_FOR_ESTIMATED_MAX_COUNT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@
* under the License.
*/

import org.apache.helix.HelixException;
import org.apache.helix.model.ResourceAssignment;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.helix.HelixException;

/**
* This class wraps the required input for the rebalance algorithm.
*/
Expand All @@ -39,23 +38,14 @@ public class ClusterModel {
private final Map<String, Map<String, AssignableReplica>> _assignableReplicaIndex;
private final Map<String, AssignableNode> _assignableNodeMap;

// Records about the previous assignment
// <ResourceName, ResourceAssignment contains the baseline assignment>
private final Map<String, ResourceAssignment> _baselineAssignment;
// <ResourceName, ResourceAssignment contains the best possible assignment>
private final Map<String, ResourceAssignment> _bestPossibleAssignment;

/**
* @param clusterContext The initialized cluster context.
* @param assignableReplicas The replicas to be assigned.
* Note that the replicas in this list shall not be included while initializing the context and assignable nodes.
* @param assignableNodes The active instances.
* @param baselineAssignment The recorded baseline assignment.
* @param bestPossibleAssignment The current best possible assignment.
*/
ClusterModel(ClusterContext clusterContext, Set<AssignableReplica> assignableReplicas,
Set<AssignableNode> assignableNodes, Map<String, ResourceAssignment> baselineAssignment,
Map<String, ResourceAssignment> bestPossibleAssignment) {
Set<AssignableNode> assignableNodes) {
_clusterContext = clusterContext;

// Save all the to be assigned replication
Expand All @@ -70,9 +60,6 @@ public class ClusterModel {

_assignableNodeMap = assignableNodes.stream()
.collect(Collectors.toMap(AssignableNode::getInstanceName, node -> node));

_baselineAssignment = baselineAssignment;
_bestPossibleAssignment = bestPossibleAssignment;
}

public ClusterContext getContext() {
Expand All @@ -87,14 +74,6 @@ public Map<String, Set<AssignableReplica>> getAssignableReplicaMap() {
return _assignableReplicaMap;
}

public Map<String, ResourceAssignment> getBaseline() {
return _baselineAssignment;
}

public Map<String, ResourceAssignment> getBestPossibleAssignment() {
return _bestPossibleAssignment;
}

/**
* Assign the given replica to the specified instance and record the assignment in the cluster model.
* The cluster usage information will be updated accordingly.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,11 @@ public static ClusterModel generateClusterModel(ResourceControllerDataProvider d
// Construct and initialize cluster context.
ClusterContext context = new ClusterContext(
replicaMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet()),
activeInstances.size());
activeInstances.size(), baselineAssignment, bestPossibleAssignment);
// Initial the cluster context with the allocated assignments.
context.setAssignmentForFaultZoneMap(mapAssignmentToFaultZone(assignableNodes));

return new ClusterModel(context, toBeAssignedReplicas, assignableNodes, baselineAssignment,
bestPossibleAssignment);
return new ClusterModel(context, toBeAssignedReplicas, assignableNodes);
}

/**
Expand Down
Loading

0 comments on commit b101515

Please sign in to comment.