diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java index 22496562662e..b494dc307c9d 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java @@ -35,10 +35,24 @@ import static java.lang.Math.min; /** - * Assigns partitions to members of a consumer group ensuring a balanced distribution with - * considerations for sticky assignments and rack-awareness. - * The order of priority of properties during the assignment will be: - * balance > rack matching (when applicable) > stickiness. + * The optimized uniform assignment builder is used to generate the target assignment for a consumer group with + * all its members subscribed to the same set of topics. + * It is optimized since the assignment can be done in fewer, less complicated steps compared to when + * the subscriptions are different across the members. + * + * Assignments are done according to the following principles: + * + * + *
  • Balance: Ensure partitions are distributed equally among all members. + * The difference in assignments sizes between any two members + * should not exceed one partition.
  • + *
  • Rack Matching: When feasible, aim to assign partitions to members + * located on the same rack thus avoiding cross-zone traffic.
  • + *
  • Stickiness: Minimize partition movements among members by retaining + * as much of the existing assignment as possible.
  • + * + * The assignment builder prioritizes the properties in the following order: + * Balance > Rack Matching > Stickiness. */ public class OptimizedUniformAssignmentBuilder extends UniformAssignor.AbstractAssignmentBuilder { private static final Logger LOG = LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class); @@ -55,17 +69,19 @@ public class OptimizedUniformAssignmentBuilder extends UniformAssignor.AbstractA */ private final Set subscriptionIds; /** - * Rack information. + * Rack information and helper methods. */ private final RackInfo rackInfo; /** - * The number of members to receive an extra partition beyond the minimum quota, - * to account for the distribution of the remaining partitions. + * The number of members to receive an extra partition beyond the minimum quota. + * Minimum Quota = Total Partitions / Total Members + * Example: If there are 11 partitions to be distributed among 3 members, + * each member gets 3 (11 / 3) [minQuota] partitions and 2 (11 % 3) members get an extra partition. */ private int remainingMembersToGetAnExtraPartition; /** * Members mapped to the remaining number of partitions needed to meet the minimum quota, - * including members eligible for an extra partition. + * including members that are eligible to receive an extra partition. */ private final Map potentiallyUnfilledMembers; /** @@ -75,16 +91,16 @@ public class OptimizedUniformAssignmentBuilder extends UniformAssignor.AbstractA private Map unfilledMembers; /** * The partitions that still need to be assigned. + * Initially this contains all the subscribed topics' partitions. */ private List unassignedPartitions; /** - * The new assignment that will be returned. + * The target assignment. */ - private final Map newAssignment; + private final Map targetAssignment; /** - * Tracks the current owner of each partition. - * Current refers to the existing assignment. - * Only used when the rack awareness strategy is used. + * Tracks the existing owner of each partition. + * Only populated when the rack awareness strategy is used. */ private final Map currentPartitionOwners; @@ -95,7 +111,7 @@ public class OptimizedUniformAssignmentBuilder extends UniformAssignor.AbstractA this.rackInfo = new RackInfo(assignmentSpec, subscribedTopicDescriber, subscriptionIds); this.potentiallyUnfilledMembers = new HashMap<>(); this.unfilledMembers = new HashMap<>(); - this.newAssignment = new HashMap<>(); + this.targetAssignment = new HashMap<>(); // Without rack-aware strategy, tracking current owners of unassigned partitions is unnecessary // as all sticky partitions are retained until a member meets its quota. this.currentPartitionOwners = rackInfo.useRackStrategy ? new HashMap<>() : Collections.emptyMap(); @@ -105,10 +121,11 @@ public class OptimizedUniformAssignmentBuilder extends UniformAssignor.AbstractA * Here's the step-by-step breakdown of the assignment process: * *
  • Compute the quotas of partitions for each member based on the total partitions and member count.
  • + *
  • Initialize unassigned partitions to all the topic partitions and + * remove partitions from the list as and when they are assigned.
  • *
  • For existing assignments, retain partitions based on the determined quota and member's rack compatibility.
  • *
  • If a partition's rack mismatches with its owner, track it for future use.
  • *
  • Identify members that haven't fulfilled their partition quota or are eligible to receive extra partitions.
  • - *
  • Derive the unassigned partitions by taking the difference between total partitions and the sticky assignments.
  • *
  • Depending on members needing extra partitions, select members from the potentially unfilled list * and add them to the unfilled list.
  • *
  • Proceed with a round-robin assignment adhering to rack awareness. @@ -143,11 +160,14 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers; assignmentSpec.members().keySet().forEach(memberId -> - newAssignment.put(memberId, new MemberAssignment(new HashMap<>()) + targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()) )); - Set allAssignedStickyPartitions = computeAssignedStickyPartitions(minQuota); - unassignedPartitions = computeUnassignedPartitions(allAssignedStickyPartitions); + // sorted list of all partitions. + unassignedPartitions = allTopicIdPartitions(subscriptionIds, subscribedTopicDescriber); + + assignStickyPartitions(minQuota); + unfilledMembers = computeUnfilledMembers(); if (!unassignedPartitionsCountEqualsRemainingAssignmentsCount()) { @@ -157,57 +177,60 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { if (rackInfo.useRackStrategy) rackAwareRoundRobinAssignment(); unassignedPartitionsRoundRobinAssignment(); - return new GroupAssignment(newAssignment); + return new GroupAssignment(targetAssignment); } /** - * Retrieves a set of partitions that were currently assigned to members and will be retained in the new assignment, - * by ensuring that the partitions are still relevant based on current topic metadata and subscriptions. - * If rack awareness is enabled, it ensures that a partition's rack matches the member's rack. + * Retains a set of partitions from the existing assignment and includes them in the target assignment. + * Only relevant partitions that exist in the current topic metadata and subscriptions are considered. + * In addition, if rack awareness is enabled, it is ensured that a partition's rack matches the member's rack. * - *

    For each member, it: - *

      - *
    • Finds the valid current assignment considering topic subscriptions and metadata.
    • - *
    • If current assignments exist, retains up to the minimum quota of assignments.
    • - *
    • If there are members that should get an extra partition, - * assigns the next partition after the retained ones.
    • - *
    • For members with assignments not exceeding the minimum quota, - * it identifies them as potentially unfilled members and tracks the remaining quota.
    • - *
    - * - * @return A set containing all the sticky partitions that have been retained in the new assignment. + *

    For each member: + *

      + *
    1. Find the valid current assignment considering topic subscriptions, metadata and rack information.
    2. + *
    3. If the current assignments exist, retain partitions up to the minimum quota.
    4. + *
    5. If the current assignment size is greater than the minimum quota and + * there are members that could get an extra partition, assign the next partition as well.
    6. + *
    7. Finally, if the member's current assignment size is less than the minimum quota, + * add them to the potentially unfilled members map and track the number of remaining + * partitions required to meet the quota.
    8. + *
    + *

    */ - private Set computeAssignedStickyPartitions(int minQuota) { - Set allAssignedStickyPartitions = new HashSet<>(); - + private void assignStickyPartitions(int minQuota) { assignmentSpec.members().forEach((memberId, assignmentMemberSpec) -> { - // Remove all the topics that aren't in the subscriptions or the topic metadata anymore. - // If rack awareness is enabled, only add partitions if the members rack matches the partitions rack. List validCurrentAssignment = validCurrentAssignment( memberId, assignmentMemberSpec.assignedPartitions() ); int currentAssignmentSize = validCurrentAssignment.size(); + // Number of partitions required to meet the minimum quota. int remaining = minQuota - currentAssignmentSize; if (currentAssignmentSize > 0) { int retainedPartitionsCount = min(currentAssignmentSize, minQuota); IntStream.range(0, retainedPartitionsCount).forEach(i -> { - newAssignment.get(memberId) - .targetPartitions() - .computeIfAbsent(validCurrentAssignment.get(i).topicId(), __ -> new HashSet<>()) - .add(validCurrentAssignment.get(i).partition()); - allAssignedStickyPartitions.add(validCurrentAssignment.get(i)); + TopicIdPartition topicIdPartition = validCurrentAssignment.get(i); + addPartitionToAssignment( + topicIdPartition.partition(), + topicIdPartition.topicId(), + memberId, + targetAssignment + ); + unassignedPartitions.remove(topicIdPartition); }); // The extra partition is located at the last index from the previous step. if (remaining < 0 && remainingMembersToGetAnExtraPartition > 0) { - newAssignment.get(memberId) - .targetPartitions() - .computeIfAbsent(validCurrentAssignment.get(retainedPartitionsCount).topicId(), __ -> new HashSet<>()) - .add(validCurrentAssignment.get(retainedPartitionsCount).partition()); - allAssignedStickyPartitions.add(validCurrentAssignment.get(retainedPartitionsCount)); + TopicIdPartition topicIdPartition = validCurrentAssignment.get(retainedPartitionsCount); + addPartitionToAssignment( + topicIdPartition.partition(), + topicIdPartition.topicId(), + memberId, + targetAssignment + ); + unassignedPartitions.remove(topicIdPartition); remainingMembersToGetAnExtraPartition--; } } @@ -217,30 +240,28 @@ private Set computeAssignedStickyPartitions(int minQuota) { } }); - - return allAssignedStickyPartitions; } /** * Filters the current assignment of partitions for a given member. * - * If a partition is assigned to a member not subscribed to its topic or - * if the rack-aware strategy is to be used but there is a mismatch, - * the partition is excluded from the valid assignment and stored for future consideration. + * Any partition that still belongs to the member's subscribed topics list is considered valid. + * If rack aware strategy can be used: Only partitions with matching rack are valid and non-matching partitions are + * tracked with their current owner for future use. * * @param memberId The Id of the member whose assignment is being validated. - * @param assignedPartitions The partitions currently assigned to the member. + * @param currentAssignment The partitions currently assigned to the member. * * @return List of valid partitions after applying the filters. */ private List validCurrentAssignment( String memberId, - Map> assignedPartitions + Map> currentAssignment ) { List validCurrentAssignmentList = new ArrayList<>(); - assignedPartitions.forEach((topicId, currentAssignment) -> { + currentAssignment.forEach((topicId, partitions) -> { if (subscriptionIds.contains(topicId)) { - currentAssignment.forEach(partition -> { + partitions.forEach(partition -> { TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, partition); if (rackInfo.useRackStrategy && rackInfo.racksMismatch(memberId, topicIdPartition)) { currentPartitionOwners.put(topicIdPartition, memberId); @@ -257,14 +278,13 @@ private List validCurrentAssignment( } /** - * This method iterates over the unassigned partitions and attempts to allocate them - * to members while considering their rack affiliations. + * Allocates the unassigned partitions to unfilled members present in the same rack in a round-robin fashion. */ private void rackAwareRoundRobinAssignment() { Queue roundRobinMembers = new LinkedList<>(unfilledMembers.keySet()); // Sort partitions in ascending order by number of potential members with matching racks. - // Partitions with no potential members aren't included in this list. + // Partitions with no potential members in the same rack aren't included in this list. List sortedPartitions = rackInfo.sortPartitionsByRackMembers(unassignedPartitions); sortedPartitions.forEach(partition -> { @@ -279,7 +299,7 @@ private void rackAwareRoundRobinAssignment() { unassignedPartitions.remove(partition); } - // Only re-add to the end of the queue if it's still in the unfilledMembers map + // Only re-add the member to the end of the queue if it's still available for assignment. if (unfilledMembers.containsKey(memberId)) { roundRobinMembers.add(memberId); } @@ -288,12 +308,13 @@ private void rackAwareRoundRobinAssignment() { } /** - * Allocates the unassigned partitions to available members. + * Allocates the unassigned partitions to unfilled members in a round-robin fashion. * * If the rack-aware strategy is enabled, partitions are attempted to be assigned back to their current owners first. + * This is because pure stickiness without rack matching is not considered initially. * - * If a partition couldn't be assigned to its current owner due to quotas or - * if the rack-aware strategy is not enabled, the partitions are allocated to members in a round-robin fashion. + * If a partition couldn't be assigned to its current owner due to the quotas OR + * if the rack-aware strategy is not enabled, the partitions are allocated to the unfilled members. */ private void unassignedPartitionsRoundRobinAssignment() { Queue roundRobinMembers = new LinkedList<>(unfilledMembers.keySet()); @@ -327,22 +348,23 @@ private void unassignedPartitionsRoundRobinAssignment() { } /** - * Assigns the specified partition to the given member. + * Assigns the specified partition to the given member and updates the unfilled members map. * *

    - * If the member has met their allocation quota, the member is removed from the - * tracking map of members with their remaining allocations. + * If the member has met their allocation quota, the member is removed from the unfilled members map. * Otherwise, the count of remaining partitions that can be assigned to the member is updated. *

    * - * @param memberId The Id of the member to which the partition will be assigned. - * @param partition The partition to be assigned. + * @param memberId The Id of the member to which the partition will be assigned. + * @param topicIdPartition The topicIdPartition to be assigned. */ - private void assignPartitionToMember(String memberId, TopicIdPartition partition) { - newAssignment.get(memberId) - .targetPartitions() - .computeIfAbsent(partition.topicId(), __ -> new HashSet<>()) - .add(partition.partition()); + private void assignPartitionToMember(String memberId, TopicIdPartition topicIdPartition) { + addPartitionToAssignment( + topicIdPartition.partition(), + topicIdPartition.topicId(), + memberId, + targetAssignment + ); int remainingPartitionCount = unfilledMembers.get(memberId) - 1; if (remainingPartitionCount == 0) { @@ -355,7 +377,14 @@ private void assignPartitionToMember(String memberId, TopicIdPartition partition /** * Determines which members can still be assigned partitions to meet the full quota. * - * @return A map of member IDs and their capacity for additional partitions. + * The map contains: + *
      + *
    1. Members that have met the minimum quota but will receive an extra partition.
    2. + *
    3. Members that have not yet met the minimum quota. If there are still members that could receive + * an extra partition, the remaining value is updated to include this.
    4. + *
    + * + * @return A map of member Ids and the required partitions to meet the quota. */ private Map computeUnfilledMembers() { Map unfilledMembers = new HashMap<>(); @@ -373,35 +402,9 @@ private Map computeUnfilledMembers() { return unfilledMembers; } - /** - * This method compares the full list of partitions against the set of already - * assigned sticky partitions to identify those that still need to be allocated. - * - * @param allAssignedStickyPartitions Set of partitions that have already been assigned. - * @return List of unassigned partitions. - */ - private List computeUnassignedPartitions(Set allAssignedStickyPartitions) { - List unassignedPartitions = new ArrayList<>(); - List sortedAllTopics = new ArrayList<>(subscriptionIds); - Collections.sort(sortedAllTopics); - - if (allAssignedStickyPartitions.isEmpty()) { - return allTopicIdPartitions(sortedAllTopics, subscribedTopicDescriber); - } - - sortedAllTopics.forEach(topic -> - IntStream.range(0, subscribedTopicDescriber.numPartitions(topic)) - .mapToObj(i -> new TopicIdPartition(topic, i)) - .filter(partition -> !allAssignedStickyPartitions.contains(partition)) - .forEach(unassignedPartitions::add) - ); - - return unassignedPartitions; - } - /** * This method is a correctness check to validate that the number of unassigned partitions - * is equal to the sum of all the remaining partitions count + * is equal to the sum of all the remaining partitions counts. */ private boolean unassignedPartitionsCountEqualsRemainingAssignmentsCount() { int totalRemaining = unfilledMembers.values().stream().reduce(0, Integer::sum); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java index b8c86b08fe6d..e7ca94322b52 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java @@ -34,7 +34,7 @@ import java.util.stream.IntStream; /** - * The Uniform Assignor distributes Kafka topic partitions among group members for balanced assignment. + * The Uniform Assignor distributes topic partitions among group members for a balanced assignment. * The assignor employs two different strategies based on the nature of topic * subscriptions across the group members: *
      @@ -64,11 +64,11 @@ public String name() { /** * Perform the group assignment given the current members and - * topic metadata. + * topics metadata. * - * @param assignmentSpec The member assignment spec. + * @param assignmentSpec The assignment specification that included member metadata. * @param subscribedTopicDescriber The topic and cluster metadata describer {@link SubscribedTopicDescriber}. - * @return The new assignment for the group. + * @return The new target assignment for the group. */ @Override public GroupAssignment assign( @@ -85,7 +85,7 @@ public GroupAssignment assign( + "optimized assignment algorithm"); assignmentBuilder = new OptimizedUniformAssignmentBuilder(assignmentSpec, subscribedTopicDescriber); } else { - assignmentBuilder = new GeneralUniformAssignmentBuilder(assignmentSpec, subscribedTopicDescriber); + assignmentBuilder = new GeneralUniformAssignmentBuilder(); LOG.debug("Detected that all members are subscribed to a different set of topics, invoking the " + "general assignment algorithm"); } @@ -94,10 +94,10 @@ public GroupAssignment assign( } /** - * Determines if all members are subscribed to the same list of topic IDs. + * Determines if all members are subscribed to the same list of topic Ids. * - * @param members A map of member identifiers to their respective {@code AssignmentMemberSpec}. - * @return true if all members have the same subscription list of topic IDs, + * @param members Members mapped to their respective {@code AssignmentMemberSpec}. + * @return true if all members have the same subscription list of topic Ids, * false otherwise. */ private boolean allSubscriptionsEqual(Map members) { @@ -111,10 +111,9 @@ private boolean allSubscriptionsEqual(Map members) } /** - * The assignment builder is used to construct the final assignment in a series of steps that - * are determined by the type of subscriptions. + * The assignment builder is used to construct the target assignment. * - * There are common methods present that are used by any type of assignment strategy. + * This class contains common utility methods and a class for obtaining and storing rack information. */ protected static abstract class AbstractAssignmentBuilder { protected abstract GroupAssignment buildAssignment(); @@ -140,20 +139,36 @@ protected static boolean useRackAwareAssignment( } } + /** + * Adds the topic's partition to the member's target assignment. + */ + protected static void addPartitionToAssignment( + int partition, + Uuid topicId, + String memberId, + Map targetAssignment + ) { + targetAssignment.get(memberId) + .targetPartitions() + .computeIfAbsent(topicId, __ -> new HashSet<>()) + .add(partition); + } + /** * Constructs a list of {@code TopicIdPartition} for each topic Id based on its partition count. * - * @param allTopicIds The list of subscribed topic Ids. + * @param allTopicIds The subscribed topic Ids. * @param subscribedTopicDescriber Utility to fetch the partition count for a given topic. * - * @return List of generated {@code TopicIdPartition} for all provided topic Ids. + * @return List of sorted {@code TopicIdPartition} for all provided topic Ids. */ protected static List allTopicIdPartitions( Collection allTopicIds, SubscribedTopicDescriber subscribedTopicDescriber ) { List allTopicIdPartitions = new ArrayList<>(); - allTopicIds.forEach(topic -> + // Sorted so that partitions from each topic can be distributed amongst its subscribers equally. + allTopicIds.stream().sorted().forEach(topic -> IntStream.range(0, subscribedTopicDescriber.numPartitions(topic)) .forEach(i -> allTopicIdPartitions.add(new TopicIdPartition(topic, i)) ) @@ -178,7 +193,7 @@ protected static class RackInfo { /** * Number of members with the same rack as the partition. */ - private final Map numMembersWithSameRackByPartition; + private final Map numMembersWithSameRackAsPartition; /** * Indicates if a rack aware assignment can be done. * True if racks are defined for both members and partitions and there is an intersection between the sets. @@ -186,7 +201,7 @@ protected static class RackInfo { protected final boolean useRackStrategy; /** - * Constructs rack information based on assignment specification and subscribed topics. + * Constructs rack information based on the assignment specification and subscribed topics. * * @param assignmentSpec The current assignment specification. * @param subscribedTopicDescriber Topic and partition metadata of the subscribed topics. @@ -232,24 +247,24 @@ public RackInfo( useRackStrategy = false; } - numMembersWithSameRackByPartition = partitionRacks.entrySet().stream() + numMembersWithSameRackAsPartition = partitionRacks.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().stream() .map(r -> membersByRack.getOrDefault(r, Collections.emptyList()).size()) .reduce(0, Integer::sum))); } /** - * Determines if there's a mismatch between the memberId's rack and the partition's replica racks. + * Determines if there's a mismatch between the member's rack and the partition's replica racks. * - *

      Mismatch conditions (returns {@code true}): + *

      Racks are considered mismatched under the following conditions: (returns {@code true}): *

        - *
      • Member lacks an associated rack.
      • - *
      • Partition lacks associated replica racks.
      • - *
      • Member's rack isn't among the partition's replica racks.
      • + *
      • Member lacks an associated rack.
      • + *
      • Partition lacks associated replica racks.
      • + *
      • Member's rack isn't among the partition's replica racks.
      • *
      * - * @param memberId The memberId identifier. - * @param tp The topic partition in question. + * @param memberId The member Id. + * @param tp The topic partition. * @return {@code true} for a mismatch; {@code false} if member and partition racks exist and align. */ protected boolean racksMismatch(String memberId, TopicIdPartition tp) { @@ -259,22 +274,21 @@ protected boolean racksMismatch(String memberId, TopicIdPartition tp) { } /** - * Sorts the given list of partitions based on the number of members available for each partition - * in a rack-aware manner. + * Sort partitions in ascending order by number of members with matching racks. * * @param partitions The list of partitions to be sorted. * @return A sorted list of partitions with potential members in the same rack. */ protected List sortPartitionsByRackMembers(List partitions) { - if (numMembersWithSameRackByPartition.isEmpty()) + if (numMembersWithSameRackAsPartition.isEmpty()) return partitions; return partitions.stream() .filter(tp -> { - Integer count = numMembersWithSameRackByPartition.get(tp); + Integer count = numMembersWithSameRackAsPartition.get(tp); return count != null && count > 0; }) - .sorted(Comparator.comparing(tp -> numMembersWithSameRackByPartition.getOrDefault(tp, 0))) + .sorted(Comparator.comparing(tp -> numMembersWithSameRackAsPartition.getOrDefault(tp, 0))) .collect(Collectors.toList()); } @@ -286,6 +300,7 @@ public String toString() { ")"; } } + } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java index f888c88ac7a8..75bda57de210 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java @@ -1015,8 +1015,8 @@ public void testReassignmentWhenOneSubscriptionRemovedWithMemberAndPartitionRack * - the assignment is fully balanced (the numbers of topic partitions assigned to members differ by at most one), or * - there is no topic partition that can be moved from one member to another with 2+ fewer topic partitions. * - * @param members members data structure from assignmentSpec - * @param computedGroupAssignment given assignment for balance check + * @param members Members data structure from the assignment Spec. + * @param computedGroupAssignment Assignment computed by the uniform assignor. */ private void checkValidityAndBalance( Map members, @@ -1074,6 +1074,9 @@ private void checkValidityAndBalance( } } + /** + * Verifies that the expected assignment is equal to the computed assignment for every member in the group. + */ private void assertAssignment( Map>> expectedAssignment, GroupAssignment computedGroupAssignment