Skip to content

Commit

Permalink
Merge de01c81 into bda66e5
Browse files Browse the repository at this point in the history
  • Loading branch information
chrnola committed Sep 5, 2019
2 parents bda66e5 + de01c81 commit 842e446
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 4 deletions.
18 changes: 15 additions & 3 deletions core/internal/consumer/kafka_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type metadataHeader struct {
}
type metadataMember struct {
MemberID string
GroupInstanceID string
ClientID string
ClientHost string
RebalanceTimeout int32
Expand Down Expand Up @@ -511,7 +512,7 @@ func (module *KafkaClient) decodeGroupMetadata(keyBuffer *bytes.Buffer, value []
}

switch valueVersion {
case 0, 1, 2:
case 0, 1, 2, 3:
module.decodeAndSendGroupMetadata(valueVersion, group, valueBuffer, logger.With(
zap.String("message_type", "metadata"),
zap.String("group", group),
Expand All @@ -530,7 +531,7 @@ func (module *KafkaClient) decodeAndSendGroupMetadata(valueVersion int16, group
var metadataHeader metadataHeader
var errorAt string
switch valueVersion {
case 2:
case 2, 3:
metadataHeader, errorAt = decodeMetadataValueHeaderV2(valueBuffer)
default:
metadataHeader, errorAt = decodeMetadataValueHeader(valueBuffer)
Expand Down Expand Up @@ -650,6 +651,13 @@ func decodeMetadataValueHeaderV2(buf *bytes.Buffer) (metadataHeader, string) {
return metadataHeader, ""
}

func decodeGroupInstanceID(buf *bytes.Buffer, memberVersion int16) (string, error) {
if memberVersion == 3 {
return readString(buf)
}
return "", nil
}

func decodeMetadataMember(buf *bytes.Buffer, memberVersion int16) (metadataMember, string) {
var err error
memberMetadata := metadataMember{}
Expand All @@ -658,6 +666,10 @@ func decodeMetadataMember(buf *bytes.Buffer, memberVersion int16) (metadataMembe
if err != nil {
return memberMetadata, "member_id"
}
memberMetadata.GroupInstanceID, err = decodeGroupInstanceID(buf, memberVersion)
if err != nil {
return memberMetadata, "group_instance_id"
}
memberMetadata.ClientID, err = readString(buf)
if err != nil {
return memberMetadata, "client_id"
Expand All @@ -666,7 +678,7 @@ func decodeMetadataMember(buf *bytes.Buffer, memberVersion int16) (metadataMembe
if err != nil {
return memberMetadata, "client_host"
}
if memberVersion == 1 || memberVersion == 2 {
if memberVersion >= 1 {
err = binary.Read(buf, binary.BigEndian, &memberMetadata.RebalanceTimeout)
if err != nil {
return memberMetadata, "rebalance_timeout"
Expand Down
50 changes: 49 additions & 1 deletion core/internal/consumer/kafka_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,31 @@ func TestKafkaClient_decodeMetadataValueHeaderV2(t *testing.T) {
assert.Equalf(t, "range", result.Protocol, "Expected Protocol to be range, not %v", result.Protocol)
assert.Equalf(t, "tLeader-a42d2baa-bfaa-4b96-9ea2-dee5f42b2ab2", result.Leader, "Expected Leader to be tLeader-a42d2baa-bfaa-4b96-9ea2-dee5f42b2ab2, not %v", result.Leader)
assert.Equalf(t, int64(1552078883402), result.CurrentStateTimestamp, "Expected CurrentStateTimestamp to be 1552078883402, not %v", result.CurrentStateTimestamp)
assert.Equalf(t, "", errorAt, "Expected decodeMetadataValueHeader to return empty errorAt, not %v", errorAt)
assert.Equalf(t, "", errorAt, "Expected decodeMetadataValueHeaderV2 to return empty errorAt, not %v", errorAt)
}

func TestKafkaClient_decodeMetadataValueHeaderV3(t *testing.T) {
var valueVersion int16
metadata := "\x00\x03\x00" // Header Version 3
metadata += "\x08consumer" // Protocol Type
metadata += "\x00\x00\x00\x01\x00\x12RoundRobinAssigner\x00,tLeader-a42d2baa-bfaa-4b96-9ea2-dee5f42b2ab2" // Generation, Protocol, Leader
metadata += "\x00\x00\x01l\xDF5\x07k\x00\x00\x00\x01" // Timestamp
metadata += "\x00\x06member\xFF\xFF\x00\x06client\x00\x0D/10.10.10.100\xFF\xFF\xFF\xFF\x00\x00\x75\x30" // Member Metadata
metadata += "\x00\x00\x00\x12\x00\x01\x00\x00\x00\x01\x00\x06topic1\x00\x00\x00\x00" // Member Metadata
metadata += "\x00\x00\x00\x1A\x00\x01\x00\x00\x00\x01\x00\x06topic1\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00" // Member Metadata
value := []byte(metadata)

valueBuffer := bytes.NewBuffer(value)
binary.Read(valueBuffer, binary.BigEndian, &valueVersion)
assert.Equalf(t, int16(3), valueVersion, "Expected valueVersion to be 3, not %v", valueVersion)

result, errorAt := decodeMetadataValueHeaderV2(valueBuffer)
assert.Equalf(t, "consumer", result.ProtocolType, "Expected ProtocolType to be consumer, not %v", result.ProtocolType)
assert.Equalf(t, int32(1), result.Generation, "Expected Generation to be 1, not %v", result.Generation)
assert.Equalf(t, "RoundRobinAssigner", result.Protocol, "Expected protocol to be RoundRobinAssigner, not %v", result.Protocol)
assert.Equalf(t, "tLeader-a42d2baa-bfaa-4b96-9ea2-dee5f42b2ab2", result.Leader, "Expected Leader to be tLeader-a42d2baa-bfaa-4b96-9ea2-dee5f42b2ab2, not %v", result.Leader)
assert.Equalf(t, int64(1567112890219), result.CurrentStateTimestamp, "Expected CurrentStateTimestamp to be 1567112890219, not %v", result.CurrentStateTimestamp)
assert.Equalf(t, "", errorAt, "Expected decodeMetadataValueHeaderV2 to return empty errorAt, not %v", errorAt)
}

var decodeMetadataValueHeaderErrors = []errorTestSetBytesWithString{
Expand Down Expand Up @@ -348,6 +372,30 @@ func TestKafkaClient_decodeMetadataMember(t *testing.T) {
assert.Equalf(t, int32(8), result.SessionTimeout, "Expected SessionTimeout to be 8, not %v", result.SessionTimeout)
}

func TestKafkaClient_decodeMetadataMemberV3(t *testing.T) {
metadata := "\x00\x06member\xFF" // memberID
metadata += "\xFF" // groupInstanceID
metadata += "\x00\x06client\x00" // clientID
metadata += "\x0D/10.10.10.100" // clientHost
metadata += "\xFF\xFF\xFF\xFF" // rebalanceTimeout
metadata += "\x00\x00\x75\x30" // seesionTimeout
metadata += "\x00\x00\x00\x12\x00\x01\x00\x00\x00\x01\x00\x06topic1\x00\x00\x00\x00" // subscription
metadata += "\x00\x00\x00\x1A\x00\x01\x00\x00\x00\x01\x00\x06topic1\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00" // assignment
value := []byte(metadata)
valueBuffer := bytes.NewBuffer(value)

result, errorAt := decodeMetadataMember(valueBuffer, 3)

assert.Equalf(t, "", errorAt, "Expected decodeMetadataMember to return empty errorAt, not %v", errorAt)
assert.Equalf(t, "member", result.MemberID, "Expected MemberID to be member, not %v", result.MemberID)
assert.Equalf(t, "", result.GroupInstanceID, "Expected GroupInstanceID to be \"\" not %v", result.GroupInstanceID)
assert.Equalf(t, "client", result.ClientID, "Expected ClientID to be client, not %v", result.ClientID)
assert.Equalf(t, "/10.10.10.100", result.ClientHost, "Expected ClientHost to be /10.10.10.100, not %v", result.ClientHost)
assert.Equalf(t, int32(-1), result.RebalanceTimeout, "Expected RebalanceTimeout to be -1, not %v", result.RebalanceTimeout)
assert.Equalf(t, int32(30000), result.SessionTimeout, "Expected SessionTimeout to be 30000, not %v", result.SessionTimeout)
assert.Equalf(t, []int32{0}, result.Assignment["topic1"], "Expected topic1 assignment to be {0}, not %v", result.Assignment["topic1"])
}

var decodeMetadataMemberErrors = []errorTestSetBytesWithString{
{[]byte("\x00\x0ctestmemb"), "member_id"},
{[]byte("\x00\x0ctestmemberid\x00\x0ctestclie"), "client_id"},
Expand Down

0 comments on commit 842e446

Please sign in to comment.