diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index b4a205fb66fb..00851d6ca248 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -1260,10 +1260,7 @@ class KafkaController(val config: KafkaConfig, // check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions // that need to be on this broker if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) { - // do this check only if the broker is live and there are no partitions being reassigned currently - // and preferred replica election is not in progress val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp => - controllerContext.partitionsBeingReassigned.isEmpty && !topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) && controllerContext.allTopics.contains(tp.topic) && canPreferredReplicaBeLeader(tp) diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala index 532ff1a946e9..a195be37dbc9 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala @@ -490,6 +490,87 @@ class ControllerIntegrationTest extends QuorumTestHarness { "failed to get expected partition state upon broker startup") } + @Test + def testAutoPreferredReplicaLeaderElectionWithOtherReassigningPartitions(): Unit = { + servers = makeServers(3, autoLeaderRebalanceEnable = true) + val controllerId = TestUtils.waitUntilControllerElected(zkClient) + val leaderBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head + val otherBrokerId = servers.map(_.config.brokerId).filter(e => e != controllerId && e != leaderBrokerId).head + + // Partition tp: [leaderBrokerId, controllerId] + // Partition reassigningTp: [controllerId] + val tp = new TopicPartition("t", 0) + val assignment = Map(tp.partition -> Seq(leaderBrokerId, controllerId)) + TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers) + val reassigningTp = new TopicPartition("reassigning", 0) + val reassigningTpAssignment = Map(reassigningTp.partition -> Seq(controllerId)) + TestUtils.createTopic(zkClient, reassigningTp.topic, partitionReplicaAssignment = reassigningTpAssignment, servers = servers) + + // Shutdown broker leaderBrokerId so that broker controllerId will be elected as leader for partition tp + servers(leaderBrokerId).shutdown() + servers(leaderBrokerId).awaitShutdown() + waitForPartitionState(tp, firstControllerEpoch, controllerId, LeaderAndIsr.InitialLeaderEpoch + 1, + "failed to get expected partition state upon broker shutdown") + + // Shutdown broker otherBrokerId and reassign partition reassigningTp from [controllerId] to [otherBrokerId] + // to create a stuck reassignment. + servers(otherBrokerId).shutdown() + servers(otherBrokerId).awaitShutdown() + val reassignment = Map(reassigningTp -> ReplicaAssignment(Seq(otherBrokerId), List(), List())) + zkClient.createPartitionReassignment(reassignment.map { case (k, v) => k -> v.replicas }) + waitForPartitionState(reassigningTp, firstControllerEpoch, controllerId, LeaderAndIsr.InitialLeaderEpoch + 1, + "failed to get expected partition state during partition reassignment with offline replica") + + // Start broker leaderBrokerId and make sure it is elected as leader (preferred) of partition tp automatically + // even though there is some other ongoing reassignment. + servers(leaderBrokerId).startup() + waitForPartitionState(tp, firstControllerEpoch, leaderBrokerId, LeaderAndIsr.InitialLeaderEpoch + 2, + "failed to get expected partition state upon leader broker startup") + + // Start broker otherBrokerId and make sure the reassignment which was stuck can be fulfilled. + servers(otherBrokerId).startup() + waitForPartitionState(reassigningTp, firstControllerEpoch, otherBrokerId, LeaderAndIsr.InitialLeaderEpoch + 4, + "failed to get expected partition state upon other broker startup") + TestUtils.waitUntilTrue(() => zkClient.getFullReplicaAssignmentForTopics(Set(reassigningTp.topic)) == reassignment, + "failed to get updated partition assignment on topic znode after partition reassignment") + TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress, + "failed to remove reassign partitions path after completion") + } + + @Test + def testAutoPreferredReplicaLeaderElectionWithSamePartitionBeingReassigned(): Unit = { + servers = makeServers(3, autoLeaderRebalanceEnable = true) + val controllerId = TestUtils.waitUntilControllerElected(zkClient) + val leaderBrokerId = servers.map(_.config.brokerId).filter(_ != controllerId).head + val otherBrokerId = servers.map(_.config.brokerId).filter(e => e != controllerId && e != leaderBrokerId).head + + // Partition tp: [controllerId, leaderBrokerId] + val tp = new TopicPartition("t", 0) + val assignment = Map(tp.partition -> Seq(controllerId, leaderBrokerId)) + TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers) + + // Shutdown broker otherBrokerId and reassign partition tp from [controllerId, leaderBrokerId] to [leaderBrokerId, controllerId, otherBrokerId] + // to create a stuck reassignment. + servers(otherBrokerId).shutdown() + servers(otherBrokerId).awaitShutdown() + val reassignment = Map(tp -> ReplicaAssignment(Seq(leaderBrokerId, controllerId, otherBrokerId), List(), List())) + zkClient.createPartitionReassignment(reassignment.map { case (k, v) => k -> v.replicas }) + + //Make sure broker leaderBrokerId is elected as leader (preferred) of partition tp automatically + // even though the reassignment is still ongoing. + waitForPartitionState(tp, firstControllerEpoch, leaderBrokerId, LeaderAndIsr.InitialLeaderEpoch + 2, + "failed to get expected partition state after auto preferred replica leader election") + + // Start broker otherBrokerId and make sure the reassignment which was stuck can be fulfilled. + servers(otherBrokerId).startup() + waitForPartitionState(tp, firstControllerEpoch, leaderBrokerId, LeaderAndIsr.InitialLeaderEpoch + 3, + "failed to get expected partition state upon broker startup") + TestUtils.waitUntilTrue(() => zkClient.getFullReplicaAssignmentForTopics(Set(tp.topic)) == reassignment, + "failed to get updated partition assignment on topic znode after partition reassignment") + TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress, + "failed to remove reassign partitions path after completion") + } + @Test def testLeaderAndIsrWhenEntireIsrOfflineAndUncleanLeaderElectionDisabled(): Unit = { servers = makeServers(2)