forked from segmentio/kafka-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
listgroups.go
78 lines (61 loc) · 1.82 KB
/
listgroups.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package listgroups
import (
"github.com/leedavis81/kafka-go/protocol"
)
func init() {
protocol.Register(&Request{}, &Response{})
}
// Detailed API definition: https://kafka.apache.org/protocol#The_Messages_ListGroups
type Request struct {
_ struct{} `kafka:"min=v0,max=v2"`
brokerID int32
}
func (r *Request) ApiKey() protocol.ApiKey { return protocol.ListGroups }
func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) {
return cluster.Brokers[r.brokerID], nil
}
func (r *Request) Split(cluster protocol.Cluster) (
[]protocol.Message,
protocol.Merger,
error,
) {
messages := []protocol.Message{}
for _, broker := range cluster.Brokers {
messages = append(messages, &Request{brokerID: broker.ID})
}
return messages, new(Response), nil
}
type Response struct {
ThrottleTimeMs int32 `kafka:"min=v1,max=v2"`
ErrorCode int16 `kafka:"min=v0,max=v2"`
Groups []ResponseGroup `kafka:"min=v0,max=v2"`
}
type ResponseGroup struct {
GroupID string `kafka:"min=v0,max=v2"`
ProtocolType string `kafka:"min=v0,max=v2"`
// Use this to store which broker returned the response
BrokerID int32 `kafka:"-"`
}
func (r *Response) ApiKey() protocol.ApiKey { return protocol.ListGroups }
func (r *Response) Merge(requests []protocol.Message, results []interface{}) (
protocol.Message,
error,
) {
response := &Response{}
for r, result := range results {
brokerResp := result.(*Response)
respGroups := []ResponseGroup{}
for _, brokerResp := range brokerResp.Groups {
respGroups = append(
respGroups,
ResponseGroup{
GroupID: brokerResp.GroupID,
ProtocolType: brokerResp.ProtocolType,
BrokerID: requests[r].(*Request).brokerID,
},
)
}
response.Groups = append(response.Groups, brokerResp.Groups...)
}
return response, nil
}