/
35_describe_log_dirs.go
70 lines (61 loc) · 1.55 KB
/
35_describe_log_dirs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package kfake
import (
"github.com/twmb/franz-go/pkg/kmsg"
)
func init() { regKey(35, 0, 4) }
func (c *Cluster) handleDescribeLogDirs(b *broker, kreq kmsg.Request) (kmsg.Response, error) {
req := kreq.(*kmsg.DescribeLogDirsRequest)
resp := req.ResponseKind().(*kmsg.DescribeLogDirsResponse)
if err := checkReqVersion(req.Key(), req.Version); err != nil {
return nil, err
}
totalSpace := make(map[string]int64)
individual := make(map[string]map[string]map[int32]int64)
add := func(d string, t string, p int32, s int64) {
totalSpace[d] += s
ts, ok := individual[d]
if !ok {
ts = make(map[string]map[int32]int64)
individual[d] = ts
}
ps, ok := ts[t]
if !ok {
ps = make(map[int32]int64)
ts[t] = ps
}
ps[p] += s
}
if req.Topics == nil {
c.data.tps.each(func(t string, p int32, d *partData) {
add(d.dir, t, p, d.nbytes)
})
} else {
for _, t := range req.Topics {
for _, p := range t.Partitions {
d, ok := c.data.tps.getp(t.Topic, p)
if ok {
add(d.dir, t.Topic, p, d.nbytes)
}
}
}
}
for dir, ts := range individual {
rd := kmsg.NewDescribeLogDirsResponseDir()
rd.Dir = dir
rd.TotalBytes = totalSpace[dir]
rd.UsableBytes = 32 << 30
for t, ps := range ts {
rt := kmsg.NewDescribeLogDirsResponseDirTopic()
rt.Topic = t
for p, s := range ps {
rp := kmsg.NewDescribeLogDirsResponseDirTopicPartition()
rp.Partition = p
rp.Size = s
rt.Partitions = append(rt.Partitions, rp)
}
rd.Topics = append(rd.Topics, rt)
}
resp.Dirs = append(resp.Dirs, rd)
}
return resp, nil
}