-
Notifications
You must be signed in to change notification settings - Fork 696
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FIXED] Deadlock when accessing subscriptions map on consumer #1671
Conversation
This fixes an issue where a deadlock could occur when calling `Stop()` or `Drain()` on `ConsumeContext` or `MessagesContext` and then calling `Consume` or `Messages` immediately. Switched to using a type-safe implementation of `sync.Map` for subscriptions map instead of locking the whole consumer state. Additionally, changed the type of atomic flags from `uint32` to `atomic.UInt32` to avoid acciental non-atomic reads/writes. Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good.
Just one comment on the type safe sync map.
internal/syncx/map.go
Outdated
return previous.(V), loaded | ||
} | ||
|
||
func (m *Map[K, V]) Len() int { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this is not concurrent safe.
Maybe add additional field, like
count atomic.Int64
and increment/decrement it every time we change the map?
That would also make Len()
more performant, I think for a very low price elsewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having a counter would not work since e.g. when using Store()
we don't know whether the value is added to the map or updated. I agree that it may be racy, I'll think of another solution to not have to use Len()
at all.
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
This fixes an issue where a deadlock could occur when calling `Stop()` or `Drain()` on `ConsumeContext` or `MessagesContext` and then calling `Consume` or `Messages` immediately. Switched to using a type-safe implementation of `sync.Map` for subscriptions map instead of locking the whole consumer state. Additionally, changed the type of atomic flags from `uint32` to `atomic.UInt32` to avoid accidental non-atomic reads/writes. Signed-off-by: Piotr Piotrowski <piotr@synadia.com> --------- Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
This fixes an issue where a deadlock could occur when calling `Stop()` or `Drain()` on `ConsumeContext` or `MessagesContext` and then calling `Consume` or `Messages` immediately. Switched to using a type-safe implementation of `sync.Map` for subscriptions map instead of locking the whole consumer state. Additionally, changed the type of atomic flags from `uint32` to `atomic.UInt32` to avoid accidental non-atomic reads/writes. Signed-off-by: Piotr Piotrowski <piotr@synadia.com> --------- Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
This fixes an issue where a deadlock could occur when calling
Stop()
orDrain()
onConsumeContext
orMessagesContext
and then callingConsume
orMessages
immediately.Switched to using a type-safe implementation of
sync.Map
for subscriptions mapinstead of locking the whole consumer state.
Additionally, changed the type of atomic flags from
uint32
toatomic.UInt32
to avoid acciental non-atomic reads/writes.
Signed-off-by: Piotr Piotrowski piotr@synadia.com