diff --git a/helix-core/src/main/java/org/apache/helix/api/exceptions/InstanceConfigMismatchException.java b/helix-core/src/main/java/org/apache/helix/api/exceptions/InstanceConfigMismatchException.java new file mode 100644 index 0000000000..5dc81dafcf --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/api/exceptions/InstanceConfigMismatchException.java @@ -0,0 +1,7 @@ +package org.apache.helix.api.exceptions; + +public class InstanceConfigMismatchException extends IllegalArgumentException { + public InstanceConfigMismatchException(String message) { + super(message); + } +} \ No newline at end of file diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java index 2618275b13..170464f218 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/topology/Topology.java @@ -30,10 +30,10 @@ import java.util.Set; import org.apache.helix.HelixException; +import org.apache.helix.api.exceptions.InstanceConfigMismatchException; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.ClusterTopologyConfig; import org.apache.helix.model.InstanceConfig; -import org.apache.helix.util.InstanceValidationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -204,13 +204,16 @@ private Node createClusterTree(ClusterConfig clusterConfig, boolean faultZoneLev unnecessaryTopoKeys.forEach(instanceTopologyMap::remove); } addEndNode(root, instanceName, instanceTopologyMap, weight, _liveInstances); + } catch (InstanceConfigMismatchException e) { + logger.warn("Topology setting {} for instance {} is unset or invalid due to mismatch with cluster topology " + + "configuration. Instance will be ignored! Error: {}", insConfig.getDomainAsString(), instanceName, + e.getMessage()); } catch (IllegalArgumentException e) { if (insConfig.getInstanceEnabled()) { throw e; - } else { - logger.warn("Topology setting {} for instance {} is unset or invalid, ignore the instance!", - insConfig.getDomainAsString(), instanceName); } + logger.warn("Topology setting {} for instance {} is unset or invalid, ignore the instance!", + insConfig.getDomainAsString(), instanceName); } } return root; @@ -256,11 +259,15 @@ private static LinkedHashMap computeInstanceTopologyMapHelper( instanceName)); } int numOfMatchedKeys = 0; + boolean shouldThrowExceptionDueToMissingConfigs = false; for (String key : clusterTopologyConfig.getTopologyKeyDefaultValue().keySet()) { // if a key does not exist in the instance domain config, using the default domain value. String value = domainAsMap.get(key); - if (value == null || value.length() == 0) { + if (value == null || value.isEmpty()) { value = clusterTopologyConfig.getTopologyKeyDefaultValue().get(key); + if (clusterTopologyConfig.getRequiredMatchingTopologyKeys().contains(key)) { + shouldThrowExceptionDueToMissingConfigs = true; + } } else { numOfMatchedKeys++; } @@ -270,10 +277,13 @@ private static LinkedHashMap computeInstanceTopologyMapHelper( } } if (numOfMatchedKeys < clusterTopologyConfig.getTopologyKeyDefaultValue().size()) { - logger.warn( - "Key-value pairs in InstanceConfig.Domain {} do not align with keys in ClusterConfig.Topology " - + "{}, using default domain value instead", instanceConfig.getDomainAsString(), - clusterTopologyConfig.getTopologyKeyDefaultValue().keySet()); + String errorMessage = + String.format("Instance %s does not have all the keys in ClusterConfig. Topology %s.", instanceName, + clusterTopologyConfig.getTopologyKeyDefaultValue().keySet()); + logger.warn(errorMessage); + if (shouldThrowExceptionDueToMissingConfigs) { + throw new InstanceConfigMismatchException(errorMessage); + } } } } else { diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java index 619ee23ca2..946e3c980c 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java @@ -26,6 +26,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -42,11 +43,14 @@ import org.apache.helix.model.ResourceAssignment; import org.apache.helix.model.ResourceConfig; import org.apache.helix.model.StateModelDefinition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This util class generates Cluster Model object based on the controller's data cache. */ public class ClusterModelProvider { + private static Logger logger = LoggerFactory.getLogger(ClusterModelProvider.class); private enum RebalanceScopeType { // Set the rebalance scope to cover the difference between the current assignment and the @@ -268,7 +272,7 @@ private static ClusterModel generateClusterModel(ResourceControllerDataProvider // Construct and initialize cluster context. ClusterContext context = new ClusterContext( replicaMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet()), - assignableNodes, logicalIdIdealAssignment, logicalIdCurrentAssignment, + assignableNodes, logicalIdIdealAssignment, logicalIdCurrentAssignment, dataProvider.getClusterConfig(), dataProvider); // Initial the cluster context with the allocated assignments. @@ -607,10 +611,21 @@ private static Set getAllAssignableNodes(ClusterConfig clusterCo ClusterTopologyConfig clusterTopologyConfig = ClusterTopologyConfig.createFromClusterConfig(clusterConfig); return activeInstances.parallelStream() - .filter(instanceConfigMap::containsKey).map( - instanceName -> new AssignableNode(clusterConfig, clusterTopologyConfig, - instanceConfigMap.get(instanceName), - instanceName)).collect(Collectors.toSet()); + .filter(instanceConfigMap::containsKey) + .map(instanceName -> { + try { + return new AssignableNode(clusterConfig, clusterTopologyConfig, + instanceConfigMap.get(instanceName), instanceName); + } catch (IllegalArgumentException e) { + // Log the filtering of invalid instance configuration + // This helps with debugging when instances are unexpectedly excluded + logger.warn("Instance {} has invalid configuration and will be excluded from the assignable nodes: {}", + instanceName, e.getMessage()); + return null; + } + }) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); } /** diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java index f5977ec890..1be24b1485 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java @@ -170,7 +170,10 @@ public enum ClusterConfigProperty { PARTICIPANT_DEREGISTRATION_TIMEOUT, // Allow disabled partitions to remain OFFLINE instead of being reassigned in WAGED rebalancer - RELAXED_DISABLED_PARTITION_CONSTRAINT + RELAXED_DISABLED_PARTITION_CONSTRAINT, + + // Ignore instances which do not match the required topology keys of the cluster + REQUIRED_INSTANCE_TOPOLOGY_KEYS, } public enum GlobalRebalancePreferenceKey { @@ -871,7 +874,7 @@ public void enableP2PMessage(boolean enabled) { /** * Whether the relaxed disabled partition constraint is enabled for this cluster. - * When enabled, WAGED rebalancer will allow disabled partitions to remain OFFLINE + * When enabled, WAGED rebalancer will allow disabled partitions to remain OFFLINE * instead of being immediately reassigned, making behavior consistent with CrushEd. * By default it is disabled if not set. * @return true if relaxed disabled partition constraint is enabled, false otherwise @@ -882,7 +885,7 @@ public boolean isRelaxedDisabledPartitionConstraintEnabled() { /** * Enable/disable relaxed disabled partition constraint for this cluster. - * When enabled, WAGED rebalancer will allow disabled partitions to remain OFFLINE + * When enabled, WAGED rebalancer will allow disabled partitions to remain OFFLINE * instead of being immediately reassigned, making behavior consistent with CrushEd. * @param enabled true to enable relaxed constraint, false for strict constraint (default) */ @@ -890,6 +893,27 @@ public void setRelaxedDisabledPartitionConstraint(boolean enabled) { _record.setBooleanField(ClusterConfigProperty.RELAXED_DISABLED_PARTITION_CONSTRAINT.name(), enabled); } + /** + * Get the required Instance Topology Keys. If not configured, return an empty list. + * @return a list of required topology keys + */ + public List getRequiredInstanceTopologyKeys() { + List topologyKeys = _record.getListField(ClusterConfigProperty.REQUIRED_INSTANCE_TOPOLOGY_KEYS.name()); + if (topologyKeys == null) { + return Collections.emptyList(); + } + return Collections.unmodifiableList(topologyKeys); + } + + /** + * Set the required Instance Topology Keys which must be present on all instances in the cluster + * if they are present in cluster config. + * @param topologyKeys + */ + public void setRequiredInstanceTopologyKeys(List topologyKeys) { + _record.setListField(ClusterConfigProperty.REQUIRED_INSTANCE_TOPOLOGY_KEYS.name(), topologyKeys); + } + /** * Set the required Instance Capacity Keys. * @param capacityKeys - the capacity key list. diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterTopologyConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterTopologyConfig.java index 8becbaa41d..d8e2e346b0 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ClusterTopologyConfig.java +++ b/helix-core/src/main/java/org/apache/helix/model/ClusterTopologyConfig.java @@ -19,7 +19,10 @@ * under the License. */ +import java.util.ArrayList; import java.util.LinkedHashMap; +import java.util.List; + import org.apache.helix.HelixException; import org.apache.helix.controller.rebalancer.topology.Topology; @@ -31,13 +34,15 @@ public class ClusterTopologyConfig { private final String _endNodeType; private final String _faultZoneType; private final LinkedHashMap _topologyKeyDefaultValue; + private final List _requiredMatchingTopologyKeys; private ClusterTopologyConfig(boolean topologyAwareEnabled, String endNodeType, String faultZoneType, - LinkedHashMap topologyKeyDefaultValue) { + LinkedHashMap topologyKeyDefaultValue, List requiredMatchingTopologyKeys) { _topologyAwareEnabled = topologyAwareEnabled; _endNodeType = endNodeType; _faultZoneType = faultZoneType; _topologyKeyDefaultValue = topologyKeyDefaultValue; + _requiredMatchingTopologyKeys = requiredMatchingTopologyKeys; } /** @@ -52,12 +57,14 @@ public static ClusterTopologyConfig createFromClusterConfig(ClusterConfig cluste false, Topology.Types.INSTANCE.name(), Topology.Types.INSTANCE.name(), - new LinkedHashMap<>()); + new LinkedHashMap<>(), + new ArrayList<>()); } // Assign default cluster topology definition, i,e. /root/zone/instance String endNodeType = Topology.Types.INSTANCE.name(); String faultZoneType = Topology.Types.ZONE.name(); LinkedHashMap topologyKeyDefaultValue = new LinkedHashMap<>(); + List requiredMatchingTopologyKeys = clusterConfig.getRequiredInstanceTopologyKeys(); String topologyDef = clusterConfig.getTopology(); if (topologyDef != null) { @@ -79,7 +86,7 @@ public static ClusterTopologyConfig createFromClusterConfig(ClusterConfig cluste faultZoneType, clusterConfig.getTopology())); } } - return new ClusterTopologyConfig(true, endNodeType, faultZoneType, topologyKeyDefaultValue); + return new ClusterTopologyConfig(true, endNodeType, faultZoneType, topologyKeyDefaultValue, requiredMatchingTopologyKeys); } public boolean isTopologyAwareEnabled() { @@ -97,4 +104,8 @@ public String getFaultZoneType() { public LinkedHashMap getTopologyKeyDefaultValue() { return _topologyKeyDefaultValue; } + + public List getRequiredMatchingTopologyKeys() { + return _requiredMatchingTopologyKeys; + } } diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java index 2e41b6dbea..fc807b08f1 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java +++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestClusterModelProvider.java @@ -22,6 +22,8 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import java.io.IOException; +import java.lang.reflect.Method; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -676,4 +678,90 @@ static class MockAssignableReplica extends AssignableReplica { super(new ClusterConfig("testCluster"), resourceConfig, partition, replicaState, 1); } } + + @Test + public void testGetAllAssignableNodes_success() { + ClusterConfig clusterConfig = new ClusterConfig("testCluster"); + clusterConfig.setTopologyAwareEnabled(true); + clusterConfig.setTopology("/zone/instance"); + clusterConfig.setFaultZoneType("zone"); + + InstanceConfig instance1 = new InstanceConfig("instance1"); + instance1.setDomain("zone=zone1,instance=instance1"); + InstanceConfig instance2 = new InstanceConfig("instance2"); + instance2.setDomain("zone=zone2,instance=instance2"); + + Map instanceConfigMap = new HashMap<>(); + instanceConfigMap.put("instance1", instance1); + instanceConfigMap.put("instance2", instance2); + + Set activeInstances = new HashSet<>(Arrays.asList("instance1", "instance2")); + + Set nodes = invokeGetAllAssignableNodes(clusterConfig, instanceConfigMap, activeInstances); + Assert.assertEquals(nodes.size(), 2); + Set nodeNames = new HashSet<>(); + for (AssignableNode node : nodes) { + nodeNames.add(node.getInstanceName()); + } + Assert.assertTrue(nodeNames.contains("instance1")); + Assert.assertTrue(nodeNames.contains("instance2")); + } + + @Test + public void testGetAllAssignableNodes_missingConfig() { + ClusterConfig clusterConfig = new ClusterConfig("testCluster"); + clusterConfig.setTopologyAwareEnabled(true); + clusterConfig.setTopology("/zone/instance"); + clusterConfig.setFaultZoneType("zone"); + + InstanceConfig instance1 = new InstanceConfig("instance1"); + instance1.setDomain("zone=zone1,instance=instance1"); + // instance2 config missing + Map instanceConfigMap = new HashMap<>(); + instanceConfigMap.put("instance1", instance1); + + Set activeInstances = new HashSet<>(Arrays.asList("instance1", "instance2")); + + Set nodes = invokeGetAllAssignableNodes(clusterConfig, instanceConfigMap, activeInstances); + Assert.assertEquals(nodes.size(), 1); + AssignableNode node = nodes.iterator().next(); + Assert.assertEquals(node.getInstanceName(), "instance1"); + } + + @Test + public void testGetAllAssignableNodes_illegalArgumentException() { + ClusterConfig clusterConfig = new ClusterConfig("testCluster"); + clusterConfig.setTopologyAwareEnabled(true); + clusterConfig.setTopology("/zone/instance"); + clusterConfig.setFaultZoneType("zone"); + + // Create an invalid InstanceConfig that will cause IllegalArgumentException + InstanceConfig invalidInstance = Mockito.mock(InstanceConfig.class); + Mockito.when(invalidInstance.getInstanceName()).thenReturn("invalidInstance"); + // Simulate invalid domain or missing required config + Mockito.when(invalidInstance.getDomainAsMap()).thenThrow(new IllegalArgumentException("Invalid config")); + + Map instanceConfigMap = new HashMap<>(); + instanceConfigMap.put("invalidInstance", invalidInstance); + + Set activeInstances = new HashSet<>(Collections.singletonList("invalidInstance")); + + Set nodes = invokeGetAllAssignableNodes(clusterConfig, instanceConfigMap, activeInstances); + // Should be empty due to exception + Assert.assertTrue(nodes.isEmpty()); + } + + private Set invokeGetAllAssignableNodes(ClusterConfig clusterConfig, + Map instanceConfigMap, Set activeInstances) { + // Use reflection to access private static method + try { + Method getAllAssignableNodesMethod = ClusterModelProvider.class.getDeclaredMethod( + "getAllAssignableNodes", ClusterConfig.class, Map.class, Set.class); + getAllAssignableNodesMethod.setAccessible(true); + Object result = getAllAssignableNodesMethod.invoke(null, clusterConfig, instanceConfigMap, activeInstances); + return (Set) result; + } catch (Exception e) { + throw new RuntimeException(e); + } + } } diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestTopology.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestTopology.java index 95d063a602..9eb1c441e1 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestTopology.java +++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestTopology.java @@ -6,6 +6,7 @@ import java.util.Map; import org.apache.helix.controller.rebalancer.TestAutoRebalanceStrategy; +import org.apache.helix.controller.rebalancer.topology.InstanceNode; import org.apache.helix.controller.rebalancer.topology.Node; import org.apache.helix.controller.rebalancer.topology.Topology; import org.apache.helix.model.ClusterConfig; @@ -170,4 +171,64 @@ public void testCreateClusterTopologyWithDefaultTopology() { Assert.assertEquals(rack.getWeight(), (long) nodeToWeightMap.get(rack.getName())); } } + + @Test + public void testInstanceToClusterConfigMismatch() { + ClusterConfig clusterConfig = new ClusterConfig("Test_Cluster"); + + // Set up a specific topology with required keys + String topology = "/DataCenter/Rack/Host/Instance"; + clusterConfig.setTopology(topology); + clusterConfig.setFaultZoneType("Rack"); + clusterConfig.setTopologyAwareEnabled(true); + clusterConfig.setRequiredInstanceTopologyKeys(List.of("DataCenter", "Rack", "Host")); + + List allNodes = new ArrayList<>(); + List liveNodes = new ArrayList<>(); + Map instanceConfigMap = new HashMap<>(); + + // Add instances with all configurations other domain + for (int i = 0; i < 10; i++) { + String instance = "localhost_" + i; + InstanceConfig config = new InstanceConfig(instance); + config.setHostName(instance); + config.setPort("9000"); + allNodes.add(instance); + liveNodes.add(instance); + instanceConfigMap.put(instance, config); + } + // Add instances with correct domain configuration + for (int i = 0; i < 5; i++) { + String instance = "localhost_" + i; + instanceConfigMap.get(instance).setDomain(String.format("DataCenter=dc1, Rack=rack%d, Host=%s", i, instance)); + } + + // Add instances with mismatched domain configuration (missing DataCenter key) + for (int i = 5; i < 10; i++) { + String instance = "localhost_" + i; + instanceConfigMap.get(instance).setDomain(String.format("Rack=rack%d, Host=%s", i, instance)); + } + + // Create topology - instances with mismatched config should be ignored + Topology topo = new Topology(allNodes, liveNodes, instanceConfigMap, clusterConfig); + + // Verify that only instances with correct configuration are in the topology + Node root = topo.getRootNode(); + List allLeafNodes = Topology.getAllLeafNodes(root); + + // Should only have 5 instances (the ones with correct domain configuration) + Assert.assertEquals(allLeafNodes.size(), 5); + + // Verify that all included instances have the correct naming pattern + for (Node leafNode : allLeafNodes) { + String instanceName = ((InstanceNode) leafNode).getInstanceName(); + int instanceId = Integer.parseInt(instanceName.split("_")[1]); + Assert.assertTrue(instanceId < 5, "Only instances 0-4 should be included"); + } + + // Verify topology structure for included instances + Assert.assertEquals(root.getChildrenCount("DataCenter"), 1); + Assert.assertEquals(root.getChildrenCount("Rack"), 5); + Assert.assertEquals(root.getChildrenCount("Instance"), 5); + } } diff --git a/helix-core/src/test/java/org/apache/helix/model/TestClusterTopologyConfig.java b/helix-core/src/test/java/org/apache/helix/model/TestClusterTopologyConfig.java index 8235312d86..70f0dbd5f8 100644 --- a/helix-core/src/test/java/org/apache/helix/model/TestClusterTopologyConfig.java +++ b/helix-core/src/test/java/org/apache/helix/model/TestClusterTopologyConfig.java @@ -19,6 +19,8 @@ * under the License. */ +import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import org.apache.helix.HelixException; import org.apache.helix.controller.rebalancer.topology.Topology; @@ -36,6 +38,7 @@ public void testClusterNonTopologyAware() { Assert.assertEquals(clusterTopologyConfig.getEndNodeType(), Topology.Types.INSTANCE.name()); Assert.assertEquals(clusterTopologyConfig.getFaultZoneType(), Topology.Types.INSTANCE.name()); Assert.assertTrue(clusterTopologyConfig.getTopologyKeyDefaultValue().isEmpty()); + Assert.assertTrue(clusterTopologyConfig.getRequiredMatchingTopologyKeys().isEmpty()); } @Test @@ -43,6 +46,7 @@ public void testClusterValidTopology() { ClusterConfig testConfig = new ClusterConfig("testId"); testConfig.setTopologyAwareEnabled(true); testConfig.setTopology("/zone/instance"); + testConfig.setRequiredInstanceTopologyKeys(Collections.singletonList("mz_virtualzone")); // no fault zone setup ClusterTopologyConfig clusterTopologyConfig = ClusterTopologyConfig.createFromClusterConfig(testConfig); Assert.assertEquals(clusterTopologyConfig.getEndNodeType(), "instance"); @@ -71,6 +75,8 @@ public void testClusterValidTopology() { for (String k : keys) { Assert.assertEquals(k, itr.next()); } + Assert.assertEquals(clusterTopologyConfig.getRequiredMatchingTopologyKeys().size(), 1); + Assert.assertEquals(clusterTopologyConfig.getRequiredMatchingTopologyKeys().get(0), "mz_virtualzone"); } @Test(expectedExceptions = HelixException.class) diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java index 1e752f3feb..5d502c22d3 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/ClusterAccessor.java @@ -57,6 +57,7 @@ import org.apache.helix.manager.zk.ZKUtil; import org.apache.helix.model.CloudConfig; import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.ClusterTopologyConfig; import org.apache.helix.model.ControllerHistory; import org.apache.helix.model.CustomizedStateConfig; import org.apache.helix.model.HelixConfigScope; @@ -1294,6 +1295,19 @@ private void validateClusterConfigChange(String clusterName, ConfigAccessor conf boolean isFaultZoneTypeChanged = oldConfig.getFaultZoneType() == null || (oldConfig.getFaultZoneType() != null && !oldConfig.getFaultZoneType().equals(updatedConfig.getFaultZoneType())); + boolean isRequiredInstanceTopologyKeysChanged = + !oldConfig.getRequiredInstanceTopologyKeys().equals(updatedConfig.getRequiredInstanceTopologyKeys()); + + if (isTopologyPathChanged || isRequiredInstanceTopologyKeysChanged) { + // All required instance topology keys must be present in the topology path. + ClusterTopologyConfig topologyConfig = ClusterTopologyConfig.createFromClusterConfig(updatedConfig); + for (String key : updatedConfig.getRequiredInstanceTopologyKeys()) { + if (!topologyConfig.getTopologyKeyDefaultValue().keySet().contains(key)) { + throw new IllegalArgumentException( + "Required instance topology key " + key + " is not present in the topology path."); + } + } + } if (isTopologyAwareChanged || isTopologyPathChanged || isFaultZoneTypeChanged) { HelixDataAccessor dataAccessor = getDataAccssor(clusterName); diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java index 79b181cebe..a02e9f68d0 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java @@ -82,6 +82,7 @@ public class TestClusterAccessor extends AbstractTestClass { private static final String VG_CLUSTER = "vgCluster"; + private static final String TEST_CLUSTER = "TestCluster_1"; @BeforeClass public void beforeClass() { @@ -94,8 +95,7 @@ public void beforeClass() { @Test public void testValidateClusterConfigChange() throws IOException { System.out.println("Start test :" + TestHelper.getTestMethodName()); - String cluster = "TestCluster_1"; - ClusterConfig config = getClusterConfigFromRest(cluster); + ClusterConfig config = getClusterConfigFromRest(TEST_CLUSTER); // Enable the topology aware setting while the instance config does not have the DOMAIN info { @@ -104,13 +104,13 @@ public void testValidateClusterConfigChange() throws IOException { _auditLogger.clearupLogs(); Entity entity = Entity.entity(OBJECT_MAPPER.writeValueAsString(newConfig.getRecord()), MediaType.APPLICATION_JSON_TYPE); - post("clusters/" + cluster + "/configs", ImmutableMap.of("command", Command.update.name()), + post("clusters/" + TEST_CLUSTER + "/configs", ImmutableMap.of("command", Command.update.name()), entity, Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()); validateAuditLogSize(1); AuditLog auditLog = _auditLogger.getAuditLogs().get(0); Assert.assertEquals(auditLog.getHttpMethod(), HTTPMethods.POST.name()); - Assert.assertEquals(auditLog.getRequestPath(), "clusters/" + cluster + "/configs"); + Assert.assertEquals(auditLog.getRequestPath(), "clusters/" + TEST_CLUSTER + "/configs"); Assert.assertEquals(auditLog.getExceptions().size(), 1); } @@ -121,7 +121,7 @@ public void testValidateClusterConfigChange() throws IOException { newConfig.setFaultZoneType("TestZoneId"); newConfig.setTopology("/TestZoneId/instance"); newConfig.setTopologyAwareEnabled(false); - updateClusterConfigFromRest(cluster, newConfig, Command.update); + updateClusterConfigFromRest(TEST_CLUSTER, newConfig, Command.update); } // Update the topology path string to NULL. This request should go through since the @@ -131,7 +131,7 @@ public void testValidateClusterConfigChange() throws IOException { newConfig.setTopology(null); newConfig.setFaultZoneType(null); newConfig.setTopologyAwareEnabled(false); - updateClusterConfigFromRest(cluster, newConfig, Command.update); + updateClusterConfigFromRest(TEST_CLUSTER, newConfig, Command.update); } // Now update the config while keeping the topology path and fault zone unchanged (it's still NULL) @@ -141,23 +141,56 @@ public void testValidateClusterConfigChange() throws IOException { _auditLogger.clearupLogs(); Entity entity = Entity.entity(OBJECT_MAPPER.writeValueAsString(newConfig.getRecord()), MediaType.APPLICATION_JSON_TYPE); - post("clusters/" + cluster + "/configs", ImmutableMap.of("command", Command.update.name()), + post("clusters/" + TEST_CLUSTER + "/configs", ImmutableMap.of("command", Command.update.name()), entity, Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()); validateAuditLogSize(1); AuditLog auditLog = _auditLogger.getAuditLogs().get(0); Assert.assertEquals(auditLog.getHttpMethod(), HTTPMethods.POST.name()); - Assert.assertEquals(auditLog.getRequestPath(), "clusters/" + cluster + "/configs"); + Assert.assertEquals(auditLog.getRequestPath(), "clusters/" + TEST_CLUSTER + "/configs"); Assert.assertEquals(auditLog.getExceptions().size(), 1); } // Restore the cluster config - updateClusterConfigFromRest(cluster, config, Command.update); + updateClusterConfigFromRest(TEST_CLUSTER, config, Command.update); System.out.println("End test :" + TestHelper.getTestMethodName()); } @Test(dependsOnMethods = "testValidateClusterConfigChange") + public void testValidateClusterConfigChange_missingRequiredTopologyKey_throwsException() throws IOException { + System.out.println("Start test :" + TestHelper.getTestMethodName()); + ClusterConfig config = getClusterConfigFromRest(TEST_CLUSTER); + + // Update config with mismatching required topology key while enabling topology aware + { + ClusterConfig newConfig = new ClusterConfig(config.getClusterName()); + newConfig.setTopologyAwareEnabled(true); + newConfig.setTopology("/zone/rack/host"); + newConfig.setFaultZoneType("zone"); + newConfig.setRequiredInstanceTopologyKeys(Arrays.asList("rack", "missingKey")); + _auditLogger.clearupLogs(); + Entity entity = Entity.entity(OBJECT_MAPPER.writeValueAsString(newConfig.getRecord()), + MediaType.APPLICATION_JSON_TYPE); + post("clusters/" + TEST_CLUSTER + "/configs", ImmutableMap.of("command", Command.update.name()), + entity, Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()); + + validateAuditLogSize(1); + AuditLog auditLog = _auditLogger.getAuditLogs().get(0); + Assert.assertEquals(auditLog.getHttpMethod(), HTTPMethods.POST.name()); + Assert.assertEquals(auditLog.getRequestPath(), "clusters/" + TEST_CLUSTER + "/configs"); + Assert.assertEquals(auditLog.getExceptions().size(), 1); + Assert.assertEquals(auditLog.getExceptions().get(0).getMessage(), + "Required instance topology key missingKey is not present in the topology path."); + } + + // Restore the cluster config + updateClusterConfigFromRest(TEST_CLUSTER, config, Command.update); + + System.out.println("End test :" + TestHelper.getTestMethodName()); + } + + @Test(dependsOnMethods = "testValidateClusterConfigChange_missingRequiredTopologyKey_throwsException") public void testGetClusters() throws IOException { System.out.println("Start test :" + TestHelper.getTestMethodName());