Skip to content

Commit

Permalink
tmp: enlarge
Browse files Browse the repository at this point in the history
  • Loading branch information
maiha committed Dec 3, 2015
1 parent 0d00b08 commit 1ecfcd3
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 4 deletions.
15 changes: 12 additions & 3 deletions src/main/scala/sc/ala/kafka/utils/KafkaUtils.scala
Expand Up @@ -16,6 +16,8 @@ import scala.concurrent._
import scala.concurrent.duration._
import collection.JavaConversions._

import pl.project13.scala.rainbow._

abstract class KafkaUtils extends Api {
def zkClient: ZkClient

Expand All @@ -32,7 +34,14 @@ abstract class KafkaUtils extends Api {

def leaderBrokers(topic: String): Map[Int, Broker] = {
val resolve = brokers.groupBy(_.id).map{ case (k, list) => (k, list.head) }
leaders(topic).filter(_._2.isDefined).mapValues(_.map(resolve).get)
try {
println(s"ERROR: topic=$topic, leaders=${leaders(topic)}".red)
leaders(topic).filter(_._2.isDefined).mapValues(_.map(resolve).get)
} catch {
case err: java.util.NoSuchElementException =>
println(s"ERROR: topic=$topic, leaders=${leaders(topic)}".red)
throw err
}
}

def partitions(topic: String): Seq[Int] =
Expand Down Expand Up @@ -65,7 +74,7 @@ abstract class KafkaUtils extends Api {
leaderBrokers(topic).get(partition) match {
case Some(b) =>
val f = Future{ new CountingConsumer(b.host, b.port, topic, partition).count() }
Await.result(f, 10.seconds)
Await.result(f, 60.seconds)
case None =>
throw new RuntimeException(s"leader not found: ($topic, $partition)")
}
Expand All @@ -75,7 +84,7 @@ abstract class KafkaUtils extends Api {
import scala.concurrent.ExecutionContext.Implicits.global
val sources = leaderBrokers(topic) // Map(0 -> id:1,host:ubuntu,port:9092)
val fetches = sources.map { case (p, b) => Future { (p, new CountingConsumer(b.host, b.port, topic, p).count()) } }
Await.result(Future.sequence(fetches), 10.seconds).toMap
Await.result(Future.sequence(fetches), 60.seconds).toMap
}

def count(topic: String): Long = counts(topic).values.sum
Expand Down
Expand Up @@ -24,7 +24,7 @@ class CountingConsumer(host: String, port: Int, topic: String, partition: Int) e
private def countingImpl(consumer: SimpleConsumer): Long = {
var numMessagesConsumed = 0

val maxMessages = 100000
val maxMessages = 100000000
val skipMessageOnError = true
val replicaId = -1

Expand Down

0 comments on commit 1ecfcd3

Please sign in to comment.