From c957508cebf56a173fa3b41fa3871614cf05b07d Mon Sep 17 00:00:00 2001 From: Daniel Schierbeck Date: Tue, 24 Oct 2017 14:32:57 +0200 Subject: [PATCH] Decode 0.9-format messages Even if a cluster has been upgraded to Kafka protocol version 0.10, we may still get messages in the 0.9 format. --- lib/kafka/protocol/message.rb | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/lib/kafka/protocol/message.rb b/lib/kafka/protocol/message.rb index 87477ec5f..06e91d4b5 100644 --- a/lib/kafka/protocol/message.rb +++ b/lib/kafka/protocol/message.rb @@ -66,13 +66,21 @@ def self.decode(decoder) crc = message_decoder.int32 magic_byte = message_decoder.int8 + attributes = message_decoder.int8 - unless magic_byte == MAGIC_BYTE + # The magic byte indicates the message format version. There are situations + # where an old message format can be returned from a newer version of Kafka, + # because old messages are not necessarily rewritten on upgrades. + case magic_byte + when 0 + # No timestamp in the pre-0.10 message format. + timestamp = nil + when 1 + timestamp = message_decoder.int64 + else raise Kafka::Error, "Invalid magic byte: #{magic_byte}" end - attributes = message_decoder.int8 - timestamp = message_decoder.int64 key = message_decoder.bytes value = message_decoder.bytes