Skip to content

Commit

Permalink
[CHANGED] Generate consumer name in Subscribe() when name is not prov…
Browse files Browse the repository at this point in the history
…ided (#1261)

Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio committed May 11, 2023
1 parent c3cae07 commit d313991
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 26 deletions.
36 changes: 11 additions & 25 deletions js.go
Expand Up @@ -1660,7 +1660,11 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,

// If we are creating or updating let's process that request.
if shouldCreate {
info, err := js.upsertConsumer(stream, cfg.Durable, ccreq.Config)
consName := cfg.Durable
if consName == "" {
consName = nuid.Next()
}
info, err := js.upsertConsumer(stream, consName, ccreq.Config)
if err != nil {
var apiErr *APIError
if ok := errors.As(err, &apiErr); !ok {
Expand Down Expand Up @@ -1963,40 +1967,22 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
cfg.DeliverPolicy = DeliverByStartSequencePolicy
cfg.OptStartSeq = sseq

ccSubj := fmt.Sprintf(apiLegacyConsumerCreateT, jsi.stream)
j, err := json.Marshal(jsi.ccreq)
js := jsi.js
sub.mu.Unlock()

consName := nuid.Next()
cinfo, err := js.upsertConsumer(jsi.stream, consName, cfg)
if err != nil {
pushErr(err)
return
}

resp, err := nc.Request(js.apiSubj(ccSubj), j, js.opts.wait)
if err != nil {
if errors.Is(err, ErrNoResponders) || errors.Is(err, ErrTimeout) {
var apiErr *APIError
if errors.Is(err, ErrJetStreamNotEnabled) || errors.Is(err, ErrTimeout) {
// if creating consumer failed, retry
return
}
pushErr(err)
return
}

var cinfo consumerResponse
err = json.Unmarshal(resp.Data, &cinfo)
if err != nil {
pushErr(err)
return
}

if cinfo.Error != nil {
if cinfo.Error.ErrorCode == JSErrCodeInsufficientResourcesErr {
} else if errors.As(err, &apiErr) && apiErr.ErrorCode == JSErrCodeInsufficientResourcesErr {
// retry for insufficient resources, as it may mean that client is connected to a running
// server in cluster while the server hosting R1 JetStream resources is restarting
return
}
pushErr(cinfo.Error)
pushErr(err)
return
}

Expand Down
2 changes: 1 addition & 1 deletion js_test.go
Expand Up @@ -369,7 +369,7 @@ func TestJetStreamOrderedConsumerDeleteAssets(t *testing.T) {
t.Run("remove consumer, expect it to be recreated", func(t *testing.T) {
createStream()

createConsSub, err := nc.SubscribeSync("$JS.API.CONSUMER.CREATE.OBJECT")
createConsSub, err := nc.SubscribeSync("$JS.API.CONSUMER.CREATE.OBJECT.*.a")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down

0 comments on commit d313991

Please sign in to comment.