Skip to content

Commit

Permalink
Initial commit for reliable Kafka receiver
Browse files Browse the repository at this point in the history
  • Loading branch information
jerryshao committed Nov 11, 2014
1 parent 6e03de3 commit dd9aeeb
Show file tree
Hide file tree
Showing 7 changed files with 441 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,8 @@
package org.apache.spark.streaming.kafka

import scala.collection.Map
import scala.reflect.{classTag, ClassTag}
import scala.reflect.ClassTag

import java.util.Properties
import java.util.concurrent.Executors

import kafka.consumer._
import kafka.serializer.Decoder
import kafka.utils.VerifiableProperties

Expand Down Expand Up @@ -51,11 +47,16 @@ class KafkaInputDStream[
@transient ssc_ : StreamingContext,
kafkaParams: Map[String, String],
topics: Map[String, Int],
reliableStoreEnabled: Boolean,
storageLevel: StorageLevel
) extends ReceiverInputDStream[(K, V)](ssc_) with Logging {

def getReceiver(): Receiver[(K, V)] = {
new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
if (!reliableStoreEnabled) {
new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
.asInstanceOf[Receiver[(K, V)]]
} else {
new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
.asInstanceOf[Receiver[(K, V)]]
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.kafka

import scala.collection.Map
import scala.reflect.{classTag, ClassTag}

import java.util.Properties
import java.util.concurrent.Executors

import kafka.consumer._
import kafka.serializer.Decoder
import kafka.utils.VerifiableProperties
import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient._

import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver

private[streaming]
class KafkaReceiver[
K: ClassTag,
V: ClassTag,
U <: Decoder[_]: ClassTag,
T <: Decoder[_]: ClassTag](
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
) extends Receiver[Any](storageLevel) with Logging {

// Connection to Kafka
var consumerConnector: ConsumerConnector = null

def onStop() {
if (consumerConnector != null) {
consumerConnector.shutdown()
}
}

def onStart() {

logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id"))

// Kafka connection properties
val props = new Properties()
kafkaParams.foreach(param => props.put(param._1, param._2))

val zkConnect = kafkaParams("zookeeper.connect")
// Create the connection to the cluster
logInfo("Connecting to Zookeeper: " + zkConnect)
val consumerConfig = new ConsumerConfig(props)
consumerConnector = Consumer.create(consumerConfig)
logInfo("Connected to " + zkConnect)

// When auto.offset.reset is defined, it is our responsibility to try and whack the
// consumer group zk node.
if (kafkaParams.contains("auto.offset.reset")) {
tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams("group.id"))
}

val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(consumerConfig.props)
.asInstanceOf[Decoder[K]]
val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(consumerConfig.props)
.asInstanceOf[Decoder[V]]

// Create Threads for each Topic/Message Stream we are listening
val topicMessageStreams = consumerConnector.createMessageStreams(
topics, keyDecoder, valueDecoder)

val executorPool = Executors.newFixedThreadPool(topics.values.sum)
try {
// Start the messages handler for each partition
topicMessageStreams.values.foreach { streams =>
streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
}
} finally {
executorPool.shutdown() // Just causes threads to terminate after work is done
}
}

// Handles Kafka Messages
private class MessageHandler[K: ClassTag, V: ClassTag](stream: KafkaStream[K, V])
extends Runnable {
def run() {
logInfo("Starting MessageHandler.")
try {
for (msgAndMetadata <- stream) {
store((msgAndMetadata.key, msgAndMetadata.message))
}
} catch {
case e: Throwable => logError("Error handling message; exiting", e)
}
}
}

// It is our responsibility to delete the consumer group when specifying auto.offset.reset. This
// is because Kafka 0.7.2 only honors this param when the group is not in zookeeper.
//
// The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied
// from Kafka's ConsoleConsumer. See code related to 'auto.offset.reset' when it is set to
// 'smallest'/'largest':
// scalastyle:off
// https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
// scalastyle:on
private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) {
val dir = "/consumers/" + groupId
logInfo("Cleaning up temporary Zookeeper data under " + dir + ".")
val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
try {
zk.deleteRecursive(dir)
} catch {
case e: Throwable => logWarning("Error cleaning up temporary Zookeeper data", e)
} finally {
zk.close()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ object KafkaUtils {
topics: Map[String, Int],
storageLevel: StorageLevel
): ReceiverInputDStream[(K, V)] = {
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel)
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, false, storageLevel)
}

/**
Expand Down Expand Up @@ -144,4 +144,71 @@ object KafkaUtils {
createStream[K, V, U, T](
jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
}

def createReliableStream(
ssc: StreamingContext,
zkQuorum: String,
groupId: String,
topics: Map[String, Int],
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2)
: ReceiverInputDStream[(String, String)] = {
val kafkaParams = Map[String, String](
"zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
"zookeeper.connection.timeout.ms" -> "10000")
createReliableStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics, storageLevel)
}

def createReliableStream[
K: ClassTag,
V: ClassTag,
U <: Decoder[_]: ClassTag,
T <: Decoder[_]: ClassTag](
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
): ReceiverInputDStream[(K, V)] = {
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, true, storageLevel)
}

def createReliableStream(
jssc: JavaStreamingContext,
zkQuorum: String,
groupId: String,
topics: JMap[String, JInt]
): JavaPairReceiverInputDStream[String, String] = {
createReliableStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
}

def createReliableStream(
jssc: JavaStreamingContext,
zkQuorum: String,
groupId: String,
topics: JMap[String, JInt],
storageLevel: StorageLevel
): JavaPairReceiverInputDStream[String, String] = {
createReliableStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
storageLevel)
}

def createReliableStream[K, V, U <: Decoder[_], T <: Decoder[_]](
jssc: JavaStreamingContext,
keyTypeClass: Class[K],
valueTypeClass: Class[V],
keyDecoderClass: Class[U],
valueDecoderClass: Class[T],
kafkaParams: JMap[String, String],
topics: JMap[String, JInt],
storageLevel: StorageLevel
): JavaPairReceiverInputDStream[K, V] = {
implicit val keyCmt: ClassTag[K] = ClassTag(keyTypeClass)
implicit val valueCmt: ClassTag[V] = ClassTag(valueTypeClass)

implicit val keyCmd: ClassTag[U] = ClassTag(keyDecoderClass)
implicit val valueCmd: ClassTag[T] = ClassTag(valueDecoderClass)

createReliableStream[K, V, U, T](
jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
}
}
Loading

0 comments on commit dd9aeeb

Please sign in to comment.