Skip to content

Commit

Permalink
Fix to support reading batched compressed messages
Browse files Browse the repository at this point in the history
These messages span offsets, so "next" may sometimes be zero.
KafkaContext changed to pass the actual message offset.
  • Loading branch information
Cliff Resnick committed Apr 16, 2013
1 parent e65cfbf commit 05129a7
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 2 deletions.
3 changes: 2 additions & 1 deletion src/main/java/kafka/consumer/KafkaContext.java
Expand Up @@ -125,9 +125,10 @@ public long getNext(LongWritable key, BytesWritable value) throws IOException {

MessageAndOffset messageOffset = iterator.next();
Message message = messageOffset.message();

key.set(curOffset);
curOffset = messageOffset.offset();

key.set(curOffset - message.size() - 4);
//byte[] bytes = new byte[message.payloadSize()];
//message.payload().get(bytes);
//value.set(bytes, 0, message.payloadSize());
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/kafka/consumer/KafkaInputFormat.java
Expand Up @@ -217,7 +217,7 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
if (limit < 0 || count < limit) {

long next = kcontext.getNext(key, value);
if (next > 0) {
if (next >= 0) {
pos = next;
count++;
return true;
Expand Down

0 comments on commit 05129a7

Please sign in to comment.