Skip to content

Commit

Permalink
first draft, assign sticky partitions and sorting topics
Browse files Browse the repository at this point in the history
  • Loading branch information
rreddy-22 committed Sep 7, 2023
1 parent 61ca0a1 commit fee503b
Show file tree
Hide file tree
Showing 3 changed files with 222 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,137 @@

package org.apache.kafka.coordinator.group.assignor;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.common.TopicIdPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class GeneralUniformAssignmentBuilder extends UniformAssignor.AbstractAssignmentBuilder {
private static final Logger LOG = LoggerFactory.getLogger(GeneralUniformAssignmentBuilder.class);
/**
* The assignment specification which includes member metadata.
*/
private final AssignmentSpec assignmentSpec;
/**
* The topic and partition metadata describer.
*/
private final SubscribedTopicDescriber subscribedTopicDescriber;
/**
* The set of all the topic Ids that the consumer group is subscribed to.
*/
private final Set<Uuid> subscriptionIds;
/**
* Rack information.
*/
private final RackInfo rackInfo;
/**
* The partitions that still need to be assigned.
*/
private List<TopicIdPartition> unassignedPartitions;
/**
* Tracks the current owner of each partition.
* Current refers to the existing assignment.
*/
private final Map<TopicIdPartition, String> currentPartitionOwners;
/**
* The new assignment that will be returned.
*/
private final Map<String, MemberAssignment> newAssignment;

public GeneralUniformAssignmentBuilder(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) {
this.assignmentSpec = assignmentSpec;
this.subscribedTopicDescriber = subscribedTopicDescriber;
this.subscriptionIds = assignmentSpec.members().values().stream()
.flatMap(memberSpec -> memberSpec.subscribedTopicIds().stream())
.collect(Collectors.toSet());
this.rackInfo = new RackInfo(assignmentSpec, subscribedTopicDescriber, subscriptionIds);
this.unassignedPartitions = new ArrayList<>();
this.currentPartitionOwners = new HashMap<>();
this.newAssignment = new HashMap<>();
}

@Override
protected GroupAssignment buildAssignment() {
return null;
if (subscriptionIds.isEmpty()) {
LOG.info("The subscription list is empty, returning an empty assignment");
return new GroupAssignment(Collections.emptyMap());
}

assignmentSpec.members().keySet().forEach(memberId ->
newAssignment.put(memberId, new MemberAssignment(new HashMap<>())
));
// When rack awareness is enabled, this only contains sticky partitions with matching rack.
// Otherwise, it contains all sticky partitions.
Set<TopicIdPartition> allAssignedStickyPartitions = computeAssignedStickyPartitions();
return new GroupAssignment(newAssignment);
}

/**
* Topic Ids are sorted in descending order based on the value:
* totalPartitions/number of subscribed members.
* If the above value is the same then topic Ids are sorted in ascending order of number of subscribers.
*/
List<Uuid> sortedTopics() {
Map<Uuid, List<String>> membersPerTopic = new HashMap<>();
assignmentSpec.members().forEach((memberId, memberMetadata) -> {
Collection<Uuid> topics = memberMetadata.subscribedTopicIds();
topics.forEach(topicId ->
membersPerTopic.computeIfAbsent(topicId, k -> new ArrayList<>()).add(memberId)
);
});

Comparator<Uuid> comparator = Comparator.comparingDouble((Uuid topicId) -> {
int totalPartitions = subscribedTopicDescriber.numPartitions(topicId);
int totalSubscribers = membersPerTopic.get(topicId).size();
return (double) totalPartitions / totalSubscribers;
}).reversed().thenComparingInt(topicId -> membersPerTopic.get(topicId).size());

return membersPerTopic.keySet().stream()
.sorted(comparator)
.collect(Collectors.toList());
}

/**
* Gets a set of partitions that are retained from the existing assignment. This includes:
* - Partitions from topics present in both the new subscriptions and the topic metadata.
* - If using a rack-aware strategy, only partitions where members are in the same rack are retained.
*
* @return A set containing all the sticky partitions that have been retained in the new assignment.
*/
private Set<TopicIdPartition> computeAssignedStickyPartitions() {
Set<TopicIdPartition> allAssignedStickyPartitions = new HashSet<>();

assignmentSpec.members().forEach((memberId, assignmentMemberSpec) ->
assignmentMemberSpec.assignedPartitions().forEach((topicId, currentAssignment) -> {
if (subscriptionIds.contains(topicId)) {
currentAssignment.forEach(partition -> {
TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, partition);
if (rackInfo.useRackStrategy && rackInfo.racksMismatch(memberId, topicIdPartition)) {
currentPartitionOwners.put(topicIdPartition, memberId);
} else {
newAssignment.get(memberId)
.targetPartitions()
.computeIfAbsent(topicId, __ -> new HashSet<>())
.add(partition);
allAssignedStickyPartitions.add(topicIdPartition);
}
});
} else {
LOG.debug("The topic " + topicId + " is no longer present in the subscribed topics list");
}
})
);
return allAssignedStickyPartitions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public GroupAssignment assign(
+ "optimized assignment algorithm");
assignmentBuilder = new OptimizedUniformAssignmentBuilder(assignmentSpec, subscribedTopicDescriber);
} else {
assignmentBuilder = new GeneralUniformAssignmentBuilder();
assignmentBuilder = new GeneralUniformAssignmentBuilder(assignmentSpec, subscribedTopicDescriber);
LOG.debug("Detected that all members are subscribed to a different set of topics, invoking the "
+ "general assignment algorithm");
}
Expand Down Expand Up @@ -286,7 +286,6 @@ public String toString() {
")";
}
}

}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;

