Skip to content

Commit

Permalink
Refectored KafkaStreamSuiteBased to eliminate KafkaTestUtils and made…
Browse files Browse the repository at this point in the history
… Java more robust.
  • Loading branch information
tdas committed Nov 13, 2014
1 parent fab14c7 commit eae4ad6
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,16 @@ public Void call(JavaPairRDD<String, Long> rdd) throws Exception {
);

ssc.start();
ssc.awaitTermination(3000);

long startTime = System.currentTimeMillis();
boolean sizeMatches = false;
while (!sizeMatches && System.currentTimeMillis() - startTime < 20000) {
sizeMatches = sent.size() == result.size();
Thread.sleep(200);
}
Assert.assertEquals(sent.size(), result.size());
for (String k : sent.keySet()) {
Assert.assertEquals(sent.get(k).intValue(), result.get(k).intValue());
}
ssc.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ import org.apache.spark.util.Utils
* This is an abstract base class for Kafka testsuites. This has the functionality to set up
* and tear down local Kafka servers, and to push data using Kafka producers.
*/
abstract class KafkaStreamSuiteBase extends FunSuite with Logging {
import KafkaTestUtils._
abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Logging {

var zkAddress: String = _
var zkClient: ZkClient = _
Expand Down Expand Up @@ -78,7 +77,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Logging {
var bindSuccess: Boolean = false
while(!bindSuccess) {
try {
val brokerProps = getBrokerConfig(brokerPort, zkAddress)
val brokerProps = getBrokerConfig()
brokerConf = new KafkaConfig(brokerProps)
server = new KafkaServer(brokerConf)
logInfo("==================== 2 ====================")
Expand Down Expand Up @@ -134,111 +133,43 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Logging {
CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
logInfo("==================== 5 ====================")
// wait until metadata is propagated
waitUntilMetadataIsPropagated(Seq(server), topic, 0, 1000)
waitUntilMetadataIsPropagated(topic, 0)
}

def produceAndSendMessage(topic: String, sent: Map[String, Int]) {
val brokerAddr = brokerConf.hostName + ":" + brokerConf.port
producer = new Producer[String, String](new ProducerConfig(getProducerConfig(brokerAddr)))
producer = new Producer[String, String](new ProducerConfig(getProducerConfig()))
producer.send(createTestMessage(topic, sent): _*)
producer.close()
logInfo("==================== 6 ====================")
}
}

class KafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually {
var ssc: StreamingContext = _

before {
setupKafka()
}

after {
if (ssc != null) {
ssc.stop()
ssc = null
}
tearDownKafka()
}

test("Kafka input stream") {
val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
ssc = new StreamingContext(sparkConf, Milliseconds(500))
val topic = "topic1"
val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
createTopic(topic)
produceAndSendMessage(topic, sent)

val kafkaParams = Map("zookeeper.connect" -> zkAddress,
"group.id" -> s"test-consumer-${Random.nextInt(10000)}",
"auto.offset.reset" -> "smallest")

val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
Map(topic -> 1),
StorageLevel.MEMORY_ONLY)
val result = new mutable.HashMap[String, Long]()
stream.map { case (k, v) => v }
.countByValue()
.foreachRDD { r =>
val ret = r.collect()
ret.toMap.foreach { kv =>
val count = result.getOrElseUpdate(kv._1, 0) + kv._2
result.put(kv._1, count)
}
}
ssc.start()
eventually(timeout(3000 milliseconds), interval(100 milliseconds)) {
assert(sent.size === result.size)
sent.keys.foreach { k => assert(sent(k) === result(k).toInt) }
}

ssc.stop()
}
}


object KafkaTestUtils {

def getBrokerConfig(port: Int, zkConnect: String): Properties = {
private def getBrokerConfig(): Properties = {
val props = new Properties()
props.put("broker.id", "0")
props.put("host.name", "localhost")
props.put("port", port.toString)
props.put("port", brokerPort.toString)
props.put("log.dir", Utils.createTempDir().getAbsolutePath)
props.put("zookeeper.connect", zkConnect)
props.put("zookeeper.connect", zkAddress)
props.put("log.flush.interval.messages", "1")
props.put("replica.socket.timeout.ms", "1500")
props
}

def getProducerConfig(brokerList: String): Properties = {
private def getProducerConfig(): Properties = {
val brokerAddr = brokerConf.hostName + ":" + brokerConf.port
val props = new Properties()
props.put("metadata.broker.list", brokerList)
props.put("metadata.broker.list", brokerAddr)
props.put("serializer.class", classOf[StringEncoder].getName)
props
}

def waitUntilTrue(condition: () => Boolean, waitTime: Long): Boolean = {
val startTime = System.currentTimeMillis()
while (true) {
if (condition())
return true
if (System.currentTimeMillis() > startTime + waitTime)
return false
Thread.sleep(waitTime.min(100L))
private def waitUntilMetadataIsPropagated(topic: String, partition: Int) {
eventually(timeout(1000 milliseconds), interval(100 milliseconds)) {
assert(
server.apis.leaderCache.keySet.contains(TopicAndPartition(topic, partition)),
s"Partition [$topic, $partition] metadata not propagated after timeout"
)
}
// Should never go to here
throw new RuntimeException("unexpected error")
}

def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int,
timeout: Long) {
assert(waitUntilTrue(() =>
servers.foldLeft(true)(_ && _.apis.leaderCache.keySet.contains(
TopicAndPartition(topic, partition))), timeout),
s"Partition [$topic, $partition] metadata not propagated after timeout")
}

class EmbeddedZookeeper(val zkConnect: String) {
Expand All @@ -264,3 +195,53 @@ object KafkaTestUtils {
}
}
}


class KafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
var ssc: StreamingContext = _

before {
setupKafka()
}

after {
if (ssc != null) {
ssc.stop()
ssc = null
}
tearDownKafka()
}

test("Kafka input stream") {
val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
ssc = new StreamingContext(sparkConf, Milliseconds(500))
val topic = "topic1"
val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
createTopic(topic)
produceAndSendMessage(topic, sent)

val kafkaParams = Map("zookeeper.connect" -> zkAddress,
"group.id" -> s"test-consumer-${Random.nextInt(10000)}",
"auto.offset.reset" -> "smallest")

val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
val result = new mutable.HashMap[String, Long]()
stream.map(_._2).countByValue().foreachRDD { r =>
val ret = r.collect()
ret.toMap.foreach { kv =>
val count = result.getOrElseUpdate(kv._1, 0) + kv._2
result.put(kv._1, count)
}
}
ssc.start()
eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
assert(sent.size === result.size)
sent.keys.foreach { k =>
assert(sent(k) === result(k).toInt)
}
}
ssc.stop()
}
}

0 comments on commit eae4ad6

Please sign in to comment.