From 08a2015c617ddd3c93525afc572081a7836f9476 Mon Sep 17 00:00:00 2001 From: Jiajun Wang <1803880+jiajunwang@users.noreply.github.com> Date: Wed, 28 Aug 2019 22:28:10 -0700 Subject: [PATCH] Refine the WAGED rebalancer related interfaces for integration (#431) * Refine the WAGED rebalancer related interfaces and initial integrate with the BestPossibleStateCalStage. - Modify the BestPossibleStateCalStage logic to plugin the WAGED rebalancer. - Refine ClusterModel to integrate with the ClusterDataDetector implementation. - Enabling getting the changed details for Cluster Config in the change detector. Which is required by the WAGED rebalancer. --- .../apache/helix/HelixRebalanceException.java | 43 +++++++ .../ResourceChangeDetector.java | 20 ++-- .../rebalancer/GlobalRebalancer.java | 67 ----------- .../rebalancer/waged/ClusterDataDetector.java | 73 ------------ .../rebalancer/waged/ClusterDataProvider.java | 54 --------- .../rebalancer/waged/WagedRebalancer.java | 65 ++++++++--- .../waged/model/ClusterModelProvider.java | 25 ++-- .../stages/BestPossibleStateCalcStage.java | 109 +++++++++++++----- .../waged/model/TestClusterModelProvider.java | 6 +- 9 files changed, 202 insertions(+), 260 deletions(-) create mode 100644 helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java delete mode 100644 helix-core/src/main/java/org/apache/helix/controller/rebalancer/GlobalRebalancer.java delete mode 100644 helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataDetector.java delete mode 100644 helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataProvider.java diff --git a/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java b/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java new file mode 100644 index 0000000000..c01b173e87 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/HelixRebalanceException.java @@ -0,0 +1,43 @@ +package org.apache.helix; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Exception thrown by Helix due to rebalance failures. + */ +public class HelixRebalanceException extends Exception { + enum RebalanceFailureType { + INVALID_CLUSTER_STATUS, + INVALID_REBALANCER_STATUS, + FAILED_TO_CALCULATE, + UNKNOWN_FAILURE + } + + private final RebalanceFailureType _type; + + public HelixRebalanceException(String message, RebalanceFailureType type, Throwable cause) { + super(String.format("%s. Failure Type: %s", message, type.name()), cause); + _type = type; + } + + public RebalanceFailureType getFailureType() { + return _type; + } +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java index d65e609258..611f4b2bc9 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java +++ b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java @@ -20,15 +20,17 @@ */ import com.google.common.collect.Sets; +import org.apache.helix.HelixConstants; +import org.apache.helix.HelixProperty; +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import org.apache.helix.HelixConstants; -import org.apache.helix.HelixException; -import org.apache.helix.HelixProperty; -import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; /** * ResourceChangeDetector implements ChangeDetector. It caches resource-related metadata from @@ -37,6 +39,7 @@ * WARNING: the methods of this class are not thread-safe. */ public class ResourceChangeDetector implements ChangeDetector { + private static final Logger LOG = LoggerFactory.getLogger(ResourceChangeDetector.class.getName()); private ResourceChangeSnapshot _oldSnapshot; // snapshot for previous pipeline run private ResourceChangeSnapshot _newSnapshot; // snapshot for this pipeline run @@ -108,10 +111,13 @@ private void clearCachedComputation() { return snapshot.getResourceConfigMap(); case LIVE_INSTANCE: return snapshot.getLiveInstances(); + case CONFIG: + return Collections.emptyMap(); default: - throw new HelixException(String.format( - "ResourceChangeDetector cannot compute the names of changes for the given ChangeType: %s", - changeType)); + LOG.warn( + "ResourceChangeDetector cannot compute the names of changes for the given ChangeType: {}", + changeType); + return Collections.emptyMap(); } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/GlobalRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/GlobalRebalancer.java deleted file mode 100644 index a3b9b3259e..0000000000 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/GlobalRebalancer.java +++ /dev/null @@ -1,67 +0,0 @@ -package org.apache.helix.controller.rebalancer; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import org.apache.helix.HelixManager; -import org.apache.helix.controller.dataproviders.BaseControllerDataProvider; -import org.apache.helix.controller.stages.CurrentStateOutput; -import org.apache.helix.model.IdealState; -import org.apache.helix.model.Resource; - -import java.util.Map; - -public interface GlobalRebalancer { - enum RebalanceFailureType { - INVALID_CLUSTER_STATUS, - INVALID_REBALANCER_STATUS, - FAILED_TO_CALCULATE, - UNKNOWN_FAILURE - } - - class RebalanceFailureReason { - private final static String DEFAULT_REASON_MESSAGE = "No detail"; - private final RebalanceFailureType _type; - private final String _reason; - - public RebalanceFailureReason(RebalanceFailureType type) { - this(type, DEFAULT_REASON_MESSAGE); - } - - public RebalanceFailureReason(RebalanceFailureType type, String reason) { - _type = type; - _reason = reason; - } - - public RebalanceFailureType get_type() { - return _type; - } - - public String get_reason() { - return _reason; - } - } - - void init(HelixManager manager); - - Map computeNewIdealState(final CurrentStateOutput currentStateOutput, - T clusterData, Map resourceMap); - - RebalanceFailureReason getFailureReason(); -} diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataDetector.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataDetector.java deleted file mode 100644 index 0423edf4d3..0000000000 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataDetector.java +++ /dev/null @@ -1,73 +0,0 @@ -package org.apache.helix.controller.rebalancer.waged; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import org.apache.helix.controller.dataproviders.BaseControllerDataProvider; - -import java.util.Collections; -import java.util.Map; -import java.util.Set; - -/** - * A placeholder before we have the Cluster Data Detector implemented. - * - * @param The cache class that can be handled by the detector. - */ -public class ClusterDataDetector { - /** - * All the cluster change type that may trigger a WAGED rebalancer re-calculation. - */ - public enum ChangeType { - BaselineAssignmentChange, - InstanceConfigChange, - ClusterConfigChange, - ResourceConfigChange, - ResourceIdealStatesChange, - InstanceStateChange, - OtherChange - } - - private Map> _currentChanges = - Collections.singletonMap(ChangeType.ClusterConfigChange, Collections.emptySet()); - - public void updateClusterStatus(T cache) { - } - - /** - * Returns all change types detected during the ClusterDetection stage. - */ - public Set getChangeTypes() { - return _currentChanges.keySet(); - } - - /** - * Returns a set of the names of components that changed based on the given change type. - */ - public Set getChangesBasedOnType(ChangeType changeType) { - return _currentChanges.get(changeType); - } - - /** - * Return a map of the change details . - */ - public Map> getAllChanges() { - return _currentChanges; - } -} diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataProvider.java deleted file mode 100644 index 387666c6db..0000000000 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ClusterDataProvider.java +++ /dev/null @@ -1,54 +0,0 @@ -package org.apache.helix.controller.rebalancer.waged; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; -import org.apache.helix.controller.rebalancer.waged.model.ClusterModel; -import org.apache.helix.model.ResourceAssignment; - -import java.util.Map; -import java.util.Set; - -/** - * A placeholder before we have the implementation. - * - * The data provider generates the Cluster Model based on the controller's data cache. - */ -public class ClusterDataProvider { - - /** - * @param dataProvider The controller's data cache. - * @param activeInstances The logical active instances that will be used in the calculation. Note - * This list can be different from the real active node list according to - * the rebalancer logic. - * @param clusterChanges All the cluster changes that happened after the previous rebalance. - * @param baselineAssignment The persisted Baseline assignment. - * @param bestPossibleAssignment The persisted Best Possible assignment that was generated in the - * previous rebalance. - * @return The cluster model as the input for the upcoming rebalance. - */ - protected static ClusterModel generateClusterModel(ResourceControllerDataProvider dataProvider, - Set activeInstances, Map> clusterChanges, - Map baselineAssignment, - Map bestPossibleAssignment) { - // TODO finish the implementation. - return null; - } -} diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java index aa3cfeeacc..fd740e6533 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java @@ -19,10 +19,13 @@ * under the License. */ -import org.apache.helix.HelixException; import org.apache.helix.HelixManager; +import org.apache.helix.HelixRebalanceException; +import org.apache.helix.controller.changedetector.ResourceChangeDetector; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; -import org.apache.helix.controller.rebalancer.GlobalRebalancer; +import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer; +import org.apache.helix.controller.rebalancer.internal.MappingCalculator; +import org.apache.helix.controller.rebalancer.waged.constraints.ConstraintsRebalanceAlgorithm; import org.apache.helix.controller.stages.CurrentStateOutput; import org.apache.helix.model.IdealState; import org.apache.helix.model.Resource; @@ -36,23 +39,57 @@ * A placeholder before we have the implementation. * Weight-Aware Globally-Even Distribute Rebalancer. * - * @see https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer + * @see + * Design Document + * */ -public class WagedRebalancer implements GlobalRebalancer { +public class WagedRebalancer { private static final Logger LOG = LoggerFactory.getLogger(WagedRebalancer.class); - @Override - public void init(HelixManager manager) { } + // --------- The following fields are placeholders and need replacement. -----------// + // TODO Shall we make the metadata store a static threadlocal object as well to avoid reinitialization? + private final AssignmentMetadataStore _assignmentMetadataStore; + private final RebalanceAlgorithm _rebalanceAlgorithm; + // ------------------------------------------------------------------------------------// - @Override - public Map computeNewIdealState(CurrentStateOutput currentStateOutput, - ResourceControllerDataProvider clusterData, Map resourceMap) - throws HelixException { - return new HashMap<>(); + // The cluster change detector is a stateful object. Make it static to avoid unnecessary + // reinitialization. + private static final ThreadLocal CHANGE_DETECTOR_THREAD_LOCAL = + new ThreadLocal<>(); + private final MappingCalculator _mappingCalculator; + + private ResourceChangeDetector getChangeDetector() { + if (CHANGE_DETECTOR_THREAD_LOCAL.get() == null) { + CHANGE_DETECTOR_THREAD_LOCAL.set(new ResourceChangeDetector()); + } + return CHANGE_DETECTOR_THREAD_LOCAL.get(); + } + + public WagedRebalancer(HelixManager helixManager) { + // TODO init the metadata store according to their requirement when integrate, or change to final static method if possible. + _assignmentMetadataStore = new AssignmentMetadataStore(); + // TODO init the algorithm according to the requirement when integrate. + _rebalanceAlgorithm = new ConstraintsRebalanceAlgorithm(); + + // Use the mapping calculator in DelayedAutoRebalancer for calculating the final assignment + // output. + // This calculator will translate the best possible assignment into an applicable state mapping + // based on the current states. + // TODO abstract and separate the mapping calculator logic from the DelayedAutoRebalancer + _mappingCalculator = new DelayedAutoRebalancer(); } - @Override - public RebalanceFailureReason getFailureReason() { - return new RebalanceFailureReason(RebalanceFailureType.UNKNOWN_FAILURE); + /** + * Compute the new IdealStates for all the resources input. The IdealStates include both the new + * partition assignment (in the listFiles) and the new replica state mapping (in the mapFields). + * @param clusterData The Cluster status data provider. + * @param resourceMap A map containing all the rebalancing resources. + * @param currentStateOutput The present Current State of the cluster. + * @return A map containing the computed new IdealStates. + */ + public Map computeNewIdealStates(ResourceControllerDataProvider clusterData, + Map resourceMap, final CurrentStateOutput currentStateOutput) + throws HelixRebalanceException { + return new HashMap<>(); } } diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java index 9de023b0f7..c4f7d02e9c 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java @@ -19,9 +19,9 @@ * under the License. */ +import org.apache.helix.HelixConstants; import org.apache.helix.HelixException; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; -import org.apache.helix.controller.rebalancer.waged.ClusterDataDetector; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; @@ -59,7 +59,7 @@ public class ClusterModelProvider { */ public static ClusterModel generateClusterModel(ResourceControllerDataProvider dataProvider, Map resourceMap, Set activeInstances, - Map> clusterChanges, + Map> clusterChanges, Map baselineAssignment, Map bestPossibleAssignment) { // Generate replica objects for all the resource partitions. @@ -108,14 +108,13 @@ public static ClusterModel generateClusterModel(ResourceControllerDataProvider d */ private static Set findToBeAssignedReplicas( Map> replicaMap, - Map> clusterChanges, Set activeInstances, + Map> clusterChanges, Set activeInstances, Map bestPossibleAssignment, Map> allocatedReplicas) { Set toBeAssignedReplicas = new HashSet<>(); - if (clusterChanges.containsKey(ClusterDataDetector.ChangeType.ClusterConfigChange) - || clusterChanges.containsKey(ClusterDataDetector.ChangeType.InstanceConfigChange) - || clusterChanges.containsKey(ClusterDataDetector.ChangeType.BaselineAssignmentChange)) { - // If the cluster topology or baseline assignment has been modified, need to reassign all replicas + if (clusterChanges.containsKey(HelixConstants.ChangeType.CONFIG) + || clusterChanges.containsKey(HelixConstants.ChangeType.INSTANCE_CONFIG)) { + // If the cluster topology has been modified, need to reassign all replicas toBeAssignedReplicas .addAll(replicaMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet())); } else { @@ -124,11 +123,13 @@ private static Set findToBeAssignedReplicas( Set replicas = replicaMap.get(resourceName); // 1. if the resource config/idealstate is changed, need to reassign. // 2. if the resource does appear in the best possible assignment, need to reassign. - if (clusterChanges.getOrDefault(ClusterDataDetector.ChangeType.ResourceConfigChange, - Collections.emptySet()).contains(resourceName) || clusterChanges - .getOrDefault(ClusterDataDetector.ChangeType.ResourceIdealStatesChange, - Collections.emptySet()).contains(resourceName) || !bestPossibleAssignment - .containsKey(resourceName)) { + if (clusterChanges + .getOrDefault(HelixConstants.ChangeType.RESOURCE_CONFIG, Collections.emptySet()) + .contains(resourceName) + || clusterChanges + .getOrDefault(HelixConstants.ChangeType.IDEAL_STATE, Collections.emptySet()) + .contains(resourceName) + || !bestPossibleAssignment.containsKey(resourceName)) { toBeAssignedReplicas.addAll(replicas); continue; // go to check next resource } else { diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java index 85a4add52d..e0b77d006d 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java @@ -19,13 +19,9 @@ * under the License. */ -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Callable; import org.apache.helix.HelixException; import org.apache.helix.HelixManager; +import org.apache.helix.HelixRebalanceException; import org.apache.helix.controller.LogUtil; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; import org.apache.helix.controller.pipeline.AbstractBaseStage; @@ -36,6 +32,7 @@ import org.apache.helix.controller.rebalancer.Rebalancer; import org.apache.helix.controller.rebalancer.SemiAutoRebalancer; import org.apache.helix.controller.rebalancer.internal.MappingCalculator; +import org.apache.helix.controller.rebalancer.waged.WagedRebalancer; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.MaintenanceSignal; @@ -50,23 +47,31 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; + /** * For partition compute best possible (instance,state) pair based on * IdealState,StateModel,LiveInstance */ public class BestPossibleStateCalcStage extends AbstractBaseStage { - private static final Logger logger = LoggerFactory.getLogger(BestPossibleStateCalcStage.class.getName()); + private static final Logger logger = + LoggerFactory.getLogger(BestPossibleStateCalcStage.class.getName()); @Override public void process(ClusterEvent event) throws Exception { _eventId = event.getEventId(); - CurrentStateOutput currentStateOutput = - event.getAttribute(AttributeName.CURRENT_STATE.name()); + CurrentStateOutput currentStateOutput = event.getAttribute(AttributeName.CURRENT_STATE.name()); final Map resourceMap = event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name()); final ClusterStatusMonitor clusterStatusMonitor = event.getAttribute(AttributeName.clusterStatusMonitor.name()); - ResourceControllerDataProvider cache = event.getAttribute(AttributeName.ControllerDataProvider.name()); + ResourceControllerDataProvider cache = + event.getAttribute(AttributeName.ControllerDataProvider.name()); if (currentStateOutput == null || resourceMap == null || cache == null) { throw new StageException( @@ -89,8 +94,7 @@ public Object call() { resourceMap, stateModelDefMap); } } catch (Exception e) { - LogUtil - .logError(logger, _eventId, "Could not update cluster status metrics!", e); + LogUtil.logError(logger, _eventId, "Could not update cluster status metrics!", e); } return null; } @@ -99,7 +103,8 @@ public Object call() { private BestPossibleStateOutput compute(ClusterEvent event, Map resourceMap, CurrentStateOutput currentStateOutput) { - ResourceControllerDataProvider cache = event.getAttribute(AttributeName.ControllerDataProvider.name()); + ResourceControllerDataProvider cache = + event.getAttribute(AttributeName.ControllerDataProvider.name()); BestPossibleStateOutput output = new BestPossibleStateOutput(); HelixManager helixManager = event.getAttribute(AttributeName.helixmanager.name()); @@ -111,19 +116,50 @@ private BestPossibleStateOutput compute(ClusterEvent event, Map newIdealStates = new HashMap<>(); + WagedRebalancer wagedRebalancer = new WagedRebalancer(helixManager); + try { + newIdealStates + .putAll(wagedRebalancer.computeNewIdealStates(cache, resourceMap, currentStateOutput)); + } catch (HelixRebalanceException ex) { + // Note that unlike the legacy rebalancer, the WAGED rebalance won't return partial result. + // Since it calculates for all the eligible resources globally, a partial result is invalid. + // TODO propagate the rebalancer failure information to updateRebalanceStatus for monitoring. + LogUtil.logError(logger, _eventId, String + .format("Failed to calculate the new Ideal States using the rebalancer %s due to %s", + wagedRebalancer.getClass().getSimpleName(), ex.getFailureType()), ex); + } + final List failureResources = new ArrayList<>(); Iterator itr = resourceMap.values().iterator(); while (itr.hasNext()) { Resource resource = itr.next(); boolean result = false; - try { - result = - computeResourceBestPossibleState(event, cache, currentStateOutput, resource, output); - } catch (HelixException ex) { - LogUtil.logError(logger, _eventId, - "Exception when calculating best possible states for " + resource.getResourceName(), - ex); - + IdealState is = newIdealStates.get(resource.getResourceName()); + if (is != null) { + // 2. Check if the WAGED rebalancer has calculated for this resource or not. + result = checkBestPossibleStateCalculation(is); + if (result) { + // The WAGED rebalancer calculates a valid result, record in the output + updateBestPossibleStateOutput(output, resource, is); + } + } else { + // 3. The WAGED rebalancer skips calculating the resource assignment, fallback to use a + // legacy resource rebalancer if applicable. + // If this calculation fails, the resource will be reported in the failureResources list. + try { + result = + computeSingleResourceBestPossibleState(event, cache, currentStateOutput, resource, + output); + } catch (HelixException ex) { + LogUtil.logError(logger, _eventId, + "Exception when calculating best possible states for " + resource.getResourceName(), + ex); + } } if (!result) { failureResources.add(resource.getResourceName()); @@ -184,8 +220,9 @@ private boolean validateOfflineInstancesLimit(final ResourceControllerDataProvid if (manager != null) { if (manager.getHelixDataAccessor() .getProperty(manager.getHelixDataAccessor().keyBuilder().maintenance()) == null) { - manager.getClusterManagmentTool().autoEnableMaintenanceMode(manager.getClusterName(), - true, errMsg, MaintenanceSignal.AutoTriggerReason.MAX_OFFLINE_INSTANCES_EXCEEDED); + manager.getClusterManagmentTool() + .autoEnableMaintenanceMode(manager.getClusterName(), true, errMsg, + MaintenanceSignal.AutoTriggerReason.MAX_OFFLINE_INSTANCES_EXCEEDED); LogUtil.logWarn(logger, _eventId, errMsg); } } else { @@ -198,8 +235,19 @@ private boolean validateOfflineInstancesLimit(final ResourceControllerDataProvid return true; } - private boolean computeResourceBestPossibleState(ClusterEvent event, ResourceControllerDataProvider cache, - CurrentStateOutput currentStateOutput, Resource resource, BestPossibleStateOutput output) { + private void updateBestPossibleStateOutput(BestPossibleStateOutput output, Resource resource, + IdealState computedIdealState) { + output.setPreferenceLists(resource.getResourceName(), computedIdealState.getPreferenceLists()); + for (Partition partition : resource.getPartitions()) { + Map newStateMap = + computedIdealState.getInstanceStateMap(partition.getPartitionName()); + output.setState(resource.getResourceName(), partition, newStateMap); + } + } + + private boolean computeSingleResourceBestPossibleState(ClusterEvent event, + ResourceControllerDataProvider cache, CurrentStateOutput currentStateOutput, + Resource resource, BestPossibleStateOutput output) { // for each ideal state // read the state model def // for each resource @@ -228,12 +276,13 @@ private boolean computeResourceBestPossibleState(ClusterEvent event, ResourceCon Rebalancer rebalancer = getRebalancer(idealState, resourceName, cache.isMaintenanceModeEnabled()); - MappingCalculator mappingCalculator = getMappingCalculator(rebalancer, resourceName); + MappingCalculator mappingCalculator = + getMappingCalculator(rebalancer, resourceName); if (rebalancer == null || mappingCalculator == null) { - LogUtil.logError(logger, _eventId, - "Error computing assignment for resource " + resourceName + ". no rebalancer found. rebalancer: " + rebalancer - + " mappingCalculator: " + mappingCalculator); + LogUtil.logError(logger, _eventId, "Error computing assignment for resource " + resourceName + + ". no rebalancer found. rebalancer: " + rebalancer + " mappingCalculator: " + + mappingCalculator); } if (rebalancer != null && mappingCalculator != null) { @@ -298,8 +347,8 @@ private boolean checkBestPossibleStateCalculation(IdealState idealState) { } } - private Rebalancer getRebalancer(IdealState idealState, String resourceName, - boolean isMaintenanceModeEnabled) { + private Rebalancer getRebalancer(IdealState idealState, + String resourceName, boolean isMaintenanceModeEnabled) { Rebalancer customizedRebalancer = null; String rebalancerClassName = idealState.getRebalancerClassName(); if (rebalancerClassName != null) { diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java index f92a66ccaf..1221b6f28b 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java @@ -19,8 +19,8 @@ * under the License. */ +import org.apache.helix.HelixConstants; import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; -import org.apache.helix.controller.rebalancer.waged.ClusterDataDetector; import org.apache.helix.controller.rebalancer.waged.WagedRebalancer; import org.apache.helix.model.CurrentState; import org.apache.helix.model.IdealState; @@ -177,7 +177,7 @@ public void testGenerateClusterModel() throws IOException { // 5. test with best possible assignment but cluster topology is changed clusterModel = ClusterModelProvider.generateClusterModel(testCache, _resourceNames.stream() .collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))), - _instances, Collections.singletonMap(ClusterDataDetector.ChangeType.ClusterConfigChange, + _instances, Collections.singletonMap(HelixConstants.ChangeType.CONFIG, Collections.emptySet()), Collections.emptyMap(), bestPossibleAssignment); // There should be no existing assignment since the topology change invalidates all existing assignment Assert.assertTrue(clusterModel.getContext().getAssignmentForFaultZoneMap().values().stream() @@ -194,7 +194,7 @@ public void testGenerateClusterModel() throws IOException { String changedResourceName = _resourceNames.get(0); clusterModel = ClusterModelProvider.generateClusterModel(testCache, _resourceNames.stream() .collect(Collectors.toMap(resource -> resource, resource -> new Resource(resource))), - _instances, Collections.singletonMap(ClusterDataDetector.ChangeType.ResourceConfigChange, + _instances, Collections.singletonMap(HelixConstants.ChangeType.RESOURCE_CONFIG, Collections.singleton(changedResourceName)), Collections.emptyMap(), bestPossibleAssignment); // There should be no existing assignment for all the resource except for resource2.