Skip to content

Commit

Permalink
Merge pull request apache#2 from jasonaliyetti/ineligible-leader
Browse files Browse the repository at this point in the history
Add ability to prevent broker from taking leadership for any partition
  • Loading branch information
lentztopher committed Oct 7, 2016
2 parents 89bfe41 + 94f554c commit 07e597a
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 19 deletions.
3 changes: 3 additions & 0 deletions core/src/main/scala/kafka/controller/KafkaController.scala
Expand Up @@ -57,6 +57,7 @@ class ControllerContext(val zkUtils: ZkUtils,
var partitionLeadershipInfo: mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty
val partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = new mutable.HashMap
val partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] = new mutable.HashSet
var leaderIneligibleBrokerId: Int = -1

private var liveBrokersUnderlying: Set[Broker] = Set.empty
private var liveBrokerIdsUnderlying: Set[Int] = Set.empty
Expand Down Expand Up @@ -742,6 +743,8 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
controllerContext.partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(controllerContext.allTopics.toSeq)
controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int]
controllerContext.leaderIneligibleBrokerId = config.leaderIneligibleBrokerId

// update the leader and isr cache for all existing partitions from Zookeeper
updateLeaderAndIsrCache()
// start the channel manager
Expand Down
52 changes: 34 additions & 18 deletions core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
Expand Up @@ -54,6 +54,7 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi
controllerContext.partitionReplicaAssignment.get(topicAndPartition) match {
case Some(assignedReplicas) =>
val liveAssignedReplicas = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
val candidates = liveAssignedReplicas.filter( _ != config.leaderIneligibleBrokerId )
val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r))
val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
Expand All @@ -70,24 +71,33 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext, confi

debug("No broker in ISR is alive for %s. Pick the leader from the alive assigned replicas: %s"
.format(topicAndPartition, liveAssignedReplicas.mkString(",")))
liveAssignedReplicas.isEmpty match {
candidates.isEmpty match {
case true =>
throw new NoReplicaOnlineException(("No replica for partition " +
"%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +
" Assigned replicas are: [%s]".format(assignedReplicas))
" Assigned replicas are: [%s]".format(assignedReplicas) +
" Ineligible replicas are: [%s]".format(config.leaderIneligibleBrokerId))
case false =>
ControllerStats.uncleanLeaderElectionRate.mark()
val newLeader = liveAssignedReplicas.head
val newLeader = candidates.head
warn("No broker in ISR is alive for %s. Elect leader %d from live brokers %s. There's potential data loss."
.format(topicAndPartition, newLeader, liveAssignedReplicas.mkString(",")))
new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1)
}
case false =>
val liveReplicasInIsr = liveAssignedReplicas.filter(r => liveBrokersInIsr.contains(r))
val newLeader = liveReplicasInIsr.head
debug("Some broker in ISR is alive for %s. Select %d from ISR %s to be the leader."
.format(topicAndPartition, newLeader, liveBrokersInIsr.mkString(",")))
new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1)
val liveReplicasInIsr = candidates.filter(r => liveBrokersInIsr.contains(r))

