Skip to content

Commit

Permalink
Further address the comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jerryshao committed Nov 12, 2014
1 parent 98f3d07 commit ea873e4
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

package org.apache.spark.streaming.kafka

import java.util.Properties

import scala.collection.Map
import scala.reflect.ClassTag
import scala.reflect.{classTag, ClassTag}

import kafka.consumer.{KafkaStream, Consumer, ConsumerConfig, ConsumerConnector}
import kafka.serializer.Decoder
import kafka.utils.VerifiableProperties

Expand All @@ -28,6 +31,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.util.Utils

/**
* Input stream that pulls messages from a Kafka Broker.
Expand All @@ -47,17 +51,16 @@ class KafkaInputDStream[
@transient ssc_ : StreamingContext,
kafkaParams: Map[String, String],
topics: Map[String, Int],
reliableReceiveEnabled: Boolean,
useReliableReceiver: Boolean,
storageLevel: StorageLevel
) extends ReceiverInputDStream[(K, V)](ssc_) with Logging {

def getReceiver(): Receiver[(K, V)] = {
if (!reliableReceiveEnabled) {
if (!useReliableReceiver) {
new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
.asInstanceOf[Receiver[(K, V)]]
} else {
new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
.asInstanceOf[Receiver[(K, V)]]
}
}
}

Expand All @@ -70,14 +73,15 @@ class KafkaReceiver[
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
) extends Receiver[Any](storageLevel) with Logging {
) extends Receiver[(K, V)](storageLevel) with Logging {

// Connection to Kafka
var consumerConnector : ConsumerConnector = null
var consumerConnector: ConsumerConnector = null

def onStop() {
if (consumerConnector != null) {
consumerConnector.shutdown()
consumerConnector = null
}
}

Expand All @@ -103,11 +107,11 @@ class KafkaReceiver[
.newInstance(consumerConfig.props)
.asInstanceOf[Decoder[V]]

// Create Threads for each Topic/Message Stream we are listening
// Create threads for each topic/message Stream we are listening
val topicMessageStreams = consumerConnector.createMessageStreams(
topics, keyDecoder, valueDecoder)

val executorPool = Executors.newFixedThreadPool(topics.values.sum)
val executorPool = Utils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler")
try {
// Start the messages handler for each partition
topicMessageStreams.values.foreach { streams =>
Expand All @@ -118,8 +122,8 @@ class KafkaReceiver[
}
}

// Handles Kafka Messages
private class MessageHandler[K: ClassTag, V: ClassTag](stream: KafkaStream[K, V])
// Handles Kafka messages
private class MessageHandler(stream: KafkaStream[K, V])
extends Runnable {
def run() {
logInfo("Starting MessageHandler.")
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ object KafkaUtils {
topics: Map[String, Int],
storageLevel: StorageLevel
): ReceiverInputDStream[(K, V)] = {
val WALEnabled = ssc.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, WALEnabled, storageLevel)
val walEnabled = ssc.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,19 @@ import org.apache.spark.storage.{StreamBlockId, StorageLevel}
import org.apache.spark.streaming.receiver.{BlockGeneratorListener, BlockGenerator, Receiver}
import org.apache.spark.util.Utils


/**
* ReliableKafkaReceiver offers the ability to reliably store data into BlockManager without loss.
* It is turned off by default and will be enabled when
* spark.streaming.receiver.writeAheadLog.enable is true. The difference compared to KafkaReceiver
* is that this receiver manages topic-partition/offset itself and updates the offset information
* after data is reliably stored as write-ahead log. Offsets will only be updated when data is
* reliably stored, so the potential data loss problem of KafkaReceiver can be eliminated.
*
* Note: ReliableKafkaReceiver will set auto.commit.enable to false to turn off automatic offset
* commit mechanism in Kafka consumer. So setting this configuration manually within kafkaParams
* will not take effect.
*/
private[streaming]
class ReliableKafkaReceiver[
K: ClassTag,
Expand All @@ -44,20 +57,20 @@ class ReliableKafkaReceiver[
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel)
extends Receiver[Any](storageLevel) with Logging {
extends Receiver[(K, V)](storageLevel) with Logging {

private val groupId = kafkaParams("group.id")

private val AUTO_OFFSET_COMMIT = "auto.commit.enable"

private def conf() = SparkEnv.get.conf

/** High level consumer to connect to Kafka. */
private var consumerConnector: ConsumerConnector = null

/** zkClient to connect to Zookeeper to commit the offsets. */
private var zkClient: ZkClient = null

private val groupId = kafkaParams("group.id")

private def conf() = SparkEnv.get.conf

private val AUTO_OFFSET_COMMIT = "auto.commit.enable"

/**
* A HashMap to manage the offset for each topic/partition, this HashMap is called in
* synchronized block, so mutable HashMap will not meet concurrency issue.
Expand All @@ -75,12 +88,6 @@ class ReliableKafkaReceiver[

/** Kafka offsets checkpoint listener to register into BlockGenerator for offsets checkpoint. */
private final class OffsetCheckpointListener extends BlockGeneratorListener {
override def onStoreData(data: Any, metadata: Any): Unit = {
if (metadata != null) {
val kafkaMetadata = metadata.asInstanceOf[(TopicAndPartition, Long)]
topicPartitionOffsetMap.put(kafkaMetadata._1, kafkaMetadata._2)
}
}

override def onGenerateBlock(blockId: StreamBlockId): Unit = {
// Get a snapshot of current offset map and store with related block id. Since this hook
Expand All @@ -91,7 +98,7 @@ class ReliableKafkaReceiver[
}

override def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[Any]])
store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]])

