Skip to content

Commit

Permalink
Merge pull request #5 from tdas/kafka-refactor
Browse files Browse the repository at this point in the history
Refactored Kafka receiver logic and Kafka testsuites
  • Loading branch information
jerryshao committed Nov 13, 2014
2 parents e501b3c + b2b2f84 commit 9f636b3
Show file tree
Hide file tree
Showing 5 changed files with 254 additions and 236 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,23 @@
package org.apache.spark.streaming.kafka

import java.util.Properties
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.{ThreadPoolExecutor, ConcurrentHashMap}

import scala.collection.Map
import scala.collection.mutable
import scala.reflect.{classTag, ClassTag}
import scala.collection.{Map, mutable}
import scala.reflect.{ClassTag, classTag}

import kafka.common.TopicAndPartition
import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream}
import kafka.message.MessageAndMetadata
import kafka.serializer.Decoder
import kafka.utils.{ZkUtils, ZKGroupTopicDirs, ZKStringSerializer, VerifiableProperties}
import kafka.utils.{VerifiableProperties, ZKGroupTopicDirs, ZKStringSerializer, ZkUtils}
import org.I0Itec.zkclient.ZkClient

import org.apache.spark.{SparkEnv, Logging}
import org.apache.spark.storage.{StreamBlockId, StorageLevel}
import org.apache.spark.streaming.receiver.{BlockGeneratorListener, BlockGenerator, Receiver}
import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.storage.{StorageLevel, StreamBlockId}
import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver}
import org.apache.spark.util.Utils


/**
* ReliableKafkaReceiver offers the ability to reliably store data into BlockManager without loss.
* It is turned off by default and will be enabled when
Expand All @@ -60,10 +59,8 @@ class ReliableKafkaReceiver[
extends Receiver[(K, V)](storageLevel) with Logging {

private val groupId = kafkaParams("group.id")

private val AUTO_OFFSET_COMMIT = "auto.commit.enable"

private def conf() = SparkEnv.get.conf
private def conf = SparkEnv.get.conf

/** High level consumer to connect to Kafka. */
private var consumerConnector: ConsumerConnector = null
Expand All @@ -86,58 +83,8 @@ class ReliableKafkaReceiver[
*/
private var blockGenerator: BlockGenerator = null

/** Kafka offsets checkpoint listener to register into BlockGenerator for offsets checkpoint. */
private final class OffsetCheckpointListener extends BlockGeneratorListener {

override def onGenerateBlock(blockId: StreamBlockId): Unit = {
// Get a snapshot of current offset map and store with related block id. Since this hook
// function is called in synchronized block, so we can get the snapshot without explicit lock.
val offsetSnapshot = topicPartitionOffsetMap.toMap
blockOffsetMap.put(blockId, offsetSnapshot)
topicPartitionOffsetMap.clear()
}

override def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]])

// Commit and remove the related offsets.
Option(blockOffsetMap.get(blockId)).foreach { offsetMap =>
commitOffset(offsetMap)
}
blockOffsetMap.remove(blockId)
}

override def onError(message: String, throwable: Throwable): Unit = {
reportError(message, throwable)
}
}

override def onStop(): Unit = {
if (consumerConnector != null) {
consumerConnector.shutdown()
consumerConnector = null
}

if (zkClient != null) {
zkClient.close()
zkClient = null
}

if (blockGenerator != null) {
blockGenerator.stop()
blockGenerator = null
}

if (topicPartitionOffsetMap != null) {
topicPartitionOffsetMap.clear()
topicPartitionOffsetMap = null
}

if (blockOffsetMap != null) {
blockOffsetMap.clear()
blockOffsetMap = null
}
}
/** Threadpool running the handlers for receiving message from multiple topics and partitions. */
private var messageHandlerThreadPool: ThreadPoolExecutor = null

