Browse files

Changing the MessageSet to return an iterator over (message,offset) p…

…airs where offset represents offset to be used for the next message
  • Loading branch information...
1 parent cb83183 commit ebf9b1cf62cc4a5cfaea6e97f35464b4bb9c9d59 Neha Narkhede committed Jul 20, 2011
Showing with 143 additions and 103 deletions.
  1. +7 −6 contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
  2. +3 −3 core/src/main/scala/kafka/consumer/ConsumerIterator.scala
  3. +2 −2 core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
  4. +5 −5 core/src/main/scala/kafka/javaapi/message/MessageSet.scala
  5. +2 −2 core/src/main/scala/kafka/log/Log.scala
  6. +33 −28 core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
  7. +6 −5 core/src/main/scala/kafka/message/FileMessageSet.scala
  8. +22 −0 core/src/main/scala/kafka/message/MessageOffset.scala
  9. +4 −4 core/src/main/scala/kafka/message/MessageSet.scala
  10. +4 −4 core/src/main/scala/kafka/producer/SyncProducer.scala
  11. +2 −2 core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
  12. +5 −5 core/src/main/scala/kafka/utils/DumpLogSegments.scala
  13. +4 −1 core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
  14. +5 −5 core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
  15. +10 −3 core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
  16. +2 −2 core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala
  17. +4 −4 core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala
  18. +5 −6 core/src/test/scala/unit/kafka/log/LogTest.scala
  19. +1 −1 core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
  20. +1 −1 core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
  21. +4 −4 core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala
  22. +6 −6 core/src/test/scala/unit/kafka/producer/ProducerTest.scala
  23. +1 −1 core/src/test/scala/unit/kafka/utils/TestUtils.scala
  24. +3 −2 examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
  25. +2 −1 perf/src/main/java/kafka/perf/consumer/SimplePerfConsumer.java
View
13 contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
@@ -13,6 +13,7 @@
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
+import kafka.message.MessageOffset;
import kafka.message.MessageSet;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobConf;
@@ -43,7 +44,7 @@
protected long _count; /*current count*/
protected MultiFetchResponse _response = null; /*fetch response*/
- protected Iterator<Message> _messageIt = null; /*message iterator*/
+ protected Iterator<MessageOffset> _messageIt = null; /*message iterator*/
protected int _retry = 0;
protected long _requestTime = 0; /*accumulative request time*/
@@ -122,7 +123,7 @@ public boolean getNext(KafkaETLKey key, BytesWritable value) throws IOException
while ( !gotNext && iter.hasNext()) {
ByteBufferMessageSet msgSet = iter.next();
if ( hasError(msgSet)) return false;
- _messageIt = (Iterator<Message>) msgSet.iterator();
+ _messageIt = (Iterator<MessageOffset>) msgSet.iterator();
gotNext = get(key, value);
}
}
@@ -171,17 +172,17 @@ public void close() throws IOException {
protected boolean get(KafkaETLKey key, BytesWritable value) throws IOException {
if (_messageIt != null && _messageIt.hasNext()) {
- Message msg = _messageIt.next();
+ MessageOffset msgAndOffset = _messageIt.next();
- ByteBuffer buf = msg.payload();
+ ByteBuffer buf = msgAndOffset.message().payload();
int origSize = buf.remaining();
byte[] bytes = new byte[origSize];
buf.get(bytes, buf.position(), origSize);
value.set(bytes, 0, origSize);
- key.set(_index, _offset, msg.checksum());
+ key.set(_index, _offset, msgAndOffset.message().checksum());
- _offset += MessageSet.entrySize(msg); //increase offset
+ _offset += msgAndOffset.offset(); //increase offset
_count ++; //increase count
return true;
View
6 core/src/main/scala/kafka/consumer/ConsumerIterator.scala
@@ -19,8 +19,8 @@ package kafka.consumer
import kafka.utils.IteratorTemplate
import org.apache.log4j.Logger
import java.util.concurrent.{TimeUnit, BlockingQueue}
-import kafka.message.{MessageSet, Message}
import kafka.cluster.Partition
+import kafka.message.{MessageOffset, MessageSet, Message}
/**
* An iterator that blocks until a value can be read from the supplied queue.
@@ -31,7 +31,7 @@ class ConsumerIterator(private val channel: BlockingQueue[FetchedDataChunk], con
extends IteratorTemplate[Message] {
private val logger = Logger.getLogger(classOf[ConsumerIterator])
- private var current: Iterator[Message] = null
+ private var current: Iterator[MessageOffset] = null
private var currentDataChunk: FetchedDataChunk = null
private var setConsumedOffset: Boolean = false
private var currentTopicInfo: PartitionTopicInfo = null
@@ -76,7 +76,7 @@ class ConsumerIterator(private val channel: BlockingQueue[FetchedDataChunk], con
setConsumedOffset = true
}
- item
+ item.message
}
}
View
4 core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
@@ -56,13 +56,13 @@ class ByteBufferMessageSet(val buffer: ByteBuffer,
def serialized():ByteBuffer = underlying.serialized
- override def iterator: java.util.Iterator[Message] = new java.util.Iterator[Message] {
+ override def iterator: java.util.Iterator[MessageOffset] = new java.util.Iterator[MessageOffset] {
val underlyingIterator = underlying.iterator
override def hasNext(): Boolean = {
underlyingIterator.hasNext
}
- override def next(): Message = {
+ override def next(): MessageOffset = {
underlyingIterator.next
}
View
10 core/src/main/scala/kafka/javaapi/message/MessageSet.scala
@@ -16,8 +16,8 @@
package kafka.javaapi.message
-import kafka.message.{InvalidMessageException, Message}
import java.nio.channels.WritableByteChannel
+import kafka.message.{MessageOffset, InvalidMessageException, Message}
/**
* A set of messages. A message set has a fixed serialized form, though the container
@@ -26,12 +26,12 @@ import java.nio.channels.WritableByteChannel
* 4 byte size containing an integer N
* N message bytes as described in the message class
*/
-abstract class MessageSet extends java.lang.Iterable[Message] {
+abstract class MessageSet extends java.lang.Iterable[MessageOffset] {
/**
* Provides an iterator over the messages in this set
*/
- def iterator: java.util.Iterator[Message]
+ def iterator: java.util.Iterator[MessageOffset]
/**
* Gives the total size of this message set in bytes
@@ -45,8 +45,8 @@ abstract class MessageSet extends java.lang.Iterable[Message] {
def validate(): Unit = {
val thisIterator = this.iterator
while(thisIterator.hasNext) {
- val message = thisIterator.next
- if(!message.isValid)
+ val messageAndOffset = thisIterator.next
+ if(!messageAndOffset.message.isValid)
throw new InvalidMessageException
}
}
View
4 core/src/main/scala/kafka/log/Log.scala
@@ -198,8 +198,8 @@ private[log] class Log(val dir: File, val maxSize: Long, val flushInterval: Int,
def append(messages: MessageSet): Unit = {
// validate the messages
var numberOfMessages = 0
- for(message <- messages) {
- if(!message.isValid)
+ for(messageAndOffset <- messages) {
+ if(!messageAndOffset.message.isValid)
throw new InvalidMessageException()
numberOfMessages += 1;
}
View
61 core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -16,14 +16,12 @@
package kafka.message
-import java.nio._
-import java.nio.channels._
import scala.collection.mutable
-import kafka.message._
-import kafka.utils._
import org.apache.log4j.Logger
-import collection.{JavaConversions, mutable}
import kafka.common.{InvalidMessageSizeException, ErrorMapping}
+import java.nio.ByteBuffer
+import java.nio.channels.WritableByteChannel
+import kafka.utils.IteratorTemplate
/**
* A sequence of messages stored in a byte buffer
@@ -116,18 +114,18 @@ class ByteBufferMessageSet(val buffer: ByteBuffer,
def writeTo(channel: WritableByteChannel, offset: Long, size: Long): Long =
channel.write(buffer.duplicate)
- override def iterator: Iterator[Message] = deepIterate match {
+ override def iterator: Iterator[MessageOffset] = deepIterate match {
case true => deepIterator
case false => shallowIterator
}
- def shallowIterator(): Iterator[Message] = {
+ def shallowIterator(): Iterator[MessageOffset] = {
ErrorMapping.maybeThrowException(errorCode)
- new IteratorTemplate[Message] {
+ new IteratorTemplate[MessageOffset] {
var iter = buffer.slice()
var currValidBytes = 0
- override def makeNext(): Message = {
+ override def makeNext(): MessageOffset = {
// read the size of the item
if(iter.remaining < 4) {
shallowValidByteCount = currValidBytes
@@ -137,42 +135,46 @@ class ByteBufferMessageSet(val buffer: ByteBuffer,
if(size < 0 || iter.remaining < size) {
shallowValidByteCount = currValidBytes
if (currValidBytes == 0 || size < 0)
- throw new InvalidMessageSizeException("invalid message size:" + size + " only received bytes:" + iter.remaining
- + " at " + currValidBytes + " possible causes (1) a single message larger than the fetch size; (2) log corruption")
+ throw new InvalidMessageSizeException("invalid message size: %d only received bytes: %d " +
+ " at %d possible causes (1) a single message larger than the fetch size; (2) log corruption "
+ .format(size, iter.remaining, currValidBytes))
return allDone()
}
currValidBytes += 4 + size
val message = iter.slice()
message.limit(size)
iter.position(iter.position + size)
- new Message(message)
+ new MessageOffset(new Message(message), currValidBytes)
}
}
}
- def deepIterator(): Iterator[Message] = {
+ def deepIterator(): Iterator[MessageOffset] = {
ErrorMapping.maybeThrowException(errorCode)
- new IteratorTemplate[Message] {
+ new IteratorTemplate[MessageOffset] {
var topIter = buffer.slice()
var currValidBytes = 0
- var innerIter:Iterator[Message] = null
+ var innerIter:Iterator[MessageOffset] = null
def innerDone():Boolean = (innerIter==null || !innerIter.hasNext)
- def makeNextOuter: Message = {
+ def makeNextOuter: MessageOffset = {
if (topIter.remaining < 4) {
deepValidByteCount = currValidBytes
return allDone()
}
val size = topIter.getInt()
- logger.trace("Remaining bytes in iterator = " + topIter.remaining)
- logger.trace("size of data = " + size)
+ if(logger.isTraceEnabled) {
+ logger.trace("Remaining bytes in iterator = " + topIter.remaining)
+ logger.trace("size of data = " + size)
+ }
if(size < 0 || topIter.remaining < size) {
deepValidByteCount = currValidBytes
if (currValidBytes == 0 || size < 0)
- throw new InvalidMessageSizeException("invalid message size:" + size + " only received bytes:" + topIter.remaining
- + " at " + currValidBytes + " possible causes (1) a single message larger than the fetch size; (2) log corruption")
+ throw new InvalidMessageSizeException("invalid message size: %d only received bytes: %d " +
+ " at %d possible causes (1) a single message larger than the fetch size; (2) log corruption "
+ .format(size, topIter.remaining, currValidBytes))
return allDone()
}
val message = topIter.slice()
@@ -182,26 +184,29 @@ class ByteBufferMessageSet(val buffer: ByteBuffer,
newMessage.compressionCodec match {
case NoCompressionCodec =>
if(logger.isDebugEnabled)
- logger.debug("Message is uncompressed")
+ logger.debug("Message is uncompressed. Valid byte count = %d".format(currValidBytes))
innerIter = null
currValidBytes += 4 + size
- newMessage
+ new MessageOffset(newMessage, currValidBytes)
case _ =>
if(logger.isDebugEnabled)
- logger.debug("Message is compressed")
+ logger.debug("Message is compressed. Valid byte count = %d".format(currValidBytes))
innerIter = CompressionUtils.decompress(newMessage).deepIterator
makeNext()
}
}
- override def makeNext(): Message = {
- logger.debug("makeNext() in deepIterator: innerDone = " + innerDone)
+ override def makeNext(): MessageOffset = {
+ if(logger.isDebugEnabled)
+ logger.debug("makeNext() in deepIterator: innerDone = " + innerDone)
innerDone match {
case true => makeNextOuter
case false => {
- val message = innerIter.next
- currValidBytes += message.serializedSize
- message
+ val messageAndOffset = innerIter.next
+ //TODO: Alternatively check if messageAndOffset.offset can be used
+// currValidBytes += messageAndOffset.message.serializedSize
+ currValidBytes += messageAndOffset.offset
+ new MessageOffset(messageAndOffset.message, currValidBytes)
}
}
}
View
11 core/src/main/scala/kafka/message/FileMessageSet.scala
@@ -95,7 +95,8 @@ class FileMessageSet private[kafka](private[message] val channel: FileChannel,
* Return a message set which is a view into this set starting from the given offset and with the given size limit.
*/
def read(readOffset: Long, size: Long): MessageSet = {
- new FileMessageSet(channel, this.offset + readOffset, scala.math.min(this.offset + readOffset + size, highWaterMark), false, new AtomicBoolean(false))
+ new FileMessageSet(channel, this.offset + readOffset, scala.math.min(this.offset + readOffset + size, highWaterMark),
+ false, new AtomicBoolean(false))
}
/**
@@ -107,11 +108,11 @@ class FileMessageSet private[kafka](private[message] val channel: FileChannel,
/**
* Get an iterator over the messages in the set
*/
- override def iterator: Iterator[Message] = {
- new IteratorTemplate[Message] {
+ override def iterator: Iterator[MessageOffset] = {
+ new IteratorTemplate[MessageOffset] {
var location = offset
- override def makeNext(): Message = {
+ override def makeNext(): MessageOffset = {
// read the size of the item
val sizeBuffer = ByteBuffer.allocate(4)
channel.read(sizeBuffer, location)
@@ -132,7 +133,7 @@ class FileMessageSet private[kafka](private[message] val channel: FileChannel,
// increment the location and return the item
location += size + 4
- new Message(buffer)
+ new MessageOffset(new Message(buffer), location)
}
}
}
View
22 core/src/main/scala/kafka/message/MessageOffset.scala
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.message
+
+/**
+ * Represents message and offset of the next message. This is used in the MessageSet to iterate over it
+ */
+case class MessageOffset(val message: Message, val offset: Long)
View
8 core/src/main/scala/kafka/message/MessageSet.scala
@@ -63,7 +63,7 @@ object MessageSet {
* 4 byte size containing an integer N
* N message bytes as described in the message class
*/
-abstract class MessageSet extends Iterable[Message] {
+abstract class MessageSet extends Iterable[MessageOffset] {
/** Write the messages in this set to the given channel starting at the given offset byte.
* Less than the complete amount may be written, but no more than maxSize can be. The number
@@ -73,7 +73,7 @@ abstract class MessageSet extends Iterable[Message] {
/**
* Provides an iterator over the messages in this set
*/
- def iterator: Iterator[Message]
+ def iterator: Iterator[MessageOffset]
/**
* Gives the total size of this message set in bytes
@@ -85,8 +85,8 @@ abstract class MessageSet extends Iterable[Message] {
* match the payload for any message.
*/
def validate(): Unit = {
- for(message <- this)
- if(!message.isValid)
+ for(messageAndOffset <- this)
+ if(!messageAndOffset.message.isValid)
throw new InvalidMessageException
}
View
8 core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -56,8 +56,8 @@ class SyncProducer(val config: SyncProducerConfig) {
val request = MultiProducerRequest.readFrom(buffer)
for (produce <- request.produces) {
try {
- for (message <- produce.messages)
- if (!message.isValid)
+ for (messageAndOffset <- produce.messages)
+ if (!messageAndOffset.message.isValid)
logger.trace("topic " + produce.topic + " is invalid")
}
catch {
@@ -135,8 +135,8 @@ class SyncProducer(val config: SyncProducerConfig) {
}
private def verifyMessageSize(messages: ByteBufferMessageSet) {
- for (message <- messages)
- if (message.payloadSize > config.maxMessageSize)
+ for (messageAndOffset <- messages)
+ if (messageAndOffset.message.payloadSize > config.maxMessageSize)
throw new MessageSizeTooLargeException
}
View
4 core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
@@ -82,8 +82,8 @@ object SimpleConsumerShell {
for (messages <- messageSets) {
println("multi fetched " + messages.sizeInBytes + " bytes from offset " + offset)
var consumed = 0
- for(message <- messages) {
- println("consumed: " + Utils.toString(message.payload, "UTF-8"))
+ for(messageAndOffset <- messages) {
+ println("consumed: " + Utils.toString(messageAndOffset.message.payload, "UTF-8"))
consumed += 1
}
if(consumed > 0)
View
10 core/src/main/scala/kafka/utils/DumpLogSegments.scala
@@ -35,15 +35,15 @@ object DumpLogSegments {
var offset = file.getName().split("\\.")(0).toLong
println("Starting offset: " + offset)
val messageSet = new FileMessageSet(file, false)
- for(message <- messageSet) {
+ for(messageAndOffset <- messageSet) {
println("----------------------------------------------")
- if (message.isValid)
+ if (messageAndOffset.message.isValid)
println("offset:\t" + offset)
else
- println("offset:\t" + offset + "\t invalid")
+ println("offset:\t %d \t invalid".format(offset))
if (!isNoPrint)
- println("payload:\t" + Utils.toString(message.payload, "UTF-8"))
- offset += MessageSet.entrySize(message)
+ println("payload:\t" + Utils.toString(messageAndOffset.message.payload, "UTF-8"))
+ offset += messageAndOffset.offset
}
}
}
View
5 core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
@@ -176,6 +176,9 @@ class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness
Thread.sleep(750)
val response = consumer.multifetch(fetches: _*)
for((topic, resp) <- topics.zip(response.toList))
- TestUtils.checkEquals(TestUtils.stackedIterator(messages(topic).iterator, messages(topic).iterator), resp.iterator)
+ TestUtils.checkEquals(TestUtils.stackedIterator(messages(topic).map(m => m.message).iterator,
+ messages(topic).map(m => m.message).iterator),
+ resp.map(m => m.message).iterator)
+// TestUtils.checkEquals(TestUtils.stackedIterator(messages(topic).iterator, messages(topic).iterator), resp.iterator)
}
}
View
10 core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -56,9 +56,9 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
var fetched = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
assertTrue(fetched.iterator.hasNext)
- val fetchedMessage = fetched.iterator.next
+ val fetchedMessageAndOffset = fetched.iterator.next
val stringDecoder = new StringDecoder
- val fetchedStringMessage = stringDecoder.toEvent(fetchedMessage)
+ val fetchedStringMessage = stringDecoder.toEvent(fetchedMessageAndOffset.message)
assertEquals("test-message", fetchedStringMessage)
}
@@ -77,9 +77,9 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
var fetched = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
assertTrue(fetched.iterator.hasNext)
- val fetchedMessage = fetched.iterator.next
+ val fetchedMessageAndOffset = fetched.iterator.next
val stringDecoder = new StringDecoder
- val fetchedStringMessage = stringDecoder.toEvent(fetchedMessage)
+ val fetchedStringMessage = stringDecoder.toEvent(fetchedMessageAndOffset.message)
assertEquals("test-message", fetchedStringMessage)
}
@@ -99,7 +99,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
}
// wait a bit for produced message to be available
- Thread.sleep(200)
+ Thread.sleep(700)
val response = consumer.multifetch(fetches: _*)
for((topic, resp) <- topics.zip(response.toList))
TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
View
13 core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
@@ -26,26 +26,33 @@ trait BaseMessageSetTestCases extends JUnitSuite {
val messages = Array(new Message("abcd".getBytes()), new Message("efgh".getBytes()))
def createMessageSet(messages: Seq[Message], compressed: CompressionCodec = NoCompressionCodec): MessageSet
+ def toMessageIterator(messageSet: MessageSet): Iterator[Message] = {
+ import scala.collection.JavaConversions._
+ val messages = asIterable(messageSet)
+ messages.map(m => m.message).iterator
+ }
@Test
def testWrittenEqualsRead {
import scala.collection.JavaConversions._
val messageSet = createMessageSet(messages)
- TestUtils.checkEquals(asList(messages).iterator, messageSet.iterator)
+ TestUtils.checkEquals(messages.iterator, toMessageIterator(messageSet))
}
@Test
def testIteratorIsConsistent() {
+ import scala.collection.JavaConversions._
val m = createMessageSet(messages)
// two iterators over the same set should give the same results
- TestUtils.checkEquals(m.iterator, m.iterator)
+ TestUtils.checkEquals(asIterator(m.iterator), asIterator(m.iterator))
}
@Test
def testIteratorIsConsistentWithCompression() {
+ import scala.collection.JavaConversions._
val m = createMessageSet(messages, DefaultCompressionCodec)
// two iterators over the same set should give the same results
- TestUtils.checkEquals(m.iterator, m.iterator)
+ TestUtils.checkEquals(asIterator(m.iterator), asIterator(m.iterator))
}
@Test
View
4 core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala
@@ -43,8 +43,8 @@ class ByteBufferMessageSetTest extends kafka.javaapi.message.BaseMessageSetTestC
@Test
def testValidBytesWithCompression () {
val messageList = new ByteBufferMessageSet(compressionCodec = DefaultCompressionCodec,
- messages = getMessageList(new Message("hello".getBytes()),
- new Message("there".getBytes())))
+ messages = getMessageList(new Message("hello".getBytes()),
+ new Message("there".getBytes())))
val buffer = ByteBuffer.allocate(messageList.sizeInBytes.toInt + 2)
buffer.put(messageList.buffer)
buffer.putShort(4)
View
8 core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala
@@ -407,10 +407,10 @@ class ProducerTest extends JUnitSuite {
// cross check if brokers got the messages
val messageSet1 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
Assert.assertTrue("Message set should have 1 message", messageSet1.hasNext)
- Assert.assertEquals(new Message("test1".getBytes), messageSet1.next)
+ Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message)
val messageSet2 = consumer2.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
Assert.assertTrue("Message set should have 1 message", messageSet2.hasNext)
- Assert.assertEquals(new Message("test1".getBytes), messageSet2.next)
+ Assert.assertEquals(new Message("test1".getBytes), messageSet2.next.message)
} catch {
case e: Exception => fail("Not expected")
}
@@ -440,9 +440,9 @@ class ProducerTest extends JUnitSuite {
// cross check if brokers got the messages
val messageSet1 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
Assert.assertTrue("Message set should have 1 message", messageSet1.hasNext)
- Assert.assertEquals(new Message("test1".getBytes), messageSet1.next)
+ Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message)
Assert.assertTrue("Message set should have another message", messageSet1.hasNext)
- Assert.assertEquals(new Message("test1".getBytes), messageSet1.next)
+ Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message)
} catch {
case e: Exception => fail("Not expected")
}
View
11 core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -72,7 +72,7 @@ class LogTest extends JUnitSuite {
val messages = log.read(0, 1024)
var current = 0
for(curr <- messages) {
- assertEquals("Read message should equal written", message, curr)
+ assertEquals("Read message should equal written", message, curr.message)
current += 1
}
assertEquals(10, current)
@@ -113,12 +113,11 @@ class LogTest extends JUnitSuite {
var reads = 0
var current = 0
var offset = 0L
+ var readOffset = 0L
while(current < numMessages) {
- val messages = log.read(offset, 1024*1024)
- for(message <- messages) {
- current += 1
- offset += MessageSet.entrySize(message)
- }
+ val messages = log.read(readOffset, 1024*1024)
+ readOffset += messages.last.offset
+ current += messages.size
if(reads > 2*numMessages)
fail("Too many read attempts.")
reads += 1
View
2 core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
@@ -142,7 +142,7 @@ class KafkaLog4jAppenderTest extends JUnitSuite {
var count = 0
for(message <- messages) {
count = count + 1
- offset += MessageSet.entrySize(message)
+ offset += message.offset
}
assertEquals(5, count)
View
2 core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
@@ -31,7 +31,7 @@ trait BaseMessageSetTestCases extends JUnitSuite {
@Test
def testWrittenEqualsRead {
val messageSet = createMessageSet(messages)
- checkEquals(messages.iterator, messageSet.iterator)
+ checkEquals(messages.iterator, messageSet.map(m => m.message).iterator)
}
@Test
View
8 core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala
@@ -59,17 +59,17 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
buffer.rewind()
messageSet.channel.write(buffer)
// appending those bytes should not change the contents
- checkEquals(messages.iterator, messageSet.iterator)
+ checkEquals(messages.iterator, messageSet.map(m => m.message).iterator)
assertEquals("Unexpected number of bytes truncated", size.longValue, messageSet.recover())
assertEquals("File pointer should now be at the end of the file.", originalPosition, messageSet.channel.position)
// nor should recovery change the contents
- checkEquals(messages.iterator, messageSet.iterator)
+ checkEquals(messages.iterator, messageSet.map(m => m.message).iterator)
}
@Test
def testIterationDoesntChangePosition() {
val position = messageSet.channel.position
- checkEquals(messages.iterator, messageSet.iterator)
+ checkEquals(messages.iterator, messageSet.map(m => m.message).iterator)
assertEquals(position, messageSet.channel.position)
}
@@ -79,7 +79,7 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
checkEquals(messageSet.iterator, read.iterator)
val items = read.iterator.toList
val first = items.head
- val read2 = messageSet.read(MessageSet.entrySize(first), messageSet.sizeInBytes)
+ val read2 = messageSet.read(first.offset, messageSet.sizeInBytes)
checkEquals(items.tail.iterator, read2.iterator)
}
View
12 core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -413,10 +413,10 @@ class ProducerTest extends JUnitSuite {
// cross check if brokers got the messages
val messageSet1 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
Assert.assertTrue("Message set should have 1 message", messageSet1.hasNext)
- Assert.assertEquals(new Message("test1".getBytes), messageSet1.next)
+ Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message)
val messageSet2 = consumer2.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
Assert.assertTrue("Message set should have 1 message", messageSet2.hasNext)
- Assert.assertEquals(new Message("test1".getBytes), messageSet2.next)
+ Assert.assertEquals(new Message("test1".getBytes), messageSet2.next.message)
} catch {
case e: Exception => fail("Not expected", e)
}
@@ -450,9 +450,9 @@ class ProducerTest extends JUnitSuite {
// cross check if brokers got the messages
val messageSet1 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
Assert.assertTrue("Message set should have 1 message", messageSet1.hasNext)
- Assert.assertEquals(new Message("test1".getBytes), messageSet1.next)
+ Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message)
Assert.assertTrue("Message set should have another message", messageSet1.hasNext)
- Assert.assertEquals(new Message("test1".getBytes), messageSet1.next)
+ Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message)
} catch {
case e: Exception => fail("Not expected")
}
@@ -483,7 +483,7 @@ class ProducerTest extends JUnitSuite {
// cross check if brokers got the messages
val messageSet1 = consumer2.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
Assert.assertTrue("Message set should have 1 message", messageSet1.hasNext)
- Assert.assertEquals(new Message("test".getBytes), messageSet1.next)
+ Assert.assertEquals(new Message("test".getBytes), messageSet1.next.message)
// shutdown server2
server2.shutdown
@@ -506,7 +506,7 @@ class ProducerTest extends JUnitSuite {
// cross check if brokers got the messages
val messageSet2 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
Assert.assertTrue("Message set should have 1 message", messageSet2.hasNext)
- Assert.assertEquals(new Message("test".getBytes), messageSet2.next)
+ Assert.assertEquals(new Message("test".getBytes), messageSet2.next.message)
} catch {
case e: Exception => fail("Not expected", e)
View
2 core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -209,7 +209,7 @@ object TestUtils {
* Throw an exception if an iterable has different length than expected
*
*/
- def checkLength(s1: Iterator[Message], expectedLength:Integer) {
+ def checkLength[T](s1: Iterator[T], expectedLength:Integer) {
var n = 0
while (s1.hasNext) {
n+=1
View
5 examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
@@ -21,6 +21,7 @@
import kafka.javaapi.MultiFetchResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.MessageOffset;
import scala.collection.Iterator;
import kafka.api.FetchRequest;
@@ -31,8 +32,8 @@
{
private static void printMessages(ByteBufferMessageSet messageSet)
{
- for (Message message : messageSet) {
- System.out.println(ExampleUtils.getMessage(message));
+ for (MessageOffset messageAndOffset : messageSet) {
+ System.out.println(ExampleUtils.getMessage(messageAndOffset.message()));
}
}
View
3 perf/src/main/java/kafka/perf/consumer/SimplePerfConsumer.java
@@ -27,6 +27,7 @@
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
+import kafka.message.MessageOffset;
public class SimplePerfConsumer extends Thread
{
@@ -77,7 +78,7 @@ public void run() {
offset+= messages.validBytes();
bytesRec.getAndAdd(messages.sizeInBytes());
- Iterator<Message> it = messages.iterator();
+ Iterator<MessageOffset> it = messages.iterator();
while(it.hasNext())
{
it.next();

0 comments on commit ebf9b1c

Please sign in to comment.