/
34_alter_replica_log_dirs.go
53 lines (46 loc) · 1.38 KB
/
34_alter_replica_log_dirs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package kfake
import (
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kmsg"
)
func init() { regKey(34, 0, 2) }
func (c *Cluster) handleAlterReplicaLogDirs(b *broker, kreq kmsg.Request) (kmsg.Response, error) {
req := kreq.(*kmsg.AlterReplicaLogDirsRequest)
resp := req.ResponseKind().(*kmsg.AlterReplicaLogDirsResponse)
if err := checkReqVersion(req.Key(), req.Version); err != nil {
return nil, err
}
tidx := make(map[string]int)
donet := func(t string, errCode int16) *kmsg.AlterReplicaLogDirsResponseTopic {
if i, ok := tidx[t]; ok {
return &resp.Topics[i]
}
tidx[t] = len(resp.Topics)
st := kmsg.NewAlterReplicaLogDirsResponseTopic()
st.Topic = t
resp.Topics = append(resp.Topics, st)
return &resp.Topics[len(resp.Topics)-1]
}
donep := func(t string, p int32, errCode int16) *kmsg.AlterReplicaLogDirsResponseTopicPartition {
sp := kmsg.NewAlterReplicaLogDirsResponseTopicPartition()
sp.Partition = p
sp.ErrorCode = errCode
st := donet(t, 0)
st.Partitions = append(st.Partitions, sp)
return &st.Partitions[len(st.Partitions)-1]
}
for _, rd := range req.Dirs {
for _, t := range rd.Topics {
for _, p := range t.Partitions {
d, ok := c.data.tps.getp(t.Topic, p)
if !ok {
donep(t.Topic, p, kerr.UnknownTopicOrPartition.Code)
continue
}
d.dir = rd.Dir
donep(t.Topic, p, 0)
}
}
}
return resp, nil
}