Skip to content

Commit

Permalink
KAFKA-15022: [6/N] add rack aware assignor configs and update standby…
Browse files Browse the repository at this point in the history
… optimizer (apache#14150)

Part of KIP-925.

- Add configs for rack aware assignor
- Update standby optimizer in RackAwareTaskAssignor to have more rounds
- Refactor some method in RackAwareTaskAssignorTest to test utils so that they can also be used in HighAvailabilityTaskAssignorTest and other tests

Reviewers: Matthias J. Sax <matthias@confluent.io>
  • Loading branch information
lihaosky committed Aug 7, 2023
1 parent ac6a536 commit 8dec3e6
Show file tree
Hide file tree
Showing 6 changed files with 605 additions and 373 deletions.
40 changes: 40 additions & 0 deletions streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
import org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -755,6 +756,29 @@ public class StreamsConfig extends AbstractConfig {
public static final String DEFAULT_CLIENT_SUPPLIER_CONFIG = "default.client.supplier";
public static final String DEFAULT_CLIENT_SUPPLIER_DOC = "Client supplier class that implements the <code>org.apache.kafka.streams.KafkaClientSupplier</code> interface.";

public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_NONE = "NONE";
public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC = "MIN_TRAFFIC";

/** {@code } rack.aware.assignment.strategy */
@SuppressWarnings("WeakerAccess")
public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG = "rack.aware.assignment.strategy";
public static final String RACK_AWARE_ASSIGNMENT_STRATEGY_DOC = "The strategy we use for rack aware assignment. Rack aware assignment will take <code>client.rack</code> and <code>racks</code> of <code>TopicPartition</code> into account when assigning"
+ " tasks to minimize cross rack traffic. Valid settings are : <code>" + RACK_AWARE_ASSIGNMENT_STRATEGY_NONE + "</code> (default), which will disable rack aware assignment; <code>" + RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC
+ "</code>, which will compute minimum cross rack traffic assignment.";

@SuppressWarnings("WeakerAccess")
public static final String RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG = "rack.aware.assignment.traffic_cost";
public static final String RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_DOC = "Cost associated with cross rack traffic. This config and <code>rack.aware.assignment.non_overlap_cost</code> controls whether the "
+ "optimization algorithm favors minimizing cross rack traffic or minimize the movement of tasks in existing assignment. If set a larger value <code>" + RackAwareTaskAssignor.class.getName() + "</code> will "
+ "optimize for minimizing cross rack traffic. The default value is null which means it will use default traffic cost values in different assignors.";

@SuppressWarnings("WeakerAccess")
public static final String RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG = "rack.aware.assignment.non_overlap_cost";
public static final String RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_DOC = "Cost associated with moving tasks from existing assignment. This config and <code>" + RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG + "</code> controls whether the "
+ "optimization algorithm favors minimizing cross rack traffic or minimize the movement of tasks in existing assignment. If set a larger value <code>" + RackAwareTaskAssignor.class.getName() + "<code/> will "
+ "optimize to maintain the existing assignment. The default value is null which means it will use default non_overlap cost values in different assignors.";


/**
* {@code topology.optimization}
* @deprecated since 2.7; use {@link #TOPOLOGY_OPTIMIZATION_CONFIG} instead
Expand Down Expand Up @@ -890,12 +914,28 @@ public class StreamsConfig extends AbstractConfig {
in(AT_LEAST_ONCE, EXACTLY_ONCE, EXACTLY_ONCE_BETA, EXACTLY_ONCE_V2),
Importance.MEDIUM,
PROCESSING_GUARANTEE_DOC)
.define(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG,
Type.INT,
null,
Importance.MEDIUM,
RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_DOC)
.define(RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG,
Type.STRING,
RACK_AWARE_ASSIGNMENT_STRATEGY_NONE,
in(RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC),
Importance.MEDIUM,
RACK_AWARE_ASSIGNMENT_STRATEGY_DOC)
.define(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
Type.LIST,
Collections.emptyList(),
atMostOfSize(MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE),
Importance.MEDIUM,
RACK_AWARE_ASSIGNMENT_TAGS_DOC)
.define(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG,
Type.INT,
null,
Importance.MEDIUM,
RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_DOC)
.define(REPLICATION_FACTOR_CONFIG,
Type.INT,
-1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,25 +267,46 @@ public static class AssignmentConfigs {
public final int numStandbyReplicas;
public final long probingRebalanceIntervalMs;
public final List<String> rackAwareAssignmentTags;
public final Integer rackAwareAssignmentTrafficCost;
public final Integer rackAwareAssignmentNonOverlapCost;
public final String rackAwareAssignmentStrategy;

private AssignmentConfigs(final StreamsConfig configs) {
acceptableRecoveryLag = configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG);
maxWarmupReplicas = configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG);
numStandbyReplicas = configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
probingRebalanceIntervalMs = configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG);
rackAwareAssignmentTags = configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG);
rackAwareAssignmentTrafficCost = configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG);
rackAwareAssignmentNonOverlapCost = configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG);
rackAwareAssignmentStrategy = configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG);
}

AssignmentConfigs(final Long acceptableRecoveryLag,
final Integer maxWarmupReplicas,
final Integer numStandbyReplicas,
final Long probingRebalanceIntervalMs,
final List<String> rackAwareAssignmentTags) {
this(acceptableRecoveryLag, maxWarmupReplicas, numStandbyReplicas, probingRebalanceIntervalMs, rackAwareAssignmentTags,
null, null, StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE);
}

AssignmentConfigs(final Long acceptableRecoveryLag,
final Integer maxWarmupReplicas,
final Integer numStandbyReplicas,
final Long probingRebalanceIntervalMs,
final List<String> rackAwareAssignmentTags,
final Integer rackAwareAssignmentTrafficCost,
final Integer rackAwareAssignmentNonOverlapCost,
final String rackAwareAssignmentStrategy) {
this.acceptableRecoveryLag = validated(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, acceptableRecoveryLag);
this.maxWarmupReplicas = validated(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, maxWarmupReplicas);
this.numStandbyReplicas = validated(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, numStandbyReplicas);
this.probingRebalanceIntervalMs = validated(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, probingRebalanceIntervalMs);
this.rackAwareAssignmentTags = validated(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, rackAwareAssignmentTags);
this.rackAwareAssignmentTrafficCost = validated(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, rackAwareAssignmentTrafficCost);
this.rackAwareAssignmentNonOverlapCost = validated(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG, rackAwareAssignmentNonOverlapCost);
this.rackAwareAssignmentStrategy = validated(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, rackAwareAssignmentStrategy);
}

private static <T> T validated(final String configKey, final T value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
Expand All @@ -62,6 +63,9 @@ boolean canMove(final ClientState source,
private static final Logger log = LoggerFactory.getLogger(RackAwareTaskAssignor.class);

private static final int SOURCE_ID = -1;
// This is number is picked based on testing. Usually the optimization for standby assignment
// stops after 3 rounds
private static final int STANDBY_OPTIMIZER_MAX_ITERATION = 4;

private final Cluster fullMetadata;
private final Map<TaskId, Set<TopicPartition>> partitionsForTask;
Expand Down Expand Up @@ -95,13 +99,9 @@ public boolean validClientRack() {
}

public synchronized boolean canEnableRackAwareAssignor() {
/*
TODO: enable this after we add the config
if (StreamsConfig.RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy)) {
canEnable = false;
if (StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy)) {
return false;
}
*/
if (canEnable != null) {
return canEnable;
}
Expand Down Expand Up @@ -340,6 +340,9 @@ public long optimizeActiveTasks(final SortedSet<TaskId> activeTasks,
return 0;
}

log.info("Assignment before active task optimization is {}\n with cost {}", clientStates,
activeTasksCost(activeTasks, clientStates, trafficCost, nonOverlapCost));

final List<UUID> clientList = new ArrayList<>(clientStates.keySet());
final List<TaskId> taskIdList = new ArrayList<>(activeTasks);
final Map<TaskId, UUID> taskClientMap = new HashMap<>();
Expand All @@ -353,6 +356,7 @@ public long optimizeActiveTasks(final SortedSet<TaskId> activeTasks,
assignTaskFromMinCostFlow(graph, clientList, taskIdList, clientStates, originalAssignedTaskNumber,
taskClientMap, ClientState::assignActive, ClientState::unassignActive, ClientState::hasActiveTask);

log.info("Assignment after active task optimization is {}\n with cost {}", clientStates, cost);
return cost;
}

Expand All @@ -368,46 +372,63 @@ public long optimizeStandbyTasks(final SortedMap<UUID, ClientState> clientStates

final List<UUID> clientList = new ArrayList<>(clientStates.keySet());
final SortedSet<TaskId> standbyTasks = new TreeSet<>();
for (int i = 0; i < clientList.size(); i++) {
final ClientState clientState1 = clientStates.get(clientList.get(i));
standbyTasks.addAll(clientState1.standbyTasks());
for (int j = i + 1; j < clientList.size(); j++) {
final ClientState clientState2 = clientStates.get(clientList.get(j));

final String rack1 = racksForProcess.get(clientState1.processId());
final String rack2 = racksForProcess.get(clientState2.processId());
// Cross rack traffic can not be reduced if racks are the same
if (rack1.equals(rack2)) {
continue;
}

final List<TaskId> movable1 = getMovableTasks.apply(clientState1, clientState2);
final List<TaskId> movable2 = getMovableTasks.apply(clientState2, clientState1);

// There's no needed to optimize if one is empty because the optimization
// can only swap tasks to keep the client's load balanced
if (movable1.isEmpty() || movable2.isEmpty()) {
continue;
}

final List<TaskId> taskIdList = Stream.concat(movable1.stream(), movable2.stream())
.sorted()
.collect(Collectors.toList());
clientStates.values().forEach(clientState -> standbyTasks.addAll(clientState.standbyTasks()));

log.info("Assignment before standby task optimization is {}\n with cost {}", clientStates,
standByTasksCost(standbyTasks, clientStates, trafficCost, nonOverlapCost));

boolean taskMoved = true;
int round = 0;
while (taskMoved && round < STANDBY_OPTIMIZER_MAX_ITERATION) {
taskMoved = false;
round++;
for (int i = 0; i < clientList.size(); i++) {
final ClientState clientState1 = clientStates.get(clientList.get(i));
for (int j = i + 1; j < clientList.size(); j++) {
final ClientState clientState2 = clientStates.get(clientList.get(j));

final String rack1 = racksForProcess.get(clientState1.processId());
final String rack2 = racksForProcess.get(clientState2.processId());
// Cross rack traffic can not be reduced if racks are the same
if (rack1.equals(rack2)) {
continue;
}

final Map<TaskId, UUID> taskClientMap = new HashMap<>();
final List<UUID> clients = Stream.of(clientList.get(i), clientList.get(j)).sorted().collect(
Collectors.toList());
final Map<UUID, Integer> originalAssignedTaskNumber = new HashMap<>();
final List<TaskId> movable1 = getMovableTasks.apply(clientState1, clientState2);
final List<TaskId> movable2 = getMovableTasks.apply(clientState2, clientState1);

final Graph<Integer> graph = constructTaskGraph(clients, taskIdList, clientStates, taskClientMap, originalAssignedTaskNumber,
ClientState::hasStandbyTask, trafficCost, nonOverlapCost, true, true);
graph.solveMinCostFlow();
// There's no needed to optimize if one is empty because the optimization
// can only swap tasks to keep the client's load balanced
if (movable1.isEmpty() || movable2.isEmpty()) {
continue;
}

assignTaskFromMinCostFlow(graph, clients, taskIdList, clientStates, originalAssignedTaskNumber,
taskClientMap, ClientState::assignStandby, ClientState::unassignStandby, ClientState::hasStandbyTask);
final List<TaskId> taskIdList = Stream.concat(movable1.stream(),
movable2.stream())
.sorted()
.collect(Collectors.toList());

final Map<TaskId, UUID> taskClientMap = new HashMap<>();
final List<UUID> clients = Stream.of(clientList.get(i), clientList.get(j))
.sorted().collect(
Collectors.toList());
final Map<UUID, Integer> originalAssignedTaskNumber = new HashMap<>();

final Graph<Integer> graph = constructTaskGraph(clients, taskIdList,
clientStates, taskClientMap, originalAssignedTaskNumber,
ClientState::hasStandbyTask, trafficCost, nonOverlapCost, true, true);
graph.solveMinCostFlow();

taskMoved |= assignTaskFromMinCostFlow(graph, clients, taskIdList, clientStates,
originalAssignedTaskNumber,
taskClientMap, ClientState::assignStandby, ClientState::unassignStandby,
ClientState::hasStandbyTask);
}
}
}
return standByTasksCost(standbyTasks, clientStates, trafficCost, nonOverlapCost);
final long cost = standByTasksCost(standbyTasks, clientStates, trafficCost, nonOverlapCost);
log.info("Assignment after {} rounds of standby task optimization is {}\n with cost {}", round, clientStates, cost);
return cost;
}

private Graph<Integer> constructTaskGraph(final List<UUID> clientList,
Expand Down Expand Up @@ -473,16 +494,17 @@ private Graph<Integer> constructTaskGraph(final List<UUID> clientList,
return graph;
}

private void assignTaskFromMinCostFlow(final Graph<Integer> graph,
final List<UUID> clientList,
final List<TaskId> taskIdList,
final Map<UUID, ClientState> clientStates,
final Map<UUID, Integer> originalAssignedTaskNumber,
final Map<TaskId, UUID> taskClientMap,
final BiConsumer<ClientState, TaskId> assignTask,
final BiConsumer<ClientState, TaskId> unassignTask,
final BiPredicate<ClientState, TaskId> hasAssignedTask) {
private boolean assignTaskFromMinCostFlow(final Graph<Integer> graph,
final List<UUID> clientList,
final List<TaskId> taskIdList,
final Map<UUID, ClientState> clientStates,
final Map<UUID, Integer> originalAssignedTaskNumber,
final Map<TaskId, UUID> taskClientMap,
final BiConsumer<ClientState, TaskId> assignTask,
final BiConsumer<ClientState, TaskId> unAssignTask,
final BiPredicate<ClientState, TaskId> hasAssignedTask) {
int tasksAssigned = 0;
boolean taskMoved = false;
for (int taskNodeId = 0; taskNodeId < taskIdList.size(); taskNodeId++) {
final TaskId taskId = taskIdList.get(taskNodeId);
final Map<Integer, Graph<Integer>.Edge> edges = graph.edges(taskNodeId);
Expand All @@ -498,8 +520,9 @@ private void assignTaskFromMinCostFlow(final Graph<Integer> graph,
break;
}

unassignTask.accept(clientStates.get(originalProcessId), taskId);
unAssignTask.accept(clientStates.get(originalProcessId), taskId);
assignTask.accept(clientStates.get(processId), taskId);
taskMoved = true;
}
}
}
Expand Down Expand Up @@ -534,5 +557,7 @@ private void assignTaskFromMinCostFlow(final Graph<Integer> graph,
+ " are assigned to it after assignment");
}
}

return taskMoved;
}
}
Loading

0 comments on commit 8dec3e6

Please sign in to comment.