Skip to content

Commit

Permalink
Code refactor and add some unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jerryshao committed Nov 11, 2014
1 parent dd9aeeb commit 77c3e50
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ class KafkaInputDStream[
@transient ssc_ : StreamingContext,
kafkaParams: Map[String, String],
topics: Map[String, Int],
reliableStoreEnabled: Boolean,
reliableReceiveEnabled: Boolean,
storageLevel: StorageLevel
) extends ReceiverInputDStream[(K, V)](ssc_) with Logging {

def getReceiver(): Receiver[(K, V)] = {
if (!reliableStoreEnabled) {
if (!reliableReceiveEnabled) {
new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
.asInstanceOf[Receiver[(K, V)]]
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ class ReliableKafkaReceiver[
val kafkaMetadata = metadata.asInstanceOf[(TopicAndPartition, Long)]
topicPartitionOffsetMap.put(kafkaMetadata._1, kafkaMetadata._2)
}
println(s"offset map: ${topicPartitionOffsetMap.mkString(":")}")
}

override def onGenerateBlock(blockId: StreamBlockId): Unit = {
Expand All @@ -80,7 +79,6 @@ class ReliableKafkaReceiver[
val offsetSnapshot = topicPartitionOffsetMap.toMap
blockOffsetMap.put(blockId, offsetSnapshot)
topicPartitionOffsetMap.clear()
println(s"block generated: $blockId, offset snapshot: ${offsetSnapshot.mkString(":")}")
}

override def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
Expand All @@ -101,8 +99,7 @@ class ReliableKafkaReceiver[

/** Manage the BlockGenerator in receiver itself for better managing block store and offset
* commit */
@volatile private lazy val blockGenerator =
new BlockGenerator(blockGeneratorListener, streamId, env.conf)
private lazy val blockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf)

override def onStop(): Unit = {
if (consumerConnector != null) {
Expand Down Expand Up @@ -134,6 +131,9 @@ class ReliableKafkaReceiver[
props.setProperty(AUTO_OFFSET_COMMIT, "false")

val consumerConfig = new ConsumerConfig(props)

assert(consumerConfig.autoCommitEnable == false)

logInfo(s"Connecting to Zookeeper: ${consumerConfig.zkConnect}")
consumerConnector = Consumer.create(consumerConfig)
logInfo(s"Connected to Zookeeper: ${consumerConfig.zkConnect}")
Expand Down Expand Up @@ -204,7 +204,7 @@ class ReliableKafkaReceiver[
s"${topicAndPart.topic}, partition ${topicAndPart.partition}", t)
}

println(s"Committed offset ${offset} for topic ${topicAndPart.topic}, " +
logInfo(s"Committed offset ${offset} for topic ${topicAndPart.topic}, " +
s"partition ${topicAndPart.partition}")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,27 @@ class KafkaStreamSuite extends TestSuiteBase {
}

override def afterFunction() {
producer.close()
server.shutdown()
if (producer != null) {
producer.close()
producer = null
}

if (server != null) {
server.shutdown()
server = null
}

brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) }

zkClient.close()
zookeeper.shutdown()
if (zkClient != null) {
zkClient.close()
zkClient = null
}

if (zookeeper != null) {
zookeeper.shutdown()
zookeeper = null
}

super.afterFunction()
}
Expand Down Expand Up @@ -155,7 +170,9 @@ class KafkaStreamSuite extends TestSuiteBase {

def produceAndSendMessage(topic: String, sent: Map[String, Int]) {
val brokerAddr = brokerConf.hostName + ":" + brokerConf.port
producer = new Producer[String, String](new ProducerConfig(getProducerConfig(brokerAddr)))
if (producer == null) {
producer = new Producer[String, String](new ProducerConfig(getProducerConfig(brokerAddr)))
}
producer.send(createTestMessage(topic, sent): _*)
logInfo("==================== 6 ====================")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.kafka

import scala.collection.mutable

import kafka.serializer.StringDecoder
import kafka.utils.{ZkUtils, ZKGroupTopicDirs}

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext

class ReliableKafkaStreamSuite extends KafkaStreamSuite {
import KafkaTestUtils._

test("Reliable Kafka input stream") {
val ssc = new StreamingContext(master, framework, batchDuration)
val topic = "test"
val sent = Map("a" -> 1, "b" -> 1, "c" -> 1)
createTopic(topic)
produceAndSendMessage(topic, sent)

val kafkaParams = Map("zookeeper.connect" -> s"$zkHost:$zkPort",
"group.id" -> s"test-consumer-${random.nextInt(10000)}",
"auto.offset.reset" -> "smallest")

val stream = KafkaUtils.createReliableStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
Map(topic -> 1),
StorageLevel.MEMORY_ONLY)
val result = new mutable.HashMap[String, Long]()
stream.map { case (k, v) => v }
.foreachRDD { r =>
val ret = r.collect()
ret.foreach { v =>
val count = result.getOrElseUpdate(v, 0) + 1
result.put(v, count)
}
}
ssc.start()
ssc.awaitTermination(3000)

assert(sent.size === result.size)
sent.keys.foreach { k => assert(sent(k) === result(k).toInt) }

ssc.stop()
}

test("Verify the offset commit") {
val ssc = new StreamingContext(master, framework, batchDuration)
val topic = "test"
val sent = Map("a" -> 10, "b" -> 10, "c" -> 10)
createTopic(topic)
produceAndSendMessage(topic, sent)

val groupId = s"test-consumer-${random.nextInt(10000)}"

val kafkaParams = Map("zookeeper.connect" -> s"$zkHost:$zkPort",
"group.id" -> groupId,
"auto.offset.reset" -> "smallest")

assert(getCommitOffset(groupId, topic, 0) === 0L)

val stream = KafkaUtils.createReliableStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
Map(topic -> 1),
StorageLevel.MEMORY_ONLY)
stream.foreachRDD(_ => Unit)
ssc.start()
ssc.awaitTermination(3000)
ssc.stop()

assert(getCommitOffset(groupId, topic, 0) === 29L)
}

test("Verify multiple topics offset commit") {
val ssc = new StreamingContext(master, framework, batchDuration)
val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1)
val sent = Map("a" -> 10, "b" -> 10, "c" -> 10)
topics.foreach { case (t, _) =>
createTopic(t)
produceAndSendMessage(t, sent)
}

val groupId = s"test-consumer-${random.nextInt(10000)}"

val kafkaParams = Map("zookeeper.connect" -> s"$zkHost:$zkPort",
"group.id" -> groupId,
"auto.offset.reset" -> "smallest")

topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 0L) }

val stream = KafkaUtils.createReliableStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
topics,
StorageLevel.MEMORY_ONLY)
stream.foreachRDD(_ => Unit)
ssc.start()
ssc.awaitTermination(3000)
ssc.stop()

topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 29L) }
}

