diff --git a/lib/kafka/protocol/message.rb b/lib/kafka/protocol/message.rb index 87477ec5f..06e91d4b5 100644 --- a/lib/kafka/protocol/message.rb +++ b/lib/kafka/protocol/message.rb @@ -66,13 +66,21 @@ def self.decode(decoder) crc = message_decoder.int32 magic_byte = message_decoder.int8 + attributes = message_decoder.int8 - unless magic_byte == MAGIC_BYTE + # The magic byte indicates the message format version. There are situations + # where an old message format can be returned from a newer version of Kafka, + # because old messages are not necessarily rewritten on upgrades. + case magic_byte + when 0 + # No timestamp in the pre-0.10 message format. + timestamp = nil + when 1 + timestamp = message_decoder.int64 + else raise Kafka::Error, "Invalid magic byte: #{magic_byte}" end - attributes = message_decoder.int8 - timestamp = message_decoder.int64 key = message_decoder.bytes value = message_decoder.bytes