override def onStart(): Unit = {
logInfo(s"Starting Kafka Consumer Stream with group: $groupId")
Expand All @@ -149,7 +96,7 @@ class ReliableKafkaReceiver[
blockOffsetMap = new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]()

// Initialize the block generator for storing Kafka message.
blockGenerator = new BlockGenerator(new OffsetCheckpointListener, streamId, conf())
blockGenerator = new BlockGenerator(new GeneratedBlockHandler, streamId, conf)

if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && kafkaParams(AUTO_OFFSET_COMMIT) == "true") {
logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in ReliableKafkaReceiver, " +
Expand All @@ -174,7 +121,9 @@ class ReliableKafkaReceiver[
zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs,
consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)

// start BlockGenerator
messageHandlerThreadPool = Utils.newDaemonFixedThreadPool(
topics.values.sum, "KafkaMessageHandler")

blockGenerator.start()

val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
Expand All @@ -188,40 +137,70 @@ class ReliableKafkaReceiver[
val topicMessageStreams = consumerConnector.createMessageStreams(
topics, keyDecoder, valueDecoder)

val executorPool = Utils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler")

try {
topicMessageStreams.values.foreach { streams =>
streams.foreach { stream =>
executorPool.submit(new MessageHandler(stream))
}
topicMessageStreams.values.foreach { streams =>
streams.foreach { stream =>
messageHandlerThreadPool.submit(new MessageHandler(stream))
}
} finally {
executorPool.shutdown()
}
println("Starting")
}

/** A inner class to handle received Kafka message. */
private final class MessageHandler(stream: KafkaStream[K, V]) extends Runnable {
override def run(): Unit = {
logInfo(s"Starting message process thread ${Thread.currentThread().getId}.")
try {
val streamIterator = stream.iterator()
while (streamIterator.hasNext()) {
val msgAndMetadata = streamIterator.next()
val topicAndPartition = TopicAndPartition(
msgAndMetadata.topic, msgAndMetadata.partition)
blockGenerator.synchronized {
blockGenerator += ((msgAndMetadata.key, msgAndMetadata.message))
topicPartitionOffsetMap.put(topicAndPartition, msgAndMetadata.offset)
}
}
} catch {
case e: Throwable => logError("Error handling message; existing", e)
}
override def onStop(): Unit = {
if (messageHandlerThreadPool != null) {
messageHandlerThreadPool.shutdown()
messageHandlerThreadPool = null
}

if (consumerConnector != null) {
consumerConnector.shutdown()
consumerConnector = null
}

if (zkClient != null) {
zkClient.close()
zkClient = null
}

if (blockGenerator != null) {
blockGenerator.stop()
blockGenerator = null
}

if (topicPartitionOffsetMap != null) {
topicPartitionOffsetMap.clear()
topicPartitionOffsetMap = null
}

if (blockOffsetMap != null) {
blockOffsetMap.clear()
blockOffsetMap = null
}
}

/** Store a Kafka message and the associated metadata as a tuple */
private def storeMessageAndMetadata(
msgAndMetadata: MessageAndMetadata[K, V]): Unit = synchronized {
val topicAndPartition = TopicAndPartition(msgAndMetadata.topic, msgAndMetadata.partition)
blockGenerator += ((msgAndMetadata.key, msgAndMetadata.message))
topicPartitionOffsetMap.put(topicAndPartition, msgAndMetadata.offset)
}

/** Remember the current offsets for each topic and partition. This is called when a block is generated */
private def rememberBlockOffsets(blockId: StreamBlockId): Unit = synchronized {
// Get a snapshot of current offset map and store with related block id. Since this hook
// function is called in synchronized block, so we can get the snapshot without explicit lock.
val offsetSnapshot = topicPartitionOffsetMap.toMap
blockOffsetMap.put(blockId, offsetSnapshot)
topicPartitionOffsetMap.clear()
}

/** Store the ready-to-be-stored block and commit the related offsets to zookeeper */
private def storeBlockAndCommitOffset(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]])
Option(blockOffsetMap.get(blockId)).foreach(commitOffset)
blockOffsetMap.remove(blockId)
}

/**
* Commit the offset of Kafka's topic/partition, the commit mechanism follow Kafka 0.8.x's
* metadata schema in Zookeeper.
Expand All @@ -248,4 +227,40 @@ class ReliableKafkaReceiver[
s"partition ${topicAndPart.partition}")
}
}

/** Class to handle received Kafka message. */
private final class MessageHandler(stream: KafkaStream[K, V]) extends Runnable {
override def run(): Unit = {
while (!isStopped) {
println(s"Starting message process thread ${Thread.currentThread().getId}.")
try {
val streamIterator = stream.iterator()
while (streamIterator.hasNext) {
storeMessageAndMetadata(streamIterator.next)
}
} catch {
case e: Exception =>
logError("Error handling message", e)
}
}
}
}

/** Class to handle blocks generated by the block generator. */
private final class GeneratedBlockHandler extends BlockGeneratorListener {

override def onGenerateBlock(blockId: StreamBlockId): Unit = {
// Remember the offsets of topics/partitions when a block has been generated
rememberBlockOffsets(blockId)
}

override def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
// Store block and commit the blocks offset
storeBlockAndCommitOffset(blockId, arrayBuffer)
}

override def onError(message: String, throwable: Throwable): Unit = {
reportError(message, throwable)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Random;

import scala.Predef;
import scala.Tuple2;
Expand All @@ -42,25 +43,23 @@
import org.junit.After;
import org.junit.Before;

public class JavaKafkaStreamSuite extends LocalJavaStreamingContext implements Serializable {
private transient KafkaStreamSuite testSuite = new KafkaStreamSuite();
public class JavaKafkaStreamSuite extends KafkaStreamSuiteBase implements Serializable {
private transient JavaStreamingContext ssc = null;
private Random random = new Random();

@Before
@Override
public void setUp() {
testSuite.beforeFunction();
beforeFunction();
System.clearProperty("spark.driver.port");
//System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock");
ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
ssc = new JavaStreamingContext(sparkConf(), batchDuration());
}

@After
@Override
public void tearDown() {
ssc.stop();
ssc = null;
System.clearProperty("spark.driver.port");
testSuite.afterFunction();
afterFunction();
}

@Test
Expand All @@ -74,15 +73,15 @@ public void testKafkaStream() throws InterruptedException {
sent.put("b", 3);
sent.put("c", 10);

testSuite.createTopic(topic);
createTopic(topic);
HashMap<String, Object> tmp = new HashMap<String, Object>(sent);
testSuite.produceAndSendMessage(topic,
produceAndSendMessage(topic,
JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap(
Predef.<Tuple2<String, Object>>conforms()));

HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("zookeeper.connect", testSuite.zkHost() + ":" + testSuite.zkPort());
kafkaParams.put("group.id", "test-consumer-" + KafkaTestUtils.random().nextInt(10000));
kafkaParams.put("zookeeper.connect", zkAddress());
kafkaParams.put("group.id", "test-consumer-" + random.nextInt(10000));
kafkaParams.put("auto.offset.reset", "smallest");

JavaPairDStream<String, String> stream = KafkaUtils.createStream(ssc,
Expand Down
Loading

0 comments on commit 9f636b3

Please sign in to comment.