Skip to content

Commit

Permalink
using MultiFetch in the follower; patched by Jun Rao; reviewed by Joe…
Browse files Browse the repository at this point in the history
…l Koshy and Neha Narkhede; KAFKA-339

git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/branches/0.8@1353086 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Jun Rao committed Jun 23, 2012
1 parent 8953fa9 commit 019ef69
Show file tree
Hide file tree
Showing 11 changed files with 359 additions and 130 deletions.
28 changes: 0 additions & 28 deletions core/src/main/scala/kafka/cluster/Replica.scala
Expand Up @@ -18,7 +18,6 @@
package kafka.cluster

import kafka.log.Log
import kafka.server.{KafkaConfig, ReplicaFetcherThread}
import java.lang.IllegalStateException
import kafka.utils.Logging

Expand All @@ -28,7 +27,6 @@ class Replica(val brokerId: Int,
var log: Option[Log] = None,
var leoUpdateTime: Long = -1L) extends Logging {
private var logEndOffset: Long = -1L
private var replicaFetcherThread: ReplicaFetcherThread = null

def logEndOffset(newLeo: Option[Long] = None): Long = {
isLocal match {
Expand Down Expand Up @@ -88,32 +86,6 @@ class Replica(val brokerId: Int,
}
}

def startReplicaFetcherThread(leaderBroker: Broker, config: KafkaConfig) {
val name = "Replica-Fetcher-%d-%s-%d".format(brokerId, topic, partition.partitionId)
replicaFetcherThread = new ReplicaFetcherThread(name, this, leaderBroker, config)
replicaFetcherThread.setDaemon(true)
replicaFetcherThread.start()
}

def stopReplicaFetcherThread() {
if(replicaFetcherThread != null) {
replicaFetcherThread.shutdown()
replicaFetcherThread = null
}
}

def getIfFollowerAndLeader(): (Boolean, Int) = {
replicaFetcherThread != null match {
case true => (true, replicaFetcherThread.getLeader().id)
case false => (false, -1)
}
}

def close() {
if(replicaFetcherThread != null)
replicaFetcherThread.shutdown()
}

override def equals(that: Any): Boolean = {
if(!(that.isInstanceOf[Replica]))
return false
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/common/ErrorMapping.scala
Expand Up @@ -60,6 +60,8 @@ object ErrorMapping {
def maybeThrowException(code: Short) =
if(code != 0)
throw codeToException(code).newInstance()

def exceptionFor(code: Short) : Throwable = codeToException(code).newInstance()
}

class InvalidTopicException(message: String) extends RuntimeException(message) {
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/scala/kafka/log/Log.scala
Expand Up @@ -405,6 +405,19 @@ private[kafka] class Log(val dir: File, val maxSize: Long, val flushInterval: In
ret
}

/**
* Truncate all segments in the log and start a new segment on a new offset
*/
def truncateAndStartWithNewOffset(newOffset: Long) {
lock synchronized {
val deletedSegments = segments.trunc(segments.view.size)
val newFile = new File(dir, Log.nameFromOffset(newOffset))
debug("tuncate and start log '" + name + "' to " + newFile.getName())
segments.append(new LogSegment(newFile, new FileMessageSet(newFile, true), newOffset))
deleteSegments(deletedSegments)
}
}

/* Attemps to delete all provided segments from a log and returns how many it was able to */
def deleteSegments(segments: Seq[LogSegment]): Int = {
var total = 0
Expand Down
85 changes: 85 additions & 0 deletions core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.server

import scala.collection.mutable
import kafka.utils.Logging
import kafka.cluster.Broker

abstract class AbstractFetcherManager(name: String, numReplicaFetchers: Int = 1) extends Logging {
// map of (source brokerid, fetcher Id per source broker) => fetcher
private val fetcherThreadMap = new mutable.HashMap[Tuple2[Int, Int], AbstractFetcherThread]
private val mapLock = new Object
this.logIdent = name + " "

private def getFetcherId(topic: String, partitionId: Int) : Int = {
(topic.hashCode() + 31 * partitionId) % numReplicaFetchers
}

// to be defined in subclass to create a specific fetcher
def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread

def addFetcher(topic: String, partitionId: Int, initialOffset: Long, sourceBroker: Broker) {
mapLock synchronized {
var fetcherThread: AbstractFetcherThread = null
val key = (sourceBroker.id, getFetcherId(topic, partitionId))
fetcherThreadMap.get(key) match {
case Some(f) => fetcherThread = f
case None =>
fetcherThread = createFetcherThread(key._2, sourceBroker)
fetcherThreadMap.put(key, fetcherThread)
fetcherThread.start
}
fetcherThread.addPartition(topic, partitionId, initialOffset)
info("adding fetcher on topic %s, partion %d, initOffset %d to broker %d with fetcherId %d"
.format(topic, partitionId, initialOffset, sourceBroker.id, key._2))
}
}

def removeFetcher(topic: String, partitionId: Int) {
info("%s removing fetcher on topic %s, partition %d".format(name, topic, partitionId))
mapLock synchronized {
for ((key, fetcher) <- fetcherThreadMap) {
fetcher.removePartition(topic, partitionId)
if (fetcher.partitionCount <= 0) {
fetcher.shutdown
fetcherThreadMap.remove(key)
}
}
}
}

def fetcherSourceBroker(topic: String, partitionId: Int): Option[Int] = {
mapLock synchronized {
for ( ((sourceBrokerId, _), fetcher) <- fetcherThreadMap)
if (fetcher.hasPartition(topic, partitionId))
return Some(sourceBrokerId)
}
None
}

def shutdown() = {
info("shutting down")
mapLock synchronized {
for ( (_, fetcher) <- fetcherThreadMap) {
fetcher.shutdown
}
}
info("shutdown completes")
}
}
149 changes: 149 additions & 0 deletions core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -0,0 +1,149 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.server

import java.util.concurrent.CountDownLatch
import kafka.cluster.Broker
import kafka.consumer.SimpleConsumer
import java.util.concurrent.atomic.AtomicBoolean
import kafka.utils.Logging
import kafka.common.ErrorMapping
import kafka.api.{PartitionData, FetchRequestBuilder}
import scala.collection.mutable
import kafka.message.ByteBufferMessageSet

/**
* Abstract class for fetching data from multiple partitions from the same broker.
*/
abstract class AbstractFetcherThread(val name: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int,
fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1)
extends Thread(name) with Logging {
private val isRunning: AtomicBoolean = new AtomicBoolean(true)
private val shutdownLatch = new CountDownLatch(1)
private val fetchMap = new mutable.HashMap[Tuple2[String,Int], Long] // a (topic, partitionId) -> offset map
private val fetchMapLock = new Object
val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize)
this.logIdent = name + " "
info("starting")
// callbacks to be defined in subclass

// process fetched data and return the new fetch offset
def processPartitionData(topic: String, fetchOffset: Long, partitionData: PartitionData)

// handle a partition whose offset is out of range and return a new fetch offset
def handleOffsetOutOfRange(topic: String, partitionId: Int): Long

// any logic for partitions whose leader has changed
def handlePartitionsWithNewLeader(partitions: List[Tuple2[String, Int]]): Unit

override def run() {
try {
while(isRunning.get()) {
val builder = new FetchRequestBuilder().
clientId(name).
replicaId(fetcherBrokerId).
maxWait(maxWait).
minBytes(minBytes)

fetchMapLock synchronized {
for ( ((topic, partitionId), offset) <- fetchMap )
builder.addFetch(topic, partitionId, offset.longValue, fetchSize)
}

val fetchRequest = builder.build()
val response = simpleConsumer.fetch(fetchRequest)
trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))

var partitionsWithNewLeader : List[Tuple2[String, Int]] = Nil
// process fetched data
fetchMapLock synchronized {
for ( topicData <- response.data ) {
for ( partitionData <- topicData.partitionData) {
val topic = topicData.topic
val partitionId = partitionData.partition
val key = (topic, partitionId)
val currentOffset = fetchMap.get(key)
if (currentOffset.isDefined) {
partitionData.error match {
case ErrorMapping.NoError =>
processPartitionData(topic, currentOffset.get, partitionData)
val newOffset = currentOffset.get + partitionData.messages.asInstanceOf[ByteBufferMessageSet].validBytes
fetchMap.put(key, newOffset)
case ErrorMapping.OffsetOutOfRangeCode =>
val newOffset = handleOffsetOutOfRange(topic, partitionId)
fetchMap.put(key, newOffset)
warn("current offset %d for topic %s partition %d out of range; reset offset to %d"
.format(currentOffset.get, topic, partitionId, newOffset))
case ErrorMapping.NotLeaderForPartitionCode =>
partitionsWithNewLeader ::= key
case _ =>
error("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.host),
ErrorMapping.exceptionFor(partitionData.error))
}
}
}
}
}
if (partitionsWithNewLeader.size > 0) {
debug("changing leaders for %s".format(partitionsWithNewLeader))
handlePartitionsWithNewLeader(partitionsWithNewLeader)
}
}
} catch {
case e: InterruptedException => info("replica fetcher runnable interrupted. Shutting down")
case e1 => error("error in replica fetcher runnable", e1)
}
shutdownComplete()
}

def addPartition(topic: String, partitionId: Int, initialOffset: Long) {
fetchMapLock synchronized {
fetchMap.put((topic, partitionId), initialOffset)
}
}

def removePartition(topic: String, partitionId: Int) {
fetchMapLock synchronized {
fetchMap.remove((topic, partitionId))
}
}

def hasPartition(topic: String, partitionId: Int): Boolean = {
fetchMapLock synchronized {
fetchMap.get((topic, partitionId)).isDefined
}
}

def partitionCount() = {
fetchMapLock synchronized {
fetchMap.size
}
}

private def shutdownComplete() = {
simpleConsumer.close()
shutdownLatch.countDown
}

def shutdown() {
isRunning.set(false)
interrupt()
shutdownLatch.await()
info("shutdown completed")
}
}
3 changes: 3 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Expand Up @@ -136,4 +136,7 @@ class KafkaConfig(props: Properties) extends ZKConfig(props) {

val replicaMinBytes = Utils.getInt(props, "replica.fetch.min.bytes", 4086)

/* number of fetcher threads used to replicate messages from a source broker.
* Increasing this value can increase the degree of I/O parallelism in the follower broker. */
val numReplicaFetchers = Utils.getInt(props, "replica.fetchers", 1)
}
29 changes: 29 additions & 0 deletions core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
@@ -0,0 +1,29 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.server

import kafka.cluster.Broker

class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val replicaMgr: ReplicaManager)
extends AbstractFetcherManager("ReplicaFetcherManager", brokerConfig.numReplicaFetchers) {

def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d".format(sourceBroker.id, fetcherId), sourceBroker, brokerConfig, replicaMgr)
}

}

0 comments on commit 019ef69

Please sign in to comment.