From b7b8bf7f5d59769a341f73a912e82d0a3329b4ea Mon Sep 17 00:00:00 2001 From: Boris Granveaud Date: Mon, 9 Oct 2023 15:02:46 +0200 Subject: [PATCH] fix requests to change broker config. --- protocol/alterconfigs/alterconfigs.go | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/protocol/alterconfigs/alterconfigs.go b/protocol/alterconfigs/alterconfigs.go index 6c7d0d5db..568785394 100644 --- a/protocol/alterconfigs/alterconfigs.go +++ b/protocol/alterconfigs/alterconfigs.go @@ -1,6 +1,14 @@ package alterconfigs -import "github.com/segmentio/kafka-go/protocol" +import ( + "strconv" + + "github.com/segmentio/kafka-go/protocol" +) + +const ( + resourceTypeBroker int8 = 4 +) func init() { protocol.Register(&Request{}, &Response{}) @@ -15,6 +23,18 @@ type Request struct { func (r *Request) ApiKey() protocol.ApiKey { return protocol.AlterConfigs } func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) { + // Broker alter config requests must be sent to the associated broker + for _, resource := range r.Resources { + if resource.ResourceType == resourceTypeBroker { + brokerID, err := strconv.Atoi(resource.ResourceName) + if err != nil { + return protocol.Broker{}, err + } + + return cluster.Brokers[int32(brokerID)], nil + } + } + return cluster.Brokers[cluster.Controller], nil }