test("Verify offset commit when exception is met") {
val sparkConf = new SparkConf()
.setMaster(master)
.setAppName(framework)
var ssc = new StreamingContext(
sparkConf.clone.set("spark.streaming.blockInterval", "4000"),
batchDuration)
val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1)
val sent = Map("a" -> 10, "b" -> 10, "c" -> 10)
topics.foreach { case (t, _) =>
createTopic(t)
produceAndSendMessage(t, sent)
}

val groupId = s"test-consumer-${random.nextInt(10000)}"

val kafkaParams = Map("zookeeper.connect" -> s"$zkHost:$zkPort",
"group.id" -> groupId,
"auto.offset.reset" -> "smallest")

KafkaUtils.createReliableStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
topics,
StorageLevel.MEMORY_ONLY).foreachRDD(_ => throw new Exception)
try {
ssc.start()
ssc.awaitTermination(1000)
} catch {
case e: Exception =>
if (ssc != null) {
ssc.stop()
ssc = null
}
}
// Failed before putting to BM, so offset is not updated.
topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 0L) }

// Restart to see if data is consumed from last checkpoint.
ssc = new StreamingContext(sparkConf, batchDuration)
KafkaUtils.createReliableStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
topics,
StorageLevel.MEMORY_ONLY).foreachRDD(_ => Unit)
ssc.start()
ssc.awaitTermination(3000)
ssc.stop()

topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 29L) }
}

private def getCommitOffset(groupId: String, topic: String, partition: Int): Long = {
assert(zkClient != null, "Zookeeper client is not initialized")

val topicDirs = new ZKGroupTopicDirs(groupId, topic)
val zkPath = s"${topicDirs.consumerOffsetDir}/$partition"

ZkUtils.readDataMaybeNull(zkClient, zkPath)._1.map(_.toLong).getOrElse(0L)
}
}

0 comments on commit 77c3e50

Please sign in to comment.