Skip to content

Commit

Permalink
Merge pull request #726 from nats-io/fix_sub_race
Browse files Browse the repository at this point in the history
[FIXED] Possible data race on auto-unsubscribe
  • Loading branch information
kozlovic committed May 3, 2021
2 parents aa61ea3 + 049c0b5 commit aa78bbf
Showing 1 changed file with 9 additions and 1 deletion.
10 changes: 9 additions & 1 deletion nats.go
Expand Up @@ -3648,10 +3648,11 @@ func (nc *Conn) subscribeLocked(subj, queue string, cb MsgHandler, ch chan *Msg,

// If we have an async callback, start up a sub specific
// Go routine to deliver the messages.
var sr bool
if cb != nil {
sub.typ = AsyncSubscription
sub.pCond = sync.NewCond(&sub.mu)
go nc.waitForMsgs(sub)
sr = true
} else if !isSync {
sub.typ = ChanSubscription
sub.mch = ch
Expand All @@ -3666,6 +3667,11 @@ func (nc *Conn) subscribeLocked(subj, queue string, cb MsgHandler, ch chan *Msg,
nc.subs[sub.sid] = sub
nc.subsMu.Unlock()

// Let's start the go routine now that it is fully setup and registered.
if sr {
go nc.waitForMsgs(sub)
}

// We will send these for all subs when we reconnect
// so that we can suppress here if reconnecting.
if !nc.isReconnecting() {
Expand Down Expand Up @@ -3862,7 +3868,9 @@ func (nc *Conn) unsubscribe(sub *Subscription, max int, drainMode bool) error {

maxStr := _EMPTY_
if max > 0 {
s.mu.Lock()
s.max = uint64(max)
s.mu.Unlock()
maxStr = strconv.Itoa(max)
} else if !drainMode {
nc.removeSub(s)
Expand Down

0 comments on commit aa78bbf

Please sign in to comment.