diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 26d90be11dff..f2f05dfc385d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -46,6 +46,7 @@ import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor; +import org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -755,6 +756,29 @@ public class StreamsConfig extends AbstractConfig { public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = "default.client.supplier"; public static final String DEFAULT_CLIENT_SUPPLIER_DOC = "Client supplier class that implements the org.apache.kafka.streams.KafkaClientSupplier interface."; + public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_NONE = "NONE"; + public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC = "MIN_TRAFFIC"; + + /** {@code } rack.aware.assignment.strategy */ + @SuppressWarnings("WeakerAccess") + public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG = "rack.aware.assignment.strategy"; + public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The strategy we use for rack aware assignment. Rack aware assignment will take client.rack and racks of TopicPartition into account when assigning" + + " tasks to minimize cross rack traffic. Valid settings are : " + RACK_AWARE_ASSIGNMENT_STRATEGY_NONE + " (default), which will disable rack aware assignment; " + RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC + + ", which will compute minimum cross rack traffic assignment."; + + @SuppressWarnings("WeakerAccess") + public static final String RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG = "rack.aware.assignment.traffic_cost"; + public static final String RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_DOC = "Cost associated with cross rack traffic. This config and rack.aware.assignment.non_overlap_cost controls whether the " + + "optimization algorithm favors minimizing cross rack traffic or minimize the movement of tasks in existing assignment. If set a larger value " + RackAwareTaskAssignor.class.getName() + " will " + + "optimize for minimizing cross rack traffic. The default value is null which means it will use default traffic cost values in different assignors."; + + @SuppressWarnings("WeakerAccess") + public static final String RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG = "rack.aware.assignment.non_overlap_cost"; + public static final String RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_DOC = "Cost associated with moving tasks from existing assignment. This config and " + RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG + " controls whether the " + + "optimization algorithm favors minimizing cross rack traffic or minimize the movement of tasks in existing assignment. If set a larger value " + RackAwareTaskAssignor.class.getName() + " will " + + "optimize to maintain the existing assignment. The default value is null which means it will use default non_overlap cost values in different assignors."; + + /** * {@code topology.optimization} * @deprecated since 2.7; use {@link #TOPOLOGY_OPTIMIZATION_CONFIG} instead @@ -890,12 +914,28 @@ public class StreamsConfig extends AbstractConfig { in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, EXACTLY_ONCE_V2), Importance.MEDIUM, PROCESSING_GUARANTEE_DOC) + .define(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG, + Type.INT, + null, + Importance.MEDIUM, + RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_DOC) + .define(RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, + Type.STRING, + RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, + in(RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC), + Importance.MEDIUM, + RACK_AWARE_ASSIGNMENT_STRATEGY_DOC) .define(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, Type.LIST, Collections.emptyList(), atMostOfSize(MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE), Importance.MEDIUM, RACK_AWARE_ASSIGNMENT_TAGS_DOC) + .define(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, + Type.INT, + null, + Importance.MEDIUM, + RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_DOC) .define(REPLICATION_FACTOR_CONFIG, Type.INT, -1, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java index d40489eab29f..b45044fe5f2f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java @@ -267,6 +267,9 @@ public static class AssignmentConfigs { public final int numStandbyReplicas; public final long probingRebalanceIntervalMs; public final List rackAwareAssignmentTags; + public final Integer rackAwareAssignmentTrafficCost; + public final Integer rackAwareAssignmentNonOverlapCost; + public final String rackAwareAssignmentStrategy; private AssignmentConfigs(final StreamsConfig configs) { acceptableRecoveryLag = configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG); @@ -274,6 +277,9 @@ private AssignmentConfigs(final StreamsConfig configs) { numStandbyReplicas = configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG); probingRebalanceIntervalMs = configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG); rackAwareAssignmentTags = configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG); + rackAwareAssignmentTrafficCost = configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG); + rackAwareAssignmentNonOverlapCost = configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG); + rackAwareAssignmentStrategy = configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG); } AssignmentConfigs(final Long acceptableRecoveryLag, @@ -281,11 +287,26 @@ private AssignmentConfigs(final StreamsConfig configs) { final Integer numStandbyReplicas, final Long probingRebalanceIntervalMs, final List rackAwareAssignmentTags) { + this(acceptableRecoveryLag, maxWarmupReplicas, numStandbyReplicas, probingRebalanceIntervalMs, rackAwareAssignmentTags, + null, null, StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE); + } + + AssignmentConfigs(final Long acceptableRecoveryLag, + final Integer maxWarmupReplicas, + final Integer numStandbyReplicas, + final Long probingRebalanceIntervalMs, + final List rackAwareAssignmentTags, + final Integer rackAwareAssignmentTrafficCost, + final Integer rackAwareAssignmentNonOverlapCost, + final String rackAwareAssignmentStrategy) { this.acceptableRecoveryLag = validated(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, acceptableRecoveryLag); this.maxWarmupReplicas = validated(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, maxWarmupReplicas); this.numStandbyReplicas = validated(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, numStandbyReplicas); this.probingRebalanceIntervalMs = validated(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, probingRebalanceIntervalMs); this.rackAwareAssignmentTags = validated(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, rackAwareAssignmentTags); + this.rackAwareAssignmentTrafficCost = validated(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, rackAwareAssignmentTrafficCost); + this.rackAwareAssignmentNonOverlapCost = validated(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG, rackAwareAssignmentNonOverlapCost); + this.rackAwareAssignmentStrategy = validated(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, rackAwareAssignmentStrategy); } private static T validated(final String configKey, final T value) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java index cd6e2a49b388..00b5306ae4b7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java @@ -42,6 +42,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.InternalTopicManager; import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; @@ -62,6 +63,9 @@ boolean canMove(final ClientState source, private static final Logger log = LoggerFactory.getLogger(RackAwareTaskAssignor.class); private static final int SOURCE_ID = -1; + // This is number is picked based on testing. Usually the optimization for standby assignment + // stops after 3 rounds + private static final int STANDBY_OPTIMIZER_MAX_ITERATION = 4; private final Cluster fullMetadata; private final Map> partitionsForTask; @@ -95,13 +99,9 @@ public boolean validClientRack() { } public synchronized boolean canEnableRackAwareAssignor() { - /* - TODO: enable this after we add the config - if (StreamsConfig.RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy)) { - canEnable = false; + if (StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy)) { return false; } - */ if (canEnable != null) { return canEnable; } @@ -340,6 +340,9 @@ public long optimizeActiveTasks(final SortedSet activeTasks, return 0; } + log.info("Assignment before active task optimization is {}\n with cost {}", clientStates, + activeTasksCost(activeTasks, clientStates, trafficCost, nonOverlapCost)); + final List clientList = new ArrayList<>(clientStates.keySet()); final List taskIdList = new ArrayList<>(activeTasks); final Map taskClientMap = new HashMap<>(); @@ -353,6 +356,7 @@ public long optimizeActiveTasks(final SortedSet activeTasks, assignTaskFromMinCostFlow(graph, clientList, taskIdList, clientStates, originalAssignedTaskNumber, taskClientMap, ClientState::assignActive, ClientState::unassignActive, ClientState::hasActiveTask); + log.info("Assignment after active task optimization is {}\n with cost {}", clientStates, cost); return cost; } @@ -368,46 +372,63 @@ public long optimizeStandbyTasks(final SortedMap clientStates final List clientList = new ArrayList<>(clientStates.keySet()); final SortedSet standbyTasks = new TreeSet<>(); - for (int i = 0; i < clientList.size(); i++) { - final ClientState clientState1 = clientStates.get(clientList.get(i)); - standbyTasks.addAll(clientState1.standbyTasks()); - for (int j = i + 1; j < clientList.size(); j++) { - final ClientState clientState2 = clientStates.get(clientList.get(j)); - - final String rack1 = racksForProcess.get(clientState1.processId()); - final String rack2 = racksForProcess.get(clientState2.processId()); - // Cross rack traffic can not be reduced if racks are the same - if (rack1.equals(rack2)) { - continue; - } - - final List movable1 = getMovableTasks.apply(clientState1, clientState2); - final List movable2 = getMovableTasks.apply(clientState2, clientState1); - - // There's no needed to optimize if one is empty because the optimization - // can only swap tasks to keep the client's load balanced - if (movable1.isEmpty() || movable2.isEmpty()) { - continue; - } - - final List taskIdList = Stream.concat(movable1.stream(), movable2.stream()) - .sorted() - .collect(Collectors.toList()); + clientStates.values().forEach(clientState -> standbyTasks.addAll(clientState.standbyTasks())); + + log.info("Assignment before standby task optimization is {}\n with cost {}", clientStates, + standByTasksCost(standbyTasks, clientStates, trafficCost, nonOverlapCost)); + + boolean taskMoved = true; + int round = 0; + while (taskMoved && round < STANDBY_OPTIMIZER_MAX_ITERATION) { + taskMoved = false; + round++; + for (int i = 0; i < clientList.size(); i++) { + final ClientState clientState1 = clientStates.get(clientList.get(i)); + for (int j = i + 1; j < clientList.size(); j++) { + final ClientState clientState2 = clientStates.get(clientList.get(j)); + + final String rack1 = racksForProcess.get(clientState1.processId()); + final String rack2 = racksForProcess.get(clientState2.processId()); + // Cross rack traffic can not be reduced if racks are the same + if (rack1.equals(rack2)) { + continue; + } - final Map taskClientMap = new HashMap<>(); - final List clients = Stream.of(clientList.get(i), clientList.get(j)).sorted().collect( - Collectors.toList()); - final Map originalAssignedTaskNumber = new HashMap<>(); + final List movable1 = getMovableTasks.apply(clientState1, clientState2); + final List movable2 = getMovableTasks.apply(clientState2, clientState1); - final Graph graph = constructTaskGraph(clients, taskIdList, clientStates, taskClientMap, originalAssignedTaskNumber, - ClientState::hasStandbyTask, trafficCost, nonOverlapCost, true, true); - graph.solveMinCostFlow(); + // There's no needed to optimize if one is empty because the optimization + // can only swap tasks to keep the client's load balanced + if (movable1.isEmpty() || movable2.isEmpty()) { + continue; + } - assignTaskFromMinCostFlow(graph, clients, taskIdList, clientStates, originalAssignedTaskNumber, - taskClientMap, ClientState::assignStandby, ClientState::unassignStandby, ClientState::hasStandbyTask); + final List taskIdList = Stream.concat(movable1.stream(), + movable2.stream()) + .sorted() + .collect(Collectors.toList()); + + final Map taskClientMap = new HashMap<>(); + final List clients = Stream.of(clientList.get(i), clientList.get(j)) + .sorted().collect( + Collectors.toList()); + final Map originalAssignedTaskNumber = new HashMap<>(); + + final Graph graph = constructTaskGraph(clients, taskIdList, + clientStates, taskClientMap, originalAssignedTaskNumber, + ClientState::hasStandbyTask, trafficCost, nonOverlapCost, true, true); + graph.solveMinCostFlow(); + + taskMoved |= assignTaskFromMinCostFlow(graph, clients, taskIdList, clientStates, + originalAssignedTaskNumber, + taskClientMap, ClientState::assignStandby, ClientState::unassignStandby, + ClientState::hasStandbyTask); + } } } - return standByTasksCost(standbyTasks, clientStates, trafficCost, nonOverlapCost); + final long cost = standByTasksCost(standbyTasks, clientStates, trafficCost, nonOverlapCost); + log.info("Assignment after {} rounds of standby task optimization is {}\n with cost {}", round, clientStates, cost); + return cost; } private Graph constructTaskGraph(final List clientList, @@ -473,16 +494,17 @@ private Graph constructTaskGraph(final List clientList, return graph; } - private void assignTaskFromMinCostFlow(final Graph graph, - final List clientList, - final List taskIdList, - final Map clientStates, - final Map originalAssignedTaskNumber, - final Map taskClientMap, - final BiConsumer assignTask, - final BiConsumer unassignTask, - final BiPredicate hasAssignedTask) { + private boolean assignTaskFromMinCostFlow(final Graph graph, + final List clientList, + final List taskIdList, + final Map clientStates, + final Map originalAssignedTaskNumber, + final Map taskClientMap, + final BiConsumer assignTask, + final BiConsumer unAssignTask, + final BiPredicate hasAssignedTask) { int tasksAssigned = 0; + boolean taskMoved = false; for (int taskNodeId = 0; taskNodeId < taskIdList.size(); taskNodeId++) { final TaskId taskId = taskIdList.get(taskNodeId); final Map.Edge> edges = graph.edges(taskNodeId); @@ -498,8 +520,9 @@ private void assignTaskFromMinCostFlow(final Graph graph, break; } - unassignTask.accept(clientStates.get(originalProcessId), taskId); + unAssignTask.accept(clientStates.get(originalProcessId), taskId); assignTask.accept(clientStates.get(processId), taskId); + taskMoved = true; } } } @@ -534,5 +557,7 @@ private void assignTaskFromMinCostFlow(final Graph graph, + " are assigned to it after assignment"); } } + + return taskMoved; } } diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index 07e62ce96ed5..d19687773b77 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -63,6 +63,8 @@ import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE_V2; import static org.apache.kafka.streams.StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH; import static org.apache.kafka.streams.StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH; +import static org.apache.kafka.streams.StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG; import static org.apache.kafka.streams.StreamsConfig.STATE_DIR_CONFIG; import static org.apache.kafka.streams.StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG; import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix; @@ -1378,6 +1380,48 @@ public void shouldNotEnableAnyOptimizationsWithNoOptimizationConfig() { assertEquals(0, configs.size()); } + @Test + public void shouldReturnDefaultRackAwareAssignmentConfig() { + final String strategy = streamsConfig.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG); + assertEquals("NONE", strategy); + } + + @Test + public void shouldtSetMinTrafficRackAwareAssignmentConfig() { + props.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC); + assertEquals("MIN_TRAFFIC", new StreamsConfig(props).getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG)); + } + + @Test + public void shouldThrowIfNotSetCorrectRackAwareAssignmentConfig() { + props.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, "invalid"); + assertThrows(ConfigException.class, () -> new StreamsConfig(props)); + } + + @Test + public void shouldReturnDefaultRackAwareAssignmentTrafficCost() { + final Integer cost = streamsConfig.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG); + assertNull(cost); + } + + @Test + public void shouldReturnRackAwareAssignmentTrafficCost() { + props.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, "10"); + assertEquals(Integer.valueOf(10), new StreamsConfig(props).getInt(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG)); + } + + @Test + public void shouldReturnDefaultRackAwareAssignmentNonOverlapCost() { + final Integer cost = streamsConfig.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG); + assertNull(cost); + } + + @Test + public void shouldReturnRackAwareAssignmentNonOverlapCost() { + props.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG, "10"); + assertEquals(Integer.valueOf(10), new StreamsConfig(props).getInt(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG)); + } + @Test public void shouldReturnDefaultClientSupplier() { final KafkaClientSupplier supplier = streamsConfig.getKafkaClientSupplier(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java index 5ab04f4d400d..3fac81d43524 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java @@ -16,20 +16,35 @@ */ package org.apache.kafka.streams.processor.internals.assignment; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.List; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Random; +import java.util.SortedMap; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.ListOffsetsResult; import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; +import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.StreamsConfig.InternalConfig; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.InternalTopicManager; import org.apache.kafka.streams.processor.internals.Task; import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; +import org.apache.kafka.test.MockClientSupplier; +import org.apache.kafka.test.MockInternalTopicManager; import org.hamcrest.BaseMatcher; import org.hamcrest.Description; import org.hamcrest.Matcher; @@ -50,13 +65,22 @@ import static java.util.Collections.emptySet; import static org.apache.kafka.common.utils.Utils.entriesToMap; import static org.apache.kafka.common.utils.Utils.intersection; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasItems; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; public final class AssignmentTestUtils { @@ -80,40 +104,59 @@ public final class AssignmentTestUtils { public static final Node NODE_0 = new Node(0, "node0", 1, RACK_0); public static final Node NODE_1 = new Node(1, "node1", 1, RACK_1); public static final Node NODE_2 = new Node(2, "node2", 1, RACK_2); - public static final Node NODE_3 = new Node(2, "node2", 1, RACK_3); + public static final Node NODE_3 = new Node(3, "node3", 1, RACK_3); + public static final Node NODE_4 = new Node(4, "node4", 1, RACK_4); public static final Node NO_RACK_NODE = new Node(3, "node3", 1); public static final Node[] REPLICA_0 = new Node[] {NODE_0, NODE_1}; public static final Node[] REPLICA_1 = new Node[] {NODE_1, NODE_2}; public static final Node[] REPLICA_2 = new Node[] {NODE_0, NODE_2}; public static final Node[] REPLICA_3 = new Node[] {NODE_1, NODE_3}; + public static final Node[] REPLICA_4 = new Node[] {NODE_3, NODE_4}; public static final String TP_0_NAME = "topic0"; public static final String TP_1_NAME = "topic1"; + public static final String TP_2_NAME = "topic2"; public static final String CHANGELOG_TP_0_NAME = "store-0-changelog"; public static final String CHANGELOG_TP_1_NAME = "store-1-changelog"; + public static final String CHANGELOG_TP_2_NAME = "store-2-changelog"; public static final TopicPartition CHANGELOG_TP_0_0 = new TopicPartition(CHANGELOG_TP_0_NAME, 0); public static final TopicPartition CHANGELOG_TP_0_1 = new TopicPartition(CHANGELOG_TP_0_NAME, 1); public static final TopicPartition CHANGELOG_TP_0_2 = new TopicPartition(CHANGELOG_TP_0_NAME, 2); + public static final TopicPartition CHANGELOG_TP_0_3 = new TopicPartition(CHANGELOG_TP_0_NAME, 3); public static final TopicPartition CHANGELOG_TP_1_0 = new TopicPartition(CHANGELOG_TP_1_NAME, 0); public static final TopicPartition CHANGELOG_TP_1_1 = new TopicPartition(CHANGELOG_TP_1_NAME, 1); public static final TopicPartition CHANGELOG_TP_1_2 = new TopicPartition(CHANGELOG_TP_1_NAME, 2); + public static final TopicPartition CHANGELOG_TP_1_3 = new TopicPartition(CHANGELOG_TP_1_NAME, 3); + public static final TopicPartition CHANGELOG_TP_2_0 = new TopicPartition(CHANGELOG_TP_2_NAME, 0); + public static final TopicPartition CHANGELOG_TP_2_1 = new TopicPartition(CHANGELOG_TP_2_NAME, 1); + public static final TopicPartition CHANGELOG_TP_2_2 = new TopicPartition(CHANGELOG_TP_2_NAME, 2); public static final TopicPartition TP_0_0 = new TopicPartition(TP_0_NAME, 0); public static final TopicPartition TP_0_1 = new TopicPartition(TP_0_NAME, 1); public static final TopicPartition TP_0_2 = new TopicPartition(TP_0_NAME, 2); + public static final TopicPartition TP_0_3 = new TopicPartition(TP_0_NAME, 3); public static final TopicPartition TP_1_0 = new TopicPartition(TP_1_NAME, 0); public static final TopicPartition TP_1_1 = new TopicPartition(TP_1_NAME, 1); public static final TopicPartition TP_1_2 = new TopicPartition(TP_1_NAME, 2); + public static final TopicPartition TP_1_3 = new TopicPartition(TP_1_NAME, 3); + public static final TopicPartition TP_2_0 = new TopicPartition(TP_2_NAME, 0); + public static final TopicPartition TP_2_1 = new TopicPartition(TP_2_NAME, 1); + public static final TopicPartition TP_2_2 = new TopicPartition(TP_2_NAME, 2); public static final PartitionInfo PI_0_0 = new PartitionInfo(TP_0_NAME, 0, NODE_0, REPLICA_0, REPLICA_0); public static final PartitionInfo PI_0_1 = new PartitionInfo(TP_0_NAME, 1, NODE_1, REPLICA_1, REPLICA_1); public static final PartitionInfo PI_0_2 = new PartitionInfo(TP_0_NAME, 2, NODE_1, REPLICA_1, REPLICA_1); + public static final PartitionInfo PI_0_3 = new PartitionInfo(TP_0_NAME, 3, NODE_2, REPLICA_2, REPLICA_2); public static final PartitionInfo PI_1_0 = new PartitionInfo(TP_1_NAME, 0, NODE_2, REPLICA_2, REPLICA_2); public static final PartitionInfo PI_1_1 = new PartitionInfo(TP_1_NAME, 1, NODE_3, REPLICA_3, REPLICA_3); public static final PartitionInfo PI_1_2 = new PartitionInfo(TP_1_NAME, 2, NODE_0, REPLICA_0, REPLICA_0); + public static final PartitionInfo PI_1_3 = new PartitionInfo(TP_1_NAME, 3, NODE_1, REPLICA_1, REPLICA_1); + public static final PartitionInfo PI_2_0 = new PartitionInfo(TP_2_NAME, 0, NODE_4, REPLICA_4, REPLICA_4); + public static final PartitionInfo PI_2_1 = new PartitionInfo(TP_2_NAME, 1, NODE_3, REPLICA_3, REPLICA_3); + public static final PartitionInfo PI_2_2 = new PartitionInfo(TP_2_NAME, 2, NODE_1, REPLICA_4, REPLICA_4); public static final TaskId TASK_0_0 = new TaskId(0, 0); public static final TaskId TASK_0_1 = new TaskId(0, 1); @@ -149,6 +192,12 @@ public final class AssignmentTestUtils { public static final List EMPTY_RACK_AWARE_ASSIGNMENT_TAGS = Collections.emptyList(); public static final Map EMPTY_CLIENT_TAGS = Collections.emptyMap(); + private static final String USER_END_POINT = "localhost:8080"; + private static final String APPLICATION_ID = "stream-partition-assignor-test"; + private static final String TOPIC_PREFIX = "topic"; + private static final String CHANGELOG_TOPIC_PREFIX = "changelog-topic"; + private static final String RACK_PREFIX = "rack"; + private AssignmentTestUtils() {} static Map getClientStatesMap(final ClientState... states) { @@ -528,4 +577,297 @@ public String toString() { '}'; } } + + static List getRandomNodes(final int nodeSize) { + final List nodeList = new ArrayList<>(nodeSize); + for (int i = 0; i < nodeSize; i++) { + nodeList.add(new Node(i, "node" + i, 1, RACK_PREFIX + i)); + } + Collections.shuffle(nodeList); + return nodeList; + } + + static Cluster getRandomCluster(final int nodeSize, final int tpSize) { + final List nodeList = getRandomNodes(nodeSize); + final Set partitionInfoSet = new HashSet<>(); + for (int i = 0; i < tpSize; i++) { + final Node firstNode = nodeList.get(i % nodeSize); + final Node secondNode = nodeList.get((i + 1) % nodeSize); + final Node[] replica = new Node[] {firstNode, secondNode}; + partitionInfoSet.add(new PartitionInfo(TOPIC_PREFIX + i, 0, firstNode, replica, replica)); + } + + return new Cluster( + "cluster", + new HashSet<>(nodeList), + partitionInfoSet, + Collections.emptySet(), + Collections.emptySet() + ); + } + + static Map>> getRandomProcessRacks(final int clientSize, final int nodeSize) { + final List racks = new ArrayList<>(nodeSize); + for (int i = 0; i < nodeSize; i++) { + racks.add(RACK_PREFIX + i); + } + Collections.shuffle(racks); + final Map>> processRacks = new HashMap<>(); + for (int i = 0; i < clientSize; i++) { + final String rack = racks.get(i % nodeSize); + processRacks.put(uuidForInt(i), mkMap(mkEntry("1", Optional.of(rack)))); + } + return processRacks; + } + + static SortedMap> getTaskTopicPartitionMap(final int tpSize, final boolean changelog) { + final SortedMap> taskTopicPartitionMap = new TreeMap<>(); + final String topicName = changelog ? CHANGELOG_TOPIC_PREFIX : TOPIC_PREFIX; + for (int i = 0; i < tpSize; i++) { + taskTopicPartitionMap.put(new TaskId(i, 0), mkSet( + new TopicPartition(topicName + i, 0), + new TopicPartition(topicName + (i + 1) % tpSize, 0) + )); + } + return taskTopicPartitionMap; + } + + static Map configProps(final boolean enableRackAwareAssignor) { + return configProps(enableRackAwareAssignor, 0); + } + + static Map configProps(final boolean enableRackAwareAssignor, final int replicaNum) { + final Map configurationMap = new HashMap<>(); + configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID); + configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, USER_END_POINT); + configurationMap.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, replicaNum); + if (enableRackAwareAssignor) { + configurationMap.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC); + } + + final ReferenceContainer referenceContainer = new ReferenceContainer(); + configurationMap.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer); + return configurationMap; + } + + static InternalTopicManager mockInternalTopicManagerForRandomChangelog(final int nodeSize, final int tpSize) { + final MockTime time = new MockTime(); + final StreamsConfig streamsConfig = new StreamsConfig(configProps(true)); + final MockClientSupplier mockClientSupplier = new MockClientSupplier(); + final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager( + time, + streamsConfig, + mockClientSupplier.restoreConsumer, + false + ); + + final Set changelogNames = new HashSet<>(); + final List nodeList = getRandomNodes(nodeSize); + final Map> topicPartitionInfo = new HashMap<>(); + for (int i = 0; i < tpSize; i++) { + final String topicName = CHANGELOG_TOPIC_PREFIX + i; + changelogNames.add(topicName); + + final Node firstNode = nodeList.get(i % nodeSize); + final Node secondNode = nodeList.get((i + 1) % nodeSize); + final TopicPartitionInfo info = new TopicPartitionInfo(0, firstNode, Arrays.asList(firstNode, secondNode), Collections.emptyList()); + + topicPartitionInfo.computeIfAbsent(topicName, tp -> new ArrayList<>()).add(info); + } + + final MockInternalTopicManager spyTopicManager = spy(mockInternalTopicManager); + doReturn(topicPartitionInfo).when(spyTopicManager).getTopicPartitionInfo(changelogNames); + return spyTopicManager; + } + + static SortedMap getRandomClientState(final int clientSize, final int tpSize, final int maxCapacity) { + return getRandomClientState(clientSize, tpSize, maxCapacity, true); + } + + static SortedMap getRandomClientState(final int clientSize, final int tpSize, final int maxCapacity, final boolean initialAssignment) { + final SortedMap clientStates = new TreeMap<>(); + final Map taskLags = new HashMap<>(); + for (int i = 0; i < tpSize; i++) { + taskLags.put(new TaskId(i, 0), 0L); + } + + final long seed = System.currentTimeMillis(); + System.out.println("seed: " + seed); + final Random random = new Random(seed); + for (int i = 0; i < clientSize; i++) { + final int capacity = random.nextInt(maxCapacity) + 1; + final ClientState clientState = new ClientState(emptySet(), emptySet(), taskLags, EMPTY_CLIENT_TAGS, capacity, uuidForInt(i)); + clientStates.put(uuidForInt(i), clientState); + } + + if (initialAssignment) { + Iterator> iterator = clientStates.entrySet().iterator(); + final List taskIds = new ArrayList<>(tpSize); + for (int i = 0; i < tpSize; i++) { + taskIds.add(new TaskId(i, 0)); + } + Collections.shuffle(taskIds); + for (final TaskId taskId : taskIds) { + if (!iterator.hasNext()) { + iterator = clientStates.entrySet().iterator(); + } + iterator.next().getValue().assignActive(taskId); + } + } + return clientStates; + } + + static Cluster getClusterForAllTopics() { + return new Cluster( + "cluster", + mkSet(NODE_0, NODE_1, NODE_2, NODE_3, NODE_4), + mkSet(PI_0_0, PI_0_1, PI_0_2, PI_0_3, PI_1_0, PI_1_1, PI_1_2, PI_1_3, PI_2_0, PI_2_1, PI_2_2), + Collections.emptySet(), + Collections.emptySet() + ); + } + + static Map> getTaskTopicPartitionMapForAllTasks() { + return mkMap( + mkEntry(TASK_0_0, mkSet(TP_0_0)), + mkEntry(TASK_0_1, mkSet(TP_0_1)), + mkEntry(TASK_0_2, mkSet(TP_0_2)), + mkEntry(TASK_0_3, mkSet(TP_0_3)), + mkEntry(TASK_1_0, mkSet(TP_1_0)), + mkEntry(TASK_1_1, mkSet(TP_1_1)), + mkEntry(TASK_1_2, mkSet(TP_1_2)), + mkEntry(TASK_1_3, mkSet(TP_1_3)), + mkEntry(TASK_2_0, mkSet(TP_2_0)), + mkEntry(TASK_2_1, mkSet(TP_2_1)), + mkEntry(TASK_2_2, mkSet(TP_2_2)) + ); + } + + static Map> getTaskChangelogMapForAllTasks() { + return mkMap( + mkEntry(TASK_0_0, mkSet(CHANGELOG_TP_0_0)), + mkEntry(TASK_0_1, mkSet(CHANGELOG_TP_0_1)), + mkEntry(TASK_0_2, mkSet(CHANGELOG_TP_0_2)), + mkEntry(TASK_0_3, mkSet(CHANGELOG_TP_0_3)), + mkEntry(TASK_1_0, mkSet(CHANGELOG_TP_1_0)), + mkEntry(TASK_1_1, mkSet(CHANGELOG_TP_1_1)), + mkEntry(TASK_1_2, mkSet(CHANGELOG_TP_1_2)), + mkEntry(TASK_1_3, mkSet(CHANGELOG_TP_1_3)), + mkEntry(TASK_2_0, mkSet(CHANGELOG_TP_2_0)), + mkEntry(TASK_2_1, mkSet(CHANGELOG_TP_2_1)), + mkEntry(TASK_2_2, mkSet(CHANGELOG_TP_2_2)) + ); + } + + static InternalTopicManager mockInternalTopicManagerForChangelog() { + final MockTime time = new MockTime(); + final StreamsConfig streamsConfig = new StreamsConfig(configProps(true)); + final MockClientSupplier mockClientSupplier = new MockClientSupplier(); + final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager( + time, + streamsConfig, + mockClientSupplier.restoreConsumer, + false + ); + + final MockInternalTopicManager spyTopicManager = spy(mockInternalTopicManager); + doReturn( + mkMap( + mkEntry( + CHANGELOG_TP_0_NAME, Arrays.asList( + new TopicPartitionInfo(0, NODE_0, Arrays.asList(REPLICA_0), Collections.emptyList()), + new TopicPartitionInfo(1, NODE_1, Arrays.asList(REPLICA_1), Collections.emptyList()), + new TopicPartitionInfo(2, NODE_1, Arrays.asList(REPLICA_1), Collections.emptyList()), + new TopicPartitionInfo(3, NODE_2, Arrays.asList(REPLICA_2), Collections.emptyList()) + ) + ), + mkEntry( + CHANGELOG_TP_1_NAME, Arrays.asList( + new TopicPartitionInfo(0, NODE_2, Arrays.asList(REPLICA_2), Collections.emptyList()), + new TopicPartitionInfo(1, NODE_3, Arrays.asList(REPLICA_3), Collections.emptyList()), + new TopicPartitionInfo(2, NODE_0, Arrays.asList(REPLICA_0), Collections.emptyList()), + new TopicPartitionInfo(3, NODE_4, Arrays.asList(REPLICA_4), Collections.emptyList()) + ) + ), + mkEntry( + CHANGELOG_TP_2_NAME, Arrays.asList( + new TopicPartitionInfo(0, NODE_1, Arrays.asList(REPLICA_1), Collections.emptyList()), + new TopicPartitionInfo(1, NODE_2, Arrays.asList(REPLICA_2), Collections.emptyList()), + new TopicPartitionInfo(2, NODE_4, Arrays.asList(REPLICA_4), Collections.emptyList()) + ) + ) + ) + ).when(spyTopicManager).getTopicPartitionInfo(mkSet(CHANGELOG_TP_0_NAME, CHANGELOG_TP_1_NAME, CHANGELOG_TP_2_NAME)); + return spyTopicManager; + } + + static Map> getTopologyGroupTaskMap() { + return Collections.singletonMap(SUBTOPOLOGY_0, Collections.singleton(new TaskId(1, 1))); + } + + static void verifyStandbySatisfyRackReplica(final Set taskIds, + final Map racksForProcess, + final Map clientStateMap, + final Integer replica, + final boolean relaxRackCheck, + final Map standbyTaskCount) { + if (standbyTaskCount != null) { + for (final Entry entry : clientStateMap.entrySet()) { + final int expected = standbyTaskCount.get(entry.getKey()); + final int actual = entry.getValue().standbyTaskCount(); + assertEquals("StandbyTaskCount for " + entry.getKey() + " doesn't match", expected, actual); + } + } + for (final TaskId taskId : taskIds) { + int activeCount = 0; + int standbyCount = 0; + final Map racks = new HashMap<>(); + for (final Map.Entry entry : clientStateMap.entrySet()) { + final UUID processId = entry.getKey(); + final ClientState clientState = entry.getValue(); + + if (!relaxRackCheck && clientState.hasAssignedTask(taskId)) { + final String rack = racksForProcess.get(processId); + assertThat("Task " + taskId + " appears in both " + processId + " and " + racks.get(rack), racks.keySet(), not(hasItems(rack))); + racks.put(rack, processId); + } + + boolean hasActive = false; + if (clientState.hasActiveTask(taskId)) { + activeCount++; + hasActive = true; + } + + boolean hasStandby = false; + if (clientState.hasStandbyTask(taskId)) { + standbyCount++; + hasStandby = true; + } + + assertFalse(clientState + " has both active and standby task " + taskId, hasActive && hasStandby); + } + + assertEquals("Task " + taskId + " should have 1 active task", 1, activeCount); + if (replica != null) { + assertEquals("Task " + taskId + " has wrong replica count", replica.intValue(), standbyCount); + } + } + } + + static Map clientTaskCount(final Map clientStateMap, + final Function taskFunc) { + return clientStateMap.entrySet().stream().collect(Collectors.toMap(Entry::getKey, v -> taskFunc.apply(v.getValue()))); + } + + static Map>> getProcessRacksForAllProcess() { + return mkMap( + mkEntry(UUID_1, mkMap(mkEntry("1", Optional.of(RACK_0)))), + mkEntry(UUID_2, mkMap(mkEntry("1", Optional.of(RACK_1)))), + mkEntry(UUID_3, mkMap(mkEntry("1", Optional.of(RACK_2)))), + mkEntry(UUID_4, mkMap(mkEntry("1", Optional.of(RACK_3)))), + mkEntry(UUID_5, mkMap(mkEntry("1", Optional.of(RACK_4)))), + mkEntry(UUID_6, mkMap(mkEntry("1", Optional.of(RACK_0)))), + mkEntry(UUID_7, mkMap(mkEntry("1", Optional.of(RACK_1)))) + ); + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java index 63698be5e286..55b0e3d00385 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java @@ -24,35 +24,17 @@ import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.common.utils.Utils.mkSortedSet; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.CHANGELOG_TP_0_0; -import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.CHANGELOG_TP_0_1; -import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.CHANGELOG_TP_0_2; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.CHANGELOG_TP_0_NAME; -import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.CHANGELOG_TP_1_0; -import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.CHANGELOG_TP_1_1; -import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.CHANGELOG_TP_1_2; -import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.CHANGELOG_TP_1_NAME; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_CLIENT_TAGS; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.NODE_0; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.NODE_1; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.NODE_2; -import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.NODE_3; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.NO_RACK_NODE; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.PI_0_0; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.PI_0_1; -import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.PI_0_2; -import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.PI_1_0; -import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.PI_1_1; -import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.PI_1_2; -import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.RACK_0; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.RACK_1; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.RACK_2; -import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.RACK_3; -import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.RACK_4; -import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.REPLICA_0; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.REPLICA_1; -import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.REPLICA_2; -import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.REPLICA_3; -import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_0; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_0; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_1; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_2; @@ -60,12 +42,8 @@ import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_1; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_2; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TP_0_0; -import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TP_0_1; -import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TP_0_2; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TP_0_NAME; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TP_1_0; -import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TP_1_1; -import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TP_1_2; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_1; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_2; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_3; @@ -73,12 +51,23 @@ import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_5; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_6; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_7; -import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.uuidForInt; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.clientTaskCount; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.configProps; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getClusterForAllTopics; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getProcessRacksForAllProcess; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getRandomClientState; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getRandomCluster; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getRandomProcessRacks; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getTaskChangelogMapForAllTasks; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getTaskTopicPartitionMap; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getTaskTopicPartitionMapForAllTasks; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getTopologyGroupTaskMap; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.mockInternalTopicManagerForChangelog; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.mockInternalTopicManagerForRandomChangelog; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.verifyStandbySatisfyRackReplica; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.hasItems; import static org.hamcrest.Matchers.lessThanOrEqualTo; -import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -93,26 +82,20 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; -import java.util.Random; import java.util.Set; import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; import java.util.UUID; -import java.util.function.Function; -import java.util.stream.Collectors; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; @@ -121,10 +104,7 @@ import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.StreamsConfig.InternalConfig; import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.internals.InternalTopicManager; -import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; import org.apache.kafka.test.MockClientSupplier; import org.apache.kafka.test.MockInternalTopicManager; @@ -138,14 +118,9 @@ @RunWith(Parameterized.class) public class RackAwareTaskAssignorTest { - private static final String USER_END_POINT = "localhost:8080"; - private static final String APPLICATION_ID = "stream-partition-assignor-test"; - private static final String TOPIC_PREFIX = "topic"; - private static final String CHANGELOG_TOPIC_PREFIX = "changelog-topic"; - private static final String RACK_PREFIX = "rack"; private final MockTime time = new MockTime(); - private final StreamsConfig streamsConfig = new StreamsConfig(configProps()); + private final StreamsConfig streamsConfig = new StreamsConfig(configProps(true)); private final MockClientSupplier mockClientSupplier = new MockClientSupplier(); private int trafficCost; @@ -180,26 +155,37 @@ public void setUp() { } } - private Map configProps() { - return configProps(0); + private AssignmentConfigs getRackAwareEnabledConfig() { + return new AssignorConfiguration( + new StreamsConfig(configProps(true)).originals()).assignmentConfigs(); } - private Map configProps(final int standbyNum) { - final Map configurationMap = new HashMap<>(); - configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID); - configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, USER_END_POINT); - configurationMap.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, standbyNum); - - final ReferenceContainer referenceContainer = new ReferenceContainer(); - /* - referenceContainer.mainConsumer = consumer; - referenceContainer.adminClient = adminClient; - referenceContainer.taskManager = taskManager; - referenceContainer.streamsMetadataState = streamsMetadataState; - referenceContainer.time = time; - */ - configurationMap.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer); - return configurationMap; + private AssignmentConfigs getRackAwareEnabledConfigWithStandby(final int replicaNum) { + return new AssignorConfiguration( + new StreamsConfig(configProps(true, replicaNum)).originals()).assignmentConfigs(); + } + + private AssignmentConfigs getRackAwareDisabledConfig() { + return new AssignorConfiguration( + new StreamsConfig(configProps(false)).originals()).assignmentConfigs(); + } + + @Test + public void shouldDisableAssignorFromConfig() { + final RackAwareTaskAssignor assignor = spy(new RackAwareTaskAssignor( + getClusterForTopic0(), + getTaskTopicPartitionMapForTask0(true), + mkMap(), + getTopologyGroupTaskMap(), + getProcessRacksForProcess0(), + mockInternalTopicManager, + getRackAwareDisabledConfig() + )); + + // False since partitionWithoutInfo10 is missing in cluster metadata + assertFalse(assignor.canEnableRackAwareAssignor()); + verify(assignor, never()).populateTopicsToDescribe(anySet(), eq(false)); + verify(assignor, never()).populateTopicsToDescribe(anySet(), eq(true)); } @Test @@ -211,7 +197,7 @@ public void shouldDisableActiveWhenMissingClusterInfo() { getTopologyGroupTaskMap(), getProcessRacksForProcess0(), mockInternalTopicManager, - new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs() + getRackAwareEnabledConfig() )); // False since partitionWithoutInfo10 is missing in cluster metadata @@ -230,7 +216,7 @@ public void shouldDisableActiveWhenRackMissingInNode() { getTopologyGroupTaskMap(), getProcessRacksForProcess0(), mockInternalTopicManager, - new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs() + getRackAwareEnabledConfig() )); // False since nodeMissingRack has one node which doesn't have rack @@ -249,7 +235,7 @@ public void shouldReturnInvalidClientRackWhenRackMissingInClientConsumer() { getTopologyGroupTaskMap(), getProcessRacksForProcess0(true), mockInternalTopicManager, - new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs() + getRackAwareEnabledConfig() ); // False since process1 doesn't have rackId assertFalse(assignor.validClientRack()); @@ -264,7 +250,7 @@ public void shouldReturnFalseWhenRackMissingInProcess() { getTopologyGroupTaskMap(), getProcessWithNoConsumerRacks(), mockInternalTopicManager, - new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs() + getRackAwareEnabledConfig() ); // False since process1 doesn't have rackId assertFalse(assignor.validClientRack()); @@ -280,7 +266,7 @@ public void shouldPopulateRacksForProcess() { getTopologyGroupTaskMap(), getProcessRacksForProcess0(), mockInternalTopicManager, - new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs() + getRackAwareEnabledConfig() ); final Map racksForProcess = assignor.racksForProcess(); assertEquals(mkMap(mkEntry(UUID_1, RACK_1)), racksForProcess); @@ -302,7 +288,7 @@ public void shouldReturnInvalidClientRackWhenRackDiffersInSameProcess() { getTopologyGroupTaskMap(), processRacks, mockInternalTopicManager, - new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs() + getRackAwareEnabledConfig() ); assertFalse(assignor.validClientRack()); @@ -317,7 +303,7 @@ public void shouldEnableRackAwareAssignorWithoutDescribingTopics() { getTopologyGroupTaskMap(), getProcessRacksForProcess0(), mockInternalTopicManager, - new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs() + getRackAwareEnabledConfig() ); // partitionWithoutInfo00 has rackInfo in cluster metadata @@ -333,7 +319,7 @@ public void shouldEnableRackAwareAssignorWithCacheResult() { getTopologyGroupTaskMap(), getProcessRacksForProcess0(), mockInternalTopicManager, - new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs() + getRackAwareEnabledConfig() )); // partitionWithoutInfo00 has rackInfo in cluster metadata @@ -365,7 +351,7 @@ public void shouldEnableRackAwareAssignorWithDescribingTopics() { getTopologyGroupTaskMap(), getProcessRacksForProcess0(), spyTopicManager, - new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs() + getRackAwareEnabledConfig() ); assertTrue(assignor.canEnableRackAwareAssignor()); @@ -392,7 +378,6 @@ public void shouldEnableRackAwareAssignorWithStandbyDescribingTopics() { ) ).when(spyTopicManager).getTopicPartitionInfo(Collections.singleton(CHANGELOG_TP_0_NAME)); - final StreamsConfig streamsConfig1 = new StreamsConfig(configProps(1)); final RackAwareTaskAssignor assignor = spy(new RackAwareTaskAssignor( getClusterWithNoNode(), getTaskTopicPartitionMapForTask0(), @@ -400,7 +385,7 @@ public void shouldEnableRackAwareAssignorWithStandbyDescribingTopics() { getTopologyGroupTaskMap(), getProcessRacksForProcess0(), spyTopicManager, - new AssignorConfiguration(streamsConfig1.originals()).assignmentConfigs() + getRackAwareEnabledConfigWithStandby(1) )); assertTrue(assignor.canEnableRackAwareAssignor()); @@ -430,7 +415,6 @@ public void shouldDisableRackAwareAssignorWithStandbyDescribingTopicsFailure() { doThrow(new TimeoutException("Timeout describing topic")).when(spyTopicManager).getTopicPartitionInfo(Collections.singleton( CHANGELOG_TP_0_NAME)); - final StreamsConfig streamsConfig1 = new StreamsConfig(configProps(1)); final RackAwareTaskAssignor assignor = spy(new RackAwareTaskAssignor( getClusterWithNoNode(), getTaskTopicPartitionMapForTask0(), @@ -438,7 +422,7 @@ public void shouldDisableRackAwareAssignorWithStandbyDescribingTopicsFailure() { getTopologyGroupTaskMap(), getProcessRacksForProcess0(), spyTopicManager, - new AssignorConfiguration(streamsConfig1.originals()).assignmentConfigs() + getRackAwareEnabledConfigWithStandby(1) )); assertFalse(assignor.canEnableRackAwareAssignor()); @@ -459,7 +443,7 @@ public void shouldDisableRackAwareAssignorWithDescribingTopicsFailure() { getTopologyGroupTaskMap(), getProcessRacksForProcess0(), spyTopicManager, - new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs() + getRackAwareEnabledConfig() )); assertFalse(assignor.canEnableRackAwareAssignor()); @@ -471,13 +455,13 @@ public void shouldDisableRackAwareAssignorWithDescribingTopicsFailure() { @Test public void shouldOptimizeEmptyActiveTasks() { final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor( - getClusterForTopic0And1(), + getClusterForAllTopics(), getTaskTopicPartitionMapForAllTasks(), mkMap(), getTopologyGroupTaskMap(), getProcessRacksForAllProcess(), mockInternalTopicManager, - new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs() + getRackAwareEnabledConfig() ); final ClientState clientState1 = new ClientState(emptySet(), emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1); @@ -502,13 +486,13 @@ public void shouldOptimizeEmptyActiveTasks() { @Test public void shouldOptimizeActiveTasks() { final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor( - getClusterForTopic0And1(), + getClusterForAllTopics(), getTaskTopicPartitionMapForAllTasks(), mkMap(), getTopologyGroupTaskMap(), getProcessRacksForAllProcess(), mockInternalTopicManager, - new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs() + getRackAwareEnabledConfig() ); final ClientState clientState1 = new ClientState(emptySet(), emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1); @@ -558,7 +542,7 @@ public void shouldOptimizeRandomActive() { getTopologyGroupTaskMap(), getRandomProcessRacks(clientSize, nodeSize), mockInternalTopicManager, - new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs() + getRackAwareEnabledConfig() ); final SortedMap clientStateMap = getRandomClientState(clientSize, tpSize, 1); @@ -589,7 +573,7 @@ public void shouldMaintainOriginalAssignment() { getTopologyGroupTaskMap(), getRandomProcessRacks(clientSize, nodeSize), mockInternalTopicManager, - new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs() + getRackAwareEnabledConfig() ); final SortedMap clientStateMap = getRandomClientState(clientSize, tpSize, 1); @@ -619,13 +603,13 @@ public void shouldMaintainOriginalAssignment() { @Test public void shouldOptimizeActiveTasksWithMoreClients() { final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor( - getClusterForTopic0And1(), + getClusterForAllTopics(), getTaskTopicPartitionMapForAllTasks(), mkMap(), getTopologyGroupTaskMap(), getProcessRacksForAllProcess(), mockInternalTopicManager, - new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs() + getRackAwareEnabledConfig() ); final ClientState clientState1 = new ClientState(emptySet(), emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1); @@ -664,13 +648,13 @@ public void shouldOptimizeActiveTasksWithMoreClients() { @Test public void shouldOptimizeActiveTasksWithMoreClientsWithMoreThanOneTask() { final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor( - getClusterForTopic0And1(), + getClusterForAllTopics(), getTaskTopicPartitionMapForAllTasks(), mkMap(), getTopologyGroupTaskMap(), getProcessRacksForAllProcess(), mockInternalTopicManager, - new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs() + getRackAwareEnabledConfig() ); final ClientState clientState1 = new ClientState(emptySet(), emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1); @@ -709,13 +693,13 @@ public void shouldOptimizeActiveTasksWithMoreClientsWithMoreThanOneTask() { @Test public void shouldBalanceAssignmentWithMoreCost() { final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor( - getClusterForTopic0And1(), + getClusterForAllTopics(), getTaskTopicPartitionMapForAllTasks(), mkMap(), getTopologyGroupTaskMap(), getProcessRacksForAllProcess(), mockInternalTopicManager, - new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs() + getRackAwareEnabledConfig() ); final ClientState clientState1 = new ClientState(emptySet(), emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1); @@ -751,13 +735,13 @@ public void shouldBalanceAssignmentWithMoreCost() { @Test public void shouldThrowIfMissingCallcanEnableRackAwareAssignor() { final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor( - getClusterForTopic0And1(), + getClusterForAllTopics(), getTaskTopicPartitionMapForAllTasks(), mkMap(), getTopologyGroupTaskMap(), getProcessRacksForAllProcess(), mockInternalTopicManager, - new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs() + getRackAwareEnabledConfig() ); final ClientState clientState1 = new ClientState(emptySet(), emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1); @@ -780,13 +764,13 @@ public void shouldThrowIfMissingCallcanEnableRackAwareAssignor() { @Test public void shouldThrowIfTaskInMultipleClients() { final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor( - getClusterForTopic0And1(), + getClusterForAllTopics(), getTaskTopicPartitionMapForAllTasks(), mkMap(), getTopologyGroupTaskMap(), getProcessRacksForAllProcess(), mockInternalTopicManager, - new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs() + getRackAwareEnabledConfig() ); final ClientState clientState1 = new ClientState(emptySet(), emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1); @@ -811,13 +795,13 @@ public void shouldThrowIfTaskInMultipleClients() { @Test public void shouldThrowIfTaskMissingInClients() { final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor( - getClusterForTopic0And1(), + getClusterForAllTopics(), getTaskTopicPartitionMapForAllTasks(), mkMap(), getTopologyGroupTaskMap(), getProcessRacksForAllProcess(), mockInternalTopicManager, - new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs() + getRackAwareEnabledConfig() ); final ClientState clientState1 = new ClientState(emptySet(), emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1); @@ -841,13 +825,13 @@ public void shouldThrowIfTaskMissingInClients() { @Test public void shouldNotCrashForEmptyStandby() { final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor( - getClusterForTopic0And1(), + getClusterForAllTopics(), getTaskTopicPartitionMapForAllTasks(), mkMap(), getTopologyGroupTaskMap(), getProcessRacksForAllProcess(), mockInternalTopicManagerForChangelog(), - new AssignorConfiguration(new StreamsConfig(configProps(1)).originals()).assignmentConfigs() + getRackAwareEnabledConfigWithStandby(1) ); final ClientState clientState1 = new ClientState(emptySet(), emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1, UUID_1); @@ -875,15 +859,14 @@ public void shouldNotCrashForEmptyStandby() { @Test public void shouldOptimizeStandbyTasksWhenTasksAllMovable() { final int replicaCount = 2; - final AssignmentConfigs assignorConfiguration = new AssignorConfiguration(new StreamsConfig(configProps(replicaCount)).originals()).assignmentConfigs(); final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor( - getClusterForTopic0And1(), + getClusterForAllTopics(), getTaskTopicPartitionMapForAllTasks(), getTaskChangelogMapForAllTasks(), getTopologyGroupTaskMap(), getProcessRacksForAllProcess(), mockInternalTopicManagerForChangelog(), - assignorConfiguration + getRackAwareEnabledConfigWithStandby(replicaCount) ); final ClientState clientState1 = new ClientState(emptySet(), emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, 1, UUID_1); @@ -929,7 +912,7 @@ public void shouldOptimizeStandbyTasksWhenTasksAllMovable() { // can only be reduced to 50 since there are moving constraints final long cost = assignor.optimizeStandbyTasks(clientStateMap, 10, 1, (source, destination, task, clients) -> true); - assertEquals(30, cost); + assertEquals(20, cost); // Don't validate tasks in different racks after moving verifyStandbySatisfyRackReplica(taskIds, assignor.racksForProcess(), clientStateMap, replicaCount, true, standbyTaskCount); } @@ -937,9 +920,9 @@ public void shouldOptimizeStandbyTasksWhenTasksAllMovable() { @Test public void shouldOptimizeStandbyTasksWithMovingConstraint() { final int replicaCount = 2; - final AssignmentConfigs assignorConfiguration = new AssignorConfiguration(new StreamsConfig(configProps(replicaCount)).originals()).assignmentConfigs(); + final AssignmentConfigs assignorConfiguration = getRackAwareEnabledConfigWithStandby(replicaCount); final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor( - getClusterForTopic0And1(), + getClusterForAllTopics(), getTaskTopicPartitionMapForAllTasks(), getTaskChangelogMapForAllTasks(), getTopologyGroupTaskMap(), @@ -1005,8 +988,7 @@ public void shouldOptimizeRandomStandby() { final int maxCapacity = 3; final SortedMap> taskTopicPartitionMap = getTaskTopicPartitionMap( tpSize, false); - final AssignmentConfigs assignorConfiguration = new AssignorConfiguration( - new StreamsConfig(configProps(replicaCount)).originals()).assignmentConfigs(); + final AssignmentConfigs assignorConfiguration = getRackAwareEnabledConfigWithStandby(replicaCount); final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor( getRandomCluster(nodeSize, tpSize), @@ -1045,115 +1027,6 @@ public void shouldOptimizeRandomStandby() { replicaCount, false, standbyTaskCount); } - private List getRandomNodes(final int nodeSize) { - final List nodeList = new ArrayList<>(nodeSize); - for (int i = 0; i < nodeSize; i++) { - nodeList.add(new Node(i, "node" + i, 1, RACK_PREFIX + i)); - } - Collections.shuffle(nodeList); - return nodeList; - } - - private Cluster getRandomCluster(final int nodeSize, final int tpSize) { - final List nodeList = getRandomNodes(nodeSize); - final Set partitionInfoSet = new HashSet<>(); - for (int i = 0; i < tpSize; i++) { - final Node firstNode = nodeList.get(i % nodeSize); - final Node secondNode = nodeList.get((i + 1) % nodeSize); - final Node[] replica = new Node[] {firstNode, secondNode}; - partitionInfoSet.add(new PartitionInfo(TOPIC_PREFIX + i, 0, firstNode, replica, replica)); - } - - return new Cluster( - "cluster", - new HashSet<>(nodeList), - partitionInfoSet, - Collections.emptySet(), - Collections.emptySet() - ); - } - - private Map>> getRandomProcessRacks(final int clientSize, final int nodeSize) { - final List racks = new ArrayList<>(nodeSize); - for (int i = 0; i < nodeSize; i++) { - racks.add(RACK_PREFIX + i); - } - Collections.shuffle(racks); - final Map>> processRacks = new HashMap<>(); - for (int i = 0; i < clientSize; i++) { - final String rack = racks.get(i % nodeSize); - processRacks.put(uuidForInt(i), mkMap(mkEntry("1", Optional.of(rack)))); - } - return processRacks; - } - - private SortedMap> getTaskTopicPartitionMap(final int tpSize, final boolean changelog) { - final SortedMap> taskTopicPartitionMap = new TreeMap<>(); - final String topicName = changelog ? CHANGELOG_TOPIC_PREFIX : TOPIC_PREFIX; - for (int i = 0; i < tpSize; i++) { - taskTopicPartitionMap.put(new TaskId(i, 0), mkSet( - new TopicPartition(topicName + i, 0), - new TopicPartition(topicName + (i + 1) % tpSize, 0) - )); - } - return taskTopicPartitionMap; - } - - private InternalTopicManager mockInternalTopicManagerForRandomChangelog(final int nodeSize, final int tpSize) { - final Set changelogNames = new HashSet<>(); - final List nodeList = getRandomNodes(nodeSize); - final Map> topicPartitionInfo = new HashMap<>(); - for (int i = 0; i < tpSize; i++) { - final String topicName = CHANGELOG_TOPIC_PREFIX + i; - changelogNames.add(topicName); - - final Node firstNode = nodeList.get(i % nodeSize); - final Node secondNode = nodeList.get((i + 1) % nodeSize); - final TopicPartitionInfo info = new TopicPartitionInfo(0, firstNode, Arrays.asList(firstNode, secondNode), Collections.emptyList()); - - topicPartitionInfo.computeIfAbsent(topicName, tp -> new ArrayList<>()).add(info); - } - - final MockInternalTopicManager spyTopicManager = spy(mockInternalTopicManager); - doReturn(topicPartitionInfo).when(spyTopicManager).getTopicPartitionInfo(changelogNames); - return spyTopicManager; - } - - private SortedMap getRandomClientState(final int clientSize, final int tpSize, final int maxCapacity) { - final SortedMap clientStates = new TreeMap<>(); - final List taskIds = new ArrayList<>(tpSize); - for (int i = 0; i < tpSize; i++) { - taskIds.add(new TaskId(i, 0)); - } - Collections.shuffle(taskIds); - final Random random = new Random(); - for (int i = 0; i < clientSize; i++) { - final int capacity = random.nextInt(maxCapacity) + 1; - final ClientState clientState = new ClientState(emptySet(), emptySet(), emptyMap(), EMPTY_CLIENT_TAGS, capacity, uuidForInt(i)); - clientStates.put(uuidForInt(i), clientState); - } - Iterator> iterator = clientStates.entrySet().iterator(); - for (final TaskId taskId : taskIds) { - if (iterator.hasNext()) { - iterator.next().getValue().assignActive(taskId); - } else { - iterator = clientStates.entrySet().iterator(); - iterator.next().getValue().assignActive(taskId); - } - } - return clientStates; - } - - private Cluster getClusterForTopic0And1() { - return new Cluster( - "cluster", - mkSet(NODE_0, NODE_1, NODE_2, NODE_3), - mkSet(PI_0_0, PI_0_1, PI_0_2, PI_1_0, PI_1_1, PI_1_2), - Collections.emptySet(), - Collections.emptySet() - ); - } - private Cluster getClusterForTopic0() { return new Cluster( "cluster", @@ -1188,18 +1061,6 @@ private Cluster getClusterWithNoNode() { ); } - private Map>> getProcessRacksForAllProcess() { - return mkMap( - mkEntry(UUID_1, mkMap(mkEntry("1", Optional.of(RACK_0)))), - mkEntry(UUID_2, mkMap(mkEntry("1", Optional.of(RACK_1)))), - mkEntry(UUID_3, mkMap(mkEntry("1", Optional.of(RACK_2)))), - mkEntry(UUID_4, mkMap(mkEntry("1", Optional.of(RACK_3)))), - mkEntry(UUID_5, mkMap(mkEntry("1", Optional.of(RACK_4)))), - mkEntry(UUID_6, mkMap(mkEntry("1", Optional.of(RACK_0)))), - mkEntry(UUID_7, mkMap(mkEntry("1", Optional.of(RACK_1)))) - ); - } - private Map>> getProcessRacksForProcess0() { return getProcessRacksForProcess0(false); } @@ -1235,105 +1096,4 @@ private Map> getTaskTopicPartitionMapForTask0(final } return Collections.singletonMap(TASK_0_0, topicPartitions); } - - private Map> getTaskTopicPartitionMapForAllTasks() { - return mkMap( - mkEntry(TASK_0_0, mkSet(TP_0_0)), - mkEntry(TASK_0_1, mkSet(TP_0_1)), - mkEntry(TASK_0_2, mkSet(TP_0_2)), - mkEntry(TASK_1_0, mkSet(TP_1_0)), - mkEntry(TASK_1_1, mkSet(TP_1_1)), - mkEntry(TASK_1_2, mkSet(TP_1_2)) - ); - } - - private Map> getTaskChangelogMapForAllTasks() { - return mkMap( - mkEntry(TASK_0_0, mkSet(CHANGELOG_TP_0_0)), - mkEntry(TASK_0_1, mkSet(CHANGELOG_TP_0_1)), - mkEntry(TASK_0_2, mkSet(CHANGELOG_TP_0_2)), - mkEntry(TASK_1_0, mkSet(CHANGELOG_TP_1_0)), - mkEntry(TASK_1_1, mkSet(CHANGELOG_TP_1_1)), - mkEntry(TASK_1_2, mkSet(CHANGELOG_TP_1_2)) - ); - } - - private InternalTopicManager mockInternalTopicManagerForChangelog() { - final MockInternalTopicManager spyTopicManager = spy(mockInternalTopicManager); - doReturn( - mkMap( - mkEntry( - CHANGELOG_TP_0_NAME, Arrays.asList( - new TopicPartitionInfo(0, NODE_0, Arrays.asList(REPLICA_0), Collections.emptyList()), - new TopicPartitionInfo(1, NODE_1, Arrays.asList(REPLICA_1), Collections.emptyList()), - new TopicPartitionInfo(2, NODE_1, Arrays.asList(REPLICA_1), Collections.emptyList()) - ) - ), - mkEntry( - CHANGELOG_TP_1_NAME, Arrays.asList( - new TopicPartitionInfo(0, NODE_2, Arrays.asList(REPLICA_2), Collections.emptyList()), - new TopicPartitionInfo(1, NODE_3, Arrays.asList(REPLICA_3), Collections.emptyList()), - new TopicPartitionInfo(2, NODE_0, Arrays.asList(REPLICA_0), Collections.emptyList()) - ) - ) - ) - ).when(spyTopicManager).getTopicPartitionInfo(mkSet(CHANGELOG_TP_0_NAME, CHANGELOG_TP_1_NAME)); - return spyTopicManager; - } - - private Map> getTopologyGroupTaskMap() { - return Collections.singletonMap(SUBTOPOLOGY_0, Collections.singleton(new TaskId(1, 1))); - } - - private void verifyStandbySatisfyRackReplica(final Set taskIds, - final Map racksForProcess, - final Map clientStateMap, - final int replica, - final boolean relaxRackCheck, - final Map standbyTaskCount) { - if (standbyTaskCount != null) { - for (final Entry entry : clientStateMap.entrySet()) { - final int expected = standbyTaskCount.get(entry.getKey()); - final int actual = entry.getValue().standbyTaskCount(); - assertEquals("StandbyTaskCount for " + entry.getKey() + " doesn't match", expected, actual); - } - } - for (final TaskId taskId : taskIds) { - int activeCount = 0; - int standbyCount = 0; - final Map racks = new HashMap<>(); - for (final Map.Entry entry : clientStateMap.entrySet()) { - final UUID processId = entry.getKey(); - final ClientState clientState = entry.getValue(); - - if (!relaxRackCheck && clientState.hasAssignedTask(taskId)) { - final String rack = racksForProcess.get(processId); - assertThat("Task " + taskId + " appears in both " + processId + " and " + racks.get(rack), racks.keySet(), not(hasItems(rack))); - racks.put(rack, processId); - } - - boolean hasActive = false; - if (clientState.hasActiveTask(taskId)) { - activeCount++; - hasActive = true; - } - - boolean hasStandby = false; - if (clientState.hasStandbyTask(taskId)) { - standbyCount++; - hasStandby = true; - } - - assertFalse(clientState + " has both active and standby task " + taskId, hasActive && hasStandby); - } - - assertEquals("Task " + taskId + " should have 1 active task", 1, activeCount); - assertEquals("Task " + taskId + " has wrong replica count", replica, standbyCount); - } - } - - private Map clientTaskCount(final Map clientStateMap, - final Function taskFunc) { - return clientStateMap.entrySet().stream().collect(Collectors.toMap(Entry::getKey, v -> taskFunc.apply(v.getValue()))); - } }