Skip to content

Commit

Permalink
KAFKA-15022: [4/N] use client tag assignor for rack aware standby tas…
Browse files Browse the repository at this point in the history
…k assignment (apache#14097)

Part of KIP-925.

For rack aware standby task assignment, we can either use the already existing "rack tags" or as a fall-back the newly added "rack.id". This PR unifies both without the need to change the actual standby task assignment logic.

Reviewers: Matthias J. Sax <matthias@confluent.io>
  • Loading branch information
lihaosky authored and jeqo committed Aug 15, 2023
1 parent a260dbb commit 353c43d
Show file tree
Hide file tree
Showing 11 changed files with 472 additions and 192 deletions.
Expand Up @@ -128,7 +128,7 @@ private static class ClientMetadata {
private final ClientState state;
private final SortedSet<String> consumers;

ClientMetadata(final String endPoint, final Map<String, String> clientTags) {
ClientMetadata(final UUID processId, final String endPoint, final Map<String, String> clientTags) {

// get the host info, or null if no endpoint is configured (ie endPoint == null)
hostInfo = HostInfo.buildFromEndpoint(endPoint);
Expand All @@ -137,7 +137,7 @@ private static class ClientMetadata {
consumers = new TreeSet<>();

// initialize the client state with client tags
state = new ClientState(clientTags);
state = new ClientState(processId, clientTags);
}

void addConsumer(final String consumerMemberId, final List<TopicPartition> ownedPartitions) {
Expand Down Expand Up @@ -340,7 +340,7 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
futureMetadataVersion = usedVersion;
processId = FUTURE_ID;
if (!clientMetadataMap.containsKey(FUTURE_ID)) {
clientMetadataMap.put(FUTURE_ID, new ClientMetadata(null, Collections.emptyMap()));
clientMetadataMap.put(FUTURE_ID, new ClientMetadata(FUTURE_ID, null, Collections.emptyMap()));
}
} else {
processId = info.processId();
Expand All @@ -350,7 +350,7 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr

// create the new client metadata if necessary
if (clientMetadata == null) {
clientMetadata = new ClientMetadata(info.userEndPoint(), info.clientTags());
clientMetadata = new ClientMetadata(info.processId(), info.userEndPoint(), info.clientTags());
clientMetadataMap.put(info.processId(), clientMetadata);
}

Expand Down
Expand Up @@ -59,25 +59,31 @@ public class ClientState {
private final ClientStateTask revokingActiveTasks = new ClientStateTask(null, new TreeMap<>());

private int capacity;
private UUID processId;

public ClientState() {
this(0);
this(null, 0);
}

public ClientState(final Map<String, String> clientTags) {
this(0, clientTags);
public ClientState(final UUID processId, final Map<String, String> clientTags) {
this(processId, 0, clientTags);
}

ClientState(final int capacity) {
this(capacity, Collections.emptyMap());
this(null, capacity);
}

ClientState(final int capacity, final Map<String, String> clientTags) {
ClientState(final UUID processId, final int capacity) {
this(processId, capacity, Collections.emptyMap());
}

ClientState(final UUID processId, final int capacity, final Map<String, String> clientTags) {
previousStandbyTasks.taskIds(new TreeSet<>());
previousActiveTasks.taskIds(new TreeSet<>());
taskOffsetSums = new TreeMap<>();
taskLagTotals = new TreeMap<>();
this.capacity = capacity;
this.processId = processId;
this.clientTags = unmodifiableMap(clientTags);
}

Expand All @@ -99,6 +105,10 @@ int capacity() {
return capacity;
}

UUID processId() {
return processId;
}

public void incrementCapacity() {
capacity++;
}
Expand Down
Expand Up @@ -16,6 +16,9 @@
*/
package org.apache.kafka.streams.processor.internals.assignment;

import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
import org.slf4j.Logger;
Expand All @@ -42,6 +45,19 @@
class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor {
private static final Logger log = LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class);

private final BiFunction<UUID, ClientState, Map<String, String>> clientTagFunction;
private final Function<AssignmentConfigs, List<String>> tagsFunction;

public ClientTagAwareStandbyTaskAssignor() {
this((uuid, clientState) -> clientState.clientTags(), assignmentConfigs -> assignmentConfigs.rackAwareAssignmentTags);
}

public ClientTagAwareStandbyTaskAssignor(final BiFunction<UUID, ClientState, Map<String, String>> clientTagFunction,
final Function<AssignmentConfigs, List<String>> tagsFunction) {
this.clientTagFunction = clientTagFunction;
this.tagsFunction = tagsFunction;
}

/**
* The algorithm distributes standby tasks for the {@param statefulTaskIds} over different tag dimensions.
* For each stateful task, the number of standby tasks will be assigned based on configured {@link AssignmentConfigs#numStandbyReplicas}.
Expand All @@ -56,7 +72,7 @@ public boolean assign(final Map<UUID, ClientState> clients,
final Set<TaskId> statefulTaskIds,
final AssignorConfiguration.AssignmentConfigs configs) {
final int numStandbyReplicas = configs.numStandbyReplicas;
final Set<String> rackAwareAssignmentTags = new HashSet<>(configs.rackAwareAssignmentTags);
final Set<String> rackAwareAssignmentTags = new HashSet<>(tagsFunction.apply(configs));

final Map<TaskId, Integer> tasksToRemainingStandbys = computeTasksToRemainingStandbys(
numStandbyReplicas,
Expand Down Expand Up @@ -128,8 +144,8 @@ private static void assignPendingStandbyTasksToLeastLoadedClients(final Map<UUID

@Override
public boolean isAllowedTaskMovement(final ClientState source, final ClientState destination) {
final Map<String, String> sourceClientTags = source.clientTags();
final Map<String, String> destinationClientTags = destination.clientTags();
final Map<String, String> sourceClientTags = clientTagFunction.apply(source.processId(), source);
final Map<String, String> destinationClientTags = clientTagFunction.apply(destination.processId(), destination);

for (final Entry<String, String> sourceClientTagEntry : sourceClientTags.entrySet()) {
if (!sourceClientTagEntry.getValue().equals(destinationClientTags.get(sourceClientTagEntry.getKey()))) {
Expand All @@ -141,31 +157,31 @@ public boolean isAllowedTaskMovement(final ClientState source, final ClientState
}

// Visible for testing
static void fillClientsTagStatistics(final Map<UUID, ClientState> clientStates,
final Map<TagEntry, Set<UUID>> tagEntryToClients,
final Map<String, Set<String>> tagKeyToValues) {
void fillClientsTagStatistics(final Map<UUID, ClientState> clientStates,
final Map<TagEntry, Set<UUID>> tagEntryToClients,
final Map<String, Set<String>> tagKeyToValues) {
for (final Entry<UUID, ClientState> clientStateEntry : clientStates.entrySet()) {
final UUID clientId = clientStateEntry.getKey();
final ClientState clientState = clientStateEntry.getValue();

clientState.clientTags().forEach((tagKey, tagValue) -> {
clientTagFunction.apply(clientId, clientState).forEach((tagKey, tagValue) -> {
tagKeyToValues.computeIfAbsent(tagKey, ignored -> new HashSet<>()).add(tagValue);
tagEntryToClients.computeIfAbsent(new TagEntry(tagKey, tagValue), ignored -> new HashSet<>()).add(clientId);
});
}
}

// Visible for testing
static void assignStandbyTasksToClientsWithDifferentTags(final int numberOfStandbyClients,
final ConstrainedPrioritySet standbyTaskClientsByTaskLoad,
final TaskId activeTaskId,
final UUID activeTaskClient,
final Set<String> rackAwareAssignmentTags,
final Map<UUID, ClientState> clientStates,
final Map<TaskId, Integer> tasksToRemainingStandbys,
final Map<String, Set<String>> tagKeyToValues,
final Map<TagEntry, Set<UUID>> tagEntryToClients,
final Map<TaskId, UUID> pendingStandbyTasksToClientId) {
void assignStandbyTasksToClientsWithDifferentTags(final int numberOfStandbyClients,
final ConstrainedPrioritySet standbyTaskClientsByTaskLoad,
final TaskId activeTaskId,
final UUID activeTaskClient,
final Set<String> rackAwareAssignmentTags,
final Map<UUID, ClientState> clientStates,
final Map<TaskId, Integer> tasksToRemainingStandbys,
final Map<String, Set<String>> tagKeyToValues,
final Map<TagEntry, Set<UUID>> tagEntryToClients,
final Map<TaskId, UUID> pendingStandbyTasksToClientId) {
standbyTaskClientsByTaskLoad.offerAll(clientStates.keySet());

// We set countOfUsedClients as 1 because client where active task is located has to be considered as used.
Expand Down Expand Up @@ -199,9 +215,10 @@ static void assignStandbyTasksToClientsWithDifferentTags(final int numberOfStand
numRemainingStandbys--;

log.debug("Assigning {} out of {} standby tasks for an active task [{}] with client tags {}. " +
"Standby task client tags are {}.",
numberOfStandbyClients - numRemainingStandbys, numberOfStandbyClients, activeTaskId,
clientStates.get(activeTaskClient).clientTags(), clientStateOnUsedTagDimensions.clientTags());
"Standby task client tags are {}.",
numberOfStandbyClients - numRemainingStandbys, numberOfStandbyClients, activeTaskId,
clientTagFunction.apply(activeTaskClient, clientStates.get(activeTaskClient)),
clientTagFunction.apply(clientStateOnUsedTagDimensions.processId(), clientStateOnUsedTagDimensions));

clientStateOnUsedTagDimensions.assignStandby(activeTaskId);
lastUsedClient = clientOnUnusedTagDimensions;
Expand All @@ -218,7 +235,7 @@ static void assignStandbyTasksToClientsWithDifferentTags(final int numberOfStand
"Standby task assignment will fall back to assigning standby tasks to the least loaded clients.",
numRemainingStandbys, numberOfStandbyClients,
activeTaskId, rackAwareAssignmentTags,
clientStates.get(activeTaskClient).clientTags());
clientTagFunction.apply(activeTaskClient, clientStates.get(activeTaskClient)));

} else {
tasksToRemainingStandbys.remove(activeTaskId);
Expand All @@ -230,14 +247,14 @@ private static boolean isClientUsedOnAnyOfTheTagEntries(final UUID client,
return tagEntryToUsedClients.values().stream().anyMatch(usedClients -> usedClients.contains(client));
}

private static void updateClientsOnAlreadyUsedTagEntries(final UUID usedClient,
private void updateClientsOnAlreadyUsedTagEntries(final UUID usedClient,
final int countOfUsedClients,
final Set<String> rackAwareAssignmentTags,
final Map<UUID, ClientState> clientStates,
final Map<TagEntry, Set<UUID>> tagEntryToClients,
final Map<String, Set<String>> tagKeyToValues,
final Map<TagEntry, Set<UUID>> tagEntryToUsedClients) {
final Map<String, String> usedClientTags = clientStates.get(usedClient).clientTags();
final Map<String, String> usedClientTags = clientTagFunction.apply(usedClient, clientStates.get(usedClient));

for (final Entry<String, String> usedClientTagEntry : usedClientTags.entrySet()) {
final String tagKey = usedClientTagEntry.getKey();
Expand Down
Expand Up @@ -134,7 +134,7 @@ private void assignStandbyReplicaTasks(final TreeMap<UUID, ClientState> clientSt
return;
}

final StandbyTaskAssignor standbyTaskAssignor = StandbyTaskAssignorFactory.create(configs);
final StandbyTaskAssignor standbyTaskAssignor = StandbyTaskAssignorFactory.create(configs, null);

standbyTaskAssignor.assign(clientStates, allTaskIds, statefulTasks, configs);

Expand Down

0 comments on commit 353c43d

Please sign in to comment.