Skip to content

Commit

Permalink
Address some comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jerryshao committed Nov 13, 2014
1 parent 9f636b3 commit 2a20a01
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class ReliableKafkaReceiver[
*/
private var blockGenerator: BlockGenerator = null

/** Threadpool running the handlers for receiving message from multiple topics and partitions. */
/** Thread pool running the handlers for receiving message from multiple topics and partitions. */
private var messageHandlerThreadPool: ThreadPoolExecutor = null

override def onStart(): Unit = {
Expand Down Expand Up @@ -142,7 +142,6 @@ class ReliableKafkaReceiver[
messageHandlerThreadPool.submit(new MessageHandler(stream))
}
}
println("Starting")
}

override def onStop(): Unit = {
Expand Down Expand Up @@ -177,25 +176,28 @@ class ReliableKafkaReceiver[
}
}

/** Store a Kafka message and the associated metadata as a tuple */
/** Store a Kafka message and the associated metadata as a tuple. */
private def storeMessageAndMetadata(
msgAndMetadata: MessageAndMetadata[K, V]): Unit = synchronized {
val topicAndPartition = TopicAndPartition(msgAndMetadata.topic, msgAndMetadata.partition)
blockGenerator += ((msgAndMetadata.key, msgAndMetadata.message))
topicPartitionOffsetMap.put(topicAndPartition, msgAndMetadata.offset)
}

/** Remember the current offsets for each topic and partition. This is called when a block is generated */
/**
* Remember the current offsets for each topic and partition. This is called when a block is
* generated.
*/
private def rememberBlockOffsets(blockId: StreamBlockId): Unit = synchronized {
// Get a snapshot of current offset map and store with related block id. Since this hook
// function is called in synchronized block, so we can get the snapshot without explicit lock.
// Get a snapshot of current offset map and store with related block id.
val offsetSnapshot = topicPartitionOffsetMap.toMap
blockOffsetMap.put(blockId, offsetSnapshot)
topicPartitionOffsetMap.clear()
}

/** Store the ready-to-be-stored block and commit the related offsets to zookeeper */
private def storeBlockAndCommitOffset(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
/** Store the ready-to-be-stored block and commit the related offsets to zookeeper. */
private def storeBlockAndCommitOffset(
blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]])
Option(blockOffsetMap.get(blockId)).foreach(commitOffset)
blockOffsetMap.remove(blockId)
Expand Down Expand Up @@ -232,7 +234,6 @@ class ReliableKafkaReceiver[
private final class MessageHandler(stream: KafkaStream[K, V]) extends Runnable {
override def run(): Unit = {
while (!isStopped) {
println(s"Starting message process thread ${Thread.currentThread().getId}.")
try {
val streamIterator = stream.iterator()
while (streamIterator.hasNext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.LocalJavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
Expand All @@ -43,23 +41,25 @@
import org.junit.After;
import org.junit.Before;

public class JavaKafkaStreamSuite extends KafkaStreamSuiteBase implements Serializable {
private transient JavaStreamingContext ssc = null;
private Random random = new Random();
public class JavaKafkaStreamSuite implements Serializable {
private transient JavaStreamingContext ssc = null;
private Random random = new Random();
private transient KafkaStreamSuiteBase suiteBase = null;

@Before
public void setUp() {
beforeFunction();
suiteBase = new KafkaStreamSuiteBase() { };
suiteBase.beforeFunction();
System.clearProperty("spark.driver.port");
ssc = new JavaStreamingContext(sparkConf(), batchDuration());
ssc = new JavaStreamingContext(suiteBase.sparkConf(), suiteBase.batchDuration());
}

@After
public void tearDown() {
ssc.stop();
ssc = null;
System.clearProperty("spark.driver.port");
afterFunction();
suiteBase.afterFunction();
}

@Test
Expand All @@ -73,14 +73,14 @@ public void testKafkaStream() throws InterruptedException {
sent.put("b", 3);
sent.put("c", 10);

createTopic(topic);
suiteBase.createTopic(topic);
HashMap<String, Object> tmp = new HashMap<String, Object>(sent);
produceAndSendMessage(topic,
suiteBase.produceAndSendMessage(topic,
JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap(
Predef.<Tuple2<String, Object>>conforms()));

HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("zookeeper.connect", zkAddress());
kafkaParams.put("zookeeper.connect", suiteBase.zkAddress());
kafkaParams.put("group.id", "test-consumer-" + random.nextInt(10000));
kafkaParams.put("auto.offset.reset", "smallest");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.util.Utils

abstract class KafkaStreamSuiteBase extends FunSuite with BeforeAndAfter with Logging {
abstract class KafkaStreamSuiteBase extends FunSuite with Logging {
import KafkaTestUtils._

val sparkConf = new SparkConf()
Expand Down Expand Up @@ -154,7 +154,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with BeforeAndAfter with Lo
}
}

class KafkaStreamSuite extends KafkaStreamSuiteBase with Eventually {
class KafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually {

before { beforeFunction() }
after { afterFunction() }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ import scala.util.Random

import kafka.serializer.StringDecoder
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.scalatest.BeforeAndAfter
import org.scalatest.concurrent.Eventually

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.util.Utils

class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with Eventually {
class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually {
val topic = "topic"
val data = Map("a" -> 10, "b" -> 10, "c" -> 10)
var groupId: String = _
Expand Down Expand Up @@ -85,7 +86,6 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with Eventually {
}
ssc.stop()
}
/*
test("Verify the offset commit") {
// Verify the correctness of offset commit mechanism.
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
Expand Down Expand Up @@ -147,7 +147,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with Eventually {
}
ssc.stop()
}
*/

/** Getting partition offset from Zookeeper. */
private def getCommitOffset(groupId: String, topic: String, partition: Int): Long = {
assert(zkClient != null, "Zookeeper client is not initialized")
Expand Down

0 comments on commit 2a20a01

Please sign in to comment.