diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 103f6cf575d4..e24b7bead949 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -57,6 +57,7 @@ class ControllerContext(val zkUtils: ZkUtils, var partitionLeadershipInfo: mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty val partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = new mutable.HashMap val partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] = new mutable.HashSet + var leaderIneligibleBrokerId: Int = -1 private var liveBrokersUnderlying: Set[Broker] = Set.empty private var liveBrokerIdsUnderlying: Set[Int] = Set.empty @@ -742,6 +743,8 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat controllerContext.partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(controllerContext.allTopics.toSeq) controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch] controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int] + controllerContext.leaderIneligibleBrokerId = config.leaderIneligibleBrokerId + // update the leader and isr cache for all existing partitions from Zookeeper updateLeaderAndIsrCache() // start the channel manager diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala index 5eed3829ff3c..28589f37f4ea 100644 --- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala +++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala @@ -54,6 +54,7 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi controllerContext.partitionReplicaAssignment.get(topicAndPartition) match { case Some(assignedReplicas) => val liveAssignedReplicas = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r)) + val candidates = liveAssignedReplicas.filter( _ != config.leaderIneligibleBrokerId ) val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r)) val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion @@ -70,24 +71,33 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi debug("No broker in ISR is alive for %s. Pick the leader from the alive assigned replicas: %s" .format(topicAndPartition, liveAssignedReplicas.mkString(","))) - liveAssignedReplicas.isEmpty match { + candidates.isEmpty match { case true => throw new NoReplicaOnlineException(("No replica for partition " + "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) + - " Assigned replicas are: [%s]".format(assignedReplicas)) + " Assigned replicas are: [%s]".format(assignedReplicas) + + " Ineligible replicas are: [%s]".format(config.leaderIneligibleBrokerId)) case false => ControllerStats.uncleanLeaderElectionRate.mark() - val newLeader = liveAssignedReplicas.head + val newLeader = candidates.head warn("No broker in ISR is alive for %s. Elect leader %d from live brokers %s. There's potential data loss." .format(topicAndPartition, newLeader, liveAssignedReplicas.mkString(","))) new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1) } case false => - val liveReplicasInIsr = liveAssignedReplicas.filter(r => liveBrokersInIsr.contains(r)) - val newLeader = liveReplicasInIsr.head - debug("Some broker in ISR is alive for %s. Select %d from ISR %s to be the leader." - .format(topicAndPartition, newLeader, liveBrokersInIsr.mkString(","))) - new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1) + val liveReplicasInIsr = candidates.filter(r => liveBrokersInIsr.contains(r)) + + if (liveReplicasInIsr.isEmpty) { + throw new NoReplicaOnlineException(("No replica for partition " + + "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) + + " Assigned replicas are: [%s]".format(assignedReplicas) + + " Ineligible replicas are: [%s]".format(config.leaderIneligibleBrokerId)) + } else { + val newLeader = liveReplicasInIsr.head + debug("Some broker in ISR is alive for %s. Select %d from ISR %s to be the leader." + .format(topicAndPartition, newLeader, liveBrokersInIsr.mkString(","))) + new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1) + } } info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(), topicAndPartition)) (newLeaderAndIsr, liveAssignedReplicas) @@ -114,12 +124,12 @@ class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) ex val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion val aliveReassignedInSyncReplicas = reassignedInSyncReplicas.filter(r => controllerContext.liveBrokerIds.contains(r) && currentLeaderAndIsr.isr.contains(r)) - val newLeaderOpt = aliveReassignedInSyncReplicas.headOption + val newLeaderOpt = aliveReassignedInSyncReplicas.filter(_ != controllerContext.leaderIneligibleBrokerId).headOption newLeaderOpt match { case Some(newLeader) => (new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, currentLeaderAndIsr.isr, currentLeaderIsrZkPathVersion + 1), reassignedInSyncReplicas) case None => - reassignedInSyncReplicas.size match { + reassignedInSyncReplicas.filter(_ != controllerContext.leaderIneligibleBrokerId).size match { case 0 => throw new NoReplicaOnlineException("List of reassigned replicas for partition " + " %s is empty. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr)) @@ -142,7 +152,14 @@ with Logging { def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = { val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) - val preferredReplica = assignedReplicas.head + val candidates = assignedReplicas.filter(_ != controllerContext.leaderIneligibleBrokerId) + if (candidates.isEmpty) { + throw new NoReplicaOnlineException(("No replica for partition " + + "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) + + " Assigned replicas are: [%s]".format(assignedReplicas) + + " Ineligible replicas are: [%s]".format(controllerContext.leaderIneligibleBrokerId)) + } + val preferredReplica = candidates.head // check if preferred replica is the current leader val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader if (currentLeader == preferredReplica) { @@ -184,14 +201,13 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext) val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r)) - val newIsr = currentLeaderAndIsr.isr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId)) - val newLeaderOpt = newIsr.headOption - newLeaderOpt match { + val newIsr = currentLeaderAndIsr.isr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId) && + brokerId != controllerContext.leaderIneligibleBrokerId) + + liveAssignedReplicas.find(newIsr.contains) match { case Some(newLeader) => - debug("Partition %s : current leader = %d, new leader = %d" - .format(topicAndPartition, currentLeader, newLeader)) - (LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1), - liveAssignedReplicas) + debug("Partition %s : current leader = %d, new leader = %d".format(topicAndPartition, currentLeader, newLeader)) + (LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1), liveAssignedReplicas) case None => throw new StateChangeFailedException(("No other replicas in ISR %s for %s besides" + " shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition, controllerContext.shuttingDownBrokerIds.mkString(","))) diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index ec03b84395bd..4e17ac467254 100755 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -285,7 +285,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { case _ => debug("Live assigned replicas for partition %s are: [%s]".format(topicAndPartition, liveAssignedReplicas)) // make the first replica in the list of assigned replicas, the leader - val leader = liveAssignedReplicas.head + val leader = liveAssignedReplicas.filter(brokerId => brokerId != controllerContext.leaderIneligibleBrokerId).head val leaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader, liveAssignedReplicas.toList), controller.epoch) debug("Initializing leader and isr for partition %s to %s".format(topicAndPartition, leaderIsrAndControllerEpoch)) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index f545ed7dcbab..c90a2b2babad 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -51,6 +51,7 @@ object Defaults { val NumIoThreads = 8 val BackgroundThreads = 10 val QueuedMaxRequests = 500 + val LeaderIneligibleBrokerId = -1 /************* Authorizer Configuration ***********/ val AuthorizerClassName = "" @@ -208,6 +209,7 @@ object KafkaConfig { val BackgroundThreadsProp = "background.threads" val QueuedMaxRequestsProp = "queued.max.requests" val RequestTimeoutMsProp = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG + val LeaderIneligibleBrokerIdProp = "leader.ineligible.broker.id" /************* Authorizer Configuration ***********/ val AuthorizerClassNameProp = "authorizer.class.name" /** ********* Socket Server Configuration ***********/ @@ -568,6 +570,7 @@ object KafkaConfig { .define(BackgroundThreadsProp, INT, Defaults.BackgroundThreads, atLeast(1), HIGH, BackgroundThreadsDoc) .define(QueuedMaxRequestsProp, INT, Defaults.QueuedMaxRequests, atLeast(1), HIGH, QueuedMaxRequestsDoc) .define(RequestTimeoutMsProp, INT, Defaults.RequestTimeoutMs, HIGH, RequestTimeoutMsDoc) + .define(LeaderIneligibleBrokerIdProp, INT, Defaults.LeaderIneligibleBrokerId, HIGH, BrokerIdDoc) /************* Authorizer Configuration ***********/ .define(AuthorizerClassNameProp, STRING, Defaults.AuthorizerClassName, LOW, AuthorizerClassNameDoc) @@ -772,6 +775,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra val numIoThreads = getInt(KafkaConfig.NumIoThreadsProp) val messageMaxBytes = getInt(KafkaConfig.MessageMaxBytesProp) val requestTimeoutMs = getInt(KafkaConfig.RequestTimeoutMsProp) + val leaderIneligibleBrokerId = getInt(KafkaConfig.LeaderIneligibleBrokerIdProp) /************* Authorizer Configuration ***********/ val authorizerClassName: String = getString(KafkaConfig.AuthorizerClassNameProp) diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 8f7015298e05..3809a68d8c57 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -448,6 +448,7 @@ class KafkaConfigTest { case KafkaConfig.BackgroundThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") case KafkaConfig.QueuedMaxRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") case KafkaConfig.RequestTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") + case KafkaConfig.LeaderIneligibleBrokerIdProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number") case KafkaConfig.AuthorizerClassNameProp => //ignore string