Skip to content

Commit

Permalink
KAFKA-10149: Allow auto preferred leader election when there are ongo…
Browse files Browse the repository at this point in the history
…ing partition reassignments (apache#12543) (apache#85)

Reviewers: Justine Olshan <jolshan@confluent.io>, Chris Egerton <fearthecellos@gmail.com>

Co-authored-by: Shenglong Zhang <4953397+songnon@users.noreply.github.com>
  • Loading branch information
rutvijmehta-harness and songnon committed Feb 9, 2024
1 parent 683cc75 commit ff97dc8
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 3 deletions.
3 changes: 0 additions & 3 deletions core/src/main/scala/kafka/controller/KafkaController.scala
Expand Up @@ -1260,10 +1260,7 @@ class KafkaController(val config: KafkaConfig,
// check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions
// that need to be on this broker
if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
// do this check only if the broker is live and there are no partitions being reassigned currently
// and preferred replica election is not in progress
val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp =>
controllerContext.partitionsBeingReassigned.isEmpty &&
!topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) &&
controllerContext.allTopics.contains(tp.topic) &&
canPreferredReplicaBeLeader(tp)
Expand Down
Expand Up @@ -490,6 +490,87 @@ class ControllerIntegrationTest extends QuorumTestHarness {
"failed to get expected partition state upon broker startup")
}

@Test
def testAutoPreferredReplicaLeaderElectionWithOtherReassigningPartitions(): Unit = {
servers = makeServers(3, autoLeaderRebalanceEnable = true)
val controllerId = TestUtils.waitUntilControllerElected(zkClient)
val leaderBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
val otherBrokerId = servers.map(_.config.brokerId).filter(e => e != controllerId && e != leaderBrokerId).head

// Partition tp: [leaderBrokerId, controllerId]
// Partition reassigningTp: [controllerId]
val tp = new TopicPartition("t", 0)
val assignment = Map(tp.partition -> Seq(leaderBrokerId, controllerId))
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)
val reassigningTp = new TopicPartition("reassigning", 0)
val reassigningTpAssignment = Map(reassigningTp.partition -> Seq(controllerId))
TestUtils.createTopic(zkClient, reassigningTp.topic, partitionReplicaAssignment = reassigningTpAssignment, servers = servers)

// Shutdown broker leaderBrokerId so that broker controllerId will be elected as leader for partition tp
servers(leaderBrokerId).shutdown()
servers(leaderBrokerId).awaitShutdown()
waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.InitialLeaderEpoch + 1,
"failed to get expected partition state upon broker shutdown")

// Shutdown broker otherBrokerId and reassign partition reassigningTp from [controllerId] to [otherBrokerId]
// to create a stuck reassignment.
servers(otherBrokerId).shutdown()
servers(otherBrokerId).awaitShutdown()
val reassignment = Map(reassigningTp -> ReplicaAssignment(Seq(otherBrokerId), List(), List()))
zkClient.createPartitionReassignment(reassignment.map { case (k, v) => k -> v.replicas })
waitForPartitionState(reassigningTp, firstControllerEpoch, controllerId, LeaderAndIsr.InitialLeaderEpoch + 1,
"failed to get expected partition state during partition reassignment with offline replica")

// Start broker leaderBrokerId and make sure it is elected as leader (preferred) of partition tp automatically
// even though there is some other ongoing reassignment.
servers(leaderBrokerId).startup()
waitForPartitionState(tp, firstControllerEpoch, leaderBrokerId, LeaderAndIsr.InitialLeaderEpoch + 2,
"failed to get expected partition state upon leader broker startup")

// Start broker otherBrokerId and make sure the reassignment which was stuck can be fulfilled.
servers(otherBrokerId).startup()
waitForPartitionState(reassigningTp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.InitialLeaderEpoch + 4,
"failed to get expected partition state upon other broker startup")
TestUtils.waitUntilTrue(() => zkClient.getFullReplicaAssignmentForTopics(Set(reassigningTp.topic)) == reassignment,
"failed to get updated partition assignment on topic znode after partition reassignment")
TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress,
"failed to remove reassign partitions path after completion")
}

@Test
def testAutoPreferredReplicaLeaderElectionWithSamePartitionBeingReassigned(): Unit = {
servers = makeServers(3, autoLeaderRebalanceEnable = true)
val controllerId = TestUtils.waitUntilControllerElected(zkClient)
val leaderBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head
val otherBrokerId = servers.map(_.config.brokerId).filter(e => e != controllerId && e != leaderBrokerId).head

// Partition tp: [controllerId, leaderBrokerId]
val tp = new TopicPartition("t", 0)
val assignment = Map(tp.partition -> Seq(controllerId, leaderBrokerId))
TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers)

// Shutdown broker otherBrokerId and reassign partition tp from [controllerId, leaderBrokerId] to [leaderBrokerId, controllerId, otherBrokerId]
// to create a stuck reassignment.
servers(otherBrokerId).shutdown()
servers(otherBrokerId).awaitShutdown()
val reassignment = Map(tp -> ReplicaAssignment(Seq(leaderBrokerId, controllerId, otherBrokerId), List(), List()))
zkClient.createPartitionReassignment(reassignment.map { case (k, v) => k -> v.replicas })

//Make sure broker leaderBrokerId is elected as leader (preferred) of partition tp automatically
// even though the reassignment is still ongoing.
waitForPartitionState(tp, firstControllerEpoch, leaderBrokerId, LeaderAndIsr.InitialLeaderEpoch + 2,
"failed to get expected partition state after auto preferred replica leader election")

// Start broker otherBrokerId and make sure the reassignment which was stuck can be fulfilled.
servers(otherBrokerId).startup()
waitForPartitionState(tp, firstControllerEpoch, leaderBrokerId, LeaderAndIsr.InitialLeaderEpoch + 3,
"failed to get expected partition state upon broker startup")
TestUtils.waitUntilTrue(() => zkClient.getFullReplicaAssignmentForTopics(Set(tp.topic)) == reassignment,
"failed to get updated partition assignment on topic znode after partition reassignment")
TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress,
"failed to remove reassign partitions path after completion")
}

@Test
def testLeaderAndIsrWhenEntireIsrOfflineAndUncleanLeaderElectionDisabled(): Unit = {
servers = makeServers(2)
Expand Down

0 comments on commit ff97dc8

Please sign in to comment.