diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala index bfee7603fc0ed..c207e95d5d337 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala @@ -83,7 +83,7 @@ class ReliableKafkaReceiver[ */ private var blockGenerator: BlockGenerator = null - /** Threadpool running the handlers for receiving message from multiple topics and partitions. */ + /** Thread pool running the handlers for receiving message from multiple topics and partitions. */ private var messageHandlerThreadPool: ThreadPoolExecutor = null override def onStart(): Unit = { @@ -142,7 +142,6 @@ class ReliableKafkaReceiver[ messageHandlerThreadPool.submit(new MessageHandler(stream)) } } - println("Starting") } override def onStop(): Unit = { @@ -177,7 +176,7 @@ class ReliableKafkaReceiver[ } } - /** Store a Kafka message and the associated metadata as a tuple */ + /** Store a Kafka message and the associated metadata as a tuple. */ private def storeMessageAndMetadata( msgAndMetadata: MessageAndMetadata[K, V]): Unit = synchronized { val topicAndPartition = TopicAndPartition(msgAndMetadata.topic, msgAndMetadata.partition) @@ -185,17 +184,20 @@ class ReliableKafkaReceiver[ topicPartitionOffsetMap.put(topicAndPartition, msgAndMetadata.offset) } - /** Remember the current offsets for each topic and partition. This is called when a block is generated */ + /** + * Remember the current offsets for each topic and partition. This is called when a block is + * generated. + */ private def rememberBlockOffsets(blockId: StreamBlockId): Unit = synchronized { - // Get a snapshot of current offset map and store with related block id. Since this hook - // function is called in synchronized block, so we can get the snapshot without explicit lock. + // Get a snapshot of current offset map and store with related block id. val offsetSnapshot = topicPartitionOffsetMap.toMap blockOffsetMap.put(blockId, offsetSnapshot) topicPartitionOffsetMap.clear() } - /** Store the ready-to-be-stored block and commit the related offsets to zookeeper */ - private def storeBlockAndCommitOffset(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = { + /** Store the ready-to-be-stored block and commit the related offsets to zookeeper. */ + private def storeBlockAndCommitOffset( + blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = { store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]]) Option(blockOffsetMap.get(blockId)).foreach(commitOffset) blockOffsetMap.remove(blockId) @@ -232,7 +234,6 @@ class ReliableKafkaReceiver[ private final class MessageHandler(stream: KafkaStream[K, V]) extends Runnable { override def run(): Unit = { while (!isStopped) { - println(s"Starting message process thread ${Thread.currentThread().getId}.") try { val streamIterator = stream.iterator() while (streamIterator.hasNext) { diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java index 6dfb34424011e..6386602ef8a43 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java @@ -33,8 +33,6 @@ import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.storage.StorageLevel; -import org.apache.spark.streaming.Duration; -import org.apache.spark.streaming.LocalJavaStreamingContext; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; @@ -43,15 +41,17 @@ import org.junit.After; import org.junit.Before; -public class JavaKafkaStreamSuite extends KafkaStreamSuiteBase implements Serializable { - private transient JavaStreamingContext ssc = null; - private Random random = new Random(); +public class JavaKafkaStreamSuite implements Serializable { + private transient JavaStreamingContext ssc = null; + private Random random = new Random(); + private transient KafkaStreamSuiteBase suiteBase = null; @Before public void setUp() { - beforeFunction(); + suiteBase = new KafkaStreamSuiteBase() { }; + suiteBase.beforeFunction(); System.clearProperty("spark.driver.port"); - ssc = new JavaStreamingContext(sparkConf(), batchDuration()); + ssc = new JavaStreamingContext(suiteBase.sparkConf(), suiteBase.batchDuration()); } @After @@ -59,7 +59,7 @@ public void tearDown() { ssc.stop(); ssc = null; System.clearProperty("spark.driver.port"); - afterFunction(); + suiteBase.afterFunction(); } @Test @@ -73,14 +73,14 @@ public void testKafkaStream() throws InterruptedException { sent.put("b", 3); sent.put("c", 10); - createTopic(topic); + suiteBase.createTopic(topic); HashMap tmp = new HashMap(sent); - produceAndSendMessage(topic, + suiteBase.produceAndSendMessage(topic, JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap( Predef.>conforms())); HashMap kafkaParams = new HashMap(); - kafkaParams.put("zookeeper.connect", zkAddress()); + kafkaParams.put("zookeeper.connect", suiteBase.zkAddress()); kafkaParams.put("group.id", "test-consumer-" + random.nextInt(10000)); kafkaParams.put("auto.offset.reset", "smallest"); diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index d65f9b4ec7d20..1bb8e0175b97c 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -42,7 +42,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.util.Utils -abstract class KafkaStreamSuiteBase extends FunSuite with BeforeAndAfter with Logging { +abstract class KafkaStreamSuiteBase extends FunSuite with Logging { import KafkaTestUtils._ val sparkConf = new SparkConf() @@ -154,7 +154,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with BeforeAndAfter with Lo } } -class KafkaStreamSuite extends KafkaStreamSuiteBase with Eventually { +class KafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually { before { beforeFunction() } after { afterFunction() } diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala index 7d90b567442ae..b546d22ca6c38 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala @@ -26,13 +26,14 @@ import scala.util.Random import kafka.serializer.StringDecoder import kafka.utils.{ZKGroupTopicDirs, ZkUtils} +import org.scalatest.BeforeAndAfter import org.scalatest.concurrent.Eventually import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.util.Utils -class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with Eventually { +class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually { val topic = "topic" val data = Map("a" -> 10, "b" -> 10, "c" -> 10) var groupId: String = _ @@ -85,7 +86,6 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with Eventually { } ssc.stop() } -/* test("Verify the offset commit") { // Verify the correctness of offset commit mechanism. sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true") @@ -147,7 +147,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with Eventually { } ssc.stop() } -*/ + /** Getting partition offset from Zookeeper. */ private def getCommitOffset(groupId: String, topic: String, partition: Int): Long = { assert(zkClient != null, "Zookeeper client is not initialized")