Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

More code cleanup. Changed the visibility of variables in ByteBufferM…

…essageSet to private with accessor methods. Deleted unused code
  • Loading branch information...
commit 4c49c4e2a9490255c35e21f89a6f4545de87cc5a 1 parent 2715ec7
@nehanarkhede authored
View
8 core/src/main/scala/kafka/consumer/ConsumerIterator.scala
@@ -34,10 +34,12 @@ class ConsumerIterator(private val channel: BlockingQueue[FetchedDataChunk], con
private var current: Iterator[MessageOffset] = null
private var currentDataChunk: FetchedDataChunk = null
private var currentTopicInfo: PartitionTopicInfo = null
- private var consumedOffset: Long = Long.MaxValue
+ private var consumedOffset: Long = -1L
override def next(): Message = {
val message = super.next
+ if(consumedOffset < 0)
+ throw new IllegalStateException("Offset returned by the message set is invalid %d".format(consumedOffset))
currentTopicInfo.resetConsumeOffset(consumedOffset)
if(logger.isTraceEnabled)
logger.trace("Setting consumed offset to %d".format(consumedOffset))
@@ -63,8 +65,8 @@ class ConsumerIterator(private val channel: BlockingQueue[FetchedDataChunk], con
} else {
currentTopicInfo = currentDataChunk.topicInfo
if (currentTopicInfo.getConsumeOffset != currentDataChunk.fetchOffset) {
- logger.error("consumed offset: " + currentTopicInfo.getConsumeOffset + " doesn't match fetch offset: " +
- currentDataChunk.fetchOffset + " for " + currentTopicInfo + "; consumer may lose data")
+ logger.error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data"
+ .format(currentTopicInfo.getConsumeOffset, currentDataChunk.fetchOffset, currentTopicInfo))
currentTopicInfo.resetConsumeOffset(currentDataChunk.fetchOffset)
}
current = currentDataChunk.messages.iterator
View
11 core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
@@ -54,17 +54,6 @@ private[consumer] class PartitionTopicInfo(val topic: String,
}
/**
- * Record the given number of bytes as having been consumed
- */
- def consumed(messageSize: Long): Unit = {
- if(logger.isTraceEnabled)
- logger.trace("Current consumed offset = " + consumedOffset.get)
- val newOffset = consumedOffset.addAndGet(messageSize)
- if (logger.isDebugEnabled)
- logger.debug("updated consume offset of ( %s ) to %d".format(this, newOffset))
- }
-
- /**
* Enqueue a message set for processing
* @return the number of valid bytes
*/
View
8 core/src/main/scala/kafka/javaapi/Implicits.scala
@@ -29,17 +29,19 @@ private[javaapi] object Implicits {
implicit def scalaMessageSetToJavaMessageSet(messageSet: kafka.message.ByteBufferMessageSet):
kafka.javaapi.message.ByteBufferMessageSet = {
- new kafka.javaapi.message.ByteBufferMessageSet(messageSet.getBuffer, messageSet.initialOffset,
+ new kafka.javaapi.message.ByteBufferMessageSet(messageSet.getBuffer, messageSet.getInitialOffset,
messageSet.getErrorCode, messageSet.getDeepIterate)
}
implicit def toJavaSyncProducer(producer: kafka.producer.SyncProducer): kafka.javaapi.producer.SyncProducer = {
- logger.debug("Implicit instantiation of Java Sync Producer")
+ if(logger.isDebugEnabled)
+ logger.debug("Implicit instantiation of Java Sync Producer")
new kafka.javaapi.producer.SyncProducer(producer)
}
implicit def toSyncProducer(producer: kafka.javaapi.producer.SyncProducer): kafka.producer.SyncProducer = {
- logger.debug("Implicit instantiation of Sync Producer")
+ if(logger.isDebugEnabled)
+ logger.debug("Implicit instantiation of Sync Producer")
producer.underlying
}
View
16 core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
@@ -20,10 +20,10 @@ import kafka.common.ErrorMapping
import org.apache.log4j.Logger
import kafka.message._
-class ByteBufferMessageSet(val buffer: ByteBuffer,
- val initialOffset: Long = 0L,
- val errorCode: Int = ErrorMapping.NoError,
- val deepIterate: Boolean = true) extends MessageSet {
+class ByteBufferMessageSet(private val buffer: ByteBuffer,
+ private val initialOffset: Long = 0L,
+ private val errorCode: Int = ErrorMapping.NoError,
+ private val deepIterate: Boolean = true) extends MessageSet {
private val logger = Logger.getLogger(getClass())
val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer,
initialOffset,
@@ -55,6 +55,14 @@ class ByteBufferMessageSet(val buffer: ByteBuffer,
def serialized():ByteBuffer = underlying.serialized
+ def getInitialOffset = initialOffset
+
+ def getDeepIterate = deepIterate
+
+ def getBuffer = buffer
+
+ def getErrorCode = errorCode
+
override def iterator: java.util.Iterator[MessageOffset] = new java.util.Iterator[MessageOffset] {
val underlyingIterator = underlying.iterator
override def hasNext(): Boolean = {
View
12 core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -33,10 +33,10 @@ import kafka.utils.IteratorTemplate
* Option 2: Give it a list of messages along with instructions relating to serialization format. Producers will use this method.
*
*/
-class ByteBufferMessageSet(val buffer: ByteBuffer,
- val initialOffset: Long = 0L,
- val errorCode: Int = ErrorMapping.NoError,
- val deepIterate: Boolean = true) extends MessageSet {
+class ByteBufferMessageSet(private val buffer: ByteBuffer,
+ private val initialOffset: Long = 0L,
+ private val errorCode: Int = ErrorMapping.NoError,
+ private val deepIterate: Boolean = true) extends MessageSet {
private val logger = Logger.getLogger(getClass())
private var validByteCount = -1L
private var shallowValidByteCount = -1L
@@ -61,6 +61,8 @@ class ByteBufferMessageSet(val buffer: ByteBuffer,
}, 0L, ErrorMapping.NoError, true)
}
+ def getInitialOffset = initialOffset
+
def getDeepIterate = deepIterate
def getBuffer = buffer
@@ -105,7 +107,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer,
ErrorMapping.maybeThrowException(errorCode)
new IteratorTemplate[MessageOffset] {
var iter = buffer.slice()
- var currValidBytes = 0
+ var currValidBytes = 0L
override def makeNext(): MessageOffset = {
// read the size of the item
View
20 core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala
@@ -49,7 +49,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
messages = getMessageList(Seq.empty[Message]: _*))
producer.send(topic, sent2)
Thread.sleep(200)
- sent2.buffer.rewind
+ sent2.getBuffer.rewind
var fetched2 = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
TestUtils.checkEquals(sent2.iterator, fetched2.iterator)
@@ -61,7 +61,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
producer.send(topic, sent3)
Thread.sleep(200)
- sent3.buffer.rewind
+ sent3.getBuffer.rewind
var fetched3: ByteBufferMessageSet = null
while(fetched3 == null || fetched3.validBytes == 0)
fetched3 = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
@@ -93,7 +93,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
messages = getMessageList(Seq.empty[Message]: _*))
producer.send(topic, sent2)
Thread.sleep(200)
- sent2.buffer.rewind
+ sent2.getBuffer.rewind
var fetched2 = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
TestUtils.checkEquals(sent2.iterator, fetched2.iterator)
@@ -105,7 +105,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
producer.send(topic, sent3)
Thread.sleep(200)
- sent3.buffer.rewind
+ sent3.getBuffer.rewind
var fetched3: ByteBufferMessageSet = null
while(fetched3 == null || fetched3.validBytes == 0)
fetched3 = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
@@ -140,7 +140,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
new Message(("b_" + topic).getBytes)))
messages += topic -> set
producer.send(topic, set)
- set.buffer.rewind
+ set.getBuffer.rewind
fetches += new FetchRequest(topic, 0, 0, 10000)
}
@@ -213,7 +213,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
new Message(("b_" + topic).getBytes)))
messages += topic -> set
producer.send(topic, set)
- set.buffer.rewind
+ set.getBuffer.rewind
fetches += new FetchRequest(topic, 0, 0, 10000)
}
@@ -286,7 +286,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
new Message(("b_" + topic).getBytes)))
messages += topic -> set
producer.send(topic, set)
- set.buffer.rewind
+ set.getBuffer.rewind
fetches.add(new FetchRequest(topic, 0, 0, 10000))
}
@@ -317,7 +317,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
new Message(("b_" + topic).getBytes)))
messages += topic -> set
producer.send(topic, set)
- set.buffer.rewind
+ set.getBuffer.rewind
fetches.add(new FetchRequest(topic, 0, 0, 10000))
}
@@ -353,7 +353,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
producer.multiSend(produceList.toArray)
for (messageSet <- messages.values)
- messageSet.buffer.rewind
+ messageSet.getBuffer.rewind
// wait a bit for produced message to be available
Thread.sleep(200)
@@ -386,7 +386,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
producer.multiSend(produceList.toArray)
for (messageSet <- messages.values)
- messageSet.buffer.rewind
+ messageSet.getBuffer.rewind
// wait a bit for produced message to be available
Thread.sleep(200)
View
4 core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala
@@ -34,7 +34,7 @@ class ByteBufferMessageSetTest extends kafka.javaapi.message.BaseMessageSetTestC
messages = getMessageList(new Message("hello".getBytes()),
new Message("there".getBytes())))
val buffer = ByteBuffer.allocate(messageList.sizeInBytes.toInt + 2)
- buffer.put(messageList.buffer)
+ buffer.put(messageList.getBuffer)
buffer.putShort(4)
val messageListPlus = new ByteBufferMessageSet(buffer)
assertEquals("Adding invalid bytes shouldn't change byte count", messageList.validBytes, messageListPlus.validBytes)
@@ -46,7 +46,7 @@ class ByteBufferMessageSetTest extends kafka.javaapi.message.BaseMessageSetTestC
messages = getMessageList(new Message("hello".getBytes()),
new Message("there".getBytes())))
val buffer = ByteBuffer.allocate(messageList.sizeInBytes.toInt + 2)
- buffer.put(messageList.buffer)
+ buffer.put(messageList.getBuffer)
buffer.putShort(4)
val messageListPlus = new ByteBufferMessageSet(buffer, 0, 0, true)
assertEquals("Adding invalid bytes shouldn't change byte count", messageList.validBytes, messageListPlus.validBytes)
Please sign in to comment.
Something went wrong with that request. Please try again.