Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions pkg/stream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,10 @@ func (c *Client) queryPublisherSequence(publisherReference string, stream string
}

func (c *Client) BrokerLeader(stream string) (*Broker, error) {
return c.BrokerLeaderWithResolver(stream, nil)
}

func (c *Client) BrokerLeaderWithResolver(stream string, resolver *AddressResolver) (*Broker, error) {
streamsMetadata := c.metaData(stream)
if streamsMetadata == nil {
return nil, fmt.Errorf("leader error for stream for stream: %s", stream)
Expand All @@ -693,6 +697,13 @@ func (c *Client) BrokerLeader(stream string) (*Broker, error) {
streamMetadata.Leader.advPort = streamMetadata.Leader.Port
streamMetadata.Leader.advHost = streamMetadata.Leader.Host

// If AddressResolver is configured, use it directly and skip DNS lookup
if resolver != nil {
streamMetadata.Leader.Host = resolver.Host
streamMetadata.Leader.Port = strconv.Itoa(resolver.Port)
return streamMetadata.Leader, nil
}

res := net.Resolver{}
// see: https://github.com/rabbitmq/rabbitmq-stream-go-client/pull/317
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
Expand Down Expand Up @@ -723,6 +734,10 @@ func (c *Client) StreamExists(stream string) bool {
return streamMetadata.responseCode == responseCodeOk
}
func (c *Client) BrokerForConsumer(stream string) (*Broker, error) {
return c.BrokerForConsumerWithResolver(stream, nil)
}

func (c *Client) BrokerForConsumerWithResolver(stream string, resolver *AddressResolver) (*Broker, error) {
streamsMetadata := c.metaData(stream)
if streamsMetadata == nil {
return nil, fmt.Errorf("leader error for stream: %s", stream)
Expand All @@ -737,6 +752,35 @@ func (c *Client) BrokerForConsumer(stream string) (*Broker, error) {
return nil, LeaderNotReady
}

// If AddressResolver is configured, use it for all brokers and skip DNS lookup
if resolver != nil {
brokers := make([]*Broker, 0, 1+len(streamMetadata.Replicas))

// Add leader with resolver host/port
leaderBroker := &Broker{
Host: resolver.Host,
Port: strconv.Itoa(resolver.Port),
}
brokers = append(brokers, leaderBroker)

// Add replicas with resolver host/port
for idx, replica := range streamMetadata.Replicas {
if replica == nil {
logs.LogWarn("Stream %s replica not ready: %d", stream, idx)
continue
}
replicaBroker := &Broker{
Host: resolver.Host,
Port: strconv.Itoa(resolver.Port),
}
brokers = append(brokers, replicaBroker)
}

r := rand.New(rand.NewSource(time.Now().UnixNano()))
n := r.Intn(len(brokers))
return brokers[n], nil
}

brokers := make([]*Broker, 0, 1+len(streamMetadata.Replicas))
brokers = append(brokers, streamMetadata.Leader)
for idx, replica := range streamMetadata.Replicas {
Expand Down
4 changes: 2 additions & 2 deletions pkg/stream/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ func (ps *producersEnvironment) newProducer(clientLocator *Client, streamName st
options *ProducerOptions, resolver *AddressResolver, rpcTimeOut time.Duration) (*Producer, error) {
ps.mutex.Lock()
defer ps.mutex.Unlock()
leader, err := clientLocator.BrokerLeader(streamName)
leader, err := clientLocator.BrokerLeaderWithResolver(streamName, resolver)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -728,7 +728,7 @@ func (ps *consumersEnvironment) NewSubscriber(clientLocator *Client, streamName
consumerOptions *ConsumerOptions, resolver *AddressResolver, rpcTimeout time.Duration) (*Consumer, error) {
ps.mutex.Lock()
defer ps.mutex.Unlock()
consumerBroker, err := clientLocator.BrokerForConsumer(streamName)
consumerBroker, err := clientLocator.BrokerForConsumerWithResolver(streamName, resolver)
if err != nil {
return nil, err
}
Expand Down
Loading