Skip to content

fix(amqp091): handle NewConsumer failure and atomic-load shared counters#84

Open
miotte wants to merge 3 commits into
mainfrom
miotte-pr4
Open

fix(amqp091): handle NewConsumer failure and atomic-load shared counters#84
miotte wants to merge 3 commits into
mainfrom
miotte-pr4

Conversation

@miotte
Copy link
Copy Markdown
Contributor

@miotte miotte commented May 18, 2026

Fixes #90

  • streamSubscribe ignored the error from StreamConnection.NewConsumer and then unconditionally called consumer.Close(); a failed consumer returned (nil, err) and panicked the goroutine. Surface the error as *pb.Error and skip the increment/Close path.
  • Stats() and connectionCleaner read produced/consumed/ActiveStreams as plain int64s while writers use atomic.AddInt64. Switch the readers to atomic.LoadInt64 to remove the data race.

Comment thread internal/provider/connectors/amqp091/amqp091.go Outdated
}

consumer, _ := bd.StreamConnection.NewConsumer(source.GetName(), consumerName, offset, handleMessages, source.GetSingleActiveConsumer())
consumer, consErr := bd.StreamConnection.NewConsumer(source.GetName(), consumerName, offset, handleMessages, source.GetSingleActiveConsumer())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can look at the error from NewConsumer because we get an 'Invalid Offset' error whenever we use 0(the default). We should check consumer for nil though.

miotte added 3 commits May 24, 2026 09:46
- streamSubscribe ignored the error from StreamConnection.NewConsumer and
  then unconditionally called consumer.Close(); a failed consumer returned
  (nil, err) and panicked the goroutine. Surface the error as *pb.Error
  and skip the increment/Close path.
- Stats() and connectionCleaner read produced/consumed/ActiveStreams as
  plain int64s while writers use atomic.AddInt64. Switch the readers to
  atomic.LoadInt64 to remove the data race.

Signed-off-by: Michael Otteni <MichaelGOtteni@gmail.com>
updateLastPubSubEvent wrote bd.lastPubSubEvent as a plain time.Time
from multiple goroutines (publish, subscribe, stream count changes)
while connectionCleaner read it without synchronization. time.Time is
a multi-word struct, so this risked torn reads and triggered the race
detector alongside the ActiveStreams/produced/consumed fields hardened
in the prior commit.

Store the timestamp as an int64 UnixNano and use atomic.StoreInt64 /
atomic.LoadInt64 to match the pattern already used for the sibling
counters on BrokerDetails.

Signed-off-by: Michael Otteni <MichaelGOtteni@gmail.com>
…rror

NewConsumer can return a non-nil error (e.g. "Invalid Offset" with the
default offset) while still handing back a usable consumer, so checking
the error rejected valid subscribes. Gate on consumer == nil instead and
still surface the error string when one is present.

Signed-off-by: Michael Otteni <MichaelGOtteni@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

streamSubscribe panics on NewConsumer failure; unsynced stats counters

3 participants