Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.apache.helix.api.exceptions;

public class InstanceConfigMismatchException extends IllegalArgumentException {
public InstanceConfigMismatchException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
import java.util.Set;

import org.apache.helix.HelixException;
import org.apache.helix.api.exceptions.InstanceConfigMismatchException;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.ClusterTopologyConfig;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.util.InstanceValidationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -204,13 +204,16 @@ private Node createClusterTree(ClusterConfig clusterConfig, boolean faultZoneLev
unnecessaryTopoKeys.forEach(instanceTopologyMap::remove);
}
addEndNode(root, instanceName, instanceTopologyMap, weight, _liveInstances);
} catch (InstanceConfigMismatchException e) {
logger.warn("Topology setting {} for instance {} is unset or invalid due to mismatch with cluster topology "
+ "configuration. Instance will be ignored! Error: {}", insConfig.getDomainAsString(), instanceName,
e.getMessage());
} catch (IllegalArgumentException e) {
if (insConfig.getInstanceEnabled()) {
throw e;
} else {
logger.warn("Topology setting {} for instance {} is unset or invalid, ignore the instance!",
insConfig.getDomainAsString(), instanceName);
}
logger.warn("Topology setting {} for instance {} is unset or invalid, ignore the instance!",
insConfig.getDomainAsString(), instanceName);
}
}
return root;
Expand Down Expand Up @@ -256,11 +259,15 @@ private static LinkedHashMap<String, String> computeInstanceTopologyMapHelper(
instanceName));
}
int numOfMatchedKeys = 0;
boolean shouldThrowExceptionDueToMissingConfigs = false;
for (String key : clusterTopologyConfig.getTopologyKeyDefaultValue().keySet()) {
// if a key does not exist in the instance domain config, using the default domain value.
String value = domainAsMap.get(key);
if (value == null || value.length() == 0) {
if (value == null || value.isEmpty()) {
value = clusterTopologyConfig.getTopologyKeyDefaultValue().get(key);
if (clusterTopologyConfig.getRequiredMatchingTopologyKeys().contains(key)) {
shouldThrowExceptionDueToMissingConfigs = true;
}
} else {
numOfMatchedKeys++;
}
Expand All @@ -270,10 +277,13 @@ private static LinkedHashMap<String, String> computeInstanceTopologyMapHelper(
}
}
if (numOfMatchedKeys < clusterTopologyConfig.getTopologyKeyDefaultValue().size()) {
logger.warn(
"Key-value pairs in InstanceConfig.Domain {} do not align with keys in ClusterConfig.Topology "
+ "{}, using default domain value instead", instanceConfig.getDomainAsString(),
clusterTopologyConfig.getTopologyKeyDefaultValue().keySet());
String errorMessage =
String.format("Instance %s does not have all the keys in ClusterConfig. Topology %s.", instanceName,
clusterTopologyConfig.getTopologyKeyDefaultValue().keySet());
logger.warn(errorMessage);
if (shouldThrowExceptionDueToMissingConfigs) {
throw new InstanceConfigMismatchException(errorMessage);
}
}
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

Expand All @@ -42,11 +43,14 @@
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This util class generates Cluster Model object based on the controller's data cache.
*/
public class ClusterModelProvider {
private static Logger logger = LoggerFactory.getLogger(ClusterModelProvider.class);

private enum RebalanceScopeType {
// Set the rebalance scope to cover the difference between the current assignment and the
Expand Down Expand Up @@ -268,7 +272,7 @@ private static ClusterModel generateClusterModel(ResourceControllerDataProvider
// Construct and initialize cluster context.
ClusterContext context = new ClusterContext(
replicaMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet()),
assignableNodes, logicalIdIdealAssignment, logicalIdCurrentAssignment,
assignableNodes, logicalIdIdealAssignment, logicalIdCurrentAssignment,
dataProvider.getClusterConfig(), dataProvider);

// Initial the cluster context with the allocated assignments.
Expand Down Expand Up @@ -607,10 +611,21 @@ private static Set<AssignableNode> getAllAssignableNodes(ClusterConfig clusterCo
ClusterTopologyConfig clusterTopologyConfig =
ClusterTopologyConfig.createFromClusterConfig(clusterConfig);
return activeInstances.parallelStream()
.filter(instanceConfigMap::containsKey).map(
instanceName -> new AssignableNode(clusterConfig, clusterTopologyConfig,
instanceConfigMap.get(instanceName),
instanceName)).collect(Collectors.toSet());
.filter(instanceConfigMap::containsKey)
.map(instanceName -> {
try {
return new AssignableNode(clusterConfig, clusterTopologyConfig,
instanceConfigMap.get(instanceName), instanceName);
} catch (IllegalArgumentException e) {
// Log the filtering of invalid instance configuration
// This helps with debugging when instances are unexpectedly excluded
logger.warn("Instance {} has invalid configuration and will be excluded from the assignable nodes: {}",
instanceName, e.getMessage());
return null;
}
})
.filter(Objects::nonNull)
.collect(Collectors.toSet());
}

/**
Expand Down
30 changes: 27 additions & 3 deletions helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,10 @@ public enum ClusterConfigProperty {
PARTICIPANT_DEREGISTRATION_TIMEOUT,

// Allow disabled partitions to remain OFFLINE instead of being reassigned in WAGED rebalancer
RELAXED_DISABLED_PARTITION_CONSTRAINT
RELAXED_DISABLED_PARTITION_CONSTRAINT,

// Ignore instances which do not match the required topology keys of the cluster
REQUIRED_INSTANCE_TOPOLOGY_KEYS,
}

public enum GlobalRebalancePreferenceKey {
Expand Down Expand Up @@ -871,7 +874,7 @@ public void enableP2PMessage(boolean enabled) {

/**
* Whether the relaxed disabled partition constraint is enabled for this cluster.
* When enabled, WAGED rebalancer will allow disabled partitions to remain OFFLINE
* When enabled, WAGED rebalancer will allow disabled partitions to remain OFFLINE
* instead of being immediately reassigned, making behavior consistent with CrushEd.
* By default it is disabled if not set.
* @return true if relaxed disabled partition constraint is enabled, false otherwise
Expand All @@ -882,14 +885,35 @@ public boolean isRelaxedDisabledPartitionConstraintEnabled() {

/**
* Enable/disable relaxed disabled partition constraint for this cluster.
* When enabled, WAGED rebalancer will allow disabled partitions to remain OFFLINE
* When enabled, WAGED rebalancer will allow disabled partitions to remain OFFLINE
* instead of being immediately reassigned, making behavior consistent with CrushEd.
* @param enabled true to enable relaxed constraint, false for strict constraint (default)
*/
public void setRelaxedDisabledPartitionConstraint(boolean enabled) {
_record.setBooleanField(ClusterConfigProperty.RELAXED_DISABLED_PARTITION_CONSTRAINT.name(), enabled);
}

/**
* Get the required Instance Topology Keys. If not configured, return an empty list.
* @return a list of required topology keys
*/
public List<String> getRequiredInstanceTopologyKeys() {
List<String> topologyKeys = _record.getListField(ClusterConfigProperty.REQUIRED_INSTANCE_TOPOLOGY_KEYS.name());
if (topologyKeys == null) {
return Collections.emptyList();
}
return Collections.unmodifiableList(topologyKeys);
}

/**
* Set the required Instance Topology Keys which must be present on all instances in the cluster
* if they are present in cluster config.
* @param topologyKeys
*/
public void setRequiredInstanceTopologyKeys(List<String> topologyKeys) {
_record.setListField(ClusterConfigProperty.REQUIRED_INSTANCE_TOPOLOGY_KEYS.name(), topologyKeys);
}

/**
* Set the required Instance Capacity Keys.
* @param capacityKeys - the capacity key list.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
* under the License.
*/

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;

import org.apache.helix.HelixException;
import org.apache.helix.controller.rebalancer.topology.Topology;

Expand All @@ -31,13 +34,15 @@ public class ClusterTopologyConfig {
private final String _endNodeType;
private final String _faultZoneType;
private final LinkedHashMap<String, String> _topologyKeyDefaultValue;
private final List<String> _requiredMatchingTopologyKeys;

private ClusterTopologyConfig(boolean topologyAwareEnabled, String endNodeType, String faultZoneType,
LinkedHashMap<String, String> topologyKeyDefaultValue) {
LinkedHashMap<String, String> topologyKeyDefaultValue, List<String> requiredMatchingTopologyKeys) {
_topologyAwareEnabled = topologyAwareEnabled;
_endNodeType = endNodeType;
_faultZoneType = faultZoneType;
_topologyKeyDefaultValue = topologyKeyDefaultValue;
_requiredMatchingTopologyKeys = requiredMatchingTopologyKeys;
}

/**
Expand All @@ -52,12 +57,14 @@ public static ClusterTopologyConfig createFromClusterConfig(ClusterConfig cluste
false,
Topology.Types.INSTANCE.name(),
Topology.Types.INSTANCE.name(),
new LinkedHashMap<>());
new LinkedHashMap<>(),
new ArrayList<>());
}
// Assign default cluster topology definition, i,e. /root/zone/instance
String endNodeType = Topology.Types.INSTANCE.name();
String faultZoneType = Topology.Types.ZONE.name();
LinkedHashMap<String, String> topologyKeyDefaultValue = new LinkedHashMap<>();
List<String> requiredMatchingTopologyKeys = clusterConfig.getRequiredInstanceTopologyKeys();

String topologyDef = clusterConfig.getTopology();
if (topologyDef != null) {
Expand All @@ -79,7 +86,7 @@ public static ClusterTopologyConfig createFromClusterConfig(ClusterConfig cluste
faultZoneType, clusterConfig.getTopology()));
}
}
return new ClusterTopologyConfig(true, endNodeType, faultZoneType, topologyKeyDefaultValue);
return new ClusterTopologyConfig(true, endNodeType, faultZoneType, topologyKeyDefaultValue, requiredMatchingTopologyKeys);
}

public boolean isTopologyAwareEnabled() {
Expand All @@ -97,4 +104,8 @@ public String getFaultZoneType() {
public LinkedHashMap<String, String> getTopologyKeyDefaultValue() {
return _topologyKeyDefaultValue;
}

public List<String> getRequiredMatchingTopologyKeys() {
return _requiredMatchingTopologyKeys;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -676,4 +678,90 @@ static class MockAssignableReplica extends AssignableReplica {
super(new ClusterConfig("testCluster"), resourceConfig, partition, replicaState, 1);
}
}

@Test
public void testGetAllAssignableNodes_success() {
ClusterConfig clusterConfig = new ClusterConfig("testCluster");
clusterConfig.setTopologyAwareEnabled(true);
clusterConfig.setTopology("/zone/instance");
clusterConfig.setFaultZoneType("zone");

InstanceConfig instance1 = new InstanceConfig("instance1");
instance1.setDomain("zone=zone1,instance=instance1");
InstanceConfig instance2 = new InstanceConfig("instance2");
instance2.setDomain("zone=zone2,instance=instance2");

Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
instanceConfigMap.put("instance1", instance1);
instanceConfigMap.put("instance2", instance2);

Set<String> activeInstances = new HashSet<>(Arrays.asList("instance1", "instance2"));

Set<AssignableNode> nodes = invokeGetAllAssignableNodes(clusterConfig, instanceConfigMap, activeInstances);
Assert.assertEquals(nodes.size(), 2);
Set<String> nodeNames = new HashSet<>();
for (AssignableNode node : nodes) {
nodeNames.add(node.getInstanceName());
}
Assert.assertTrue(nodeNames.contains("instance1"));
Assert.assertTrue(nodeNames.contains("instance2"));
}

@Test
public void testGetAllAssignableNodes_missingConfig() {
ClusterConfig clusterConfig = new ClusterConfig("testCluster");
clusterConfig.setTopologyAwareEnabled(true);
clusterConfig.setTopology("/zone/instance");
clusterConfig.setFaultZoneType("zone");

InstanceConfig instance1 = new InstanceConfig("instance1");
instance1.setDomain("zone=zone1,instance=instance1");
// instance2 config missing
Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
instanceConfigMap.put("instance1", instance1);

Set<String> activeInstances = new HashSet<>(Arrays.asList("instance1", "instance2"));

Set<AssignableNode> nodes = invokeGetAllAssignableNodes(clusterConfig, instanceConfigMap, activeInstances);
Assert.assertEquals(nodes.size(), 1);
AssignableNode node = nodes.iterator().next();
Assert.assertEquals(node.getInstanceName(), "instance1");
}

@Test
public void testGetAllAssignableNodes_illegalArgumentException() {
ClusterConfig clusterConfig = new ClusterConfig("testCluster");
clusterConfig.setTopologyAwareEnabled(true);
clusterConfig.setTopology("/zone/instance");
clusterConfig.setFaultZoneType("zone");

// Create an invalid InstanceConfig that will cause IllegalArgumentException
InstanceConfig invalidInstance = Mockito.mock(InstanceConfig.class);
Mockito.when(invalidInstance.getInstanceName()).thenReturn("invalidInstance");
// Simulate invalid domain or missing required config
Mockito.when(invalidInstance.getDomainAsMap()).thenThrow(new IllegalArgumentException("Invalid config"));

Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
instanceConfigMap.put("invalidInstance", invalidInstance);

Set<String> activeInstances = new HashSet<>(Collections.singletonList("invalidInstance"));

Set<AssignableNode> nodes = invokeGetAllAssignableNodes(clusterConfig, instanceConfigMap, activeInstances);
// Should be empty due to exception
Assert.assertTrue(nodes.isEmpty());
}

private Set<AssignableNode> invokeGetAllAssignableNodes(ClusterConfig clusterConfig,
Map<String, InstanceConfig> instanceConfigMap, Set<String> activeInstances) {
// Use reflection to access private static method
try {
Method getAllAssignableNodesMethod = ClusterModelProvider.class.getDeclaredMethod(
"getAllAssignableNodes", ClusterConfig.class, Map.class, Set.class);
getAllAssignableNodesMethod.setAccessible(true);
Object result = getAllAssignableNodesMethod.invoke(null, clusterConfig, instanceConfigMap, activeInstances);
return (Set<AssignableNode>) result;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Loading