Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue group subscribe/close race-condition creates multiple shadow subscriptions and possible undelivered messages #950

Closed
will-yip opened this issue Oct 10, 2019 · 5 comments · Fixed by #951

Comments

@will-yip
Copy link

I believe there is a race-condition when a lone durable queue subscriber closes its subscription at the same time that a client subscribes to the same queue. The closing thread sees no other subscribers, and converts the active subscription into a shadow subscription. The subscribing thread does not see the shadow subscription, and so creates a new active one. The result is that the queue group has an active subscription as well as a shadow subscription. If the active subscription is closed, we get two shadow subscriptions. If the race is repeated, we can get more than two shadow subscriptions for the same queue group.

If the subscription was closed while a message was being delivered, the message is unacknowledged and in a pending state bound to the shadow subscription. Since the active and shadow subscriptions are unaware of each other, the message is not adopted by the active subscription and remains undelivered until the shadow subscription is adopted in a subsequent 'subscribe' operation. Depending on the pattern of activity and number of shadow subscriptions, it is possible that the shadow subscription will be orphaned forever, and the message is never successfully delivered.

There are barrier operations that appear to prevent a client-side 'close subscription' request from being processed while a 'subscribe' request is being processed. However, if the 'close subscription' request gets past the barrier first, we can get the two requests to race. The race can also occur if a client previously crashed, and the heartbeat-check times out and closes the client's subscription while another client subscribes.

The race-condition window in the 'subscribe' operation appears to be between

qs.Unlock()
setStartPos = false
}
ss.RUnlock()
} else if sr.DurableName != "" {
// Check for DurableSubscriber status
if sub = ss.LookupByDurable(durableKey(sr)); sub != nil {
sub.RLock()
clientID := sub.ClientID
sub.RUnlock()
if clientID != "" {
s.log.Errorf("[Client:%s] Duplicate durable subscription registration", sr.ClientID)
return nil, ErrDupDurable
}
setStartPos = false
}
isDurable = true
}
var (
subStartTrace string
subIsNew bool
)
if sub != nil {
// ok we have a remembered subscription
sub.Lock()
// Set ClientID and new AckInbox but leave LastSent to the
// remembered value.
sub.AckInbox = ackInbox
sub.ClientID = sr.ClientID
sub.Inbox = sr.Inbox
sub.IsDurable = true
// Use some of the new options, but ignore the ones regarding start position
sub.MaxInFlight = sr.MaxInFlight
sub.AckWaitInSecs = sr.AckWaitInSecs
sub.ackWait = computeAckWait(sr.AckWaitInSecs)
sub.stalled = false
if len(sub.acksPending) > 0 {
// We have a durable with pending messages, set newOnHold
// until we have performed the initial redelivery.
sub.newOnHold = true
if !s.isClustered || s.isLeader() {
s.setupAckTimer(sub, sub.ackWait)
}
}
// Clear the IsClosed flags that were set during a Close()
sub.IsClosed = false
// In cluster mode, need to reset this flag.
if s.isClustered {
sub.norepl = false
}
// Reset the hasFailedHB boolean since it may have been set
// if the client previously crashed and server set this
// flag to its subs.
sub.hasFailedHB = false
// Reset initialized for restarting durable subscription.
// Without this (and if there is no message to be redelivered,
// which would set newOnHold), we could be sending avail
// messages before the response has been sent to client library.
sub.initialized = false
sub.Unlock()
// Case of restarted durable subscriber, or first durable queue
// subscriber re-joining a group that was left with pending messages.
err = s.updateDurable(ss, sub, sr.ClientID)
} else {
subIsNew = true
// Create sub here (can be plain, durable or queue subscriber)
sub = &subState{
SubState: spb.SubState{
ClientID: sr.ClientID,
QGroup: sr.QGroup,
Inbox: sr.Inbox,
AckInbox: ackInbox,
MaxInFlight: sr.MaxInFlight,
AckWaitInSecs: sr.AckWaitInSecs,
DurableName: sr.DurableName,
IsDurable: isDurable,
},
subject: sr.Subject,
ackWait: computeAckWait(sr.AckWaitInSecs),
acksPending: make(map[uint64]int64),
store: c.store.Subs,
}
if setStartPos {
// set the start sequence of the subscriber.
var lastSent uint64
subStartTrace, lastSent, err = s.setSubStartSequence(c, sr)
if err == nil {
sub.LastSent = lastSent
}
}
if err == nil {
// add the subscription to stan.
// In cluster mode, the server decides of the subscription ID
// (so that subscriptions have the same ID on replay). So
// set it prior to this call.
sub.ID = subID
err = s.addSubscription(ss, sub)

The race-condition window in the 'close subscription' operation appears to be between

if len(qs.subs) == 0 {
queueGroupIsEmpty = true
// If it was the last being removed, also remove the
// queue group from the subStore map, but only if
// non durable or explicit unsubscribe.
if !isDurable || unsubscribe {
delete(ss.qsubs, qgroup)
// Delete from storage too.
if err := store.DeleteSub(subid); err != nil {
reportError(err)
}
} else {
// Group is durable and last member just left the group,
// but didn't call Unsubscribe(). Need to keep a reference
// to this sub to maintain the state.
qs.shadow = sub

I believe that if the operations are running in both windows at the same time for the same durable queue group subscription, we will get the problem described above.

Thanks!

@kozlovic
Copy link
Member

Thank you for the detailed report. Will have a look asap. Just curious, did you experience the race or this is just from code inspection?

@kozlovic
Copy link
Member

That probably explains #322 which at the time I could not find the explanation for. I have just written a test and can confirm the race. Working on a fix right now. Thank you again for this great report!

@will-yip
Copy link
Author

That's great, thanks!

We encountered the delayed message delivery, and the message was delivered after we restarted NATS. When I investigated, I noticed that the queue group had multiple shadow subscriptions. Our use-case is a bit unusual in that we subscribe, wait to receive a message, and then close the subscription. We do this to control the rate of incoming messages. When we called 'close', our code didn't wait for the operation to complete, and so when we subscribed again, there were cases where the 'close' and 'subscribe' would run in parallel. Basically, our code was written in a way that exercised the race often. I've updated our code to wait for the 'close' operation to complete, but it could still be a problem if we scale up to multiple subscribers.

I really appreciate your efforts. Thanks again!

kozlovic added a commit that referenced this issue Oct 11, 2019
A race between a durable queue subscriber close and create could
cause the server to store multiple shadow subscriptions.

Resolves #950

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
@kozlovic
Copy link
Member

I have a branch up with a possible fix (https://github.com/nats-io/nats-streaming-server/tree/fix_950). If there is any chance you can test it that would be great!

@will-yip
Copy link
Author

I can confirm that your fix resolves the issue. Thanks for the super-fast turnaround!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants