diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala index 28ac5929df44a..92dd3f878a71f 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala @@ -18,12 +18,8 @@ package org.apache.spark.streaming.kafka import scala.collection.Map -import scala.reflect.{classTag, ClassTag} +import scala.reflect.ClassTag -import java.util.Properties -import java.util.concurrent.Executors - -import kafka.consumer._ import kafka.serializer.Decoder import kafka.utils.VerifiableProperties @@ -51,11 +47,16 @@ class KafkaInputDStream[ @transient ssc_ : StreamingContext, kafkaParams: Map[String, String], topics: Map[String, Int], + reliableStoreEnabled: Boolean, storageLevel: StorageLevel ) extends ReceiverInputDStream[(K, V)](ssc_) with Logging { def getReceiver(): Receiver[(K, V)] = { - new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) + if (!reliableStoreEnabled) { + new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) + .asInstanceOf[Receiver[(K, V)]] + } else { + new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) .asInstanceOf[Receiver[(K, V)]] } } diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaReceiver.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaReceiver.scala new file mode 100644 index 0000000000000..fa3b2458904b6 --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaReceiver.scala @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.collection.Map +import scala.reflect.{classTag, ClassTag} + +import java.util.Properties +import java.util.concurrent.Executors + +import kafka.consumer._ +import kafka.serializer.Decoder +import kafka.utils.VerifiableProperties +import kafka.utils.ZKStringSerializer +import org.I0Itec.zkclient._ + +import org.apache.spark.Logging +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.receiver.Receiver + +private[streaming] +class KafkaReceiver[ + K: ClassTag, + V: ClassTag, + U <: Decoder[_]: ClassTag, + T <: Decoder[_]: ClassTag]( + kafkaParams: Map[String, String], + topics: Map[String, Int], + storageLevel: StorageLevel + ) extends Receiver[Any](storageLevel) with Logging { + + // Connection to Kafka + var consumerConnector: ConsumerConnector = null + + def onStop() { + if (consumerConnector != null) { + consumerConnector.shutdown() + } + } + + def onStart() { + + logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id")) + + // Kafka connection properties + val props = new Properties() + kafkaParams.foreach(param => props.put(param._1, param._2)) + + val zkConnect = kafkaParams("zookeeper.connect") + // Create the connection to the cluster + logInfo("Connecting to Zookeeper: " + zkConnect) + val consumerConfig = new ConsumerConfig(props) + consumerConnector = Consumer.create(consumerConfig) + logInfo("Connected to " + zkConnect) + + // When auto.offset.reset is defined, it is our responsibility to try and whack the + // consumer group zk node. + if (kafkaParams.contains("auto.offset.reset")) { + tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams("group.id")) + } + + val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) + .newInstance(consumerConfig.props) + .asInstanceOf[Decoder[K]] + val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) + .newInstance(consumerConfig.props) + .asInstanceOf[Decoder[V]] + + // Create Threads for each Topic/Message Stream we are listening + val topicMessageStreams = consumerConnector.createMessageStreams( + topics, keyDecoder, valueDecoder) + + val executorPool = Executors.newFixedThreadPool(topics.values.sum) + try { + // Start the messages handler for each partition + topicMessageStreams.values.foreach { streams => + streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) } + } + } finally { + executorPool.shutdown() // Just causes threads to terminate after work is done + } + } + + // Handles Kafka Messages + private class MessageHandler[K: ClassTag, V: ClassTag](stream: KafkaStream[K, V]) + extends Runnable { + def run() { + logInfo("Starting MessageHandler.") + try { + for (msgAndMetadata <- stream) { + store((msgAndMetadata.key, msgAndMetadata.message)) + } + } catch { + case e: Throwable => logError("Error handling message; exiting", e) + } + } + } + + // It is our responsibility to delete the consumer group when specifying auto.offset.reset. This + // is because Kafka 0.7.2 only honors this param when the group is not in zookeeper. + // + // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied + // from Kafka's ConsoleConsumer. See code related to 'auto.offset.reset' when it is set to + // 'smallest'/'largest': + // scalastyle:off + // https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala + // scalastyle:on + private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) { + val dir = "/consumers/" + groupId + logInfo("Cleaning up temporary Zookeeper data under " + dir + ".") + val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer) + try { + zk.deleteRecursive(dir) + } catch { + case e: Throwable => logWarning("Error cleaning up temporary Zookeeper data", e) + } finally { + zk.close() + } + } +} \ No newline at end of file diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index ec812e1ef3b04..2eabdd170ddb6 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -70,7 +70,7 @@ object KafkaUtils { topics: Map[String, Int], storageLevel: StorageLevel ): ReceiverInputDStream[(K, V)] = { - new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel) + new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, false, storageLevel) } /** @@ -144,4 +144,71 @@ object KafkaUtils { createStream[K, V, U, T]( jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } + + def createReliableStream( + ssc: StreamingContext, + zkQuorum: String, + groupId: String, + topics: Map[String, Int], + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2) + : ReceiverInputDStream[(String, String)] = { + val kafkaParams = Map[String, String]( + "zookeeper.connect" -> zkQuorum, "group.id" -> groupId, + "zookeeper.connection.timeout.ms" -> "10000") + createReliableStream[String, String, StringDecoder, StringDecoder]( + ssc, kafkaParams, topics, storageLevel) + } + + def createReliableStream[ + K: ClassTag, + V: ClassTag, + U <: Decoder[_]: ClassTag, + T <: Decoder[_]: ClassTag]( + ssc: StreamingContext, + kafkaParams: Map[String, String], + topics: Map[String, Int], + storageLevel: StorageLevel + ): ReceiverInputDStream[(K, V)] = { + new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, true, storageLevel) + } + + def createReliableStream( + jssc: JavaStreamingContext, + zkQuorum: String, + groupId: String, + topics: JMap[String, JInt] + ): JavaPairReceiverInputDStream[String, String] = { + createReliableStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*)) + } + + def createReliableStream( + jssc: JavaStreamingContext, + zkQuorum: String, + groupId: String, + topics: JMap[String, JInt], + storageLevel: StorageLevel + ): JavaPairReceiverInputDStream[String, String] = { + createReliableStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), + storageLevel) + } + + def createReliableStream[K, V, U <: Decoder[_], T <: Decoder[_]]( + jssc: JavaStreamingContext, + keyTypeClass: Class[K], + valueTypeClass: Class[V], + keyDecoderClass: Class[U], + valueDecoderClass: Class[T], + kafkaParams: JMap[String, String], + topics: JMap[String, JInt], + storageLevel: StorageLevel + ): JavaPairReceiverInputDStream[K, V] = { + implicit val keyCmt: ClassTag[K] = ClassTag(keyTypeClass) + implicit val valueCmt: ClassTag[V] = ClassTag(valueTypeClass) + + implicit val keyCmd: ClassTag[U] = ClassTag(keyDecoderClass) + implicit val valueCmd: ClassTag[T] = ClassTag(valueDecoderClass) + + createReliableStream[K, V, U, T]( + jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) + } } diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala new file mode 100644 index 0000000000000..6773db67db24c --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.util.Properties +import java.util.concurrent.{ConcurrentHashMap, Executors} + +import scala.collection.Map +import scala.collection.mutable +import scala.reflect.{classTag, ClassTag} + +import kafka.common.TopicAndPartition +import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector} +import kafka.serializer.Decoder +import kafka.utils.{ZkUtils, ZKGroupTopicDirs, ZKStringSerializer, VerifiableProperties} +import org.I0Itec.zkclient.ZkClient + +import org.apache.spark.{SparkEnv, Logging} +import org.apache.spark.storage.{StreamBlockId, StorageLevel} +import org.apache.spark.streaming.receiver.{BlockGeneratorListener, BlockGenerator, Receiver} + +private[streaming] +class ReliableKafkaReceiver[ + K: ClassTag, + V: ClassTag, + U <: Decoder[_]: ClassTag, + T <: Decoder[_]: ClassTag]( + kafkaParams: Map[String, String], + topics: Map[String, Int], + storageLevel: StorageLevel) + extends Receiver[Any](storageLevel) with Logging { + + /** High level consumer to connect to Kafka */ + private var consumerConnector: ConsumerConnector = null + + /** zkClient to connect to Zookeeper to commit the offsets */ + private var zkClient: ZkClient = null + + private val groupId = kafkaParams("group.id") + + private lazy val env = SparkEnv.get + + private val AUTO_OFFSET_COMMIT = "auto.commit.enable" + + /** A HashMap to manage the offset for each topic/partition, this HashMap is called in + * synchronized block, so mutable HashMap will not meet concurrency issue */ + private lazy val topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long] + + /** A concurrent HashMap to store the stream block id and related offset snapshot */ + private lazy val blockOffsetMap = + new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]] + + private lazy val blockGeneratorListener = new BlockGeneratorListener { + override def onStoreData(data: Any, metadata: Any): Unit = { + if (metadata != null) { + val kafkaMetadata = metadata.asInstanceOf[(TopicAndPartition, Long)] + topicPartitionOffsetMap.put(kafkaMetadata._1, kafkaMetadata._2) + } + println(s"offset map: ${topicPartitionOffsetMap.mkString(":")}") + } + + override def onGenerateBlock(blockId: StreamBlockId): Unit = { + // Get a snapshot of current offset map and store with related block id. Since this hook + // function is called in synchronized block, so we can get the snapshot without explicit lock. + val offsetSnapshot = topicPartitionOffsetMap.toMap + blockOffsetMap.put(blockId, offsetSnapshot) + topicPartitionOffsetMap.clear() + println(s"block generated: $blockId, offset snapshot: ${offsetSnapshot.mkString(":")}") + } + + override def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = { + // TODO. this should be replaced to reliable store after WAL is ready. + store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[Any]]) + + // Commit and remove the related offsets. + Option(blockOffsetMap.get(blockId)).foreach { offsetMap => + commitOffset(offsetMap) + } + blockOffsetMap.remove(blockId) + } + + override def onError(message: String, throwable: Throwable): Unit = { + reportError(message, throwable) + } + } + + /** Manage the BlockGenerator in receiver itself for better managing block store and offset + * commit */ + @volatile private lazy val blockGenerator = + new BlockGenerator(blockGeneratorListener, streamId, env.conf) + + override def onStop(): Unit = { + if (consumerConnector != null) { + consumerConnector.shutdown() + consumerConnector = null + } + + if (zkClient != null) { + zkClient.close() + zkClient = null + } + + blockGenerator.stop() + } + + override def onStart(): Unit = { + logInfo(s"Starting Kafka Consumer Stream with group: $groupId") + + if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && kafkaParams(AUTO_OFFSET_COMMIT) == "true") { + logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in ReliableKafkaReceiver, " + + "otherwise we cannot enable reliable offset commit mechanism") + } + + val props = new Properties() + kafkaParams.foreach(param => props.put(param._1, param._2)) + // Manually set "auto.commit.enable" to "false" no matter user explicitly set it to true, + // we have to make sure this property is set to false to turn off auto commit mechanism in + // Kafka. + props.setProperty(AUTO_OFFSET_COMMIT, "false") + + val consumerConfig = new ConsumerConfig(props) + logInfo(s"Connecting to Zookeeper: ${consumerConfig.zkConnect}") + consumerConnector = Consumer.create(consumerConfig) + logInfo(s"Connected to Zookeeper: ${consumerConfig.zkConnect}") + + zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs, + consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer) + + // start BlockGenerator + blockGenerator.start() + + val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) + .newInstance(consumerConfig.props) + .asInstanceOf[Decoder[K]] + + val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) + .newInstance(consumerConfig.props) + .asInstanceOf[Decoder[V]] + + val topicMessageStreams = consumerConnector.createMessageStreams( + topics, keyDecoder, valueDecoder) + + val executorPool = Executors.newFixedThreadPool(topics.values.sum) + + try { + topicMessageStreams.values.foreach { streams => + streams.foreach { stream => + executorPool.submit(new Runnable { + override def run(): Unit = { + logInfo(s"Starting message process thread ${Thread.currentThread().getId}.") + try { + for (msgAndMetadata <- stream) { + val topicAndPartition = TopicAndPartition( + msgAndMetadata.topic, msgAndMetadata.partition) + val metadata = (topicAndPartition, msgAndMetadata.offset) + + blockGenerator += ((msgAndMetadata.key, msgAndMetadata.message), metadata) + } + } catch { + case e: Throwable => logError("Error handling message; existing", e) + } + } + }) + } + } + } finally { + executorPool.shutdown() + } + } + + /** + * Commit the offset of Kafka's topic/partition, the commit mechanism follow Kafka 0.8.x's + * metadata schema in Zookeeper. + */ + private def commitOffset(offsetMap: Map[TopicAndPartition, Long]): Unit = { + if (zkClient == null) { + logError(s"zkClient $zkClient should be initialized at started") + return + } + + for ((topicAndPart, offset) <- offsetMap) { + try { + val topicDirs = new ZKGroupTopicDirs(groupId, topicAndPart.topic) + val zkPath = s"${topicDirs.consumerOffsetDir}/${topicAndPart.partition}" + + ZkUtils.updatePersistentPath(zkClient, zkPath, offset.toString) + } catch { + case t: Throwable => logWarning(s"Exception during commit offset $offset for topic" + + s"${topicAndPart.topic}, partition ${topicAndPart.partition}", t) + } + + println(s"Committed offset ${offset} for topic ${topicAndPart.topic}, " + + s"partition ${topicAndPart.partition}") + } + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala index 0316b6862f195..6e214d7d28a9a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala @@ -27,8 +27,17 @@ import org.apache.spark.streaming.util.{RecurringTimer, SystemClock} /** Listener object for BlockGenerator events */ private[streaming] trait BlockGeneratorListener { + /** Called when new data is added into BlockGenerator, this hook function will be called each + * time new data is added into BlockGenerator, any heavy or blocking operation will hurt the + * throughput */ + def onStoreData(data: Any, metadata: Any) + + /** Called when a new block is generated */ + def onGenerateBlock(blockId: StreamBlockId) + /** Called when a new block needs to be pushed */ def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) + /** Called when an error has occurred in BlockGenerator */ def onError(message: String, throwable: Throwable) } @@ -80,9 +89,10 @@ private[streaming] class BlockGenerator( * Push a single data item into the buffer. All received data items * will be periodically pushed into BlockManager. */ - def += (data: Any): Unit = synchronized { + def += (data: Any, metadata: Any = null): Unit = synchronized { waitToPush() currentBuffer += data + listener.onStoreData(data, metadata) } /** Change the buffer to which single records are added to. */ @@ -93,6 +103,7 @@ private[streaming] class BlockGenerator( if (newBlockBuffer.size > 0) { val blockId = StreamBlockId(receiverId, time - blockInterval) val newBlock = new Block(blockId, newBlockBuffer) + listener.onGenerateBlock(blockId) blocksForPushing.put(newBlock) // put is blocking when queue is full logDebug("Last element in " + blockId + " is " + newBlockBuffer.last) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 5360412330d37..5f1784a532573 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -99,6 +99,10 @@ private[streaming] class ReceiverSupervisorImpl( /** Divides received data records into data blocks for pushing in BlockManager. */ private val blockGenerator = new BlockGenerator(new BlockGeneratorListener { + def onStoreData(data: Any, metadata: Any): Unit = { } + + def onGenerateBlock(blockId: StreamBlockId): Unit = { } + def onError(message: String, throwable: Throwable) { reportError(message, throwable) } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala index 0f6a9489dbe0d..5133590d36c2e 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala @@ -299,6 +299,10 @@ class ReceiverSuite extends FunSuite with Timeouts { val arrayBuffers = new ArrayBuffer[ArrayBuffer[Int]] val errors = new ArrayBuffer[Throwable] + def onStoreData(data: Any, metadata: Any) { } + + def onGenerateBlock(blockId: StreamBlockId) { } + def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) { val bufferOfInts = arrayBuffer.map(_.asInstanceOf[Int]) arrayBuffers += bufferOfInts