diff --git a/core/internal/consumer/kafka_client.go b/core/internal/consumer/kafka_client.go index 88e0e4a9..a78e5012 100644 --- a/core/internal/consumer/kafka_client.go +++ b/core/internal/consumer/kafka_client.go @@ -73,6 +73,7 @@ type metadataHeader struct { } type metadataMember struct { MemberID string + GroupInstanceID string ClientID string ClientHost string RebalanceTimeout int32 @@ -511,7 +512,7 @@ func (module *KafkaClient) decodeGroupMetadata(keyBuffer *bytes.Buffer, value [] } switch valueVersion { - case 0, 1, 2: + case 0, 1, 2, 3: module.decodeAndSendGroupMetadata(valueVersion, group, valueBuffer, logger.With( zap.String("message_type", "metadata"), zap.String("group", group), @@ -530,7 +531,7 @@ func (module *KafkaClient) decodeAndSendGroupMetadata(valueVersion int16, group var metadataHeader metadataHeader var errorAt string switch valueVersion { - case 2: + case 2, 3: metadataHeader, errorAt = decodeMetadataValueHeaderV2(valueBuffer) default: metadataHeader, errorAt = decodeMetadataValueHeader(valueBuffer) @@ -650,6 +651,13 @@ func decodeMetadataValueHeaderV2(buf *bytes.Buffer) (metadataHeader, string) { return metadataHeader, "" } +func decodeGroupInstanceID(buf *bytes.Buffer, memberVersion int16) (string, error) { + if memberVersion == 3 { + return readString(buf) + } + return "", nil +} + func decodeMetadataMember(buf *bytes.Buffer, memberVersion int16) (metadataMember, string) { var err error memberMetadata := metadataMember{} @@ -658,6 +666,10 @@ func decodeMetadataMember(buf *bytes.Buffer, memberVersion int16) (metadataMembe if err != nil { return memberMetadata, "member_id" } + memberMetadata.GroupInstanceID, err = decodeGroupInstanceID(buf, memberVersion) + if err != nil { + return memberMetadata, "group_instance_id" + } memberMetadata.ClientID, err = readString(buf) if err != nil { return memberMetadata, "client_id" @@ -666,7 +678,7 @@ func decodeMetadataMember(buf *bytes.Buffer, memberVersion int16) (metadataMembe if err != nil { return memberMetadata, "client_host" } - if memberVersion == 1 || memberVersion == 2 { + if memberVersion >= 1 { err = binary.Read(buf, binary.BigEndian, &memberMetadata.RebalanceTimeout) if err != nil { return memberMetadata, "rebalance_timeout" diff --git a/core/internal/consumer/kafka_client_test.go b/core/internal/consumer/kafka_client_test.go index 77bf5312..da18c515 100644 --- a/core/internal/consumer/kafka_client_test.go +++ b/core/internal/consumer/kafka_client_test.go @@ -319,7 +319,31 @@ func TestKafkaClient_decodeMetadataValueHeaderV2(t *testing.T) { assert.Equalf(t, "range", result.Protocol, "Expected Protocol to be range, not %v", result.Protocol) assert.Equalf(t, "tLeader-a42d2baa-bfaa-4b96-9ea2-dee5f42b2ab2", result.Leader, "Expected Leader to be tLeader-a42d2baa-bfaa-4b96-9ea2-dee5f42b2ab2, not %v", result.Leader) assert.Equalf(t, int64(1552078883402), result.CurrentStateTimestamp, "Expected CurrentStateTimestamp to be 1552078883402, not %v", result.CurrentStateTimestamp) - assert.Equalf(t, "", errorAt, "Expected decodeMetadataValueHeader to return empty errorAt, not %v", errorAt) + assert.Equalf(t, "", errorAt, "Expected decodeMetadataValueHeaderV2 to return empty errorAt, not %v", errorAt) +} + +func TestKafkaClient_decodeMetadataValueHeaderV3(t *testing.T) { + var valueVersion int16 + metadata := "\x00\x03\x00" // Header Version 3 + metadata += "\x08consumer" // Protocol Type + metadata += "\x00\x00\x00\x01\x00\x12RoundRobinAssigner\x00,tLeader-a42d2baa-bfaa-4b96-9ea2-dee5f42b2ab2" // Generation, Protocol, Leader + metadata += "\x00\x00\x01l\xDF5\x07k\x00\x00\x00\x01" // Timestamp + metadata += "\x00\x06member\xFF\xFF\x00\x06client\x00\x0D/10.10.10.100\xFF\xFF\xFF\xFF\x00\x00\x75\x30" // Member Metadata + metadata += "\x00\x00\x00\x12\x00\x01\x00\x00\x00\x01\x00\x06topic1\x00\x00\x00\x00" // Member Metadata + metadata += "\x00\x00\x00\x1A\x00\x01\x00\x00\x00\x01\x00\x06topic1\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00" // Member Metadata + value := []byte(metadata) + + valueBuffer := bytes.NewBuffer(value) + binary.Read(valueBuffer, binary.BigEndian, &valueVersion) + assert.Equalf(t, int16(3), valueVersion, "Expected valueVersion to be 3, not %v", valueVersion) + + result, errorAt := decodeMetadataValueHeaderV2(valueBuffer) + assert.Equalf(t, "consumer", result.ProtocolType, "Expected ProtocolType to be consumer, not %v", result.ProtocolType) + assert.Equalf(t, int32(1), result.Generation, "Expected Generation to be 1, not %v", result.Generation) + assert.Equalf(t, "RoundRobinAssigner", result.Protocol, "Expected protocol to be RoundRobinAssigner, not %v", result.Protocol) + assert.Equalf(t, "tLeader-a42d2baa-bfaa-4b96-9ea2-dee5f42b2ab2", result.Leader, "Expected Leader to be tLeader-a42d2baa-bfaa-4b96-9ea2-dee5f42b2ab2, not %v", result.Leader) + assert.Equalf(t, int64(1567112890219), result.CurrentStateTimestamp, "Expected CurrentStateTimestamp to be 1567112890219, not %v", result.CurrentStateTimestamp) + assert.Equalf(t, "", errorAt, "Expected decodeMetadataValueHeaderV2 to return empty errorAt, not %v", errorAt) } var decodeMetadataValueHeaderErrors = []errorTestSetBytesWithString{ @@ -348,6 +372,30 @@ func TestKafkaClient_decodeMetadataMember(t *testing.T) { assert.Equalf(t, int32(8), result.SessionTimeout, "Expected SessionTimeout to be 8, not %v", result.SessionTimeout) } +func TestKafkaClient_decodeMetadataMemberV3(t *testing.T) { + metadata := "\x00\x06member\xFF" // memberID + metadata += "\xFF" // groupInstanceID + metadata += "\x00\x06client\x00" // clientID + metadata += "\x0D/10.10.10.100" // clientHost + metadata += "\xFF\xFF\xFF\xFF" // rebalanceTimeout + metadata += "\x00\x00\x75\x30" // seesionTimeout + metadata += "\x00\x00\x00\x12\x00\x01\x00\x00\x00\x01\x00\x06topic1\x00\x00\x00\x00" // subscription + metadata += "\x00\x00\x00\x1A\x00\x01\x00\x00\x00\x01\x00\x06topic1\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00" // assignment + value := []byte(metadata) + valueBuffer := bytes.NewBuffer(value) + + result, errorAt := decodeMetadataMember(valueBuffer, 3) + + assert.Equalf(t, "", errorAt, "Expected decodeMetadataMember to return empty errorAt, not %v", errorAt) + assert.Equalf(t, "member", result.MemberID, "Expected MemberID to be member, not %v", result.MemberID) + assert.Equalf(t, "", result.GroupInstanceID, "Expected GroupInstanceID to be \"\" not %v", result.GroupInstanceID) + assert.Equalf(t, "client", result.ClientID, "Expected ClientID to be client, not %v", result.ClientID) + assert.Equalf(t, "/10.10.10.100", result.ClientHost, "Expected ClientHost to be /10.10.10.100, not %v", result.ClientHost) + assert.Equalf(t, int32(-1), result.RebalanceTimeout, "Expected RebalanceTimeout to be -1, not %v", result.RebalanceTimeout) + assert.Equalf(t, int32(30000), result.SessionTimeout, "Expected SessionTimeout to be 30000, not %v", result.SessionTimeout) + assert.Equalf(t, []int32{0}, result.Assignment["topic1"], "Expected topic1 assignment to be {0}, not %v", result.Assignment["topic1"]) +} + var decodeMetadataMemberErrors = []errorTestSetBytesWithString{ {[]byte("\x00\x0ctestmemb"), "member_id"}, {[]byte("\x00\x0ctestmemberid\x00\x0ctestclie"), "client_id"},