From ea873e424424747c8807adbc0c217a5fbf80e060 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 12 Nov 2014 11:04:21 +0800 Subject: [PATCH] Further address the comments --- .../streaming/kafka/KafkaInputDStream.scala | 26 ++-- .../spark/streaming/kafka/KafkaReceiver.scala | 135 ------------------ .../spark/streaming/kafka/KafkaUtils.scala | 4 +- .../kafka/ReliableKafkaReceiver.scala | 42 +++--- .../streaming/receiver/BlockGenerator.scala | 6 - .../receiver/ReceiverSupervisorImpl.scala | 2 - .../spark/streaming/ReceiverSuite.scala | 2 - 7 files changed, 42 insertions(+), 175 deletions(-) delete mode 100644 external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaReceiver.scala diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala index 5a8f4a07d0a3d..15ba9c8212aa1 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala @@ -17,9 +17,12 @@ package org.apache.spark.streaming.kafka +import java.util.Properties + import scala.collection.Map -import scala.reflect.ClassTag +import scala.reflect.{classTag, ClassTag} +import kafka.consumer.{KafkaStream, Consumer, ConsumerConfig, ConsumerConnector} import kafka.serializer.Decoder import kafka.utils.VerifiableProperties @@ -28,6 +31,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.receiver.Receiver +import org.apache.spark.util.Utils /** * Input stream that pulls messages from a Kafka Broker. @@ -47,17 +51,16 @@ class KafkaInputDStream[ @transient ssc_ : StreamingContext, kafkaParams: Map[String, String], topics: Map[String, Int], - reliableReceiveEnabled: Boolean, + useReliableReceiver: Boolean, storageLevel: StorageLevel ) extends ReceiverInputDStream[(K, V)](ssc_) with Logging { def getReceiver(): Receiver[(K, V)] = { - if (!reliableReceiveEnabled) { + if (!useReliableReceiver) { new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) - .asInstanceOf[Receiver[(K, V)]] } else { new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) - .asInstanceOf[Receiver[(K, V)]] + } } } @@ -70,14 +73,15 @@ class KafkaReceiver[ kafkaParams: Map[String, String], topics: Map[String, Int], storageLevel: StorageLevel - ) extends Receiver[Any](storageLevel) with Logging { + ) extends Receiver[(K, V)](storageLevel) with Logging { // Connection to Kafka - var consumerConnector : ConsumerConnector = null + var consumerConnector: ConsumerConnector = null def onStop() { if (consumerConnector != null) { consumerConnector.shutdown() + consumerConnector = null } } @@ -103,11 +107,11 @@ class KafkaReceiver[ .newInstance(consumerConfig.props) .asInstanceOf[Decoder[V]] - // Create Threads for each Topic/Message Stream we are listening + // Create threads for each topic/message Stream we are listening val topicMessageStreams = consumerConnector.createMessageStreams( topics, keyDecoder, valueDecoder) - val executorPool = Executors.newFixedThreadPool(topics.values.sum) + val executorPool = Utils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler") try { // Start the messages handler for each partition topicMessageStreams.values.foreach { streams => @@ -118,8 +122,8 @@ class KafkaReceiver[ } } - // Handles Kafka Messages - private class MessageHandler[K: ClassTag, V: ClassTag](stream: KafkaStream[K, V]) + // Handles Kafka messages + private class MessageHandler(stream: KafkaStream[K, V]) extends Runnable { def run() { logInfo("Starting MessageHandler.") diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaReceiver.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaReceiver.scala deleted file mode 100644 index f686fbca1a509..0000000000000 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaReceiver.scala +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kafka - -import java.util.Properties -import java.util.concurrent.Executors - -import scala.collection.Map -import scala.reflect.{classTag, ClassTag} - -import kafka.consumer._ -import kafka.serializer.Decoder -import kafka.utils.VerifiableProperties -import kafka.utils.ZKStringSerializer -import org.I0Itec.zkclient._ - -import org.apache.spark.Logging -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.receiver.Receiver - -private[streaming] -class KafkaReceiver[ - K: ClassTag, - V: ClassTag, - U <: Decoder[_]: ClassTag, - T <: Decoder[_]: ClassTag]( - kafkaParams: Map[String, String], - topics: Map[String, Int], - storageLevel: StorageLevel - ) extends Receiver[Any](storageLevel) with Logging { - - // Connection to Kafka - var consumerConnector: ConsumerConnector = null - - def onStop() { - if (consumerConnector != null) { - consumerConnector.shutdown() - } - } - - def onStart() { - - logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id")) - - // Kafka connection properties - val props = new Properties() - kafkaParams.foreach(param => props.put(param._1, param._2)) - - val zkConnect = kafkaParams("zookeeper.connect") - // Create the connection to the cluster - logInfo("Connecting to Zookeeper: " + zkConnect) - val consumerConfig = new ConsumerConfig(props) - consumerConnector = Consumer.create(consumerConfig) - logInfo("Connected to " + zkConnect) - - // When auto.offset.reset is defined, it is our responsibility to try and whack the - // consumer group zk node. - if (kafkaParams.contains("auto.offset.reset")) { - tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams("group.id")) - } - - val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) - .newInstance(consumerConfig.props) - .asInstanceOf[Decoder[K]] - val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) - .newInstance(consumerConfig.props) - .asInstanceOf[Decoder[V]] - - // Create Threads for each Topic/Message Stream we are listening - val topicMessageStreams = consumerConnector.createMessageStreams( - topics, keyDecoder, valueDecoder) - - val executorPool = Executors.newFixedThreadPool(topics.values.sum) - try { - // Start the messages handler for each partition - topicMessageStreams.values.foreach { streams => - streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) } - } - } finally { - executorPool.shutdown() // Just causes threads to terminate after work is done - } - } - - // Handles Kafka Messages - private class MessageHandler[K: ClassTag, V: ClassTag](stream: KafkaStream[K, V]) - extends Runnable { - def run() { - logInfo("Starting MessageHandler.") - try { - for (msgAndMetadata <- stream) { - store((msgAndMetadata.key, msgAndMetadata.message)) - } - } catch { - case e: Throwable => logError("Error handling message; exiting", e) - } - } - } - - // It is our responsibility to delete the consumer group when specifying auto.offset.reset. This - // is because Kafka 0.7.2 only honors this param when the group is not in zookeeper. - // - // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied - // from Kafka's ConsoleConsumer. See code related to 'auto.offset.reset' when it is set to - // 'smallest'/'largest': - // scalastyle:off - // https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala - // scalastyle:on - private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) { - val dir = "/consumers/" + groupId - logInfo("Cleaning up temporary Zookeeper data under " + dir + ".") - val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer) - try { - zk.deleteRecursive(dir) - } catch { - case e: Throwable => logWarning("Error cleaning up temporary Zookeeper data", e) - } finally { - zk.close() - } - } -} \ No newline at end of file diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index c7b3c4d33f796..b4ac929e0c070 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -70,8 +70,8 @@ object KafkaUtils { topics: Map[String, Int], storageLevel: StorageLevel ): ReceiverInputDStream[(K, V)] = { - val WALEnabled = ssc.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false) - new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, WALEnabled, storageLevel) + val walEnabled = ssc.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false) + new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel) } /** diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala index 938fcc1fc6a32..4a2e414365715 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala @@ -35,6 +35,19 @@ import org.apache.spark.storage.{StreamBlockId, StorageLevel} import org.apache.spark.streaming.receiver.{BlockGeneratorListener, BlockGenerator, Receiver} import org.apache.spark.util.Utils + +/** + * ReliableKafkaReceiver offers the ability to reliably store data into BlockManager without loss. + * It is turned off by default and will be enabled when + * spark.streaming.receiver.writeAheadLog.enable is true. The difference compared to KafkaReceiver + * is that this receiver manages topic-partition/offset itself and updates the offset information + * after data is reliably stored as write-ahead log. Offsets will only be updated when data is + * reliably stored, so the potential data loss problem of KafkaReceiver can be eliminated. + * + * Note: ReliableKafkaReceiver will set auto.commit.enable to false to turn off automatic offset + * commit mechanism in Kafka consumer. So setting this configuration manually within kafkaParams + * will not take effect. + */ private[streaming] class ReliableKafkaReceiver[ K: ClassTag, @@ -44,7 +57,13 @@ class ReliableKafkaReceiver[ kafkaParams: Map[String, String], topics: Map[String, Int], storageLevel: StorageLevel) - extends Receiver[Any](storageLevel) with Logging { + extends Receiver[(K, V)](storageLevel) with Logging { + + private val groupId = kafkaParams("group.id") + + private val AUTO_OFFSET_COMMIT = "auto.commit.enable" + + private def conf() = SparkEnv.get.conf /** High level consumer to connect to Kafka. */ private var consumerConnector: ConsumerConnector = null @@ -52,12 +71,6 @@ class ReliableKafkaReceiver[ /** zkClient to connect to Zookeeper to commit the offsets. */ private var zkClient: ZkClient = null - private val groupId = kafkaParams("group.id") - - private def conf() = SparkEnv.get.conf - - private val AUTO_OFFSET_COMMIT = "auto.commit.enable" - /** * A HashMap to manage the offset for each topic/partition, this HashMap is called in * synchronized block, so mutable HashMap will not meet concurrency issue. @@ -75,12 +88,6 @@ class ReliableKafkaReceiver[ /** Kafka offsets checkpoint listener to register into BlockGenerator for offsets checkpoint. */ private final class OffsetCheckpointListener extends BlockGeneratorListener { - override def onStoreData(data: Any, metadata: Any): Unit = { - if (metadata != null) { - val kafkaMetadata = metadata.asInstanceOf[(TopicAndPartition, Long)] - topicPartitionOffsetMap.put(kafkaMetadata._1, kafkaMetadata._2) - } - } override def onGenerateBlock(blockId: StreamBlockId): Unit = { // Get a snapshot of current offset map and store with related block id. Since this hook @@ -91,7 +98,7 @@ class ReliableKafkaReceiver[ } override def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = { - store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[Any]]) + store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]]) // Commit and remove the related offsets. Option(blockOffsetMap.get(blockId)).foreach { offsetMap => @@ -202,9 +209,10 @@ class ReliableKafkaReceiver[ for (msgAndMetadata <- stream) { val topicAndPartition = TopicAndPartition( msgAndMetadata.topic, msgAndMetadata.partition) - val metadata = (topicAndPartition, msgAndMetadata.offset) - - blockGenerator += ((msgAndMetadata.key, msgAndMetadata.message), metadata) + blockGenerator.synchronized { + blockGenerator += ((msgAndMetadata.key, msgAndMetadata.message)) + topicPartitionOffsetMap.put(topicAndPartition, msgAndMetadata.offset) + } } } catch { case e: Throwable => logError("Error handling message; existing", e) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala index 6e214d7d28a9a..e563c89dc8115 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala @@ -27,11 +27,6 @@ import org.apache.spark.streaming.util.{RecurringTimer, SystemClock} /** Listener object for BlockGenerator events */ private[streaming] trait BlockGeneratorListener { - /** Called when new data is added into BlockGenerator, this hook function will be called each - * time new data is added into BlockGenerator, any heavy or blocking operation will hurt the - * throughput */ - def onStoreData(data: Any, metadata: Any) - /** Called when a new block is generated */ def onGenerateBlock(blockId: StreamBlockId) @@ -92,7 +87,6 @@ private[streaming] class BlockGenerator( def += (data: Any, metadata: Any = null): Unit = synchronized { waitToPush() currentBuffer += data - listener.onStoreData(data, metadata) } /** Change the buffer to which single records are added to. */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 5f1784a532573..a13689a9254a6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -99,8 +99,6 @@ private[streaming] class ReceiverSupervisorImpl( /** Divides received data records into data blocks for pushing in BlockManager. */ private val blockGenerator = new BlockGenerator(new BlockGeneratorListener { - def onStoreData(data: Any, metadata: Any): Unit = { } - def onGenerateBlock(blockId: StreamBlockId): Unit = { } def onError(message: String, throwable: Throwable) { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala index 5133590d36c2e..c189f1fa9ef47 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala @@ -299,8 +299,6 @@ class ReceiverSuite extends FunSuite with Timeouts { val arrayBuffers = new ArrayBuffer[ArrayBuffer[Int]] val errors = new ArrayBuffer[Throwable] - def onStoreData(data: Any, metadata: Any) { } - def onGenerateBlock(blockId: StreamBlockId) { } def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {