Skip to content

Commit

Permalink
KAFKA-4513: Support migration of old consumers to new consumers witho…
Browse files Browse the repository at this point in the history
…ut downtime
  • Loading branch information
onurkaraman committed Feb 20, 2017
1 parent 2c91b32 commit 3b54480
Show file tree
Hide file tree
Showing 5 changed files with 442 additions and 89 deletions.
26 changes: 25 additions & 1 deletion core/src/main/scala/kafka/consumer/ConsumerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,31 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(

/** Select a strategy for assigning partitions to consumer streams. Possible values: range, roundrobin */
val partitionAssignmentStrategy = props.getString("partition.assignment.strategy", DefaultPartitionAssignmentStrategy)


/** Specify whether the ZookeeperConsumerConnector should participate in coordination migration. */
val coordinationMigrationEnabled = props.getBoolean("coordination.migration.enabled", false)

/** The poll interval for the embedded KafkaConsumer participating in coordination migration. */
val coordinationMigrationPollIntervalMs = props.getInt("coordination.migration.poll.interval.ms", 1000)

/** Specify consumer configs for the embedded KafkaConsumer by prepending "coordination.migration." to config names.
* The following consumer config must be provided:
* coordination.migration.bootstrap.servers */
val coordinationMigrationEmbeddedConsumerConfigs = {
import collection.JavaConverters._
val consumerConfigs = org.apache.kafka.clients.consumer.ConsumerConfig.configNames().asScala
props.props.asScala.flatMap { case (key, value) =>
val prefix = "coordination.migration."
val containsPrefix = key.indexOf(prefix) > -1
if (containsPrefix) {
val suffix = key.substring(key.indexOf(prefix) + prefix.size)
if (consumerConfigs.contains(suffix))
Option(suffix -> value)
else None
} else None
}
}

validate(this)
}

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package kafka.consumer

import kafka.server.{AbstractFetcherManager, AbstractFetcherThread, BrokerAndInitialOffset}
import kafka.cluster.{BrokerEndPoint, Cluster}
import kafka.cluster.BrokerEndPoint
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Time

Expand All @@ -44,7 +44,6 @@ class ConsumerFetcherManager(private val consumerIdString: String,
extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(Time.SYSTEM.milliseconds),
config.clientId, config.numConsumerFetchers) {
private var partitionMap: immutable.Map[TopicPartition, PartitionTopicInfo] = null
private var cluster: Cluster = null
private val noLeaderPartitionSet = new mutable.HashSet[TopicPartition]
private val lock = new ReentrantLock
private val cond = lock.newCondition()
Expand Down Expand Up @@ -119,13 +118,12 @@ class ConsumerFetcherManager(private val consumerIdString: String,
config, sourceBroker, partitionMap, this)
}

def startConnections(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster) {
def startConnections(topicInfos: Iterable[PartitionTopicInfo]) {
leaderFinderThread = new LeaderFinderThread(consumerIdString + "-leader-finder-thread")
leaderFinderThread.start()

inLock(lock) {
partitionMap = topicInfos.map(tpi => (new TopicPartition(tpi.topic, tpi.partitionId), tpi)).toMap
this.cluster = cluster
noLeaderPartitionSet ++= topicInfos.map(tpi => new TopicPartition(tpi.topic, tpi.partitionId))
cond.signalAll()
}
Expand Down

0 comments on commit 3b54480

Please sign in to comment.