diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index b86c2e10a729..7c7688d00ebb 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -18,7 +18,6 @@ package kafka.cluster import kafka.log.Log -import kafka.server.{KafkaConfig, ReplicaFetcherThread} import java.lang.IllegalStateException import kafka.utils.Logging @@ -28,7 +27,6 @@ class Replica(val brokerId: Int, var log: Option[Log] = None, var leoUpdateTime: Long = -1L) extends Logging { private var logEndOffset: Long = -1L - private var replicaFetcherThread: ReplicaFetcherThread = null def logEndOffset(newLeo: Option[Long] = None): Long = { isLocal match { @@ -88,32 +86,6 @@ class Replica(val brokerId: Int, } } - def startReplicaFetcherThread(leaderBroker: Broker, config: KafkaConfig) { - val name = "Replica-Fetcher-%d-%s-%d".format(brokerId, topic, partition.partitionId) - replicaFetcherThread = new ReplicaFetcherThread(name, this, leaderBroker, config) - replicaFetcherThread.setDaemon(true) - replicaFetcherThread.start() - } - - def stopReplicaFetcherThread() { - if(replicaFetcherThread != null) { - replicaFetcherThread.shutdown() - replicaFetcherThread = null - } - } - - def getIfFollowerAndLeader(): (Boolean, Int) = { - replicaFetcherThread != null match { - case true => (true, replicaFetcherThread.getLeader().id) - case false => (false, -1) - } - } - - def close() { - if(replicaFetcherThread != null) - replicaFetcherThread.shutdown() - } - override def equals(that: Any): Boolean = { if(!(that.isInstanceOf[Replica])) return false diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala index 958e16566e2d..248231066541 100644 --- a/core/src/main/scala/kafka/common/ErrorMapping.scala +++ b/core/src/main/scala/kafka/common/ErrorMapping.scala @@ -60,6 +60,8 @@ object ErrorMapping { def maybeThrowException(code: Short) = if(code != 0) throw codeToException(code).newInstance() + + def exceptionFor(code: Short) : Throwable = codeToException(code).newInstance() } class InvalidTopicException(message: String) extends RuntimeException(message) { diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 2b34af0c9bfe..857601ae4ca0 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -405,6 +405,19 @@ private[kafka] class Log(val dir: File, val maxSize: Long, val flushInterval: In ret } + /** + * Truncate all segments in the log and start a new segment on a new offset + */ + def truncateAndStartWithNewOffset(newOffset: Long) { + lock synchronized { + val deletedSegments = segments.trunc(segments.view.size) + val newFile = new File(dir, Log.nameFromOffset(newOffset)) + debug("tuncate and start log '" + name + "' to " + newFile.getName()) + segments.append(new LogSegment(newFile, new FileMessageSet(newFile, true), newOffset)) + deleteSegments(deletedSegments) + } + } + /* Attemps to delete all provided segments from a log and returns how many it was able to */ def deleteSegments(segments: Seq[LogSegment]): Int = { var total = 0 diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala new file mode 100644 index 000000000000..fd62f499557b --- /dev/null +++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import scala.collection.mutable +import kafka.utils.Logging +import kafka.cluster.Broker + +abstract class AbstractFetcherManager(name: String, numReplicaFetchers: Int = 1) extends Logging { + // map of (source brokerid, fetcher Id per source broker) => fetcher + private val fetcherThreadMap = new mutable.HashMap[Tuple2[Int, Int], AbstractFetcherThread] + private val mapLock = new Object + this.logIdent = name + " " + + private def getFetcherId(topic: String, partitionId: Int) : Int = { + (topic.hashCode() + 31 * partitionId) % numReplicaFetchers + } + + // to be defined in subclass to create a specific fetcher + def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread + + def addFetcher(topic: String, partitionId: Int, initialOffset: Long, sourceBroker: Broker) { + mapLock synchronized { + var fetcherThread: AbstractFetcherThread = null + val key = (sourceBroker.id, getFetcherId(topic, partitionId)) + fetcherThreadMap.get(key) match { + case Some(f) => fetcherThread = f + case None => + fetcherThread = createFetcherThread(key._2, sourceBroker) + fetcherThreadMap.put(key, fetcherThread) + fetcherThread.start + } + fetcherThread.addPartition(topic, partitionId, initialOffset) + info("adding fetcher on topic %s, partion %d, initOffset %d to broker %d with fetcherId %d" + .format(topic, partitionId, initialOffset, sourceBroker.id, key._2)) + } + } + + def removeFetcher(topic: String, partitionId: Int) { + info("%s removing fetcher on topic %s, partition %d".format(name, topic, partitionId)) + mapLock synchronized { + for ((key, fetcher) <- fetcherThreadMap) { + fetcher.removePartition(topic, partitionId) + if (fetcher.partitionCount <= 0) { + fetcher.shutdown + fetcherThreadMap.remove(key) + } + } + } + } + + def fetcherSourceBroker(topic: String, partitionId: Int): Option[Int] = { + mapLock synchronized { + for ( ((sourceBrokerId, _), fetcher) <- fetcherThreadMap) + if (fetcher.hasPartition(topic, partitionId)) + return Some(sourceBrokerId) + } + None + } + + def shutdown() = { + info("shutting down") + mapLock synchronized { + for ( (_, fetcher) <- fetcherThreadMap) { + fetcher.shutdown + } + } + info("shutdown completes") + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala new file mode 100644 index 000000000000..fcfb6e335bcb --- /dev/null +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.concurrent.CountDownLatch +import kafka.cluster.Broker +import kafka.consumer.SimpleConsumer +import java.util.concurrent.atomic.AtomicBoolean +import kafka.utils.Logging +import kafka.common.ErrorMapping +import kafka.api.{PartitionData, FetchRequestBuilder} +import scala.collection.mutable +import kafka.message.ByteBufferMessageSet + +/** + * Abstract class for fetching data from multiple partitions from the same broker. + */ +abstract class AbstractFetcherThread(val name: String, sourceBroker: Broker, socketTimeout: Int, socketBufferSize: Int, + fetchSize: Int, fetcherBrokerId: Int = -1, maxWait: Int = 0, minBytes: Int = 1) + extends Thread(name) with Logging { + private val isRunning: AtomicBoolean = new AtomicBoolean(true) + private val shutdownLatch = new CountDownLatch(1) + private val fetchMap = new mutable.HashMap[Tuple2[String,Int], Long] // a (topic, partitionId) -> offset map + private val fetchMapLock = new Object + val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize) + this.logIdent = name + " " + info("starting") + // callbacks to be defined in subclass + + // process fetched data and return the new fetch offset + def processPartitionData(topic: String, fetchOffset: Long, partitionData: PartitionData) + + // handle a partition whose offset is out of range and return a new fetch offset + def handleOffsetOutOfRange(topic: String, partitionId: Int): Long + + // any logic for partitions whose leader has changed + def handlePartitionsWithNewLeader(partitions: List[Tuple2[String, Int]]): Unit + + override def run() { + try { + while(isRunning.get()) { + val builder = new FetchRequestBuilder(). + clientId(name). + replicaId(fetcherBrokerId). + maxWait(maxWait). + minBytes(minBytes) + + fetchMapLock synchronized { + for ( ((topic, partitionId), offset) <- fetchMap ) + builder.addFetch(topic, partitionId, offset.longValue, fetchSize) + } + + val fetchRequest = builder.build() + val response = simpleConsumer.fetch(fetchRequest) + trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest)) + + var partitionsWithNewLeader : List[Tuple2[String, Int]] = Nil + // process fetched data + fetchMapLock synchronized { + for ( topicData <- response.data ) { + for ( partitionData <- topicData.partitionData) { + val topic = topicData.topic + val partitionId = partitionData.partition + val key = (topic, partitionId) + val currentOffset = fetchMap.get(key) + if (currentOffset.isDefined) { + partitionData.error match { + case ErrorMapping.NoError => + processPartitionData(topic, currentOffset.get, partitionData) + val newOffset = currentOffset.get + partitionData.messages.asInstanceOf[ByteBufferMessageSet].validBytes + fetchMap.put(key, newOffset) + case ErrorMapping.OffsetOutOfRangeCode => + val newOffset = handleOffsetOutOfRange(topic, partitionId) + fetchMap.put(key, newOffset) + warn("current offset %d for topic %s partition %d out of range; reset offset to %d" + .format(currentOffset.get, topic, partitionId, newOffset)) + case ErrorMapping.NotLeaderForPartitionCode => + partitionsWithNewLeader ::= key + case _ => + error("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.host), + ErrorMapping.exceptionFor(partitionData.error)) + } + } + } + } + } + if (partitionsWithNewLeader.size > 0) { + debug("changing leaders for %s".format(partitionsWithNewLeader)) + handlePartitionsWithNewLeader(partitionsWithNewLeader) + } + } + } catch { + case e: InterruptedException => info("replica fetcher runnable interrupted. Shutting down") + case e1 => error("error in replica fetcher runnable", e1) + } + shutdownComplete() + } + + def addPartition(topic: String, partitionId: Int, initialOffset: Long) { + fetchMapLock synchronized { + fetchMap.put((topic, partitionId), initialOffset) + } + } + + def removePartition(topic: String, partitionId: Int) { + fetchMapLock synchronized { + fetchMap.remove((topic, partitionId)) + } + } + + def hasPartition(topic: String, partitionId: Int): Boolean = { + fetchMapLock synchronized { + fetchMap.get((topic, partitionId)).isDefined + } + } + + def partitionCount() = { + fetchMapLock synchronized { + fetchMap.size + } + } + + private def shutdownComplete() = { + simpleConsumer.close() + shutdownLatch.countDown + } + + def shutdown() { + isRunning.set(false) + interrupt() + shutdownLatch.await() + info("shutdown completed") + } +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index ce5e95b56feb..3d2fb5bd33b5 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -136,4 +136,7 @@ class KafkaConfig(props: Properties) extends ZKConfig(props) { val replicaMinBytes = Utils.getInt(props, "replica.fetch.min.bytes", 4086) + /* number of fetcher threads used to replicate messages from a source broker. + * Increasing this value can increase the degree of I/O parallelism in the follower broker. */ + val numReplicaFetchers = Utils.getInt(props, "replica.fetchers", 1) } diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala new file mode 100644 index 000000000000..92a1ff7c4b04 --- /dev/null +++ b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.cluster.Broker + +class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val replicaMgr: ReplicaManager) + extends AbstractFetcherManager("ReplicaFetcherManager", brokerConfig.numReplicaFetchers) { + + def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = { + new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d".format(sourceBroker.id, fetcherId), sourceBroker, brokerConfig, replicaMgr) + } + +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 7cdf5e6f8ef5..24b9e8dd6411 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -17,67 +17,41 @@ package kafka.server -import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.CountDownLatch -import kafka.api.FetchRequestBuilder -import kafka.utils.Logging -import kafka.cluster.{Broker, Replica} -import kafka.consumer.SimpleConsumer - -class ReplicaFetcherThread(name: String, replica: Replica, leaderBroker: Broker, config: KafkaConfig) - extends Thread(name) with Logging { - val isRunning: AtomicBoolean = new AtomicBoolean(true) - private val shutdownLatch = new CountDownLatch(1) - private val replicaConsumer = new SimpleConsumer(leaderBroker.host, leaderBroker.port, - config.replicaSocketTimeoutMs, config.replicaSocketBufferSize) - - override def run() { - try { - info("Starting replica fetch thread %s for topic %s partition %d".format(name, replica.topic, replica.partition.partitionId)) - while(isRunning.get()) { - val builder = new FetchRequestBuilder(). - clientId(name). - replicaId(replica.brokerId). - maxWait(config.replicaMaxWaitTimeMs). - minBytes(config.replicaMinBytes) - - // TODO: KAFKA-339 Keep this simple single fetch for now. Change it to fancier multi fetch when message - // replication actually works - val fetchOffset = replica.logEndOffset() - trace("Follower %d issuing fetch request for topic %s partition %d to leader %d from offset %d" - .format(replica.brokerId, replica.topic, replica.partition.partitionId, leaderBroker.id, fetchOffset)) - builder.addFetch(replica.topic, replica.partition.partitionId, fetchOffset, config.replicaFetchSize) - - val fetchRequest = builder.build() - val response = replicaConsumer.fetch(fetchRequest) - // TODO: KAFKA-339 Check for error. Don't blindly read the messages - // append messages to local log - replica.log.get.append(response.messageSet(replica.topic, replica.partition.partitionId)) - // record the hw sent by the leader for this partition - val followerHighWatermark = replica.logEndOffset().min(response.data.head.partitionData.head.hw) - replica.highWatermark(Some(followerHighWatermark)) - trace("Follower %d set replica highwatermark for topic %s partition %d to %d" - .format(replica.brokerId, replica.topic, replica.partition.partitionId, replica.highWatermark())) - } - }catch { - case e: InterruptedException => warn("Replica fetcher thread %s interrupted. Shutting down".format(name)) - case e1 => error("Error in replica fetcher thread. Shutting down due to ", e1) - } - shutdownComplete() +import kafka.api.{OffsetRequest, PartitionData} +import kafka.cluster.Broker +import kafka.message.ByteBufferMessageSet + +class ReplicaFetcherThread(name:String, sourceBroker: Broker, brokerConfig: KafkaConfig, replicaMgr: ReplicaManager) + extends AbstractFetcherThread(name = name, sourceBroker = sourceBroker, socketTimeout = brokerConfig.replicaSocketTimeoutMs, + socketBufferSize = brokerConfig.replicaSocketBufferSize, fetchSize = brokerConfig.replicaFetchSize, + fetcherBrokerId = brokerConfig.brokerId, maxWait = brokerConfig.replicaMaxWaitTimeMs, + minBytes = brokerConfig.replicaMinBytes) { + + // process fetched data and return the new fetch offset + def processPartitionData(topic: String, fetchOffset: Long, partitionData: PartitionData) = { + val partitionId = partitionData.partition + val replica = replicaMgr.getReplica(topic, partitionId).get + val messageSet = partitionData.messages.asInstanceOf[ByteBufferMessageSet] + + if (fetchOffset != replica.logEndOffset()) + throw new RuntimeException("offset mismatch: fetchOffset=%d, logEndOffset=%d".format(fetchOffset, replica.logEndOffset())) + replica.log.get.append(messageSet) + replica.highWatermark(Some(partitionData.hw)) + trace("follower %d set replica highwatermark for topic %s partition %d to %d" + .format(replica.brokerId, topic, partitionId, partitionData.hw)) } - private def shutdownComplete() = { - replicaConsumer.close() - shutdownLatch.countDown + // handle a partition whose offset is out of range and return a new fetch offset + def handleOffsetOutOfRange(topic: String, partitionId: Int): Long = { + // This means the local replica is out of date. Truncate the log and catch up from beginning. + val offsets = simpleConsumer.getOffsetsBefore(topic, partitionId, OffsetRequest.EarliestTime, 1) + val replica = replicaMgr.getReplica(topic, partitionId).get + replica.log.get.truncateAndStartWithNewOffset(offsets(0)) + return offsets(0) } - def getLeader(): Broker = leaderBroker - - def shutdown() { - info("Shutting down replica fetcher thread") - isRunning.set(false) - interrupt() - shutdownLatch.await() - info("Replica fetcher thread shutdown completed") + // any logic for partitions whose leader has changed + def handlePartitionsWithNewLeader(partitions: List[Tuple2[String, Int]]): Unit = { + // no handler needed since the controller will make the changes accordingly } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 0012f1dbcb95..c4aa7178445f 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -32,6 +32,8 @@ class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient) ex private var leaderReplicas = new ListBuffer[Partition]() private val leaderReplicaLock = new ReentrantLock() private var isrExpirationScheduler = new KafkaScheduler(1, "isr-expiration-thread-", true) + private val replicaFetcherManager = new ReplicaFetcherManager(config, this) + // start ISR expiration thread isrExpirationScheduler.startUp isrExpirationScheduler.scheduleWithRate(maybeShrinkISR, 0, config.keepInSyncTimeMs) @@ -139,7 +141,7 @@ class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient) ex def makeLeader(replica: Replica, currentISRInZk: Seq[Int]) { // stop replica fetcher thread, if any - replica.stopReplicaFetcherThread() + replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId) // read and cache the ISR replica.partition.leaderId(Some(replica.brokerId)) replica.partition.updateISR(currentISRInZk.toSet) @@ -153,7 +155,7 @@ class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient) ex } def makeFollower(replica: Replica, leaderBrokerId: Int, zkClient: ZkClient) { - info("Broker %d becoming follower to leader %d for topic %s partition %d" + info("broker %d intending to follow leader %d for topic %s partition %d" .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId)) // remove this replica's partition from the ISR expiration queue try { @@ -169,13 +171,15 @@ class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient) ex } // get leader for this replica val leaderBroker = ZkUtils.getBrokerInfoFromIds(zkClient, List(leaderBrokerId)).head - val isReplicaAFollower = replica.getIfFollowerAndLeader() + val currentLeaderBroker = replicaFetcherManager.fetcherSourceBroker(replica.topic, replica.partition.partitionId) // Become follower only if it is not already following the same leader - if(!(isReplicaAFollower._1 && (isReplicaAFollower._2 == leaderBroker.id))) { + if( currentLeaderBroker == None || currentLeaderBroker.get != leaderBroker.id) { + info("broker %d becoming follower to leader %d for topic %s partition %d" + .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId)) // stop fetcher thread to previous leader - replica.stopReplicaFetcherThread() + replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId) // start fetcher thread to current leader - replica.startReplicaFetcherThread(leaderBroker, config) + replicaFetcherManager.addFetcher(replica.topic, replica.partition.partitionId, replica.logEndOffset(), leaderBroker) } } @@ -244,6 +248,6 @@ class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient) ex def close() { isrExpirationScheduler.shutdown() - allReplicas.foreach(_._2.assignedReplicas().foreach(_.close())) + replicaFetcherManager.shutdown() } } diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala index 0c55dd87657d..7fe4ca31b24b 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala @@ -23,17 +23,15 @@ import kafka.utils.TestUtils._ import kafka.producer.ProducerData import kafka.serializer.StringEncoder import kafka.admin.CreateTopicCommand -import kafka.cluster.{Replica, Partition, Broker} -import kafka.utils.{MockTime, TestUtils} +import kafka.utils.TestUtils import junit.framework.Assert._ -import java.io.File -import kafka.log.Log class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness { val props = createBrokerConfigs(2) val configs = props.map(p => new KafkaConfig(p) { override val flushInterval = 1}) var brokers: Seq[KafkaServer] = null - val topic = "foobar" + val topic1 = "foo" + val topic2 = "bar" override def setUp() { super.setUp() @@ -41,45 +39,36 @@ class ReplicaFetchTest extends JUnit3Suite with ZooKeeperTestHarness { } override def tearDown() { - super.tearDown() brokers.foreach(_.shutdown()) + super.tearDown() } def testReplicaFetcherThread() { val partition = 0 - val testMessageList = List("test1", "test2", "test3", "test4") - val leaderBrokerId = configs.head.brokerId - val followerBrokerId = configs.last.brokerId - val leaderBroker = new Broker(leaderBrokerId, "localhost", "localhost", configs.head.port) + val testMessageList1 = List("test1", "test2", "test3", "test4") + val testMessageList2 = List("test5", "test6", "test7", "test8") // create a topic and partition and await leadership - CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(c => c.brokerId).mkString(":")) - TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 1000) + for (topic <- List(topic1,topic2)) { + CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(c => c.brokerId).mkString(":")) + TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 1000) + } // send test messages to leader val producer = TestUtils.createProducer[String, String](zkConnect, new StringEncoder) - producer.send(new ProducerData[String, String](topic, "test", testMessageList)) - - // create a tmp directory - val tmpLogDir = TestUtils.tempDir() - val replicaLogDir = new File(tmpLogDir, topic + "-" + partition) - replicaLogDir.mkdirs() - val replicaLog = new Log(replicaLogDir, 500, 500, false) - - // create replica fetch thread - val time = new MockTime - val testPartition = new Partition(topic, partition, time) - testPartition.leaderId(Some(leaderBrokerId)) - val testReplica = new Replica(followerBrokerId, testPartition, topic, Some(replicaLog)) - val replicaFetchThread = new ReplicaFetcherThread("replica-fetcher", testReplica, leaderBroker, configs.last) - - // start a replica fetch thread to the above broker - replicaFetchThread.start() - - Thread.sleep(700) - replicaFetchThread.shutdown() + producer.send(new ProducerData[String, String](topic1, testMessageList1), + new ProducerData[String, String](topic2, testMessageList2)) + producer.close() - assertEquals(60L, testReplica.log.get.logEndOffset) - replicaLog.close() + def condition(): Boolean = { + var result = true + for (topic <- List(topic1, topic2)) { + val expectedOffset = brokers.head.getLogManager().getLog(topic, partition).get.logEndOffset + result = result && expectedOffset > 0 && brokers.foldLeft(true) { (total, item) => total && + (expectedOffset == item.getLogManager().getLog(topic, partition).get.logEndOffset) } + } + result + } + assertTrue("broker logs should be identical", waitUntilTrue(condition, 6000)) } } \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 8d307b4eba56..a3f22a0978b7 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -397,6 +397,18 @@ object TestUtils extends Logging { } } + def waitUntilTrue(condition: () => Boolean, waitTime: Long): Boolean = { + val startTime = System.currentTimeMillis() + while (true) { + if (condition()) + return true + if (System.currentTimeMillis() > startTime + waitTime) + return false + Thread.sleep(100) + } + // should never hit here + throw new RuntimeException("unexpected error") + } } object ControllerTestUtils{ @@ -442,9 +454,6 @@ object ControllerTestUtils{ } } - - - object TestZKUtils { val zookeeperConnect = "127.0.0.1:2182" }