Skip to content

Commit

Permalink
Removed the receiver's locks and essentially reverted to Saisai's ori…
Browse files Browse the repository at this point in the history
…ginal design.
  • Loading branch information
tdas committed Nov 13, 2014
1 parent 2a20a01 commit ec2e95e
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,17 +178,23 @@ class ReliableKafkaReceiver[

/** Store a Kafka message and the associated metadata as a tuple. */
private def storeMessageAndMetadata(
msgAndMetadata: MessageAndMetadata[K, V]): Unit = synchronized {
msgAndMetadata: MessageAndMetadata[K, V]): Unit = {
val topicAndPartition = TopicAndPartition(msgAndMetadata.topic, msgAndMetadata.partition)
blockGenerator += ((msgAndMetadata.key, msgAndMetadata.message))
topicPartitionOffsetMap.put(topicAndPartition, msgAndMetadata.offset)
val data = (msgAndMetadata.key, msgAndMetadata.message)
val metadata = (topicAndPartition, msgAndMetadata.offset)
blockGenerator.addDataWithCallback(data, metadata)
}

/** Update stored offset */
private def updateOffset(topicAndPartition: TopicAndPartition, offset: Long): Unit = {
topicPartitionOffsetMap.put(topicAndPartition, offset)
}

/**
* Remember the current offsets for each topic and partition. This is called when a block is
* generated.
*/
private def rememberBlockOffsets(blockId: StreamBlockId): Unit = synchronized {
private def rememberBlockOffsets(blockId: StreamBlockId): Unit = {
// Get a snapshot of current offset map and store with related block id.
val offsetSnapshot = topicPartitionOffsetMap.toMap
blockOffsetMap.put(blockId, offsetSnapshot)
Expand Down Expand Up @@ -250,17 +256,25 @@ class ReliableKafkaReceiver[
/** Class to handle blocks generated by the block generator. */
private final class GeneratedBlockHandler extends BlockGeneratorListener {

override def onGenerateBlock(blockId: StreamBlockId): Unit = {
def onAddData(data: Any, metadata: Any): Unit = {
// Update the offset of the data that was added to the generator
if (metadata != null) {
val (topicAndPartition, offset) = metadata.asInstanceOf[(TopicAndPartition, Long)]
updateOffset(topicAndPartition, offset)
}
}

def onGenerateBlock(blockId: StreamBlockId): Unit = {
// Remember the offsets of topics/partitions when a block has been generated
rememberBlockOffsets(blockId)
}

override def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
// Store block and commit the blocks offset
storeBlockAndCommitOffset(blockId, arrayBuffer)
}

override def onError(message: String, throwable: Throwable): Unit = {
def onError(message: String, throwable: Throwable): Unit = {
reportError(message, throwable)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.List;
import java.util.Random;

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import scala.Predef;
import scala.Tuple2;
import scala.collection.JavaConverters;
Expand All @@ -43,23 +45,25 @@

public class JavaKafkaStreamSuite implements Serializable {
private transient JavaStreamingContext ssc = null;
private Random random = new Random();
private transient Random random = new Random();
private transient KafkaStreamSuiteBase suiteBase = null;

@Before
public void setUp() {
suiteBase = new KafkaStreamSuiteBase() { };
suiteBase.beforeFunction();
suiteBase.setupKafka();
System.clearProperty("spark.driver.port");
ssc = new JavaStreamingContext(suiteBase.sparkConf(), suiteBase.batchDuration());
SparkConf sparkConf = new SparkConf()
.setMaster("local[4]").setAppName(this.getClass().getSimpleName());
ssc = new JavaStreamingContext(sparkConf, new Duration(500));
}

@After
public void tearDown() {
ssc.stop();
ssc = null;
System.clearProperty("spark.driver.port");
suiteBase.afterFunction();
suiteBase.tearDownKafka();
}

@Test
Expand All @@ -76,8 +80,8 @@ public void testKafkaStream() throws InterruptedException {
suiteBase.createTopic(topic);
HashMap<String, Object> tmp = new HashMap<String, Object>(sent);
suiteBase.produceAndSendMessage(topic,
JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap(
Predef.<Tuple2<String, Object>>conforms()));
JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap(
Predef.<Tuple2<String, Object>>conforms()));

HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("zookeeper.connect", suiteBase.zkAddress());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,13 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.util.Utils

/**
* This is an abstract base class for Kafka testsuites. This has the functionality to set up
* and tear down local Kafka servers, and to push data using Kafka producers.
*/
abstract class KafkaStreamSuiteBase extends FunSuite with Logging {
import KafkaTestUtils._

val sparkConf = new SparkConf()
.setMaster("local[4]")
.setAppName(this.getClass.getSimpleName)
val batchDuration = Milliseconds(500)
var ssc: StreamingContext = _

var zkAddress: String = _
var zkClient: ZkClient = _

Expand All @@ -64,7 +62,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Logging {
private var server: KafkaServer = _
private var producer: Producer[String, String] = _

def beforeFunction() {
def setupKafka() {
// Zookeeper server startup
zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
// Get the actual zookeeper binding port
Expand Down Expand Up @@ -100,12 +98,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Logging {
logInfo("==================== 4 ====================")
}

def afterFunction() {
if (ssc != null) {
ssc.stop()
ssc = null
}

def tearDownKafka() {
if (producer != null) {
producer.close()
producer = null
Expand Down Expand Up @@ -146,21 +139,31 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Logging {

def produceAndSendMessage(topic: String, sent: Map[String, Int]) {
val brokerAddr = brokerConf.hostName + ":" + brokerConf.port
if (producer == null) {
producer = new Producer[String, String](new ProducerConfig(getProducerConfig(brokerAddr)))
}
producer = new Producer[String, String](new ProducerConfig(getProducerConfig(brokerAddr)))
producer.send(createTestMessage(topic, sent): _*)
producer.close()
logInfo("==================== 6 ====================")
}
}

class KafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually {
var ssc: StreamingContext = _

before {
setupKafka()
}

before { beforeFunction() }
after { afterFunction() }
after {
if (ssc != null) {
ssc.stop()
ssc = null
}
tearDownKafka()
}

test("Kafka input stream") {
ssc = new StreamingContext(sparkConf, batchDuration)
val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
ssc = new StreamingContext(sparkConf, Milliseconds(500))
val topic = "topic1"
val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
createTopic(topic)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,57 +17,75 @@

package org.apache.spark.streaming.kafka


import java.io.File

import scala.collection.mutable
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Random

import com.google.common.io.Files
import kafka.serializer.StringDecoder
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.apache.commons.io.FileUtils
import org.scalatest.BeforeAndAfter
import org.scalatest.concurrent.Eventually

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.util.Utils
import org.apache.spark.streaming.{Milliseconds, StreamingContext}

class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually {
val topic = "topic"

val sparkConf = new SparkConf()
.setMaster("local[4]")
.setAppName(this.getClass.getSimpleName)
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
val data = Map("a" -> 10, "b" -> 10, "c" -> 10)

var topic: String = _
var groupId: String = _
var kafkaParams: Map[String, String] = _
var ssc: StreamingContext = _
var tempDirectory: File = null

before {
beforeFunction() // call this first to start ZK and Kafka
setupKafka()
topic = s"test-topic-${Random.nextInt(10000)}"
groupId = s"test-consumer-${Random.nextInt(10000)}"
kafkaParams = Map(
"zookeeper.connect" -> zkAddress,
"group.id" -> groupId,
"auto.offset.reset" -> "smallest"
)

ssc = new StreamingContext(sparkConf, Milliseconds(500))
tempDirectory = Files.createTempDir()
ssc.checkpoint(tempDirectory.getAbsolutePath)
}

after {
afterFunction()
if (ssc != null) {
ssc.stop()
}
if (tempDirectory != null && tempDirectory.exists()) {
FileUtils.deleteDirectory(tempDirectory)
tempDirectory = null
}
tearDownKafka()
}

test("Reliable Kafka input stream") {
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
ssc = new StreamingContext(sparkConf, batchDuration)
val checkpointDir = s"${System.getProperty("java.io.tmpdir", "/tmp")}/" +
s"test-checkpoint${Random.nextInt(10000)}"
Utils.registerShutdownDeleteDir(new File(checkpointDir))
ssc.checkpoint(checkpointDir)

test("Reliable Kafka input stream with single topic") {
createTopic(topic)
produceAndSendMessage(topic, data)

// Verify whether the offset of this group/topic/partition is 0 before starting.
assert(getCommitOffset(groupId, topic, 0) === None)

val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
Map(topic -> 1),
StorageLevel.MEMORY_ONLY)
ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
val result = new mutable.HashMap[String, Long]()
stream.map { case (k, v) => v }.foreachRDD { r =>
val ret = r.collect()
Expand All @@ -77,84 +95,64 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter
}
}
ssc.start()
eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {

eventually(timeout(20000 milliseconds), interval(200 milliseconds)) {
// A basic process verification for ReliableKafkaReceiver.
// Verify whether received message number is equal to the sent message number.
assert(data.size === result.size)
// Verify whether each message is the same as the data to be verified.
data.keys.foreach { k => assert(data(k) === result(k).toInt) }
// Verify the offset number whether it is equal to the total message number.
assert(getCommitOffset(groupId, topic, 0) === Some(29L))

}
ssc.stop()
}
/*
test("Verify the offset commit") {
// Verify the correctness of offset commit mechanism.
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
ssc = new StreamingContext(sparkConf, batchDuration)
val checkpointDir = s"${System.getProperty("java.io.tmpdir", "/tmp")}/" +
s"test-checkpoint${Random.nextInt(10000)}"
Utils.registerShutdownDeleteDir(new File(checkpointDir))
ssc.checkpoint(checkpointDir)

createTopic(topic)
produceAndSendMessage(topic, data)
// Verify whether the offset of this group/topic/partition is 0 before starting.
assert(getCommitOffset(groupId, topic, 0) === 0L)

// Do this to consume all the message of this group/topic.
val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
Map(topic -> 1),
StorageLevel.MEMORY_ONLY)
ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
stream.foreachRDD(_ => Unit)
ssc.start()
eventually(timeout(3000 milliseconds), interval(100 milliseconds)) {
// Verify the offset number whether it is equal to the total message number.
assert(getCommitOffset(groupId, topic, 0) === 29L)
eventually(timeout(20000 milliseconds), interval(200 milliseconds)) {
}
ssc.stop()
}

test("Verify multiple topics offset commit") {
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
ssc = new StreamingContext(sparkConf, batchDuration)
val checkpointDir = s"${System.getProperty("java.io.tmpdir", "/tmp")}/" +
s"test-checkpoint${Random.nextInt(10000)}"
Utils.registerShutdownDeleteDir(new File(checkpointDir))
ssc.checkpoint(checkpointDir)

*/
test("Reliable Kafka input stream with multiple topics") {
val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1)
topics.foreach { case (t, _) =>
createTopic(t)
produceAndSendMessage(t, data)
}

// Before started, verify all the group/topic/partition offsets are 0.
topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 0L) }
topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === None) }

// Consuming all the data sent to the broker which will potential commit the offsets internally.
val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
topics,
StorageLevel.MEMORY_ONLY)
ssc, kafkaParams, topics, StorageLevel.MEMORY_ONLY)
stream.foreachRDD(_ => Unit)
ssc.start()
eventually(timeout(3000 milliseconds), interval(100 milliseconds)) {
eventually(timeout(20000 milliseconds), interval(100 milliseconds)) {
// Verify the offset for each group/topic to see whether they are equal to the expected one.
topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 29L) }
topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === Some(29L)) }
}
ssc.stop()
}


/** Getting partition offset from Zookeeper. */
private def getCommitOffset(groupId: String, topic: String, partition: Int): Long = {
private def getCommitOffset(groupId: String, topic: String, partition: Int): Option[Long] = {
assert(zkClient != null, "Zookeeper client is not initialized")

val topicDirs = new ZKGroupTopicDirs(groupId, topic)
val zkPath = s"${topicDirs.consumerOffsetDir}/$partition"

ZkUtils.readDataMaybeNull(zkClient, zkPath)._1.map(_.toLong).getOrElse(0L)
val offset = ZkUtils.readDataMaybeNull(zkClient, zkPath)._1.map(_.toLong)
offset
}
}
Loading

0 comments on commit ec2e95e

Please sign in to comment.