From 05129a72a03590fdc7fabf3c6599c0e4f4b64bc4 Mon Sep 17 00:00:00 2001 From: Cliff Resnick Date: Tue, 16 Apr 2013 16:02:06 -0400 Subject: [PATCH] Fix to support reading batched compressed messages These messages span offsets, so "next" may sometimes be zero. KafkaContext changed to pass the actual message offset. --- src/main/java/kafka/consumer/KafkaContext.java | 3 ++- src/main/java/kafka/consumer/KafkaInputFormat.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/java/kafka/consumer/KafkaContext.java b/src/main/java/kafka/consumer/KafkaContext.java index ba362b3..32698cd 100644 --- a/src/main/java/kafka/consumer/KafkaContext.java +++ b/src/main/java/kafka/consumer/KafkaContext.java @@ -125,9 +125,10 @@ public long getNext(LongWritable key, BytesWritable value) throws IOException { MessageAndOffset messageOffset = iterator.next(); Message message = messageOffset.message(); + + key.set(curOffset); curOffset = messageOffset.offset(); - key.set(curOffset - message.size() - 4); //byte[] bytes = new byte[message.payloadSize()]; //message.payload().get(bytes); //value.set(bytes, 0, message.payloadSize()); diff --git a/src/main/java/kafka/consumer/KafkaInputFormat.java b/src/main/java/kafka/consumer/KafkaInputFormat.java index 599bd47..16721d7 100644 --- a/src/main/java/kafka/consumer/KafkaInputFormat.java +++ b/src/main/java/kafka/consumer/KafkaInputFormat.java @@ -217,7 +217,7 @@ public boolean nextKeyValue() throws IOException, InterruptedException { if (limit < 0 || count < limit) { long next = kcontext.getNext(key, value); - if (next > 0) { + if (next >= 0) { pos = next; count++; return true;