import static org.apache.kafka.coordinator.group.RecordHelpersTest.mkMapOfPartitionRacks;
import static org.junit.jupiter.api.Assertions.assertEquals;

public class GeneralUniformAssignmentBuilderTest {
private final UniformAssignor assignor = new UniformAssignor();
private final Uuid topic1Uuid = Uuid.fromString("T1-A4s3VTwiI5CTbEp6POw");
private final Uuid topic2Uuid = Uuid.fromString("T2-B4s3VTwiI5YHbPp6YUe");
private final Uuid topic3Uuid = Uuid.fromString("T3-CU8fVTLCz5YMkLoDQsa");
private final String topic1Name = "topic1";
private final String topic2Name = "topic2";
private final String topic3Name = "topic3";
private final String memberA = "A";
private final String memberB = "B";
private final String memberC = "C";

@Test
public void testSortedTopicsThreeMembersThreeTopicsWithMemberAndPartitionRacks() {
Map<Uuid, TopicMetadata> topicMetadata = new HashMap<>();
topicMetadata.put(topic1Uuid, new TopicMetadata(
topic1Uuid,
topic1Name,
6,
mkMapOfPartitionRacks(6)
));
topicMetadata.put(topic2Uuid, new TopicMetadata(
topic2Uuid,
topic2Name,
3,
mkMapOfPartitionRacks(3)
));
topicMetadata.put(topic3Uuid, new TopicMetadata(
topic3Uuid,
topic3Name,
2,
mkMapOfPartitionRacks(2)
));

Map<String, AssignmentMemberSpec> members = new TreeMap<>();
members.put(memberA, new AssignmentMemberSpec(
Optional.empty(),
Optional.of("rack1"),
Collections.singletonList(topic1Uuid),
Collections.emptyMap()
));
members.put(memberB, new AssignmentMemberSpec(
Optional.empty(),
Optional.of("rack2"),
Arrays.asList(topic1Uuid, topic2Uuid),
Collections.emptyMap()
));
members.put(memberC, new AssignmentMemberSpec(
Optional.empty(),
Optional.of("rack3"),
Arrays.asList(topic1Uuid, topic3Uuid),
Collections.emptyMap()
));

AssignmentSpec assignmentSpec = new AssignmentSpec(members);
SubscribedTopicMetadata subscribedTopicMetadata = new SubscribedTopicMetadata(topicMetadata);
GeneralUniformAssignmentBuilder generalAssignmentBuilder =
new GeneralUniformAssignmentBuilder(assignmentSpec, subscribedTopicMetadata);

// T1 = 6/3 || T2 = 3/1 || T3 = 2/1
List<Uuid> sortedTopics = generalAssignmentBuilder.sortedTopics();
assertEquals(Arrays.asList(topic2Uuid, topic3Uuid, topic1Uuid), sortedTopics);
}
}

0 comments on commit fee503b

Please sign in to comment.