if (liveReplicasInIsr.isEmpty) {
throw new NoReplicaOnlineException(("No replica for partition " +
"%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +
" Assigned replicas are: [%s]".format(assignedReplicas) +
" Ineligible replicas are: [%s]".format(config.leaderIneligibleBrokerId))
} else {
val newLeader = liveReplicasInIsr.head
debug("Some broker in ISR is alive for %s. Select %d from ISR %s to be the leader."
.format(topicAndPartition, newLeader, liveBrokersInIsr.mkString(",")))
new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1)
}
}
info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(), topicAndPartition))
(newLeaderAndIsr, liveAssignedReplicas)
Expand All @@ -114,12 +124,12 @@ class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) ex
val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
val aliveReassignedInSyncReplicas = reassignedInSyncReplicas.filter(r => controllerContext.liveBrokerIds.contains(r) &&
currentLeaderAndIsr.isr.contains(r))
val newLeaderOpt = aliveReassignedInSyncReplicas.headOption
val newLeaderOpt = aliveReassignedInSyncReplicas.filter(_ != controllerContext.leaderIneligibleBrokerId).headOption
newLeaderOpt match {
case Some(newLeader) => (new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, currentLeaderAndIsr.isr,
currentLeaderIsrZkPathVersion + 1), reassignedInSyncReplicas)
case None =>
reassignedInSyncReplicas.size match {
reassignedInSyncReplicas.filter(_ != controllerContext.leaderIneligibleBrokerId).size match {
case 0 =>
throw new NoReplicaOnlineException("List of reassigned replicas for partition " +
" %s is empty. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
Expand All @@ -142,7 +152,14 @@ with Logging {

def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
val preferredReplica = assignedReplicas.head
val candidates = assignedReplicas.filter(_ != controllerContext.leaderIneligibleBrokerId)
if (candidates.isEmpty) {
throw new NoReplicaOnlineException(("No replica for partition " +
"%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +
" Assigned replicas are: [%s]".format(assignedReplicas) +
" Ineligible replicas are: [%s]".format(controllerContext.leaderIneligibleBrokerId))
}
val preferredReplica = candidates.head
// check if preferred replica is the current leader
val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
if (currentLeader == preferredReplica) {
Expand Down Expand Up @@ -184,14 +201,13 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r))

val newIsr = currentLeaderAndIsr.isr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId))
val newLeaderOpt = newIsr.headOption
newLeaderOpt match {
val newIsr = currentLeaderAndIsr.isr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId) &&
brokerId != controllerContext.leaderIneligibleBrokerId)

liveAssignedReplicas.find(newIsr.contains) match {
case Some(newLeader) =>
debug("Partition %s : current leader = %d, new leader = %d"
.format(topicAndPartition, currentLeader, newLeader))
(LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1),
liveAssignedReplicas)
debug("Partition %s : current leader = %d, new leader = %d".format(topicAndPartition, currentLeader, newLeader))
(LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1), liveAssignedReplicas)
case None =>
throw new StateChangeFailedException(("No other replicas in ISR %s for %s besides" +
" shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition, controllerContext.shuttingDownBrokerIds.mkString(",")))
Expand Down
Expand Up @@ -285,7 +285,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
case _ =>
debug("Live assigned replicas for partition %s are: [%s]".format(topicAndPartition, liveAssignedReplicas))
// make the first replica in the list of assigned replicas, the leader
val leader = liveAssignedReplicas.head
val leader = liveAssignedReplicas.filter(brokerId => brokerId != controllerContext.leaderIneligibleBrokerId).head
val leaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader, liveAssignedReplicas.toList),
controller.epoch)
debug("Initializing leader and isr for partition %s to %s".format(topicAndPartition, leaderIsrAndControllerEpoch))
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Expand Up @@ -51,6 +51,7 @@ object Defaults {
val NumIoThreads = 8
val BackgroundThreads = 10
val QueuedMaxRequests = 500
val LeaderIneligibleBrokerId = -1

/************* Authorizer Configuration ***********/
val AuthorizerClassName = ""
Expand Down Expand Up @@ -208,6 +209,7 @@ object KafkaConfig {
val BackgroundThreadsProp = "background.threads"
val QueuedMaxRequestsProp = "queued.max.requests"
val RequestTimeoutMsProp = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG
val LeaderIneligibleBrokerIdProp = "leader.ineligible.broker.id"
/************* Authorizer Configuration ***********/
val AuthorizerClassNameProp = "authorizer.class.name"
/** ********* Socket Server Configuration ***********/
Expand Down Expand Up @@ -568,6 +570,7 @@ object KafkaConfig {
.define(BackgroundThreadsProp, INT, Defaults.BackgroundThreads, atLeast(1), HIGH, BackgroundThreadsDoc)
.define(QueuedMaxRequestsProp, INT, Defaults.QueuedMaxRequests, atLeast(1), HIGH, QueuedMaxRequestsDoc)
.define(RequestTimeoutMsProp, INT, Defaults.RequestTimeoutMs, HIGH, RequestTimeoutMsDoc)
.define(LeaderIneligibleBrokerIdProp, INT, Defaults.LeaderIneligibleBrokerId, HIGH, BrokerIdDoc)

/************* Authorizer Configuration ***********/
.define(AuthorizerClassNameProp, STRING, Defaults.AuthorizerClassName, LOW, AuthorizerClassNameDoc)
Expand Down Expand Up @@ -772,6 +775,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
val numIoThreads = getInt(KafkaConfig.NumIoThreadsProp)
val messageMaxBytes = getInt(KafkaConfig.MessageMaxBytesProp)
val requestTimeoutMs = getInt(KafkaConfig.RequestTimeoutMsProp)
val leaderIneligibleBrokerId = getInt(KafkaConfig.LeaderIneligibleBrokerIdProp)

/************* Authorizer Configuration ***********/
val authorizerClassName: String = getString(KafkaConfig.AuthorizerClassNameProp)
Expand Down
Expand Up @@ -448,6 +448,7 @@ class KafkaConfigTest {
case KafkaConfig.BackgroundThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.QueuedMaxRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.RequestTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.LeaderIneligibleBrokerIdProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")

case KafkaConfig.AuthorizerClassNameProp => //ignore string

Expand Down

0 comments on commit 07e597a

Please sign in to comment.