Skip to content
Browse files

Merged the compression feature and updated version to 0.7

  • Loading branch information...
2 parents 6e489aa + 2934228 commit 709afe4ec75489bc00a44335de8821fa726bb97e @nehanarkhede nehanarkhede committed Jul 29, 2011
Showing with 2,761 additions and 1,059 deletions.
  1. +3 −0 .gitignore
  2. +2 −1 CONTRIBUTORS
  3. +5 −0 bin/kafka-replay-log-producer.sh
  4. +4 −1 config/consumer.properties
  5. +9 −7 config/log4j.properties
  6. +14 −13 contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
  7. +1 −1 contrib/hadoop-consumer/src/main/java/kafka/etl/UndefinedPropertyException.java
  8. +3 −1 contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
  9. +2 −1 contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
  10. +3 −3 core/src/main/scala/kafka/api/MultiFetchResponse.scala
  11. +5 −5 core/src/main/scala/kafka/api/OffsetRequest.scala
  12. +3 −3 core/src/main/scala/kafka/api/ProducerRequest.scala
  13. +12 −12 core/src/main/scala/kafka/common/ErrorMapping.scala
  14. +1 −1 core/src/main/scala/kafka/common/InvalidConfigException.scala
  15. +1 −1 core/src/main/scala/kafka/common/InvalidPartitionException.scala
  16. +1 −1 core/src/main/scala/kafka/common/UnavailableProducerException.scala
  17. +25 −0 core/src/main/scala/kafka/common/UnknownCodecException.scala
  18. +25 −0 core/src/main/scala/kafka/common/UnknownMagicByteException.scala
  19. +3 −3 core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
  20. +21 −21 core/src/main/scala/kafka/consumer/ConsumerConfig.scala
  21. +1 −1 core/src/main/scala/kafka/consumer/ConsumerConnector.scala
  22. +25 −18 core/src/main/scala/kafka/consumer/ConsumerIterator.scala
  23. +2 −3 core/src/main/scala/kafka/consumer/FetchedDataChunk.scala
  24. +9 −7 core/src/main/scala/kafka/consumer/FetcherRunnable.scala
  25. +10 −15 core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
  26. +2 −2 core/src/main/scala/kafka/consumer/SimpleConsumer.scala
  27. +1 −1 core/src/main/scala/kafka/consumer/TopicCount.scala
  28. +4 −4 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
  29. +9 −6 core/src/main/scala/kafka/javaapi/Implicits.scala
  30. +3 −3 core/src/main/scala/kafka/javaapi/MultiFetchResponse.scala
  31. +1 −1 core/src/main/scala/kafka/javaapi/ProducerRequest.scala
  32. +1 −1 core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
  33. +1 −1 core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
  34. +45 −20 core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
  35. +6 −6 core/src/main/scala/kafka/javaapi/message/MessageSet.scala
  36. +1 −1 core/src/main/scala/kafka/javaapi/producer/Producer.scala
  37. +1 −1 core/src/main/scala/kafka/javaapi/producer/ProducerData.scala
  38. +1 −1 core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala
  39. +11 −8 core/src/main/scala/kafka/log/Log.scala
  40. +26 −0 core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala
  41. +121 −41 core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
  42. +35 −0 core/src/main/scala/kafka/message/CompressionCodec.scala
  43. +143 −0 core/src/main/scala/kafka/message/CompressionUtils.scala
  44. +12 −11 core/src/main/scala/kafka/message/FileMessageSet.scala
  45. +109 −20 core/src/main/scala/kafka/message/Message.scala
  46. +22 −0 core/src/main/scala/kafka/message/MessageAndOffset.scala
  47. +6 −6 core/src/main/scala/kafka/message/MessageSet.scala
  48. +1 −1 core/src/main/scala/kafka/network/InvalidRequestException.scala
  49. +1 −1 core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala
  50. +1 −1 core/src/main/scala/kafka/producer/ConfigBrokerPartitionInfo.scala
  51. +1 −1 core/src/main/scala/kafka/producer/DefaultPartitioner.scala
  52. +3 −3 core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala
  53. +1 −1 core/src/main/scala/kafka/producer/Partitioner.scala
  54. +19 −6 core/src/main/scala/kafka/producer/ProducerConfig.scala
  55. +1 −1 core/src/main/scala/kafka/producer/ProducerData.scala
  56. +16 −5 core/src/main/scala/kafka/producer/ProducerPool.scala
  57. +4 −4 core/src/main/scala/kafka/producer/SyncProducer.scala
  58. +2 −1 core/src/main/scala/kafka/producer/SyncProducerConfig.scala
  59. +17 −17 core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala
  60. +9 −9 core/src/main/scala/kafka/producer/async/AsyncProducer.scala
  61. +1 −1 core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala
  62. +1 −1 core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala
  63. +1 −1 core/src/main/scala/kafka/producer/async/AsyncProducerStatsMBean.scala
  64. +42 −7 core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
  65. +1 −1 core/src/main/scala/kafka/producer/async/EventHandler.scala
  66. +1 −1 core/src/main/scala/kafka/producer/async/MissingConfigException.scala
  67. +1 −1 core/src/main/scala/kafka/producer/async/QueueClosedException.scala
  68. +1 −1 core/src/main/scala/kafka/producer/async/QueueFullException.scala
  69. +1 −1 core/src/main/scala/kafka/serializer/Encoder.scala
  70. +1 −1 core/src/main/scala/kafka/server/KafkaConfig.scala
  71. +1 −0 core/src/main/scala/kafka/server/KafkaRequestHandlers.scala
  72. +2 −2 core/src/main/scala/kafka/server/KafkaServer.scala
  73. +5 −3 core/src/main/scala/kafka/server/KafkaServerStartable.scala
  74. +2 −2 core/src/main/scala/kafka/server/KafkaZooKeeper.scala
  75. +1 −1 core/src/main/scala/kafka/server/MessageSetSend.scala
  76. +1 −1 core/src/main/scala/kafka/server/MultiMessageSetSend.scala
  77. +1 −1 core/src/main/scala/kafka/tools/ConsumerPerformance.scala
  78. +15 −5 core/src/main/scala/kafka/tools/ConsumerShell.scala
  79. +18 −4 core/src/main/scala/kafka/tools/ProducerPerformance.scala
  80. +3 −3 core/src/main/scala/kafka/tools/ProducerShell.scala
  81. +197 −0 core/src/main/scala/kafka/tools/ReplayLogProducer.scala
  82. +2 −2 core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
  83. +1 −1 core/src/main/scala/kafka/utils/Annotations.scala
  84. +5 −5 core/src/main/scala/kafka/utils/DumpLogSegments.scala
  85. +1 −1 core/src/main/scala/kafka/utils/Throttler.scala
  86. +5 −5 core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala
  87. +19 −2 core/src/main/scala/kafka/utils/Utils.scala
  88. +12 −12 core/src/main/scala/kafka/utils/ZkUtils.scala
  89. BIN core/src/test/resources/test-kafka-logs/MagicByte0-0/00000000000000000000.kafka
  90. +1 −1 core/src/test/scala/other/kafka/DeleteZKPath.scala
  91. +4 −3 core/src/test/scala/other/kafka/TestLogPerformance.scala
  92. +1 −1 core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
  93. +132 −18 core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
  94. +76 −0 core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala
  95. +1 −1 core/src/test/scala/unit/kafka/integration/FetcherTest.scala
  96. +17 −10 core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
  97. +17 −4 core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala
  98. +119 −8 core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
  99. +126 −18 core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
  100. +200 −14 core/src/test/scala/unit/kafka/javaapi/integration/PrimitiveApiTest.scala
  101. +25 −5 core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala
  102. +44 −15 core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala
  103. +26 −23 core/src/test/scala/unit/kafka/javaapi/producer/ProducerTest.scala
  104. +16 −9 core/src/test/scala/unit/kafka/javaapi/producer/SyncProducerTest.scala
  105. +19 −22 core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
  106. +8 −9 core/src/test/scala/unit/kafka/log/LogTest.scala
  107. +1 −1 core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala
  108. +1 −1 core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
  109. +10 −6 core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
  110. +41 −0 core/src/test/scala/unit/kafka/message/CompressionUtilsTest.scala
  111. +11 −11 core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala
  112. +3 −3 core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
  113. +24 −17 core/src/test/scala/unit/kafka/producer/ProducerTest.scala
  114. +10 −10 core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
  115. +7 −7 core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
  116. +46 −8 core/src/test/scala/unit/kafka/utils/TestUtils.scala
  117. +1 −1 core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
  118. +1 −1 core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala
  119. +3 −2 examples/src/main/java/kafka/examples/SimpleConsumerDemo.java
  120. BIN perf/lib/jopt-simple-3.1.jar
  121. +357 −349 perf/src/main/java/kafka/perf/KafkaPerfSimulator.java
  122. +16 −0 perf/src/main/java/kafka/perf/KafkaSimulatorMXBean.java
  123. +130 −99 perf/src/main/java/kafka/perf/PerfTimer.java
  124. +20 −3 perf/src/main/java/kafka/perf/consumer/SimplePerfConsumer.java
  125. +27 −6 perf/src/main/java/kafka/perf/producer/Producer.java
  126. +1 −1 project/build.properties
  127. +48 −0 system_test/producer_perf/bin/run-compression-test.sh
  128. +2 −2 system_test/producer_perf/bin/run-test.sh
