Skip to content

Commit

Permalink
KAFKA-15022: [9/N] use RackAwareTaskAssignor in StickyTaskAssignor (a…
Browse files Browse the repository at this point in the history
…pache#14178)

Part of KIP-925.

Use rack aware assignor in StickyTaskAssignor.

Reviewers: Matthias J. Sax <matthias@confluent.io>
  • Loading branch information
lihaosky authored and rreddy-22 committed Sep 20, 2023
1 parent 9baf34f commit 91f32a1
Show file tree
Hide file tree
Showing 13 changed files with 935 additions and 199 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -640,8 +640,16 @@ private boolean assignTasksToClients(final Cluster fullMetadata,

final TaskAssignor taskAssignor = createTaskAssignor(lagComputationSuccessful);

final RackAwareTaskAssignor rackAwareTaskAssignor = new RackAwareTaskAssignor(fullMetadata, partitionsForTask,
changelogTopics.changelogPartionsForTask(), tasksForTopicGroup, racksForProcessConsumer, internalTopicManager, assignmentConfigs);
final RackAwareTaskAssignor rackAwareTaskAssignor = new RackAwareTaskAssignor(
fullMetadata,
partitionsForTask,
changelogTopics.changelogPartionsForTask(),
tasksForTopicGroup,
racksForProcessConsumer,
internalTopicManager,
assignmentConfigs,
time
);
final boolean probingRebalanceNeeded = taskAssignor.assign(clientStates,
allTasks,
statefulTasks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,18 @@ public ClientState(final Set<TaskId> previousActiveTasks,
this.processId = processId;
}

// For testing only
public ClientState(final ClientState clientState) {
this(
new HashSet<>(clientState.previousActiveTasks.taskIds()),
new HashSet<>(clientState.previousStandbyTasks.taskIds()),
clientState.taskLagTotals,
clientState.clientTags,
clientState.capacity,
clientState.processId
);
}

int capacity() {
return capacity;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ public boolean assign(final Map<UUID, ClientState> clients,
final Set<TaskId> statefulTaskIds,
final RackAwareTaskAssignor rackAwareTaskAssignor,
final AssignmentConfigs configs) {
delegate.assign(clients, allTaskIds, statefulTaskIds, rackAwareTaskAssignor, configs);
// Pass null for RackAwareTaskAssignor to disable it if we fallback
delegate.assign(clients, allTaskIds, statefulTaskIds, null, configs);
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@
import java.util.function.Function;

import static org.apache.kafka.common.utils.Utils.diff;
import static org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor.STATELESS_NON_OVERLAP_COST;
import static org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor.STATELESS_TRAFFIC_COST;
import static org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignActiveTaskMovements;
import static org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignStandbyTaskMovements;

public class HighAvailabilityTaskAssignor implements TaskAssignor {
private static final Logger log = LoggerFactory.getLogger(HighAvailabilityTaskAssignor.class);
private static final int DEFAULT_STATEFUL_TRAFFIC_COST = 10;
private static final int DEFAULT_STATEFUL_NON_OVERLAP_COST = 1;
private static final int STATELESS_TRAFFIC_COST = 1;
private static final int STATELESS_NON_OVERLAP_COST = 0;

@Override
public boolean assign(final Map<UUID, ClientState> clients,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TaskId;
Expand All @@ -60,8 +61,11 @@ boolean canMove(final ClientState source,
final Map<UUID, ClientState> clientStateMap);
}

private static final Logger log = LoggerFactory.getLogger(RackAwareTaskAssignor.class);
// For stateless tasks, it's ok to move them around. So we have 0 non_overlap_cost
public static final int STATELESS_TRAFFIC_COST = 1;
public static final int STATELESS_NON_OVERLAP_COST = 0;

private static final Logger log = LoggerFactory.getLogger(RackAwareTaskAssignor.class);
private static final int SOURCE_ID = -1;
// This is number is picked based on testing. Usually the optimization for standby assignment
// stops after 3 rounds
Expand All @@ -75,6 +79,7 @@ boolean canMove(final ClientState source,
private final Map<UUID, String> racksForProcess;
private final InternalTopicManager internalTopicManager;
private final boolean validClientRack;
private final Time time;
private Boolean canEnable = null;

public RackAwareTaskAssignor(final Cluster fullMetadata,
Expand All @@ -83,14 +88,16 @@ public RackAwareTaskAssignor(final Cluster fullMetadata,
final Map<Subtopology, Set<TaskId>> tasksForTopicGroup,
final Map<UUID, Map<String, Optional<String>>> racksForProcessConsumer,
final InternalTopicManager internalTopicManager,
final AssignmentConfigs assignmentConfigs) {
final AssignmentConfigs assignmentConfigs,
final Time time) {
this.fullMetadata = fullMetadata;
this.partitionsForTask = partitionsForTask;
this.changelogPartitionsForTask = changelogPartitionsForTask;
this.internalTopicManager = internalTopicManager;
this.assignmentConfigs = assignmentConfigs;
this.racksForPartition = new HashMap<>();
this.racksForProcess = new HashMap<>();
this.time = Objects.requireNonNull(time, "Time was not specified");
validClientRack = validateClientRack(racksForProcessConsumer);
}

Expand Down Expand Up @@ -346,6 +353,7 @@ public long optimizeActiveTasks(final SortedSet<TaskId> activeTasks,
log.info("Assignment before active task optimization is {}\n with cost {}", clientStates,
activeTasksCost(activeTasks, clientStates, trafficCost, nonOverlapCost));

final long startTime = time.milliseconds();
final List<UUID> clientList = new ArrayList<>(clientStates.keySet());
final List<TaskId> taskIdList = new ArrayList<>(activeTasks);
final Map<TaskId, UUID> taskClientMap = new HashMap<>();
Expand All @@ -359,7 +367,8 @@ public long optimizeActiveTasks(final SortedSet<TaskId> activeTasks,
assignTaskFromMinCostFlow(graph, clientList, taskIdList, clientStates, originalAssignedTaskNumber,
taskClientMap, ClientState::assignActive, ClientState::unassignActive, ClientState::hasActiveTask);

log.info("Assignment after active task optimization is {}\n with cost {}", clientStates, cost);
final long duration = time.milliseconds() - startTime;
log.info("Assignment after {} milliseconds for active task optimization is {}\n with cost {}", duration, clientStates, cost);
return cost;
}

Expand All @@ -373,6 +382,7 @@ public long optimizeStandbyTasks(final SortedMap<UUID, ClientState> clientStates
.sorted()
.collect(Collectors.toList());

final long startTime = time.milliseconds();
final List<UUID> clientList = new ArrayList<>(clientStates.keySet());
final SortedSet<TaskId> standbyTasks = new TreeSet<>();
clientStates.values().forEach(clientState -> standbyTasks.addAll(clientState.standbyTasks()));
Expand Down Expand Up @@ -430,7 +440,9 @@ public long optimizeStandbyTasks(final SortedMap<UUID, ClientState> clientStates
}
}
final long cost = standByTasksCost(standbyTasks, clientStates, trafficCost, nonOverlapCost);
log.info("Assignment after {} rounds of standby task optimization is {}\n with cost {}", round, clientStates, cost);

final long duration = time.milliseconds() - startTime;
log.info("Assignment after {} rounds and {} milliseconds for standby task optimization is {}\n with cost {}", round, duration, clientStates, cost);
return cost;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@
*/
package org.apache.kafka.streams.processor.internals.assignment;

import static org.apache.kafka.common.utils.Utils.diff;
import static org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor.STATELESS_NON_OVERLAP_COST;
import static org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor.STATELESS_TRAFFIC_COST;

import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
import org.slf4j.Logger;
Expand All @@ -36,11 +43,18 @@
public class StickyTaskAssignor implements TaskAssignor {

private static final Logger log = LoggerFactory.getLogger(StickyTaskAssignor.class);

// For stateful tasks, by default we want to maintain stickiness. So we have higher non_overlap_cost
private static final int DEFAULT_STATEFUL_TRAFFIC_COST = 1;
private static final int DEFAULT_STATEFUL_NON_OVERLAP_COST = 10;

private Map<UUID, ClientState> clients;
private Set<TaskId> allTaskIds;
private Set<TaskId> standbyTaskIds;
private Set<TaskId> statefulTaskIds;
private final Map<TaskId, UUID> previousActiveTaskAssignment = new HashMap<>();
private final Map<TaskId, Set<UUID>> previousStandbyTaskAssignment = new HashMap<>();
private RackAwareTaskAssignor rackAwareTaskAssignor; // nullable if passed from FallbackPriorTaskAssignor
private AssignmentConfigs configs;
private TaskPairs taskPairs;

private final boolean mustPreserveActiveTaskAssignment;
Expand All @@ -61,19 +75,55 @@ public boolean assign(final Map<UUID, ClientState> clients,
final AssignmentConfigs configs) {
this.clients = clients;
this.allTaskIds = allTaskIds;
this.standbyTaskIds = statefulTaskIds;
this.statefulTaskIds = statefulTaskIds;
this.rackAwareTaskAssignor = rackAwareTaskAssignor;
this.configs = configs;

final int maxPairs = allTaskIds.size() * (allTaskIds.size() - 1) / 2;
taskPairs = new TaskPairs(maxPairs);
mapPreviousTaskAssignment(clients);

assignActive();
optimizeActive();
assignStandby(configs.numStandbyReplicas);
optimizeStandby();
return false;
}

private void optimizeStandby() {
if (configs.numStandbyReplicas > 0 && rackAwareTaskAssignor != null && rackAwareTaskAssignor.canEnableRackAwareAssignor()) {
final int trafficCost = configs.rackAwareAssignmentTrafficCost == null ?
DEFAULT_STATEFUL_TRAFFIC_COST : configs.rackAwareAssignmentTrafficCost;
final int nonOverlapCost = configs.rackAwareAssignmentNonOverlapCost == null ?
DEFAULT_STATEFUL_NON_OVERLAP_COST : configs.rackAwareAssignmentNonOverlapCost;
final TreeMap<UUID, ClientState> clientStates = new TreeMap<>(clients);

rackAwareTaskAssignor.optimizeStandbyTasks(clientStates, trafficCost, nonOverlapCost, (s, d, t, c) -> true);
}
}

private void optimizeActive() {
if (rackAwareTaskAssignor != null && rackAwareTaskAssignor.canEnableRackAwareAssignor()) {
final int trafficCost = configs.rackAwareAssignmentTrafficCost == null ?
DEFAULT_STATEFUL_TRAFFIC_COST : configs.rackAwareAssignmentTrafficCost;
final int nonOverlapCost = configs.rackAwareAssignmentNonOverlapCost == null ?
DEFAULT_STATEFUL_NON_OVERLAP_COST : configs.rackAwareAssignmentNonOverlapCost;

final SortedSet<TaskId> statefulTasks = new TreeSet<>(statefulTaskIds);
final TreeMap<UUID, ClientState> clientStates = new TreeMap<>(clients);

rackAwareTaskAssignor.optimizeActiveTasks(statefulTasks, clientStates, trafficCost, nonOverlapCost);

final TreeSet<TaskId> statelessTasks = (TreeSet<TaskId>) diff(TreeSet::new, allTaskIds, statefulTasks);
if (!statelessTasks.isEmpty()) {
rackAwareTaskAssignor.optimizeActiveTasks(statelessTasks, clientStates,
STATELESS_TRAFFIC_COST, STATELESS_NON_OVERLAP_COST);
}
}
}

private void assignStandby(final int numStandbyReplicas) {
for (final TaskId taskId : standbyTaskIds) {
for (final TaskId taskId : statefulTaskIds) {
for (int i = 0; i < numStandbyReplicas; i++) {
final Set<UUID> ids = findClientsWithoutAssignedTask(taskId);
if (ids.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ public EmbeddedKafkaCluster(final int numBrokers,
final List<Properties> brokerConfigOverrides,
final long mockTimeMillisStart,
final long mockTimeNanoStart) {
if (!brokerConfigOverrides.isEmpty() && brokerConfigOverrides.size() != numBrokers) {
throw new IllegalArgumentException("Size of brokerConfigOverrides " + brokerConfigOverrides.size()
+ " must match broker number " + numBrokers);
}
brokers = new KafkaEmbedded[numBrokers];
this.brokerConfig = brokerConfig;
time = new MockTime(mockTimeMillisStart, mockTimeNanoStart);
Expand Down

0 comments on commit 91f32a1

Please sign in to comment.