Skip to content

Commit

Permalink
Address the comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jerryshao committed Nov 11, 2014
1 parent 16bfe78 commit a949741
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ object KafkaUtils {
topics: Map[String, Int],
storageLevel: StorageLevel
): ReceiverInputDStream[(K, V)] = {
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, false, storageLevel)
val WALEnabled = ssc.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, WALEnabled, storageLevel)
}

/**
Expand Down Expand Up @@ -143,121 +144,4 @@ object KafkaUtils {
createStream[K, V, U, T](
jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
}

/**
* Create an reliable input stream that pulls messages from a Kafka Broker.
* @param ssc StreamingContext object
* @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
* @param groupId The group id for this consumer
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread
* @param storageLevel Storage level to use for storing the received objects
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
*/
def createReliableStream(
ssc: StreamingContext,
zkQuorum: String,
groupId: String,
topics: Map[String, Int],
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2)
: ReceiverInputDStream[(String, String)] = {
val kafkaParams = Map[String, String](
"zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
"zookeeper.connection.timeout.ms" -> "10000")
createReliableStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics, storageLevel)
}

/**
* Create an reliable input stream that pulls messages from a Kafka Broker.
* @param ssc StreamingContext object
* @param kafkaParams Map of kafka configuration parameters,
* see http://kafka.apache.org/08/configuration.html
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
* @param storageLevel Storage level to use for storing the received objects
*/
def createReliableStream[
K: ClassTag,
V: ClassTag,
U <: Decoder[_]: ClassTag,
T <: Decoder[_]: ClassTag](
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
): ReceiverInputDStream[(K, V)] = {
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, true, storageLevel)
}

/**
* Create an reliable Java input stream that pulls messages form a Kafka Broker.
* Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
* @param jssc JavaStreamingContext object
* @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..)
* @param groupId The group id for this consumer
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread
*/
def createReliableStream(
jssc: JavaStreamingContext,
zkQuorum: String,
groupId: String,
topics: JMap[String, JInt]
): JavaPairReceiverInputDStream[String, String] = {
createReliableStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
}

/**
* Create an reliable Java input stream that pulls messages form a Kafka Broker.
* @param jssc JavaStreamingContext object
* @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..).
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
* @param storageLevel RDD storage level.
*/
def createReliableStream(
jssc: JavaStreamingContext,
zkQuorum: String,
groupId: String,
topics: JMap[String, JInt],
storageLevel: StorageLevel
): JavaPairReceiverInputDStream[String, String] = {
createReliableStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
storageLevel)
}

/**
* Create an reliable Java input stream that pulls messages form a Kafka Broker.
* @param jssc JavaStreamingContext object
* @param keyTypeClass Key type of RDD
* @param valueTypeClass value type of RDD
* @param keyDecoderClass Type of kafka key decoder
* @param valueDecoderClass Type of kafka value decoder
* @param kafkaParams Map of kafka configuration parameters,
* see http://kafka.apache.org/08/configuration.html
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread
* @param storageLevel RDD storage level.
*/
def createReliableStream[K, V, U <: Decoder[_], T <: Decoder[_]](
jssc: JavaStreamingContext,
keyTypeClass: Class[K],
valueTypeClass: Class[V],
keyDecoderClass: Class[U],
valueDecoderClass: Class[T],
kafkaParams: JMap[String, String],
topics: JMap[String, JInt],
storageLevel: StorageLevel
): JavaPairReceiverInputDStream[K, V] = {
implicit val keyCmt: ClassTag[K] = ClassTag(keyTypeClass)
implicit val valueCmt: ClassTag[V] = ClassTag(valueTypeClass)

implicit val keyCmd: ClassTag[U] = ClassTag(keyDecoderClass)
implicit val valueCmd: ClassTag[T] = ClassTag(valueDecoderClass)

createReliableStream[K, V, U, T](
jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ class ReliableKafkaReceiver[
}

override def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
// TODO. this should be replaced to reliable store after WAL is ready.
store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[Any]])

// Commit and remove the related offsets.
Expand Down Expand Up @@ -120,7 +119,7 @@ class ReliableKafkaReceiver[

if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && kafkaParams(AUTO_OFFSET_COMMIT) == "true") {
logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in ReliableKafkaReceiver, " +
"otherwise we cannot enable reliable offset commit mechanism")
"otherwise we will manually set it to false to turn off auto offset commit in Kafka")
}

val props = new Properties()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
import KafkaTestUtils._

test("Reliable Kafka input stream") {
val ssc = new StreamingContext(master, framework, batchDuration)
val sparkConf = new SparkConf()
.setMaster(master)
.setAppName(framework)
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
val ssc = new StreamingContext(sparkConf, batchDuration)
val topic = "test"
val sent = Map("a" -> 1, "b" -> 1, "c" -> 1)
createTopic(topic)
Expand All @@ -40,7 +44,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
"group.id" -> s"test-consumer-${random.nextInt(10000)}",
"auto.offset.reset" -> "smallest")

val stream = KafkaUtils.createReliableStream[String, String, StringDecoder, StringDecoder](
val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
Map(topic -> 1),
Expand All @@ -64,7 +68,11 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
}

test("Verify the offset commit") {
val ssc = new StreamingContext(master, framework, batchDuration)
val sparkConf = new SparkConf()
.setMaster(master)
.setAppName(framework)
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
val ssc = new StreamingContext(sparkConf, batchDuration)
val topic = "test"
val sent = Map("a" -> 10, "b" -> 10, "c" -> 10)
createTopic(topic)
Expand All @@ -78,7 +86,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {

assert(getCommitOffset(groupId, topic, 0) === 0L)

val stream = KafkaUtils.createReliableStream[String, String, StringDecoder, StringDecoder](
val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
Map(topic -> 1),
Expand All @@ -92,7 +100,11 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
}

test("Verify multiple topics offset commit") {
val ssc = new StreamingContext(master, framework, batchDuration)
val sparkConf = new SparkConf()
.setMaster(master)
.setAppName(framework)
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
val ssc = new StreamingContext(sparkConf, batchDuration)
val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1)
val sent = Map("a" -> 10, "b" -> 10, "c" -> 10)
topics.foreach { case (t, _) =>
Expand All @@ -108,7 +120,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {

topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 0L) }

val stream = KafkaUtils.createReliableStream[String, String, StringDecoder, StringDecoder](
val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
topics,
Expand All @@ -125,6 +137,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
val sparkConf = new SparkConf()
.setMaster(master)
.setAppName(framework)
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
var ssc = new StreamingContext(
sparkConf.clone.set("spark.streaming.blockInterval", "4000"),
batchDuration)
Expand All @@ -141,7 +154,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
"group.id" -> groupId,
"auto.offset.reset" -> "smallest")

KafkaUtils.createReliableStream[String, String, StringDecoder, StringDecoder](
KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
topics,
Expand All @@ -161,7 +174,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {

// Restart to see if data is consumed from last checkpoint.
ssc = new StreamingContext(sparkConf, batchDuration)
KafkaUtils.createReliableStream[String, String, StringDecoder, StringDecoder](
KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
topics,
Expand Down

0 comments on commit a949741

Please sign in to comment.