diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala index c207e95d5d337..f85296efe8d60 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala @@ -178,17 +178,23 @@ class ReliableKafkaReceiver[ /** Store a Kafka message and the associated metadata as a tuple. */ private def storeMessageAndMetadata( - msgAndMetadata: MessageAndMetadata[K, V]): Unit = synchronized { + msgAndMetadata: MessageAndMetadata[K, V]): Unit = { val topicAndPartition = TopicAndPartition(msgAndMetadata.topic, msgAndMetadata.partition) - blockGenerator += ((msgAndMetadata.key, msgAndMetadata.message)) - topicPartitionOffsetMap.put(topicAndPartition, msgAndMetadata.offset) + val data = (msgAndMetadata.key, msgAndMetadata.message) + val metadata = (topicAndPartition, msgAndMetadata.offset) + blockGenerator.addDataWithCallback(data, metadata) + } + + /** Update stored offset */ + private def updateOffset(topicAndPartition: TopicAndPartition, offset: Long): Unit = { + topicPartitionOffsetMap.put(topicAndPartition, offset) } /** * Remember the current offsets for each topic and partition. This is called when a block is * generated. */ - private def rememberBlockOffsets(blockId: StreamBlockId): Unit = synchronized { + private def rememberBlockOffsets(blockId: StreamBlockId): Unit = { // Get a snapshot of current offset map and store with related block id. val offsetSnapshot = topicPartitionOffsetMap.toMap blockOffsetMap.put(blockId, offsetSnapshot) @@ -250,17 +256,25 @@ class ReliableKafkaReceiver[ /** Class to handle blocks generated by the block generator. */ private final class GeneratedBlockHandler extends BlockGeneratorListener { - override def onGenerateBlock(blockId: StreamBlockId): Unit = { + def onAddData(data: Any, metadata: Any): Unit = { + // Update the offset of the data that was added to the generator + if (metadata != null) { + val (topicAndPartition, offset) = metadata.asInstanceOf[(TopicAndPartition, Long)] + updateOffset(topicAndPartition, offset) + } + } + + def onGenerateBlock(blockId: StreamBlockId): Unit = { // Remember the offsets of topics/partitions when a block has been generated rememberBlockOffsets(blockId) } - override def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = { + def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = { // Store block and commit the blocks offset storeBlockAndCommitOffset(blockId, arrayBuffer) } - override def onError(message: String, throwable: Throwable): Unit = { + def onError(message: String, throwable: Throwable): Unit = { reportError(message, throwable) } } diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java index 6386602ef8a43..60e6f8deb0d13 100644 --- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java +++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java @@ -22,6 +22,8 @@ import java.util.List; import java.util.Random; +import org.apache.spark.SparkConf; +import org.apache.spark.streaming.Duration; import scala.Predef; import scala.Tuple2; import scala.collection.JavaConverters; @@ -43,15 +45,17 @@ public class JavaKafkaStreamSuite implements Serializable { private transient JavaStreamingContext ssc = null; - private Random random = new Random(); + private transient Random random = new Random(); private transient KafkaStreamSuiteBase suiteBase = null; @Before public void setUp() { suiteBase = new KafkaStreamSuiteBase() { }; - suiteBase.beforeFunction(); + suiteBase.setupKafka(); System.clearProperty("spark.driver.port"); - ssc = new JavaStreamingContext(suiteBase.sparkConf(), suiteBase.batchDuration()); + SparkConf sparkConf = new SparkConf() + .setMaster("local[4]").setAppName(this.getClass().getSimpleName()); + ssc = new JavaStreamingContext(sparkConf, new Duration(500)); } @After @@ -59,7 +63,7 @@ public void tearDown() { ssc.stop(); ssc = null; System.clearProperty("spark.driver.port"); - suiteBase.afterFunction(); + suiteBase.tearDownKafka(); } @Test @@ -76,8 +80,8 @@ public void testKafkaStream() throws InterruptedException { suiteBase.createTopic(topic); HashMap tmp = new HashMap(sent); suiteBase.produceAndSendMessage(topic, - JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap( - Predef.>conforms())); + JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap( + Predef.>conforms())); HashMap kafkaParams = new HashMap(); kafkaParams.put("zookeeper.connect", suiteBase.zkAddress()); diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala index 1bb8e0175b97c..6e24b6f7ffb3b 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala @@ -42,15 +42,13 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.util.Utils +/** + * This is an abstract base class for Kafka testsuites. This has the functionality to set up + * and tear down local Kafka servers, and to push data using Kafka producers. + */ abstract class KafkaStreamSuiteBase extends FunSuite with Logging { import KafkaTestUtils._ - val sparkConf = new SparkConf() - .setMaster("local[4]") - .setAppName(this.getClass.getSimpleName) - val batchDuration = Milliseconds(500) - var ssc: StreamingContext = _ - var zkAddress: String = _ var zkClient: ZkClient = _ @@ -64,7 +62,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Logging { private var server: KafkaServer = _ private var producer: Producer[String, String] = _ - def beforeFunction() { + def setupKafka() { // Zookeeper server startup zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort") // Get the actual zookeeper binding port @@ -100,12 +98,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Logging { logInfo("==================== 4 ====================") } - def afterFunction() { - if (ssc != null) { - ssc.stop() - ssc = null - } - + def tearDownKafka() { if (producer != null) { producer.close() producer = null @@ -146,21 +139,31 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Logging { def produceAndSendMessage(topic: String, sent: Map[String, Int]) { val brokerAddr = brokerConf.hostName + ":" + brokerConf.port - if (producer == null) { - producer = new Producer[String, String](new ProducerConfig(getProducerConfig(brokerAddr))) - } + producer = new Producer[String, String](new ProducerConfig(getProducerConfig(brokerAddr))) producer.send(createTestMessage(topic, sent): _*) + producer.close() logInfo("==================== 6 ====================") } } class KafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually { + var ssc: StreamingContext = _ + + before { + setupKafka() + } - before { beforeFunction() } - after { afterFunction() } + after { + if (ssc != null) { + ssc.stop() + ssc = null + } + tearDownKafka() + } test("Kafka input stream") { - ssc = new StreamingContext(sparkConf, batchDuration) + val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName) + ssc = new StreamingContext(sparkConf, Milliseconds(500)) val topic = "topic1" val sent = Map("a" -> 5, "b" -> 3, "c" -> 10) createTopic(topic) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala index b546d22ca6c38..8489d64762a2b 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.streaming.kafka + import java.io.File import scala.collection.mutable @@ -24,50 +25,67 @@ import scala.concurrent.duration._ import scala.language.postfixOps import scala.util.Random +import com.google.common.io.Files import kafka.serializer.StringDecoder import kafka.utils.{ZKGroupTopicDirs, ZkUtils} +import org.apache.commons.io.FileUtils import org.scalatest.BeforeAndAfter import org.scalatest.concurrent.Eventually +import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.StreamingContext -import org.apache.spark.util.Utils +import org.apache.spark.streaming.{Milliseconds, StreamingContext} class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually { - val topic = "topic" + + val sparkConf = new SparkConf() + .setMaster("local[4]") + .setAppName(this.getClass.getSimpleName) + .set("spark.streaming.receiver.writeAheadLog.enable", "true") val data = Map("a" -> 10, "b" -> 10, "c" -> 10) + + var topic: String = _ var groupId: String = _ var kafkaParams: Map[String, String] = _ + var ssc: StreamingContext = _ + var tempDirectory: File = null before { - beforeFunction() // call this first to start ZK and Kafka + setupKafka() + topic = s"test-topic-${Random.nextInt(10000)}" groupId = s"test-consumer-${Random.nextInt(10000)}" kafkaParams = Map( "zookeeper.connect" -> zkAddress, "group.id" -> groupId, "auto.offset.reset" -> "smallest" ) + + ssc = new StreamingContext(sparkConf, Milliseconds(500)) + tempDirectory = Files.createTempDir() + ssc.checkpoint(tempDirectory.getAbsolutePath) } after { - afterFunction() + if (ssc != null) { + ssc.stop() + } + if (tempDirectory != null && tempDirectory.exists()) { + FileUtils.deleteDirectory(tempDirectory) + tempDirectory = null + } + tearDownKafka() } - test("Reliable Kafka input stream") { - sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true") - ssc = new StreamingContext(sparkConf, batchDuration) - val checkpointDir = s"${System.getProperty("java.io.tmpdir", "/tmp")}/" + - s"test-checkpoint${Random.nextInt(10000)}" - Utils.registerShutdownDeleteDir(new File(checkpointDir)) - ssc.checkpoint(checkpointDir) + + test("Reliable Kafka input stream with single topic") { createTopic(topic) produceAndSendMessage(topic, data) + // Verify whether the offset of this group/topic/partition is 0 before starting. + assert(getCommitOffset(groupId, topic, 0) === None) + val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( - ssc, - kafkaParams, - Map(topic -> 1), - StorageLevel.MEMORY_ONLY) + ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY) val result = new mutable.HashMap[String, Long]() stream.map { case (k, v) => v }.foreachRDD { r => val ret = r.collect() @@ -77,53 +95,36 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter } } ssc.start() - eventually(timeout(10000 milliseconds), interval(100 milliseconds)) { + + eventually(timeout(20000 milliseconds), interval(200 milliseconds)) { // A basic process verification for ReliableKafkaReceiver. // Verify whether received message number is equal to the sent message number. assert(data.size === result.size) // Verify whether each message is the same as the data to be verified. data.keys.foreach { k => assert(data(k) === result(k).toInt) } + // Verify the offset number whether it is equal to the total message number. + assert(getCommitOffset(groupId, topic, 0) === Some(29L)) + } ssc.stop() } +/* test("Verify the offset commit") { // Verify the correctness of offset commit mechanism. - sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true") - ssc = new StreamingContext(sparkConf, batchDuration) - val checkpointDir = s"${System.getProperty("java.io.tmpdir", "/tmp")}/" + - s"test-checkpoint${Random.nextInt(10000)}" - Utils.registerShutdownDeleteDir(new File(checkpointDir)) - ssc.checkpoint(checkpointDir) - createTopic(topic) produceAndSendMessage(topic, data) - // Verify whether the offset of this group/topic/partition is 0 before starting. - assert(getCommitOffset(groupId, topic, 0) === 0L) - // Do this to consume all the message of this group/topic. val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( - ssc, - kafkaParams, - Map(topic -> 1), - StorageLevel.MEMORY_ONLY) + ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY) stream.foreachRDD(_ => Unit) ssc.start() - eventually(timeout(3000 milliseconds), interval(100 milliseconds)) { - // Verify the offset number whether it is equal to the total message number. - assert(getCommitOffset(groupId, topic, 0) === 29L) + eventually(timeout(20000 milliseconds), interval(200 milliseconds)) { } ssc.stop() } - - test("Verify multiple topics offset commit") { - sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true") - ssc = new StreamingContext(sparkConf, batchDuration) - val checkpointDir = s"${System.getProperty("java.io.tmpdir", "/tmp")}/" + - s"test-checkpoint${Random.nextInt(10000)}" - Utils.registerShutdownDeleteDir(new File(checkpointDir)) - ssc.checkpoint(checkpointDir) - +*/ + test("Reliable Kafka input stream with multiple topics") { val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1) topics.foreach { case (t, _) => createTopic(t) @@ -131,30 +132,27 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter } // Before started, verify all the group/topic/partition offsets are 0. - topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 0L) } + topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === None) } // Consuming all the data sent to the broker which will potential commit the offsets internally. val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( - ssc, - kafkaParams, - topics, - StorageLevel.MEMORY_ONLY) + ssc, kafkaParams, topics, StorageLevel.MEMORY_ONLY) stream.foreachRDD(_ => Unit) ssc.start() - eventually(timeout(3000 milliseconds), interval(100 milliseconds)) { + eventually(timeout(20000 milliseconds), interval(100 milliseconds)) { // Verify the offset for each group/topic to see whether they are equal to the expected one. - topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 29L) } + topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === Some(29L)) } } ssc.stop() } + /** Getting partition offset from Zookeeper. */ - private def getCommitOffset(groupId: String, topic: String, partition: Int): Long = { + private def getCommitOffset(groupId: String, topic: String, partition: Int): Option[Long] = { assert(zkClient != null, "Zookeeper client is not initialized") - val topicDirs = new ZKGroupTopicDirs(groupId, topic) val zkPath = s"${topicDirs.consumerOffsetDir}/$partition" - - ZkUtils.readDataMaybeNull(zkClient, zkPath)._1.map(_.toLong).getOrElse(0L) + val offset = ZkUtils.readDataMaybeNull(zkClient, zkPath)._1.map(_.toLong) + offset } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala index b1e9cb7673f2c..d286fadd50096 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala @@ -27,13 +27,38 @@ import org.apache.spark.streaming.util.{RecurringTimer, SystemClock} /** Listener object for BlockGenerator events */ private[streaming] trait BlockGeneratorListener { - /** Called when a new block is generated */ + /** + * Called after a data item is added into the BlockGenerator. The data addition and this + * callback are synchronized with the block generation and its associated callback, + * so block generation waits for the active data addition+callback to complete. This is useful + * for updating metadata on successful buffering of a data item, specifically that metadata + * that will be useful when a block is generated. Any long blocking operation in this callback + * will hurt the throughput. + */ + def onAddData(data: Any, metadata: Any) + + /** + * Called when a new block of data is generated by the block generator. The block generation + * and this callback are synchronized with the data addition and its associated callback, so + * the data addition waits for the block generation+callback to complete. This is useful + * for updating metadata when a block has been generated, specifically metadata that will + * be useful when the block has been successfully stored. Any long blocking operation in this + * callback will hurt the throughput. + */ def onGenerateBlock(blockId: StreamBlockId) - /** Called when a new block needs to be pushed */ + /** + * Called when a new block is ready to be pushed. Callers are supposed to store the block into + * Spark in this method. Internally this is called from a single + * thread, that is not synchronized with any other callbacks. Hence it is okay to do long + * blocking operation in this callback. + */ def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) - /** Called when an error has occurred in BlockGenerator */ + /** + * Called when an error has occurred in the BlockGenerator. Can be called form many places + * so better to not do any long block operation in this callback. + */ def onError(message: String, throwable: Throwable) } @@ -84,16 +109,27 @@ private[streaming] class BlockGenerator( * Push a single data item into the buffer. All received data items * will be periodically pushed into BlockManager. */ - def += (data: Any): Unit = synchronized { + def addData (data: Any): Unit = synchronized { waitToPush() currentBuffer += data } + /** + * Push a single data item into the buffer. After buffering the data, the + * `BlockGeneratorListnere.onAddData` callback will be called. All received data items + * will be periodically pushed into BlockManager. + */ + def addDataWithCallback(data: Any, metadata: Any) = synchronized { + waitToPush() + currentBuffer += data + listener.onAddData(data, metadata) + } + /** Change the buffer to which single records are added to. */ - private def updateCurrentBuffer(time: Long): Unit = { + private def updateCurrentBuffer(time: Long): Unit = synchronized { try { val newBlockBuffer = currentBuffer - synchronized { currentBuffer = new ArrayBuffer[Any] } + currentBuffer = new ArrayBuffer[Any] if (newBlockBuffer.size > 0) { val blockId = StreamBlockId(receiverId, time - blockInterval) val newBlock = new Block(blockId, newBlockBuffer) @@ -131,8 +167,8 @@ private[streaming] class BlockGenerator( } catch { case ie: InterruptedException => logInfo("Block pushing thread was interrupted") - case t: Throwable => - reportError("Error in block pushing thread", t) + case e: Exception => + reportError("Error in block updating thread", e) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index a13689a9254a6..704019a55aff3 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -27,10 +27,10 @@ import akka.actor.{Actor, Props} import akka.pattern.ask import com.google.common.base.Throwables import org.apache.hadoop.conf.Configuration + import org.apache.spark.{Logging, SparkEnv, SparkException} import org.apache.spark.storage.StreamBlockId import org.apache.spark.streaming.scheduler._ -import org.apache.spark.streaming.util.WriteAheadLogFileSegment import org.apache.spark.util.{AkkaUtils, Utils} /** @@ -99,6 +99,8 @@ private[streaming] class ReceiverSupervisorImpl( /** Divides received data records into data blocks for pushing in BlockManager. */ private val blockGenerator = new BlockGenerator(new BlockGeneratorListener { + def onAddData(data: Any, metadata: Any): Unit = { } + def onGenerateBlock(blockId: StreamBlockId): Unit = { } def onError(message: String, throwable: Throwable) { @@ -112,7 +114,7 @@ private[streaming] class ReceiverSupervisorImpl( /** Push a single record of received data into block generator. */ def pushSingle(data: Any) { - blockGenerator += (data) + blockGenerator addData (data) } /** Store an ArrayBuffer of received data as a data block into Spark's memory. */ diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala index c189f1fa9ef47..9bad105f5a83b 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala @@ -138,7 +138,7 @@ class ReceiverSuite extends FunSuite with Timeouts { blockGenerator.start() var count = 0 while(System.currentTimeMillis - startTime < waitTime) { - blockGenerator += count + blockGenerator addData count generatedData += count count += 1 Thread.sleep(10) @@ -168,7 +168,7 @@ class ReceiverSuite extends FunSuite with Timeouts { blockGenerator.start() var count = 0 while(System.currentTimeMillis - startTime < waitTime) { - blockGenerator += count + blockGenerator addData count generatedData += count count += 1 Thread.sleep(1) @@ -299,6 +299,8 @@ class ReceiverSuite extends FunSuite with Timeouts { val arrayBuffers = new ArrayBuffer[ArrayBuffer[Int]] val errors = new ArrayBuffer[Throwable] + def onAddData(data: Any, metadata: Any) { } + def onGenerateBlock(blockId: StreamBlockId) { } def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {