Skip to content

Commit

Permalink
ByteBufferMessageSet constructor refactored to be able to change clas…
Browse files Browse the repository at this point in the history
…s variables to immutable vals
  • Loading branch information
Neha Narkhede committed Jul 18, 2011
1 parent b882ccc commit a8369d0
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 67 deletions.
Expand Up @@ -48,14 +48,6 @@ class ByteBufferMessageSet protected () extends MessageSet {

def validBytes: Int = underlying.validBytes

def enableDeepIteration() = {
underlying.enableDeepIteration
}

def disableDeepIteration() = {
underlying.disableDeepIteration
}

def serialized():ByteBuffer = underlying.serialized

override def iterator: java.util.Iterator[Message] = new java.util.Iterator[Message] {
Expand Down
84 changes: 35 additions & 49 deletions core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
Expand Up @@ -32,64 +32,50 @@ import kafka.common.{InvalidMessageSizeException, ErrorMapping}
* Option 2: Give it a list of messages (scala/java) along with instructions relating to serialization format. Producers will use this method.
*
*/
class ByteBufferMessageSet protected () extends MessageSet {
class ByteBufferMessageSet(val buffer: ByteBuffer,
val errorCode: Int = ErrorMapping.NoError,
val deepIterate: Boolean = true) extends MessageSet {
private val logger = Logger.getLogger(getClass())
private var validByteCount = -1
private var buffer: ByteBuffer = null
private var errorCode: Int = ErrorMapping.NoError
private var shallowValidByteCount = -1
private var deepValidByteCount = -1
private var deepIterate = true

def this(buffer: ByteBuffer, errorCode: Int, deepIterate: Boolean = true) = {
this()
this.buffer = buffer
this.errorCode = errorCode
this.deepIterate = deepIterate
}

def this(buffer: ByteBuffer) = this(buffer, ErrorMapping.NoError, true)

def this(compressionEnabled: Boolean, messages: Message*) {
this()
if (compressionEnabled) {
val message = CompressionUtils.compress(messages)
buffer = ByteBuffer.allocate(message.serializedSize)
message.serializeTo(buffer)
buffer.rewind
}
else {
buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
for (message <- messages) {
message.serializeTo(buffer)
}
buffer.rewind
}
this(
compressionEnabled match {
case true =>
val message = CompressionUtils.compress(messages)
val buffer = ByteBuffer.allocate(message.serializedSize)
message.serializeTo(buffer)
buffer.rewind
buffer
case false =>
val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
for (message <- messages) {
message.serializeTo(buffer)
}
buffer.rewind
buffer
}, ErrorMapping.NoError, true)
}

def this(compressionEnabled: Boolean, messages: Iterable[Message]) {
this()
if (compressionEnabled) {
val message = CompressionUtils.compress(messages)
buffer = ByteBuffer.allocate(message.serializedSize)
message.serializeTo(buffer)
buffer.rewind
}
else {
buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
for (message <- messages) {
message.serializeTo(buffer)
}
buffer.rewind
}
}

def enableDeepIteration() = {
deepIterate = true
}

def disableDeepIteration() = {
deepIterate = false
this(
compressionEnabled match {
case true =>
val message = CompressionUtils.compress(messages)
val buffer = ByteBuffer.allocate(message.serializedSize)
message.serializeTo(buffer)
buffer.rewind
buffer
case false =>
val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
for (message <- messages) {
message.serializeTo(buffer)
}
buffer.rewind
buffer
}, ErrorMapping.NoError, true)
}

def getDeepIterate = deepIterate
Expand Down
Expand Up @@ -153,7 +153,6 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
val fetches = new mutable.ArrayBuffer[FetchRequest]
for(topic <- topics) {
val set = new ByteBufferMessageSet(true, new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
set.enableDeepIteration
messages += topic -> set
producer.send(topic, set)
set.getBuffer.rewind
Expand Down Expand Up @@ -216,7 +215,6 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
var produceList: List[ProducerRequest] = Nil
for(topic <- topics) {
val set = new ByteBufferMessageSet(false, new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
set.enableDeepIteration
messages += topic -> set
produceList ::= new ProducerRequest(topic, 0, set)
fetches += new FetchRequest(topic, 0, 0, 10000)
Expand All @@ -241,7 +239,6 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
var produceList: List[ProducerRequest] = Nil
for(topic <- topics) {
val set = new ByteBufferMessageSet(true, new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
set.enableDeepIteration
messages += topic -> set
produceList ::= new ProducerRequest(topic, 0, set)
fetches += new FetchRequest(topic, 0, 0, 10000)
Expand Down
Expand Up @@ -88,7 +88,6 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with

// send an empty messageset first
val sent2 = new ByteBufferMessageSet(true, getMessageList(Seq.empty[Message]: _*))
sent2.enableDeepIteration
producer.send(topic, sent2)
Thread.sleep(200)
sent2.buffer.rewind
Expand All @@ -99,7 +98,6 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
// send some messages
val sent3 = new ByteBufferMessageSet(true, getMessageList(new Message("hello".getBytes()),
new Message("there".getBytes())))
sent3.enableDeepIteration
producer.send(topic, sent3)

Thread.sleep(200)
Expand Down Expand Up @@ -207,7 +205,6 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
for(topic <- topics) {
val set = new ByteBufferMessageSet(true, getMessageList(new Message(("a_" + topic).getBytes),
new Message(("b_" + topic).getBytes)))
set.enableDeepIteration
messages += topic -> set
producer.send(topic, set)
set.buffer.rewind
Expand Down Expand Up @@ -310,7 +307,6 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
for(topic <- topics) {
val set = new ByteBufferMessageSet(true, getMessageList(new Message(("a_" + topic).getBytes),
new Message(("b_" + topic).getBytes)))
set.enableDeepIteration
messages += topic -> set
producer.send(topic, set)
set.buffer.rewind
Expand Down Expand Up @@ -373,7 +369,6 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
for(topic <- topics) {
val set = new ByteBufferMessageSet(true, getMessageList(new Message(("a_" + topic).getBytes),
new Message(("b_" + topic).getBytes)))
set.enableDeepIteration
messages += topic -> set
produceList ::= new ProducerRequest(topic, 0, set)
fetches += new FetchRequest(topic, 0, 0, 10000)
Expand Down
Expand Up @@ -35,8 +35,6 @@ class CompressionUtilTest extends TestCase {

TestUtils.checkLength(decompressedMessages.iterator,2)

decompressedMessages.enableDeepIteration

TestUtils.checkLength(decompressedMessages.elements,3)

TestUtils.checkEquals(messages.iterator, decompressedMessages.elements)
Expand Down

0 comments on commit a8369d0

Please sign in to comment.