diff --git a/pkg/stream/client.go b/pkg/stream/client.go index 3f79b7cc..c6a5e44d 100644 --- a/pkg/stream/client.go +++ b/pkg/stream/client.go @@ -677,6 +677,10 @@ func (c *Client) queryPublisherSequence(publisherReference string, stream string } func (c *Client) BrokerLeader(stream string) (*Broker, error) { + return c.BrokerLeaderWithResolver(stream, nil) +} + +func (c *Client) BrokerLeaderWithResolver(stream string, resolver *AddressResolver) (*Broker, error) { streamsMetadata := c.metaData(stream) if streamsMetadata == nil { return nil, fmt.Errorf("leader error for stream for stream: %s", stream) @@ -693,6 +697,13 @@ func (c *Client) BrokerLeader(stream string) (*Broker, error) { streamMetadata.Leader.advPort = streamMetadata.Leader.Port streamMetadata.Leader.advHost = streamMetadata.Leader.Host + // If AddressResolver is configured, use it directly and skip DNS lookup + if resolver != nil { + streamMetadata.Leader.Host = resolver.Host + streamMetadata.Leader.Port = strconv.Itoa(resolver.Port) + return streamMetadata.Leader, nil + } + res := net.Resolver{} // see: https://github.com/rabbitmq/rabbitmq-stream-go-client/pull/317 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) @@ -723,6 +734,10 @@ func (c *Client) StreamExists(stream string) bool { return streamMetadata.responseCode == responseCodeOk } func (c *Client) BrokerForConsumer(stream string) (*Broker, error) { + return c.BrokerForConsumerWithResolver(stream, nil) +} + +func (c *Client) BrokerForConsumerWithResolver(stream string, resolver *AddressResolver) (*Broker, error) { streamsMetadata := c.metaData(stream) if streamsMetadata == nil { return nil, fmt.Errorf("leader error for stream: %s", stream) @@ -737,6 +752,35 @@ func (c *Client) BrokerForConsumer(stream string) (*Broker, error) { return nil, LeaderNotReady } + // If AddressResolver is configured, use it for all brokers and skip DNS lookup + if resolver != nil { + brokers := make([]*Broker, 0, 1+len(streamMetadata.Replicas)) + + // Add leader with resolver host/port + leaderBroker := &Broker{ + Host: resolver.Host, + Port: strconv.Itoa(resolver.Port), + } + brokers = append(brokers, leaderBroker) + + // Add replicas with resolver host/port + for idx, replica := range streamMetadata.Replicas { + if replica == nil { + logs.LogWarn("Stream %s replica not ready: %d", stream, idx) + continue + } + replicaBroker := &Broker{ + Host: resolver.Host, + Port: strconv.Itoa(resolver.Port), + } + brokers = append(brokers, replicaBroker) + } + + r := rand.New(rand.NewSource(time.Now().UnixNano())) + n := r.Intn(len(brokers)) + return brokers[n], nil + } + brokers := make([]*Broker, 0, 1+len(streamMetadata.Replicas)) brokers = append(brokers, streamMetadata.Leader) for idx, replica := range streamMetadata.Replicas { diff --git a/pkg/stream/environment.go b/pkg/stream/environment.go index ed39469f..dba458b5 100644 --- a/pkg/stream/environment.go +++ b/pkg/stream/environment.go @@ -664,7 +664,7 @@ func (ps *producersEnvironment) newProducer(clientLocator *Client, streamName st options *ProducerOptions, resolver *AddressResolver, rpcTimeOut time.Duration) (*Producer, error) { ps.mutex.Lock() defer ps.mutex.Unlock() - leader, err := clientLocator.BrokerLeader(streamName) + leader, err := clientLocator.BrokerLeaderWithResolver(streamName, resolver) if err != nil { return nil, err } @@ -728,7 +728,7 @@ func (ps *consumersEnvironment) NewSubscriber(clientLocator *Client, streamName consumerOptions *ConsumerOptions, resolver *AddressResolver, rpcTimeout time.Duration) (*Consumer, error) { ps.mutex.Lock() defer ps.mutex.Unlock() - consumerBroker, err := clientLocator.BrokerForConsumer(streamName) + consumerBroker, err := clientLocator.BrokerForConsumerWithResolver(streamName, resolver) if err != nil { return nil, err }