diff --git a/lib/kafka/protocol/message.rb b/lib/kafka/protocol/message.rb index ed950e38a..9b3f459a5 100644 --- a/lib/kafka/protocol/message.rb +++ b/lib/kafka/protocol/message.rb @@ -89,6 +89,11 @@ def self.decode(decoder) timestamp = nil when 1 timestamp = message_decoder.int64 + + # If the timestamp is set to zero, it's because the message has been upgraded + # from the Kafka 0.9 disk format to the Kafka 0.10 format. The former didn't + # have a timestamp attribute, so we'll just set the timestamp to nil. + timestamp = nil if timestamp.zero? else raise Kafka::Error, "Invalid magic byte: #{magic_byte}" end