View
3 .gitignore
@@ -6,3 +6,6 @@ src_managed/
project/boot/
project/plugins/project/
project/sbt_project_definition.iml
+.settings
+.idea
+.project
View
3 CONTRIBUTORS
@@ -4,6 +4,7 @@ Jun Rao
Neha Narkhede
Fatih Emekci
Lin Guo
+Shirshanka Das
Roshan Sumbaly
Sam Shah
-Chris Burroughs
+Chris Burroughs
View
5 bin/kafka-replay-log-producer.sh
@@ -0,0 +1,5 @@
+#!/bin/bash
+
+base_dir=$(dirname $0)
+export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
+$base_dir/kafka-run-class.sh kafka.tools.ReplayLogProducer $@
View
5 config/consumer.properties
@@ -9,4 +9,7 @@ zk.connect=127.0.0.1:2181
zk.connectiontimeout.ms=1000000
#consumer group id
-groupid=group1
+groupid=test-consumer-group
+
+#consumer timeout
+#consumer.timeout.ms=5000
View
16 config/log4j.properties
@@ -1,19 +1,21 @@
-log4j.rootLogger=OFF, fileAppender, stdout
+log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
-log4j.appender.fileAppender=org.apache.log4j.FileAppender
-log4j.appender.fileAppender.File=kafka-request.log
-log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
-log4j.appender.fileAppender.layout.ConversionPattern= %-4r [%t] %-5p %c %x - %m%n
+#log4j.appender.fileAppender=org.apache.log4j.FileAppender
+#log4j.appender.fileAppender.File=kafka-request.log
+#log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
+#log4j.appender.fileAppender.layout.ConversionPattern= %-4r [%t] %-5p %c %x - %m%n
# Turn on all our debugging info
log4j.logger.kafka=INFO,stdout
-log4j.logger.kafka.request.logger=TRACE,fileAppender
-log4j.additivity.kafka.request.logger=false
+#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG,stdout
+#log4j.logger.kafka.consumer.PartitionTopicInfo=TRACE,stdout
+#log4j.logger.kafka.request.logger=TRACE,fileAppender
+#log4j.additivity.kafka.request.logger=false
#log4j.logger.kafka.network.Processor=TRACE,fileAppender
#log4j.additivity.kafka.network.Processor=false
#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
View
27 contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
@@ -13,6 +13,7 @@
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
+import kafka.message.MessageAndOffset;
import kafka.message.MessageSet;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobConf;
@@ -43,7 +44,7 @@
protected long _count; /*current count*/
protected MultiFetchResponse _response = null; /*fetch response*/
- protected Iterator<Message> _messageIt = null; /*message iterator*/
+ protected Iterator<MessageAndOffset> _messageIt = null; /*message iterator*/
protected int _retry = 0;
protected long _requestTime = 0; /*accumulative request time*/
@@ -122,7 +123,7 @@ public boolean getNext(KafkaETLKey key, BytesWritable value) throws IOException
while ( !gotNext && iter.hasNext()) {
ByteBufferMessageSet msgSet = iter.next();
if ( hasError(msgSet)) return false;
- _messageIt = (Iterator<Message>) msgSet.iterator();
+ _messageIt = (Iterator<MessageAndOffset>) msgSet.iterator();
gotNext = get(key, value);
}
}
@@ -171,17 +172,17 @@ public void close() throws IOException {
protected boolean get(KafkaETLKey key, BytesWritable value) throws IOException {
if (_messageIt != null && _messageIt.hasNext()) {
- Message msg = _messageIt.next();
+ MessageAndOffset msgAndOffset = _messageIt.next();
- ByteBuffer buf = msg.payload();
+ ByteBuffer buf = msgAndOffset.message().payload();
int origSize = buf.remaining();
byte[] bytes = new byte[origSize];
buf.get(bytes, buf.position(), origSize);
value.set(bytes, 0, origSize);
- key.set(_index, _offset, msg.checksum());
+ key.set(_index, _offset, msgAndOffset.message().checksum());
- _offset += MessageSet.entrySize(msg); //increase offset
+ _offset += msgAndOffset.offset(); //increase offset
_count ++; //increase count
return true;
@@ -198,14 +199,14 @@ protected boolean get(KafkaETLKey key, BytesWritable value) throws IOException {
long[] range = new long[2];
long[] startOffsets = _consumer.getOffsetsBefore(_request.getTopic(), _request.getPartition(),
- OffsetRequest.EARLIEST_TIME(), 1);
+ OffsetRequest.EarliestTime(), 1);
if (startOffsets.length != 1)
throw new IOException("input:" + _input + " Expect one smallest offset but get "
+ startOffsets.length);
range[0] = startOffsets[0];
long[] endOffsets = _consumer.getOffsetsBefore(_request.getTopic(), _request.getPartition(),
- OffsetRequest.LATEST_TIME(), 1);
+ OffsetRequest.LatestTime(), 1);
if (endOffsets.length != 1)
throw new IOException("input:" + _input + " Expect one latest offset but get "
+ endOffsets.length);
@@ -234,8 +235,8 @@ protected boolean get(KafkaETLKey key, BytesWritable value) throws IOException {
*/
protected boolean hasError(ByteBufferMessageSet messages)
throws IOException {
- int errorCode = messages.errorCode();
- if (errorCode == ErrorMapping.OFFSET_OUT_OF_RANGE_CODE()) {
+ int errorCode = messages.getErrorCode();
+ if (errorCode == ErrorMapping.OffsetOutOfRangeCode()) {
/* offset cannot cross the maximum offset (guaranteed by Kafka protocol).
Kafka server may delete old files from time to time */
System.err.println("WARNING: current offset=" + _offset + ". It is out of range.");
@@ -246,12 +247,12 @@ protected boolean hasError(ByteBufferMessageSet messages)
_offsetRange = getOffsetRange();
_offset = _offsetRange[0];
return false;
- } else if (errorCode == ErrorMapping.INVALID_MESSAGE_CODE()) {
+ } else if (errorCode == ErrorMapping.InvalidMessageCode()) {
throw new IOException(_input + " current offset=" + _offset
+ " : invalid offset.");
- } else if (errorCode == ErrorMapping.WRONG_PARTITION_CODE()) {
+ } else if (errorCode == ErrorMapping.WrongPartitionCode()) {
throw new IOException(_input + " : wrong partition");
- } else if (errorCode != ErrorMapping.NO_ERROR()) {
+ } else if (errorCode != ErrorMapping.NoError()) {
throw new IOException(_input + " current offset=" + _offset
+ " error:" + errorCode);
} else
View
2 contrib/hadoop-consumer/src/main/java/kafka/etl/UndefinedPropertyException.java
@@ -24,4 +24,4 @@ public UndefinedPropertyException(String message) {
super(message);
}
-}
+}
View
4 contrib/hadoop-consumer/src/main/java/kafka/etl/impl/DataGenerator.java
@@ -26,6 +26,8 @@
import java.util.Random;
import java.util.Map.Entry;
import java.util.Properties;
+
+import kafka.message.NoCompressionCodec;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
@@ -96,7 +98,7 @@ public void run() throws Exception {
}
// send events
System.out.println(" send " + list.size() + " " + _topic + " count events to " + _uri);
- _producer.send(_topic, new ByteBufferMessageSet(list));
+ _producer.send(_topic, new ByteBufferMessageSet(kafka.message.NoCompressionCodec$.MODULE$, list));
// close the producer
_producer.close();
View
3 contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/KafkaRecordWriter.java
@@ -19,6 +19,7 @@
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.javaapi.producer.SyncProducer;
+import kafka.message.NoCompressionCodec;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
@@ -47,7 +48,7 @@ public KafkaRecordWriter(SyncProducer producer, String topic, int queueSize)
protected void sendMsgList()
{
if (msgList.size() > 0) {
- ByteBufferMessageSet msgSet = new ByteBufferMessageSet(msgList);
+ ByteBufferMessageSet msgSet = new ByteBufferMessageSet(kafka.message.NoCompressionCodec$.MODULE$, msgList);
producer.send(topic, msgSet);
msgList.clear();
totalSize = 0;
View
6 core/src/main/scala/kafka/api/MultiFetchResponse.scala
@@ -17,11 +17,11 @@
package kafka.api
import java.nio._
-import scala.collection.mutable
+import collection.mutable
import kafka.utils.IteratorTemplate
import kafka.message._
-class MultiFetchResponse(val buffer: ByteBuffer, val numSets: Int) extends Iterable[ByteBufferMessageSet] {
+class MultiFetchResponse(val buffer: ByteBuffer, val numSets: Int, val offsets: Array[Long]) extends Iterable[ByteBufferMessageSet] {
private val messageSets = new mutable.ListBuffer[ByteBufferMessageSet]
for(i <- 0 until numSets) {
@@ -31,7 +31,7 @@ class MultiFetchResponse(val buffer: ByteBuffer, val numSets: Int) extends Itera
val payloadSize = size - 2
copy.limit(payloadSize)
buffer.position(buffer.position + payloadSize)
- messageSets += new ByteBufferMessageSet(copy, errorCode)
+ messageSets += new ByteBufferMessageSet(copy, offsets(i), errorCode)
}
def iterator : Iterator[ByteBufferMessageSet] = {
View
10 core/src/main/scala/kafka/api/OffsetRequest.scala
@@ -23,10 +23,10 @@ import java.nio.channels.WritableByteChannel
import kafka.common.ErrorMapping
object OffsetRequest {
- val SMALLEST_TIME_STRING = "smallest"
- val LARGEST_TIME_STRING = "largest"
- val LATEST_TIME = -1L
- val EARLIEST_TIME = -2L
+ val SmallestTimeString = "smallest"
+ val LargestTimeString = "largest"
+ val LatestTime = -1L
+ val EarliestTime = -2L
def readFrom(buffer: ByteBuffer): OffsetRequest = {
val topic = Utils.readShortString(buffer, "UTF-8")
@@ -78,7 +78,7 @@ private[kafka] class OffsetArraySend(offsets: Array[Long]) extends Send {
private var size: Long = offsets.foldLeft(4)((sum, _) => sum + 8)
private val header = ByteBuffer.allocate(6)
header.putInt(size.asInstanceOf[Int] + 2)
- header.putShort(ErrorMapping.NO_ERROR.asInstanceOf[Short])
+ header.putShort(ErrorMapping.NoError.asInstanceOf[Short])
header.rewind()
private val contentBuffer = OffsetRequest.serializeOffsetArray(offsets)
View
6 core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -42,9 +42,9 @@ class ProducerRequest(val topic: String,
def writeTo(buffer: ByteBuffer) {
Utils.writeShortString(buffer, topic, "UTF-8")
buffer.putInt(partition)
- buffer.putInt(messages.buffer.limit)
- buffer.put(messages.buffer)
- messages.buffer.rewind
+ buffer.putInt(messages.serialized.limit)
+ buffer.put(messages.serialized)
+ messages.serialized.rewind
}
def sizeInBytes(): Int = 2 + topic.length + 4 + 4 + messages.sizeInBytes.asInstanceOf[Int]
View
24 core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -25,22 +25,22 @@ import java.lang.Throwable
* A bi-directional mapping between error codes and exceptions x
*/
object ErrorMapping {
- val EMPTY_BYTEBUFFER = ByteBuffer.allocate(0)
+ val EmptyByteBuffer = ByteBuffer.allocate(0)
- val UNKNOWN_CODE = -1
- val NO_ERROR = 0
- val OFFSET_OUT_OF_RANGE_CODE = 1
- val INVALID_MESSAGE_CODE = 2
- val WRONG_PARTITION_CODE = 3
- val INVALID_RETCH_SIZE_CODE = 4
+ val UnknownCode = -1
+ val NoError = 0
+ val OffsetOutOfRangeCode = 1
+ val InvalidMessageCode = 2
+ val WrongPartitionCode = 3
+ val InvalidFetchSizeCode = 4
private val exceptionToCode =
Map[Class[Throwable], Int](
- classOf[OffsetOutOfRangeException].asInstanceOf[Class[Throwable]] -> OFFSET_OUT_OF_RANGE_CODE,
- classOf[InvalidMessageException].asInstanceOf[Class[Throwable]] -> INVALID_MESSAGE_CODE,
- classOf[InvalidPartitionException].asInstanceOf[Class[Throwable]] -> WRONG_PARTITION_CODE,
- classOf[InvalidMessageSizeException].asInstanceOf[Class[Throwable]] -> INVALID_RETCH_SIZE_CODE
- ).withDefaultValue(UNKNOWN_CODE)
+ classOf[OffsetOutOfRangeException].asInstanceOf[Class[Throwable]] -> OffsetOutOfRangeCode,
+ classOf[InvalidMessageException].asInstanceOf[Class[Throwable]] -> InvalidMessageCode,
+ classOf[InvalidPartitionException].asInstanceOf[Class[Throwable]] -> WrongPartitionCode,
+ classOf[InvalidMessageSizeException].asInstanceOf[Class[Throwable]] -> InvalidFetchSizeCode
+ ).withDefaultValue(UnknownCode)
/* invert the mapping */
private val codeToException =
View
2 core/src/main/scala/kafka/common/InvalidConfigException.scala
@@ -21,4 +21,4 @@ package kafka.common
*/
class InvalidConfigException(message: String) extends RuntimeException(message) {
def this() = this(null)
-}
+}
View
2 core/src/main/scala/kafka/common/InvalidPartitionException.scala
@@ -20,4 +20,4 @@ package kafka.common
*/
class InvalidPartitionException(message: String) extends RuntimeException(message) {
def this() = this(null)
-}
+}
View
2 core/src/main/scala/kafka/common/UnavailableProducerException.scala
@@ -20,4 +20,4 @@ package kafka.common
*/
class UnavailableProducerException(message: String) extends RuntimeException(message) {
def this() = this(null)
-}
+}
View
25 core/src/main/scala/kafka/common/UnknownCodecException.scala
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Indicates the client has requested a range no longer available on the server
+ */
+class UnknownCodecException(message: String) extends RuntimeException(message) {
+ def this() = this(null)
+}
+
View
25 core/src/main/scala/kafka/common/UnknownMagicByteException.scala
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Indicates the client has requested a range no longer available on the server
+ */
+class UnknownMagicByteException(message: String) extends RuntimeException(message) {
+ def this() = this(null)
+}
+
View
6 core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
@@ -119,7 +119,7 @@ object ConsoleConsumer {
})
var stream: KafkaMessageStream = connector.createMessageStreams(Map(topic -> 1)).get(topic).get.get(0)
- val iter =
+ val iter =
if(maxMessages >= 0)
stream.slice(0, maxMessages)
else
@@ -144,7 +144,7 @@ object ConsoleConsumer {
case e => logger.error("error processing message, stop consuming: " + e)
}
- System.out.flush()
+ System.out.flush()
formatter.close()
connector.shutdown()
}
@@ -163,7 +163,7 @@ object ConsoleConsumer {
def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) {
for(arg <- required) {
if(!options.has(arg)) {
- System.err.println("Missing required argument \"" + arg + "\"")
+ logger.error("Missing required argument \"" + arg + "\"")
parser.printHelpOn(System.err)
System.exit(1)
}
View
42 core/src/main/scala/kafka/consumer/ConsumerConfig.scala
@@ -21,17 +21,17 @@ import kafka.utils.{ZKConfig, Utils}
import kafka.api.OffsetRequest
object ConsumerConfig {
- val SOCKET_TIMEOUT = 30 * 1000
- val SOCKET_BUFFER_SIZE = 64*1024
- val FETCH_SIZE = 300 * 1024
- val MAX_FETCH_SIZE = 10*FETCH_SIZE
- val BACKOFF_INCREMENT_MS = 1000
- val AUTO_COMMIT = true
- val AUTO_COMMIT_INTERVAL = 10 * 1000
- val MAX_QUEUED_CHUNKS = 100
- val AUTO_OFFSET_RESET = OffsetRequest.SMALLEST_TIME_STRING
- val CONSUMER_TIMEOUT_MS = -1
- val EMBEDDED_CONSUMER_TOPICS = ""
+ val SocketTimeout = 30 * 1000
+ val SocketBufferSize = 64*1024
+ val FetchSize = 300 * 1024
+ val MaxFetchSize = 10*FetchSize
+ val BackoffIncrementMs = 1000
+ val AutoCommit = true
+ val AutoCommitInterval = 10 * 1000
+ val MaxQueuedChunks = 100
+ val AutoOffsetReset = OffsetRequest.SmallestTimeString
+ val ConsumerTimeoutMs = -1
+ val EmbeddedConsumerTopics = ""
}
class ConsumerConfig(props: Properties) extends ZKConfig(props) {
@@ -46,40 +46,40 @@ class ConsumerConfig(props: Properties) extends ZKConfig(props) {
if (Utils.getString(props, "consumerid", null) != null) Some(Utils.getString(props, "consumerid")) else None
/** the socket timeout for network requests */
- val socketTimeoutMs = Utils.getInt(props, "socket.timeout.ms", SOCKET_TIMEOUT)
+ val socketTimeoutMs = Utils.getInt(props, "socket.timeout.ms", SocketTimeout)
/** the socket receive buffer for network requests */
- val socketBufferSize = Utils.getInt(props, "socket.buffersize", SOCKET_BUFFER_SIZE)
+ val socketBufferSize = Utils.getInt(props, "socket.buffersize", SocketBufferSize)
/** the number of byes of messages to attempt to fetch */
- val fetchSize = Utils.getInt(props, "fetch.size", FETCH_SIZE)
+ val fetchSize = Utils.getInt(props, "fetch.size", FetchSize)
/** the maximum allowable fetch size for a very large message */
val maxFetchSize: Int = fetchSize * 10
/** to avoid repeatedly polling a broker node which has no new data
we will backoff every time we get an empty set from the broker*/
- val backoffIncrementMs: Long = Utils.getInt(props, "backoff.increment.ms", BACKOFF_INCREMENT_MS)
+ val backoffIncrementMs: Long = Utils.getInt(props, "backoff.increment.ms", BackoffIncrementMs)
/** if true, periodically commit to zookeeper the offset of messages already fetched by the consumer */
- val autoCommit = Utils.getBoolean(props, "autocommit.enable", AUTO_COMMIT)
+ val autoCommit = Utils.getBoolean(props, "autocommit.enable", AutoCommit)
/** the frequency in ms that the consumer offsets are committed to zookeeper */
- val autoCommitIntervalMs = Utils.getInt(props, "autocommit.interval.ms", AUTO_COMMIT_INTERVAL)
+ val autoCommitIntervalMs = Utils.getInt(props, "autocommit.interval.ms", AutoCommitInterval)
/** max number of messages buffered for consumption */
- val maxQueuedChunks = Utils.getInt(props, "queuedchunks.max", MAX_QUEUED_CHUNKS)
+ val maxQueuedChunks = Utils.getInt(props, "queuedchunks.max", MaxQueuedChunks)
/* what to do if an offset is out of range.
smallest : automatically reset the offset to the smallest offset
largest : automatically reset the offset to the largest offset
anything else: throw exception to the consumer */
- val autoOffsetReset = Utils.getString(props, "autooffset.reset", AUTO_OFFSET_RESET)
+ val autoOffsetReset = Utils.getString(props, "autooffset.reset", AutoOffsetReset)
/** throw a timeout exception to the consumer if no message is available for consumption after the specified interval */
- val consumerTimeoutMs = Utils.getInt(props, "consumer.timeout.ms", CONSUMER_TIMEOUT_MS)
+ val consumerTimeoutMs = Utils.getInt(props, "consumer.timeout.ms", ConsumerTimeoutMs)
/* embed a consumer in the broker. e.g., topic1:1,topic2:1 */
val embeddedConsumerTopicMap = Utils.getConsumerTopicMap(Utils.getString(props, "embeddedconsumer.topics",
- EMBEDDED_CONSUMER_TOPICS))
+ EmbeddedConsumerTopics))
}
View
2 core/src/main/scala/kafka/consumer/ConsumerConnector.scala
@@ -71,4 +71,4 @@ object Consumer {
Utils.swallow(logger.warn, Utils.registerMBean(consumerConnect.underlying, consumerStatsMBeanName))
consumerConnect
}
-}
+}
View
43 core/src/main/scala/kafka/consumer/ConsumerIterator.scala
@@ -19,8 +19,8 @@ package kafka.consumer
import kafka.utils.IteratorTemplate
import org.apache.log4j.Logger
import java.util.concurrent.{TimeUnit, BlockingQueue}
-import kafka.message.{MessageSet, Message}
import kafka.cluster.Partition
+import kafka.message.{MessageAndOffset, MessageSet, Message}
/**
* An iterator that blocks until a value can be read from the supplied queue.
@@ -31,43 +31,50 @@ class ConsumerIterator(private val channel: BlockingQueue[FetchedDataChunk], con
extends IteratorTemplate[Message] {
private val logger = Logger.getLogger(classOf[ConsumerIterator])
- private var current: Iterator[Message] = null
+ private var current: Iterator[MessageAndOffset] = null
+ private var currentDataChunk: FetchedDataChunk = null
private var currentTopicInfo: PartitionTopicInfo = null
+ private var consumedOffset: Long = -1L
override def next(): Message = {
val message = super.next
- currentTopicInfo.consumed(MessageSet.entrySize(message))
+ if(consumedOffset < 0)
+ throw new IllegalStateException("Offset returned by the message set is invalid %d".format(consumedOffset))
+ currentTopicInfo.resetConsumeOffset(consumedOffset)
+ if(logger.isTraceEnabled)
+ logger.trace("Setting consumed offset to %d".format(consumedOffset))
message
}
protected def makeNext(): Message = {
// if we don't have an iterator, get one
if(current == null || !current.hasNext) {
- var found: FetchedDataChunk = null
if (consumerTimeoutMs < 0)
- found = channel.take
+ currentDataChunk = channel.take
else {
- found = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS)
- if (found == null) {
- logger.debug("Consumer iterator timing out..")
+ currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS)
+ if (currentDataChunk == null) {
throw new ConsumerTimeoutException
}
}
- if(found eq ZookeeperConsumerConnector.shutdownCommand) {
- logger.debug("Received the shutdown command")
- channel.offer(found)
+ if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) {
+ if(logger.isDebugEnabled)
+ logger.debug("Received the shutdown command")
+ channel.offer(currentDataChunk)
return allDone
} else {
- currentTopicInfo = found.topicInfo
- if (currentTopicInfo.getConsumeOffset != found.fetchOffset) {
- logger.error("consumed offset: " + currentTopicInfo.getConsumeOffset + " doesn't match fetch offset: " +
- found.fetchOffset + " for " + currentTopicInfo + "; consumer may lose data")
- currentTopicInfo.resetConsumeOffset(found.fetchOffset)
+ currentTopicInfo = currentDataChunk.topicInfo
+ if (currentTopicInfo.getConsumeOffset != currentDataChunk.fetchOffset) {
+ logger.error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data"
+ .format(currentTopicInfo.getConsumeOffset, currentDataChunk.fetchOffset, currentTopicInfo))
+ currentTopicInfo.resetConsumeOffset(currentDataChunk.fetchOffset)
}
- current = found.messages.iterator
+ current = currentDataChunk.messages.iterator
}
}
- current.next
+ val item = current.next
+ consumedOffset = item.offset
+ item.message
}
}
View
5 core/src/main/scala/kafka/consumer/FetchedDataChunk.scala
@@ -16,9 +16,8 @@
package kafka.consumer
-import java.util.concurrent.atomic._
-import kafka.message._
+import kafka.message.ByteBufferMessageSet
-private[consumer] class FetchedDataChunk(val messages: MessageSet,
+private[consumer] class FetchedDataChunk(val messages: ByteBufferMessageSet,
val topicInfo: PartitionTopicInfo,
val fetchOffset: Long)
View
16 core/src/main/scala/kafka/consumer/FetcherRunnable.scala
@@ -55,18 +55,19 @@ class FetcherRunnable(val name: String,
try {
while (!stopped) {
val fetches = partitionTopicInfos.map(info =>
- new FetchRequest(info.topic, info.partition.partId, info.getFetchOffset, config.fetchSize))
+ new FetchRequest(info.topic, info.partition.partId, info.getFetchOffset, config.fetchSize))
if (logger.isTraceEnabled)
logger.trace("fetch request: " + fetches.toString)
val response = simpleConsumer.multifetch(fetches : _*)
- var read = 0
+ var read = 0L
+
for((messages, info) <- response.zip(partitionTopicInfos)) {
try {
var done = false
- if(messages.errorCOde == ErrorMapping.OFFSET_OUT_OF_RANGE_CODE) {
+ if(messages.getErrorCode == ErrorMapping.OffsetOutOfRangeCode) {
logger.info("offset " + info.getFetchOffset + " out of range")
// see if we can fix this error
val resetOffset = resetConsumerOffsets(info.topic, info.partition)
@@ -93,6 +94,7 @@ class FetcherRunnable(val name: String,
throw e2
}
}
+
if (logger.isTraceEnabled)
logger.trace("fetched bytes: " + read)
if(read == 0) {
@@ -123,17 +125,17 @@ class FetcherRunnable(val name: String,
partition: Partition) : Long = {
var offset : Long = 0
config.autoOffsetReset match {
- case OffsetRequest.SMALLEST_TIME_STRING => offset = OffsetRequest.EARLIEST_TIME
- case OffsetRequest.LARGEST_TIME_STRING => offset = OffsetRequest.LATEST_TIME
+ case OffsetRequest.SmallestTimeString => offset = OffsetRequest.EarliestTime
+ case OffsetRequest.LargestTimeString => offset = OffsetRequest.LatestTime
case _ => return -1
}
// get mentioned offset from the broker
val offsets = simpleConsumer.getOffsetsBefore(topic, partition.partId, offset, 1)
val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
-
+
// reset manually in zookeeper
- logger.info("updating partition " + partition.name + " with " + (if(offset == OffsetRequest.EARLIEST_TIME) "earliest " else " latest ") + "offset " + offsets(0))
+ logger.info("updating partition " + partition.name + " with " + (if(offset == OffsetRequest.EarliestTime) "earliest " else " latest ") + "offset " + offsets(0))
ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition.name, offsets(0).toString)
offsets(0)
View
25 core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
@@ -50,28 +50,22 @@ private[consumer] class PartitionTopicInfo(val topic: String,
def resetFetchOffset(newFetchOffset: Long) = {
fetchedOffset.set(newFetchOffset)
if (logger.isDebugEnabled)
- logger.debug("reset fetch offset of " + this + " to " + newFetchOffset)
- }
-
- /**
- * Record the given number of bytes as having been consumed
- */
- def consumed(messageSize: Int): Unit = {
- val newOffset = consumedOffset.addAndGet(messageSize)
- if (logger.isDebugEnabled)
- logger.debug("updated consume offset of " + this + " to " + newOffset)
+ logger.debug("reset fetch offset of ( %s ) to %d".format(this, newFetchOffset))
}
/**
* Enqueue a message set for processing
* @return the number of valid bytes
*/
- def enqueue(messages: ByteBufferMessageSet, fetchOffset: Long): Int = {
- val size = messages.validBytes
+ def enqueue(messages: ByteBufferMessageSet, fetchOffset: Long): Long = {
+ val size = messages.shallowValidBytes
if(size > 0) {
+ // update fetched offset to the compressed data chunk size, not the decompressed message set size
+ if(logger.isTraceEnabled)
+ logger.trace("Updating fetch offset = " + fetchedOffset.get + " with size = " + size)
val newOffset = fetchedOffset.addAndGet(size)
if (logger.isDebugEnabled)
- logger.debug("updated fetch offset of " + this + " to " + newOffset)
+ logger.debug("updated fetch offset of ( %s ) to %d".format(this, newOffset))
chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset))
}
size
@@ -81,9 +75,10 @@ private[consumer] class PartitionTopicInfo(val topic: String,
* add an empty message with the exception to the queue so that client can see the error
*/
def enqueueError(e: Throwable, fetchOffset: Long) = {
- val messages = new ByteBufferMessageSet(ErrorMapping.EMPTY_BYTEBUFFER, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+ val messages = new ByteBufferMessageSet(ErrorMapping.EmptyByteBuffer, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
chunkQueue.put(new FetchedDataChunk(messages, this, fetchOffset))
}
- override def toString(): String = topic + ":" + partition.toString
+ override def toString(): String = topic + ":" + partition.toString + ": fetched offset = " + fetchedOffset.get +
+ ": consumed offset = " + consumedOffset.get
}
View
4 core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -103,7 +103,7 @@ class SimpleConsumer(val host: String,
val endTime = SystemTime.nanoseconds
SimpleConsumerStats.recordFetchRequest(endTime - startTime)
SimpleConsumerStats.recordConsumptionThroughput(response._1.buffer.limit)
- new ByteBufferMessageSet(response._1.buffer, response._2)
+ new ByteBufferMessageSet(response._1.buffer, request.offset, response._2)
}
}
@@ -139,7 +139,7 @@ class SimpleConsumer(val host: String,
SimpleConsumerStats.recordConsumptionThroughput(response._1.buffer.limit)
// error code will be set on individual messageset inside MultiFetchResponse
- new MultiFetchResponse(response._1.buffer, fetches.length)
+ new MultiFetchResponse(response._1.buffer, fetches.length, fetches.toArray.map(f => f.offset))
}
}
View
2 core/src/main/scala/kafka/consumer/TopicCount.scala
@@ -86,4 +86,4 @@ private[consumer] class TopicCount(val consumerIdString: String, val topicCountM
builder.append(" }")
builder.toString
}
-}
+}
View
8 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -176,7 +176,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
logger.debug("adding topic " + topic + " and stream to map..")
// register on broker partition path changes
- val partitionPath = ZkUtils.brokerTopicsPath + "/" + topic
+ val partitionPath = ZkUtils.BrokerTopicsPath + "/" + topic
ZkUtils.makeSurePersistentPathExists(zkClient, partitionPath)
zkClient.subscribeChildChanges(partitionPath, loadBalancerListener)
}
@@ -296,10 +296,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
try {
val cluster = ZkUtils.getCluster(zkClient)
val broker = cluster.getBroker(brokerId)
- simpleConsumer = new SimpleConsumer(broker.host, broker.port, ConsumerConfig.SOCKET_TIMEOUT,
- ConsumerConfig.SOCKET_BUFFER_SIZE)
+ simpleConsumer = new SimpleConsumer(broker.host, broker.port, ConsumerConfig.SocketTimeout,
+ ConsumerConfig.SocketBufferSize)
val latestOffset = simpleConsumer.getOffsetsBefore(topic, partitionId,
- OffsetRequest.LATEST_TIME, 1)
+ OffsetRequest.LatestTime, 1)
producedOffset = latestOffset(0)
}
catch {
View
15 core/src/main/scala/kafka/javaapi/Implicits.scala
@@ -15,11 +15,11 @@
*/
package kafka.javaapi
-import kafka.producer.ProducerPool
-import kafka.producer.async.QueueItem
import java.nio.ByteBuffer
import org.apache.log4j.Logger
import kafka.serializer.Encoder
+import kafka.producer.{ProducerConfig, ProducerPool}
+import kafka.producer.async.{AsyncProducerConfig, QueueItem}
private[javaapi] object Implicits {
private val logger = Logger.getLogger(getClass())
@@ -29,16 +29,19 @@ private[javaapi] object Implicits {
implicit def scalaMessageSetToJavaMessageSet(messageSet: kafka.message.ByteBufferMessageSet):
kafka.javaapi.message.ByteBufferMessageSet = {
- new kafka.javaapi.message.ByteBufferMessageSet(messageSet.buffer, messageSet.errorCOde)
+ new kafka.javaapi.message.ByteBufferMessageSet(messageSet.getBuffer, messageSet.getInitialOffset,
+ messageSet.getErrorCode)
}
implicit def toJavaSyncProducer(producer: kafka.producer.SyncProducer): kafka.javaapi.producer.SyncProducer = {
- logger.debug("Implicit instantiation of Java Sync Producer")
+ if(logger.isDebugEnabled)
+ logger.debug("Implicit instantiation of Java Sync Producer")
new kafka.javaapi.producer.SyncProducer(producer)
}
implicit def toSyncProducer(producer: kafka.javaapi.producer.SyncProducer): kafka.producer.SyncProducer = {
- logger.debug("Implicit instantiation of Sync Producer")
+ if(logger.isDebugEnabled)
+ logger.debug("Implicit instantiation of Sync Producer")
producer.underlying
}
@@ -121,5 +124,5 @@ private[javaapi] object Implicits {
response.underlying
implicit def toJavaMultiFetchResponse(response: kafka.api.MultiFetchResponse): kafka.javaapi.MultiFetchResponse =
- new kafka.javaapi.MultiFetchResponse(response.buffer, response.numSets)
+ new kafka.javaapi.MultiFetchResponse(response.buffer, response.numSets, response.offsets)
}
View
6 core/src/main/scala/kafka/javaapi/MultiFetchResponse.scala
@@ -20,13 +20,13 @@ import kafka.utils.IteratorTemplate
import java.nio.ByteBuffer
import message.ByteBufferMessageSet
-class MultiFetchResponse(buffer: ByteBuffer, numSets: Int) extends java.lang.Iterable[ByteBufferMessageSet] {
+class MultiFetchResponse(buffer: ByteBuffer, numSets: Int, offsets: Array[Long]) extends java.lang.Iterable[ByteBufferMessageSet] {
val underlyingBuffer = ByteBuffer.wrap(buffer.array)
// this has the side effect of setting the initial position of buffer correctly
val errorCode = underlyingBuffer.getShort
import Implicits._
- val underlying = new kafka.api.MultiFetchResponse(underlyingBuffer, numSets)
+ val underlying = new kafka.api.MultiFetchResponse(underlyingBuffer, numSets, offsets)
override def toString() = underlying.toString
@@ -41,4 +41,4 @@ class MultiFetchResponse(buffer: ByteBuffer, numSets: Int) extends java.lang.Ite
}
}
}
-}
+}
View
2 core/src/main/scala/kafka/javaapi/ProducerRequest.scala
@@ -48,4 +48,4 @@ class ProducerRequest(val topic: String,
override def hashCode: Int = 31 + (17 * partition) + topic.hashCode + messages.hashCode
-}
+}
View
2 core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
@@ -67,4 +67,4 @@ class SimpleConsumer(val host: String,
def close() {
underlying.close
}
-}
+}
View
2 core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
@@ -85,4 +85,4 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
def shutdown() {
underlying.shutdown
}
-}
+}
View
65 core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
@@ -17,35 +17,60 @@ package kafka.javaapi.message
import java.nio.ByteBuffer
import kafka.common.ErrorMapping
-import kafka.message.Message
-import java.nio.channels.WritableByteChannel
+import org.apache.log4j.Logger
+import kafka.message._
-class ByteBufferMessageSet(val buffer: ByteBuffer, val errorCode: Int) extends MessageSet {
- val underlying = new kafka.message.ByteBufferMessageSet(buffer, errorCode)
+class ByteBufferMessageSet(private val buffer: ByteBuffer,
+ private val initialOffset: Long = 0L,
+ private val errorCode: Int = ErrorMapping.NoError) extends MessageSet {
+ private val logger = Logger.getLogger(getClass())
+ val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer,
+ initialOffset,
+ errorCode)
+ def this(buffer: ByteBuffer) = this(buffer, 0L, ErrorMapping.NoError)
- def this(buffer: ByteBuffer) = this(buffer,ErrorMapping.NO_ERROR)
+ def this(compressionCodec: CompressionCodec, messages: java.util.List[Message]) {
+ this(compressionCodec match {
+ case NoCompressionCodec =>
+ val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
+ val messageIterator = messages.iterator
+ while(messageIterator.hasNext) {
+ val message = messageIterator.next
+ message.serializeTo(buffer)
+ }
+ buffer.rewind
+ buffer
+ case _ =>
+ import scala.collection.JavaConversions._
+ val message = CompressionUtils.compress(asBuffer(messages), compressionCodec)
+ val buffer = ByteBuffer.allocate(message.serializedSize)
+ message.serializeTo(buffer)
+ buffer.rewind
+ buffer
+ }, 0L, ErrorMapping.NoError)
+ }
def this(messages: java.util.List[Message]) {
- this(ByteBuffer.allocate(kafka.message.MessageSet.messageSetSize(messages)))
- val iter = messages.iterator
- while(iter.hasNext) {
- val message = iter.next
- buffer.putInt(message.size)
- buffer.put(message.buffer)
- message.buffer.rewind()
- }
- buffer.rewind()
+ this(NoCompressionCodec, messages)
}
- def validBytes: Int = underlying.validBytes
+ def validBytes: Long = underlying.validBytes
+
+ def serialized():ByteBuffer = underlying.serialized
+
+ def getInitialOffset = initialOffset
+
+ def getBuffer = buffer
+
+ def getErrorCode = errorCode
- override def iterator: java.util.Iterator[Message] = new java.util.Iterator[Message] {
+ override def iterator: java.util.Iterator[MessageAndOffset] = new java.util.Iterator[MessageAndOffset] {
val underlyingIterator = underlying.iterator
override def hasNext(): Boolean = {
underlyingIterator.hasNext
}
- override def next(): Message = {
+ override def next(): MessageAndOffset = {
underlyingIterator.next
}
@@ -59,13 +84,13 @@ class ByteBufferMessageSet(val buffer: ByteBuffer, val errorCode: Int) extends M
override def equals(other: Any): Boolean = {
other match {
case that: ByteBufferMessageSet =>
- (that canEqual this) && errorCode == that.errorCode && buffer.equals(that.buffer)
+ (that canEqual this) && errorCode == that.errorCode && buffer.equals(that.buffer) && initialOffset == that.initialOffset
case _ => false
}
}
def canEqual(other: Any): Boolean = other.isInstanceOf[ByteBufferMessageSet]
- override def hashCode: Int = 31 * (17 + errorCode) + buffer.hashCode
+ override def hashCode: Int = 31 * (17 + errorCode) + buffer.hashCode + initialOffset.hashCode
-}
+}
View
12 core/src/main/scala/kafka/javaapi/message/MessageSet.scala
@@ -16,8 +16,8 @@
package kafka.javaapi.message
-import kafka.message.{InvalidMessageException, Message}
import java.nio.channels.WritableByteChannel
+import kafka.message.{MessageAndOffset, InvalidMessageException, Message}
/**
* A set of messages. A message set has a fixed serialized form, though the container
@@ -26,12 +26,12 @@ import java.nio.channels.WritableByteChannel
* 4 byte size containing an integer N
* N message bytes as described in the message class
*/
-abstract class MessageSet extends java.lang.Iterable[Message] {
+abstract class MessageSet extends java.lang.Iterable[MessageAndOffset] {
/**
* Provides an iterator over the messages in this set
*/
- def iterator: java.util.Iterator[Message]
+ def iterator: java.util.Iterator[MessageAndOffset]
/**
* Gives the total size of this message set in bytes
@@ -45,9 +45,9 @@ abstract class MessageSet extends java.lang.Iterable[Message] {
def validate(): Unit = {
val thisIterator = this.iterator
while(thisIterator.hasNext) {
- val message = thisIterator.next
- if(!message.isValid)
+ val messageAndOffset = thisIterator.next
+ if(!messageAndOffset.message.isValid)
throw new InvalidMessageException
}
}
-}
+}
View
2 core/src/main/scala/kafka/javaapi/producer/Producer.scala
@@ -119,4 +119,4 @@ class Producer[K,V](config: ProducerConfig,
* the zookeeper client connection if one exists
*/
def close = underlying.close
-}
+}
View
2 core/src/main/scala/kafka/javaapi/producer/ProducerData.scala
@@ -30,4 +30,4 @@ class ProducerData[K, V](private val topic: String,
def getKey: K = key
def getData: java.util.List[V] = data
-}
+}
View
2 core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala
@@ -44,4 +44,4 @@ class SyncProducer(syncProducer: kafka.producer.SyncProducer) {
def close() {
underlying.close
}
-}
+}
View
19 core/src/main/scala/kafka/log/Log.scala
@@ -29,7 +29,7 @@ import kafka.api.OffsetRequest
import java.util._
private[log] object Log {
- val FILE_SUFFIX = ".kafka"
+ val FileSuffix = ".kafka"
/**
* Find a given range object in a list of ranges by a value in that range. Does a binary search over the ranges
@@ -77,7 +77,7 @@ private[log] object Log {
nf.setMinimumIntegerDigits(20)
nf.setMaximumFractionDigits(0)
nf.setGroupingUsed(false)
- nf.format(offset) + Log.FILE_SUFFIX
+ nf.format(offset) + Log.FileSuffix
}
}
@@ -124,11 +124,11 @@ private[log] class Log(val dir: File, val maxSize: Long, val flushInterval: Int,
val accum = new ArrayList[LogSegment]
val ls = dir.listFiles()
if(ls != null) {
- for(file <- ls if file.isFile && file.toString.endsWith(Log.FILE_SUFFIX)) {
+ for(file <- ls if file.isFile && file.toString.endsWith(Log.FileSuffix)) {
if(!file.canRead)
throw new IOException("Could not read file " + file)
val filename = file.getName()
- val start = filename.substring(0, filename.length - Log.FILE_SUFFIX.length).toLong
+ val start = filename.substring(0, filename.length - Log.FileSuffix.length).toLong
val messageSet = new FileMessageSet(file, false)
accum.add(new LogSegment(file, messageSet, start))
}
@@ -198,8 +198,8 @@ private[log] class Log(val dir: File, val maxSize: Long, val flushInterval: Int,
def append(messages: MessageSet): Unit = {
// validate the messages
var numberOfMessages = 0
- for(message <- messages) {
- if(!message.isValid)
+ for(messageAndOffset <- messages) {
+ if(!messageAndOffset.message.isValid)
throw new InvalidMessageException()
numberOfMessages += 1;
}
@@ -323,12 +323,15 @@ private[log] class Log(val dir: File, val maxSize: Long, val flushInterval: Int,
var startIndex = -1
request.time match {
- case OffsetRequest.LATEST_TIME =>
+ case OffsetRequest.LatestTime =>
startIndex = offsetTimeArray.length - 1
- case OffsetRequest.EARLIEST_TIME =>
+ case OffsetRequest.EarliestTime =>
startIndex = 0
case _ =>
var isFound = false
+ if(logger.isDebugEnabled) {
+ logger.debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2)))
+ }
startIndex = offsetTimeArray.length - 1
while (startIndex >= 0 && !isFound) {
if (offsetTimeArray(startIndex)._2 <= request.time)
View
26 core/src/main/scala/kafka/message/ByteBufferBackedInputStream.scala
@@ -0,0 +1,26 @@
+package kafka.message
+
+import java.io.InputStream
+import java.nio.ByteBuffer
+import scala.Math
+
+class ByteBufferBackedInputStream(buffer:ByteBuffer) extends InputStream {
+ override def read():Int = {
+ buffer.hasRemaining match {
+ case true =>
+ (buffer.get() & 0xFF)
+ case false => -1
+ }
+ }
+
+ override def read(bytes:Array[Byte], off:Int, len:Int):Int = {
+ buffer.hasRemaining match {
+ case true =>
+ // Read only what's left
+ val realLen = math.min(len, buffer.remaining())
+ buffer.get(bytes, off, realLen)
+ realLen
+ case false => -1
+ }
+ }
+}
View
162 core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -16,71 +16,151 @@
package kafka.message
-import java.nio._
-import java.nio.channels._
import scala.collection.mutable
-import kafka.message._
-import kafka.utils._
import org.apache.log4j.Logger
import kafka.common.{InvalidMessageSizeException, ErrorMapping}
+import java.nio.ByteBuffer
+import java.nio.channels.WritableByteChannel
+import kafka.utils.IteratorTemplate
/**
* A sequence of messages stored in a byte buffer
+ *
+ * There are two ways to create a ByteBufferMessageSet
+ *
+ * Option 1: From a ByteBuffer which already contains the serialized message set. Consumers will use this method.
+ *
+ * Option 2: Give it a list of messages along with instructions relating to serialization format. Producers will use this method.
+ *
*/
-class ByteBufferMessageSet(val buffer: ByteBuffer, val errorCOde: Int) extends MessageSet {
+class ByteBufferMessageSet(private val buffer: ByteBuffer,
+ private val initialOffset: Long = 0L,
+ private val errorCode: Int = ErrorMapping.NoError) extends MessageSet {
private val logger = Logger.getLogger(getClass())
- private var validByteCount = -1
+ private var validByteCount = -1L
+ private var shallowValidByteCount = -1L
+ private var deepValidByteCount = -1L
+
+ def this(compressionCodec: CompressionCodec, messages: Message*) {
+ this(
+ compressionCodec match {
+ case NoCompressionCodec =>
+ val buffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
+ for (message <- messages) {
+ message.serializeTo(buffer)
+ }
+ buffer.rewind
+ buffer
+ case _ =>
+ val message = CompressionUtils.compress(messages, compressionCodec)
+ val buffer = ByteBuffer.allocate(message.serializedSize)
+ message.serializeTo(buffer)
+ buffer.rewind
+ buffer
+ }, 0L, ErrorMapping.NoError)
+ }
- def this(buffer: ByteBuffer) = this(buffer,ErrorMapping.NO_ERROR)
-
def this(messages: Message*) {
- this(ByteBuffer.allocate(MessageSet.messageSetSize(messages)))
- for(message <- messages) {
- buffer.putInt(message.size)
- buffer.put(message.buffer)
- message.buffer.rewind()
+ this(NoCompressionCodec, messages: _*)
+ }
+
+ def getInitialOffset = initialOffset
+
+ def getBuffer = buffer
+
+ def getErrorCode = errorCode
+
+ def serialized(): ByteBuffer = buffer
+
+ def validBytes: Long = deepValidBytes
+
+ def shallowValidBytes: Long = {
+ if(shallowValidByteCount < 0) {
+ val iter = deepIterator
+ while(iter.hasNext) {
+ val messageAndOffset = iter.next
+ shallowValidByteCount = messageAndOffset.offset
+ }
}
- buffer.rewind()
+ shallowValidByteCount - initialOffset
}
- def validBytes: Int = {
- if(validByteCount < 0) {
- val iter = iterator
- while(iter.hasNext)
- iter.next()
+ def deepValidBytes: Long = {
+ if (deepValidByteCount < 0) {
+ val iter = deepIterator
+ while (iter.hasNext)
+ iter.next
}
- validByteCount
+ deepValidByteCount
}
/** Write the messages in this set to the given channel */
def writeTo(channel: WritableByteChannel, offset: Long, size: Long): Long =
channel.write(buffer.duplicate)
- override def iterator: Iterator[Message] = {
- ErrorMapping.maybeThrowException(errorCOde)
- new IteratorTemplate[Message] {
- var iter = buffer.slice()
- var currValidBytes = 0
-
- override def makeNext(): Message = {
- // read the size of the item
- if(iter.remaining < 4) {
- validByteCount = currValidBytes
+ override def iterator: Iterator[MessageAndOffset] = deepIterator
+
+ private def deepIterator(): Iterator[MessageAndOffset] = {
+ ErrorMapping.maybeThrowException(errorCode)
+ new IteratorTemplate[MessageAndOffset] {
+ var topIter = buffer.slice()
+ var currValidBytes = initialOffset
+ var innerIter:Iterator[MessageAndOffset] = null
+ var lastMessageSize = 0L
+
+ def innerDone():Boolean = (innerIter==null || !innerIter.hasNext)
+
+ def makeNextOuter: MessageAndOffset = {
+ if (topIter.remaining < 4) {
+ deepValidByteCount = currValidBytes
return allDone()
}
- val size = iter.getInt()
- if(size < 0 || iter.remaining < size) {
- validByteCount = currValidBytes
+ val size = topIter.getInt()
+ lastMessageSize = size
+
+ if(logger.isTraceEnabled) {
+ logger.trace("Remaining bytes in iterator = " + topIter.remaining)
+ logger.trace("size of data = " + size)
+ }
+ if(size < 0 || topIter.remaining < size) {
+ deepValidByteCount = currValidBytes
if (currValidBytes == 0 || size < 0)
- throw new InvalidMessageSizeException("invalid message size:" + size + " only received bytes:" + iter.remaining
- + " at " + currValidBytes + " possible causes (1) a single message larger than the fetch size; (2) log corruption")
+ throw new InvalidMessageSizeException("invalid message size: %d only received bytes: %d " +
+ " at %d possible causes (1) a single message larger than the fetch size; (2) log corruption "
+ .format(size, topIter.remaining, currValidBytes))
return allDone()
}
- currValidBytes += 4 + size
- val message = iter.slice()
+ val message = topIter.slice()
message.limit(size)
- iter.position(iter.position + size)
- new Message(message)
+ topIter.position(topIter.position + size)
+ val newMessage = new Message(message)
+ newMessage.compressionCodec match {
+ case NoCompressionCodec =>
+ if(logger.isDebugEnabled)
+ logger.debug("Message is uncompressed. Valid byte count = %d".format(currValidBytes))
+ innerIter = null
+ currValidBytes += 4 + size
+ new MessageAndOffset(newMessage, currValidBytes)
+ case _ =>
+ if(logger.isDebugEnabled)
+ logger.debug("Message is compressed. Valid byte count = %d".format(currValidBytes))
+ innerIter = CompressionUtils.decompress(newMessage).deepIterator
+ makeNext()
+ }
+ }
+
+ override def makeNext(): MessageAndOffset = {
+ if(logger.isDebugEnabled)
+ logger.debug("makeNext() in deepIterator: innerDone = " + innerDone)
+ innerDone match {
+ case true => makeNextOuter
+ case false => {
+ val messageAndOffset = innerIter.next
+ if(!innerIter.hasNext)
+ currValidBytes += 4 + lastMessageSize
+ new MessageAndOffset(messageAndOffset.message, currValidBytes)
+ }
+ }
}
}
}
@@ -101,12 +181,12 @@ class ByteBufferMessageSet(val buffer: ByteBuffer, val errorCOde: Int) extends M
override def equals(other: Any): Boolean = {
other match {
case that: ByteBufferMessageSet =>
- (that canEqual this) && errorCOde == that.errorCOde && buffer.equals(that.buffer)
+ (that canEqual this) && errorCode == that.errorCode && buffer.equals(that.buffer) && initialOffset == that.initialOffset
case _ => false
}
}
override def canEqual(other: Any): Boolean = other.isInstanceOf[ByteBufferMessageSet]
- override def hashCode: Int = 31 * (17 + errorCOde) + buffer.hashCode
+ override def hashCode: Int = 31 + (17 * errorCode) + buffer.hashCode + initialOffset.hashCode
}
View
35 core/src/main/scala/kafka/message/CompressionCodec.scala
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.message
+
+object CompressionCodec {
+ def getCompressionCodec(codec: Int): CompressionCodec = {
+ codec match {
+ case 0 => NoCompressionCodec
+ case 1 => GZIPCompressionCodec
+ case _ => throw new kafka.common.UnknownCodecException("%d is an unknown compression codec".format(codec))
+ }
+ }
+}
+
+sealed trait CompressionCodec { def codec: Int }
+
+case object DefaultCompressionCodec extends CompressionCodec { val codec = 1 }
+
+case object GZIPCompressionCodec extends CompressionCodec { val codec = 1 }
+
+case object NoCompressionCodec extends CompressionCodec { val codec = 0 }
View
143 core/src/main/scala/kafka/message/CompressionUtils.scala
@@ -0,0 +1,143 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.message
+
+import java.io.ByteArrayOutputStream
+import java.io.IOException
+import java.io.InputStream
+import java.util.zip.GZIPInputStream
+import java.util.zip.GZIPOutputStream
+import java.nio.ByteBuffer
+import org.apache.log4j.Logger
+
+object CompressionUtils {
+ private val logger = Logger.getLogger(getClass)
+
+ def compress(messages: Iterable[Message]): Message = compress(messages, DefaultCompressionCodec)
+
+ def compress(messages: Iterable[Message], compressionCodec: CompressionCodec):Message = compressionCodec match {
+ case DefaultCompressionCodec =>
+ val outputStream:ByteArrayOutputStream = new ByteArrayOutputStream()
+ val gzipOutput:GZIPOutputStream = new GZIPOutputStream(outputStream)
+ if(logger.isDebugEnabled)
+ logger.debug("Allocating message byte buffer of size = " + MessageSet.messageSetSize(messages))
+
+ val messageByteBuffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
+ messages.foreach(m => m.serializeTo(messageByteBuffer))
+ messageByteBuffer.rewind
+
+ try {
+ gzipOutput.write(messageByteBuffer.array)
+ } catch {
+ case e: IOException => logger.error("Error while writing to the GZIP output stream", e)
+ if(gzipOutput != null) gzipOutput.close();
+ if(outputStream != null) outputStream.close()
+ throw e
+ } finally {
+ if(gzipOutput != null) gzipOutput.close()
+ if(outputStream != null) outputStream.close()
+ }
+
+ val oneCompressedMessage:Message = new Message(outputStream.toByteArray, compressionCodec)
+ oneCompressedMessage
+ case GZIPCompressionCodec =>
+ val outputStream:ByteArrayOutputStream = new ByteArrayOutputStream()
+ val gzipOutput:GZIPOutputStream = new GZIPOutputStream(outputStream)
+ if(logger.isDebugEnabled)
+ logger.debug("Allocating message byte buffer of size = " + MessageSet.messageSetSize(messages))
+
+ val messageByteBuffer = ByteBuffer.allocate(MessageSet.messageSetSize(messages))
+ messages.foreach(m => m.serializeTo(messageByteBuffer))
+ messageByteBuffer.rewind
+
+ try {
+ gzipOutput.write(messageByteBuffer.array)
+ } catch {
+ case e: IOException => logger.error("Error while writing to the GZIP output stream", e)
+ if(gzipOutput != null)
+ gzipOutput.close()
+ if(outputStream != null)
+ outputStream.close()
+ throw e
+ } finally {
+ if(gzipOutput != null)
+ gzipOutput.close()
+ if(outputStream != null)
+ outputStream.close()
+ }
+
+ val oneCompressedMessage:Message = new Message(outputStream.toByteArray, compressionCodec)
+ oneCompressedMessage
+ case _ =>
+ throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec)
+ }
+
+ def decompress(message: Message): ByteBufferMessageSet = message.compressionCodec match {
+ case DefaultCompressionCodec =>
+ val outputStream:ByteArrayOutputStream = new ByteArrayOutputStream
+ val inputStream:InputStream = new ByteBufferBackedInputStream(message.payload)
+ val gzipIn:GZIPInputStream = new GZIPInputStream(inputStream)
+ val intermediateBuffer = new Array[Byte](1024)
+
+ try {
+ Stream.continually(gzipIn.read(intermediateBuffer)).takeWhile(_ > 0).foreach { dataRead =>
+ outputStream.write(intermediateBuffer, 0, dataRead)
+ }
+ }catch {
+ case e: IOException => logger.error("Error while reading from the GZIP input stream", e)
+ if(gzipIn != null) gzipIn.close
+ if(outputStream != null) outputStream.close
+ throw e
+ } finally {
+ if(gzipIn != null) gzipIn.close
+ if(outputStream != null) outputStream.close
+ }
+
+ val outputBuffer = ByteBuffer.allocate(outputStream.size)
+ outputBuffer.put(outputStream.toByteArray)
+ outputBuffer.rewind
+ val outputByteArray = outputStream.toByteArray
+ new ByteBufferMessageSet(outputBuffer)
+ case GZIPCompressionCodec =>
+ val outputStream:ByteArrayOutputStream = new ByteArrayOutputStream
+ val inputStream:InputStream = new ByteBufferBackedInputStream(message.payload)
+ val gzipIn:GZIPInputStream = new GZIPInputStream(inputStream)
+ val intermediateBuffer = new Array[Byte](1024)
+
+ try {
+ Stream.continually(gzipIn.read(intermediateBuffer)).takeWhile(_ > 0).foreach { dataRead =>
+ outputStream.write(intermediateBuffer, 0, dataRead)
+ }
+ }catch {
+ case e: IOException => logger.error("Error while reading from the GZIP input stream", e)
+ if(gzipIn != null) gzipIn.close
+ if(outputStream != null) outputStream.close
+ throw e
+ } finally {
+ if(gzipIn != null) gzipIn.close
+ if(outputStream != null) outputStream.close
+ }
+
+ val outputBuffer = ByteBuffer.allocate(outputStream.size)
+ outputBuffer.put(outputStream.toByteArray)
+ outputBuffer.rewind
+ val outputByteArray = outputStream.toByteArray
+ new ByteBufferMessageSet(outputBuffer)
+ case _ =>
+ throw new kafka.common.UnknownCodecException("Unknown Codec: " + message.compressionCodec)
+ }
+}
View
23 core/src/main/scala/kafka/message/FileMessageSet.scala
@@ -34,10 +34,10 @@ import kafka.utils._
*/
@nonthreadsafe
class FileMessageSet private[kafka](private[message] val channel: FileChannel,
- private[message] val offset: Long,
- private[message] val limit: Long,
- val mutable: Boolean,
- val needRecover: AtomicBoolean) extends MessageSet {
+ private[message] val offset: Long,
+ private[message] val limit: Long,
+ val mutable: Boolean,
+ val needRecover: AtomicBoolean) extends MessageSet {
private val setSize = new AtomicLong()
private val setHighWaterMark = new AtomicLong()
@@ -95,7 +95,8 @@ class FileMessageSet private[kafka](private[message] val channel: FileChannel,
* Return a message set which is a view into this set starting from the given offset and with the given size limit.
*/
def read(readOffset: Long, size: Long): MessageSet = {
- new FileMessageSet(channel, this.offset + readOffset, scala.math.min(this.offset + readOffset + size, highWaterMark), false, new AtomicBoolean(false))
+ new FileMessageSet(channel, this.offset + readOffset, scala.math.min(this.offset + readOffset + size, highWaterMark),
+ false, new AtomicBoolean(false))
}
/**
@@ -107,11 +108,11 @@ class FileMessageSet private[kafka](private[message] val channel: FileChannel,
/**
* Get an iterator over the messages in the set
*/
- override def iterator: Iterator[Message] = {
- new IteratorTemplate[Message] {
+ override def iterator: Iterator[MessageAndOffset] = {
+ new IteratorTemplate[MessageAndOffset] {
var location = offset
- override def makeNext(): Message = {
+ override def makeNext(): MessageAndOffset = {
// read the size of the item
val sizeBuffer = ByteBuffer.allocate(4)
channel.read(sizeBuffer, location)
@@ -120,7 +121,7 @@ class FileMessageSet private[kafka](private[message] val channel: FileChannel,
sizeBuffer.rewind()
val size: Int = sizeBuffer.getInt()
- if (size < Message.HeaderSize)
+ if (size < Message.MinHeaderSize)
return allDone()
// read the item itself
@@ -132,7 +133,7 @@ class FileMessageSet private[kafka](private[message] val channel: FileChannel,
// increment the location and return the item
location += size + 4
- new Message(buffer)
+ new MessageAndOffset(new Message(buffer), location)
}
}
}
@@ -225,7 +226,7 @@ class FileMessageSet private[kafka](private[message] val channel: FileChannel,
// check that we have sufficient bytes left in the file
val size = buffer.getInt(0)
- if (size < Message.HeaderSize)
+ if (size < Message.MinHeaderSize)
return -1
val next = start + 4 + size
View
129 core/src/main/scala/kafka/message/Message.scala
@@ -21,69 +21,158 @@ import java.nio.channels._
import java.util.zip.CRC32
import java.util.UUID
import kafka.utils._
+import kafka.common.UnknownMagicByteException
/**
* Message byte offsets
*/
object Message {
- val CurrentMagicValue: Byte = 0
+ val MagicVersion1: Byte = 0
+ val MagicVersion2: Byte = 1
+ val CurrentMagicValue: Byte = 1
val MagicOffset = 0
val MagicLength = 1
- val CrcOffset = MagicOffset + MagicLength
+ val AttributeOffset = MagicOffset + MagicLength
+ val AttributeLength = 1
+ /**
+ * Specifies the mask for the compression code. 2 bits to hold the compression codec.
+ * 0 is reserved to indicate no compression
+ */
+ val CompressionCodeMask: Int = 0x03 //
+
+
+ val NoCompression:Int = 0
+
+ /**
+ * Computes the CRC value based on the magic byte
+ * @param magic Specifies the magic byte value. Possible values are 0 and 1
+ * 0 for no compression
+ * 1 for compression
+ */
+ def crcOffset(magic: Byte): Int = magic match {
+ case MagicVersion1 => MagicOffset + MagicLength
+ case MagicVersion2 => AttributeOffset + AttributeLength
+ case _ => throw new UnknownMagicByteException("Magic byte value of %d is unknown".format(magic))
+ }
+
val CrcLength = 4
- val PayloadOffset = CrcOffset + CrcLength
- val HeaderSize = PayloadOffset
+
+ /**
+ * Computes the offset to the message payload based on the magic byte
+ * @param magic Specifies the magic byte value. Possible values are 0 and 1
+ * 0 for no compression
+ * 1 for compression
+ */
+ def payloadOffset(magic: Byte): Int = crcOffset(magic) + CrcLength
+
+ /**
+ * Computes the size of the message header based on the magic byte
+ * @param magic Specifies the magic byte value. Possible values are 0 and 1
+ * 0 for no compression
+ * 1 for compression
+ */
+ def headerSize(magic: Byte): Int = payloadOffset(magic)
+
+ /**
+ * Size of the header for magic byte 0. This is the minimum size of any message header
+ */
+ val MinHeaderSize = headerSize(0);
}
/**
* A message. The format of an N byte message is the following:
- * 1 byte "magic" identifier to allow format changes
- * 4 byte CRC32 of the payload
- * N - 5 byte payload
+ *
+ * If magic byte is 0
+ *
+ * 1. 1 byte "magic" identifier to allow format changes
+ *
+ * 2. 4 byte CRC32 of the payload
+ *
+ * 3. N - 5 byte payload
+ *
+ * If magic byte is 1
+ *
+ * 1. 1 byte "magic" identifier to allow format changes
+ *
+ * 2. 1 byte "attributes" identifier to allow annotations on the message independent of the version (e.g. compression enabled, type of codec used)
+ *
+ * 3. 4 byte CRC32 of the payload
+ *
+ * 4. N - 6 byte payload
*
*/
class Message(val buffer: ByteBuffer) {