Skip to content

Commit

Permalink
Merge pull request alteryx#8 from tdas/kafka-refactor3
Browse files Browse the repository at this point in the history
Refactor 3 = Refactor 2 + refactored KafkaStreamSuite further to elimite KafkaTestUtils, and made Java testsuite more robust
  • Loading branch information
jerryshao committed Nov 14, 2014
2 parents 2a20a01 + eae4ad6 commit 5461f1c
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 184 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,17 +178,23 @@ class ReliableKafkaReceiver[

/** Store a Kafka message and the associated metadata as a tuple. */
private def storeMessageAndMetadata(
msgAndMetadata: MessageAndMetadata[K, V]): Unit = synchronized {
msgAndMetadata: MessageAndMetadata[K, V]): Unit = {
val topicAndPartition = TopicAndPartition(msgAndMetadata.topic, msgAndMetadata.partition)
blockGenerator += ((msgAndMetadata.key, msgAndMetadata.message))
topicPartitionOffsetMap.put(topicAndPartition, msgAndMetadata.offset)
val data = (msgAndMetadata.key, msgAndMetadata.message)
val metadata = (topicAndPartition, msgAndMetadata.offset)
blockGenerator.addDataWithCallback(data, metadata)
}

/** Update stored offset */
private def updateOffset(topicAndPartition: TopicAndPartition, offset: Long): Unit = {
topicPartitionOffsetMap.put(topicAndPartition, offset)
}

/**
* Remember the current offsets for each topic and partition. This is called when a block is
* generated.
*/
private def rememberBlockOffsets(blockId: StreamBlockId): Unit = synchronized {
private def rememberBlockOffsets(blockId: StreamBlockId): Unit = {
// Get a snapshot of current offset map and store with related block id.
val offsetSnapshot = topicPartitionOffsetMap.toMap
blockOffsetMap.put(blockId, offsetSnapshot)
Expand Down Expand Up @@ -221,8 +227,9 @@ class ReliableKafkaReceiver[

ZkUtils.updatePersistentPath(zkClient, zkPath, offset.toString)
} catch {
case t: Throwable => logWarning(s"Exception during commit offset $offset for topic" +
s"${topicAndPart.topic}, partition ${topicAndPart.partition}", t)
case e: Exception =>
logWarning(s"Exception during commit offset $offset for topic" +
s"${topicAndPart.topic}, partition ${topicAndPart.partition}", e)
}

logInfo(s"Committed offset $offset for topic ${topicAndPart.topic}, " +
Expand Down Expand Up @@ -250,17 +257,25 @@ class ReliableKafkaReceiver[
/** Class to handle blocks generated by the block generator. */
private final class GeneratedBlockHandler extends BlockGeneratorListener {

override def onGenerateBlock(blockId: StreamBlockId): Unit = {
def onAddData(data: Any, metadata: Any): Unit = {
// Update the offset of the data that was added to the generator
if (metadata != null) {
val (topicAndPartition, offset) = metadata.asInstanceOf[(TopicAndPartition, Long)]
updateOffset(topicAndPartition, offset)
}
}

def onGenerateBlock(blockId: StreamBlockId): Unit = {
// Remember the offsets of topics/partitions when a block has been generated
rememberBlockOffsets(blockId)
}

override def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
// Store block and commit the blocks offset
storeBlockAndCommitOffset(blockId, arrayBuffer)
}

override def onError(message: String, throwable: Throwable): Unit = {
def onError(message: String, throwable: Throwable): Unit = {
reportError(message, throwable)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.List;
import java.util.Random;

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import scala.Predef;
import scala.Tuple2;
import scala.collection.JavaConverters;
Expand All @@ -43,23 +45,25 @@

public class JavaKafkaStreamSuite implements Serializable {
private transient JavaStreamingContext ssc = null;
private Random random = new Random();
private transient Random random = new Random();
private transient KafkaStreamSuiteBase suiteBase = null;

@Before
public void setUp() {
suiteBase = new KafkaStreamSuiteBase() { };
suiteBase.beforeFunction();
suiteBase.setupKafka();
System.clearProperty("spark.driver.port");
ssc = new JavaStreamingContext(suiteBase.sparkConf(), suiteBase.batchDuration());
SparkConf sparkConf = new SparkConf()
.setMaster("local[4]").setAppName(this.getClass().getSimpleName());
ssc = new JavaStreamingContext(sparkConf, new Duration(500));
}

@After
public void tearDown() {
ssc.stop();
ssc = null;
System.clearProperty("spark.driver.port");
suiteBase.afterFunction();
suiteBase.tearDownKafka();
}

@Test
Expand All @@ -76,8 +80,8 @@ public void testKafkaStream() throws InterruptedException {
suiteBase.createTopic(topic);
HashMap<String, Object> tmp = new HashMap<String, Object>(sent);
suiteBase.produceAndSendMessage(topic,
JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap(
Predef.<Tuple2<String, Object>>conforms()));
JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap(
Predef.<Tuple2<String, Object>>conforms()));

HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("zookeeper.connect", suiteBase.zkAddress());
Expand Down Expand Up @@ -123,11 +127,16 @@ public Void call(JavaPairRDD<String, Long> rdd) throws Exception {
);

ssc.start();
ssc.awaitTermination(3000);

long startTime = System.currentTimeMillis();
boolean sizeMatches = false;
while (!sizeMatches && System.currentTimeMillis() - startTime < 20000) {
sizeMatches = sent.size() == result.size();
Thread.sleep(200);
}
Assert.assertEquals(sent.size(), result.size());
for (String k : sent.keySet()) {
Assert.assertEquals(sent.get(k).intValue(), result.get(k).intValue());
}
ssc.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,12 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.util.Utils

abstract class KafkaStreamSuiteBase extends FunSuite with Logging {
import KafkaTestUtils._
/**
* This is an abstract base class for Kafka testsuites. This has the functionality to set up
* and tear down local Kafka servers, and to push data using Kafka producers.
*/
abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Logging {

val sparkConf = new SparkConf()
.setMaster("local[4]")
.setAppName(this.getClass.getSimpleName)
val batchDuration = Milliseconds(500)
var ssc: StreamingContext = _

var zkAddress: String = _
var zkClient: ZkClient = _

Expand All @@ -64,7 +61,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Logging {
private var server: KafkaServer = _
private var producer: Producer[String, String] = _

def beforeFunction() {
def setupKafka() {
// Zookeeper server startup
zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
// Get the actual zookeeper binding port
Expand All @@ -80,7 +77,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Logging {
var bindSuccess: Boolean = false
while(!bindSuccess) {
try {
val brokerProps = getBrokerConfig(brokerPort, zkAddress)
val brokerProps = getBrokerConfig()
brokerConf = new KafkaConfig(brokerProps)
server = new KafkaServer(brokerConf)
logInfo("==================== 2 ====================")
Expand All @@ -100,12 +97,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Logging {
logInfo("==================== 4 ====================")
}

def afterFunction() {
if (ssc != null) {
ssc.stop()
ssc = null
}

def tearDownKafka() {
if (producer != null) {
producer.close()
producer = null
Expand Down Expand Up @@ -141,101 +133,43 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Logging {
CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
logInfo("==================== 5 ====================")
// wait until metadata is propagated
waitUntilMetadataIsPropagated(Seq(server), topic, 0, 1000)
waitUntilMetadataIsPropagated(topic, 0)
}

def produceAndSendMessage(topic: String, sent: Map[String, Int]) {
val brokerAddr = brokerConf.hostName + ":" + brokerConf.port
if (producer == null) {
producer = new Producer[String, String](new ProducerConfig(getProducerConfig(brokerAddr)))
}
producer = new Producer[String, String](new ProducerConfig(getProducerConfig()))
producer.send(createTestMessage(topic, sent): _*)
producer.close()
logInfo("==================== 6 ====================")
}
}

class KafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually {

before { beforeFunction() }
after { afterFunction() }

test("Kafka input stream") {
ssc = new StreamingContext(sparkConf, batchDuration)
val topic = "topic1"
val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
createTopic(topic)
produceAndSendMessage(topic, sent)

val kafkaParams = Map("zookeeper.connect" -> zkAddress,
"group.id" -> s"test-consumer-${Random.nextInt(10000)}",
"auto.offset.reset" -> "smallest")

val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
Map(topic -> 1),
StorageLevel.MEMORY_ONLY)
val result = new mutable.HashMap[String, Long]()
stream.map { case (k, v) => v }
.countByValue()
.foreachRDD { r =>
val ret = r.collect()
ret.toMap.foreach { kv =>
val count = result.getOrElseUpdate(kv._1, 0) + kv._2
result.put(kv._1, count)
}
}
ssc.start()
eventually(timeout(3000 milliseconds), interval(100 milliseconds)) {
assert(sent.size === result.size)
sent.keys.foreach { k => assert(sent(k) === result(k).toInt) }
}

ssc.stop()
}
}


object KafkaTestUtils {

def getBrokerConfig(port: Int, zkConnect: String): Properties = {
private def getBrokerConfig(): Properties = {
val props = new Properties()
props.put("broker.id", "0")
props.put("host.name", "localhost")
props.put("port", port.toString)
props.put("port", brokerPort.toString)
props.put("log.dir", Utils.createTempDir().getAbsolutePath)
props.put("zookeeper.connect", zkConnect)
props.put("zookeeper.connect", zkAddress)
props.put("log.flush.interval.messages", "1")
props.put("replica.socket.timeout.ms", "1500")
props
}

def getProducerConfig(brokerList: String): Properties = {
private def getProducerConfig(): Properties = {
val brokerAddr = brokerConf.hostName + ":" + brokerConf.port
val props = new Properties()
props.put("metadata.broker.list", brokerList)
props.put("metadata.broker.list", brokerAddr)
props.put("serializer.class", classOf[StringEncoder].getName)
props
}

def waitUntilTrue(condition: () => Boolean, waitTime: Long): Boolean = {
val startTime = System.currentTimeMillis()
while (true) {
if (condition())
return true
if (System.currentTimeMillis() > startTime + waitTime)
return false
Thread.sleep(waitTime.min(100L))
private def waitUntilMetadataIsPropagated(topic: String, partition: Int) {
eventually(timeout(1000 milliseconds), interval(100 milliseconds)) {
assert(
server.apis.leaderCache.keySet.contains(TopicAndPartition(topic, partition)),
s"Partition [$topic, $partition] metadata not propagated after timeout"
)
}
// Should never go to here
throw new RuntimeException("unexpected error")
}

def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int,
timeout: Long) {
assert(waitUntilTrue(() =>
servers.foldLeft(true)(_ && _.apis.leaderCache.keySet.contains(
TopicAndPartition(topic, partition))), timeout),
s"Partition [$topic, $partition] metadata not propagated after timeout")
}

class EmbeddedZookeeper(val zkConnect: String) {
Expand All @@ -261,3 +195,53 @@ object KafkaTestUtils {
}
}
}


class KafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
var ssc: StreamingContext = _

before {
setupKafka()
}

after {
if (ssc != null) {
ssc.stop()
ssc = null
}
tearDownKafka()
}

test("Kafka input stream") {
val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
ssc = new StreamingContext(sparkConf, Milliseconds(500))
val topic = "topic1"
val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
createTopic(topic)
produceAndSendMessage(topic, sent)

val kafkaParams = Map("zookeeper.connect" -> zkAddress,
"group.id" -> s"test-consumer-${Random.nextInt(10000)}",
"auto.offset.reset" -> "smallest")

val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
val result = new mutable.HashMap[String, Long]()
stream.map(_._2).countByValue().foreachRDD { r =>
val ret = r.collect()
ret.toMap.foreach { kv =>
val count = result.getOrElseUpdate(kv._1, 0) + kv._2
result.put(kv._1, count)
}
}
ssc.start()
eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
assert(sent.size === result.size)
sent.keys.foreach { k =>
assert(sent(k) === result(k).toInt)
}
}
ssc.stop()
}
}

Loading

0 comments on commit 5461f1c

Please sign in to comment.