// Commit and remove the related offsets.
Option(blockOffsetMap.get(blockId)).foreach { offsetMap =>
Expand Down Expand Up @@ -202,9 +209,10 @@ class ReliableKafkaReceiver[
for (msgAndMetadata <- stream) {
val topicAndPartition = TopicAndPartition(
msgAndMetadata.topic, msgAndMetadata.partition)
val metadata = (topicAndPartition, msgAndMetadata.offset)

blockGenerator += ((msgAndMetadata.key, msgAndMetadata.message), metadata)
blockGenerator.synchronized {
blockGenerator += ((msgAndMetadata.key, msgAndMetadata.message))
topicPartitionOffsetMap.put(topicAndPartition, msgAndMetadata.offset)
}
}
} catch {
case e: Throwable => logError("Error handling message; existing", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,6 @@ import org.apache.spark.streaming.util.{RecurringTimer, SystemClock}

/** Listener object for BlockGenerator events */
private[streaming] trait BlockGeneratorListener {
/** Called when new data is added into BlockGenerator, this hook function will be called each
* time new data is added into BlockGenerator, any heavy or blocking operation will hurt the
* throughput */
def onStoreData(data: Any, metadata: Any)

/** Called when a new block is generated */
def onGenerateBlock(blockId: StreamBlockId)

Expand Down Expand Up @@ -92,7 +87,6 @@ private[streaming] class BlockGenerator(
def += (data: Any, metadata: Any = null): Unit = synchronized {
waitToPush()
currentBuffer += data
listener.onStoreData(data, metadata)
}

/** Change the buffer to which single records are added to. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,6 @@ private[streaming] class ReceiverSupervisorImpl(

/** Divides received data records into data blocks for pushing in BlockManager. */
private val blockGenerator = new BlockGenerator(new BlockGeneratorListener {
def onStoreData(data: Any, metadata: Any): Unit = { }

def onGenerateBlock(blockId: StreamBlockId): Unit = { }

def onError(message: String, throwable: Throwable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,6 @@ class ReceiverSuite extends FunSuite with Timeouts {
val arrayBuffers = new ArrayBuffer[ArrayBuffer[Int]]
val errors = new ArrayBuffer[Throwable]

def onStoreData(data: Any, metadata: Any) { }

def onGenerateBlock(blockId: StreamBlockId) { }

def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
Expand Down

0 comments on commit ea873e4

Please sign in to comment.