Skip to content

Commit

Permalink
js: Fix issue with concurrent queue subscribers creating consumer
Browse files Browse the repository at this point in the history
Signed-off-by: Waldemar Quevedo <wally@synadia.com>
  • Loading branch information
wallyqs committed Apr 17, 2021
1 parent 1a653a7 commit 8e9d0b5
Show file tree
Hide file tree
Showing 2 changed files with 309 additions and 39 deletions.
59 changes: 43 additions & 16 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -916,10 +916,12 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
err error
shouldCreate bool
ccfg *ConsumerConfig
info *ConsumerInfo
deliver string
attached bool
stream = o.stream
consumer = o.consumer
isDurable = o.cfg.Durable != _EMPTY_
)

// Find the stream mapped to the subject if not bound to a stream already.
Expand All @@ -934,7 +936,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync

// With an explicit durable name, then can lookup
// the consumer to which it should be attaching to.
var info *ConsumerInfo
consumer = o.cfg.Durable
if consumer != _EMPTY_ {
// Only create in case there is no consumer already.
Expand Down Expand Up @@ -1011,7 +1012,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
}

var ccSubj string
isDurable := cfg.Durable != _EMPTY_
if isDurable {
ccSubj = fmt.Sprintf(apiDurableCreateT, stream, cfg.Durable)
} else {
Expand All @@ -1027,28 +1027,55 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
return nil, err
}

var info consumerResponse
err = json.Unmarshal(resp.Data, &info)
var cinfo consumerResponse
err = json.Unmarshal(resp.Data, &cinfo)
if err != nil {
sub.Unsubscribe()
return nil, err
}
if info.Error != nil {
info = cinfo.ConsumerInfo
if cinfo.Error != nil {
// Remove interest from previous subscribe since it
// may have an incorrect delivery subject.
sub.Unsubscribe()
return nil, fmt.Errorf("nats: %s", info.Error.Description)
}

// Hold onto these for later.
sub.jsi.stream = info.Stream
sub.jsi.consumer = info.Name
sub.jsi.deliver = info.Config.DeliverSubject
sub.jsi.durable = isDurable
} else {
sub.jsi.stream = stream
sub.jsi.consumer = consumer
sub.jsi.deliver = ccfg.DeliverSubject
// Multiple subscribers could compete in creating the first consumer
// that will be shared using the same durable name. If this happens, then
// do a lookup of the consumer info and resubscribe using the latest info.
if consumer != _EMPTY_ && strings.Contains(cinfo.Error.Description, `consumer already exists`) {
info, err = js.ConsumerInfo(stream, consumer)
if err != nil && err.Error() != "nats: consumer not found" {
return nil, err
}
ccfg = &info.Config

// Validate that the original subject does still match.
if ccfg.FilterSubject != _EMPTY_ && subj != ccfg.FilterSubject {
return nil, ErrSubjectMismatch
}

// Use the deliver subject from latest consumer config to attach.
if ccfg.DeliverSubject != _EMPTY_ {
sub, err = js.nc.subscribe(ccfg.DeliverSubject, queue, cb, ch, isSync,
&jsSub{js: js, hbs: hasHeartbeats, fc: hasFC})
if err != nil {
return nil, err
}
}
attached = true
} else {
return nil, fmt.Errorf("nats: %s", cinfo.Error.Description)
}
}
stream = info.Stream
consumer = info.Name
deliver = info.Config.DeliverSubject
}
sub.jsi.stream = stream
sub.jsi.consumer = consumer
sub.jsi.durable = isDurable
sub.jsi.attached = attached
sub.jsi.deliver = deliver

return sub, nil
}
Expand Down
Loading

0 comments on commit 8e9d0b5

Please sign in to comment.