Skip to content

Commit

Permalink
js: Add option to skip consumer lookup on subscribe
Browse files Browse the repository at this point in the history
Signed-off-by: Waldemar Quevedo <wally@nats.io>
  • Loading branch information
wallyqs committed May 11, 2023
1 parent d313991 commit 936d863
Showing 1 changed file with 25 additions and 2 deletions.
27 changes: 25 additions & 2 deletions js.go
Expand Up @@ -1468,6 +1468,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
isDurable = o.cfg.Durable != _EMPTY_
consumerBound = o.bound
ctx = o.ctx
skipCInfo = o.skipCInfo
notFoundErr bool
lookupErr bool
nc = js.nc
Expand Down Expand Up @@ -1542,7 +1543,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
// With an explicit durable name, we can lookup the consumer first
// to which it should be attaching to.
// If bind to ordered consumer is true, skip the lookup.
if consumer != _EMPTY_ {
if consumer != _EMPTY_ && !o.skipCInfo {
info, err = js.ConsumerInfo(stream, consumer)
notFoundErr = errors.Is(err, ErrConsumerNotFound)
lookupErr = err == ErrJetStreamNotEnabled || err == ErrTimeout || err == context.DeadlineExceeded
Expand All @@ -1563,6 +1564,19 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
if !(isPullMode && lookupErr && consumerBound) {
return nil, err
}
case skipCInfo:
// When skipping consumer info, need to rely on the manually passed sub options
// to match the expected behavior from the subscription.
hasFC, hbi = o.cfg.FlowControl, o.cfg.Heartbeat
hasHeartbeats = hbi > 0
maxap = o.cfg.MaxAckPending
deliver = o.cfg.DeliverSubject
if consumerBound {
break
}

// When not bound to a consumer already, proceeed to create.
fallthrough
default:
// Attempt to create consumer if not found nor using Bind.
shouldCreate = true
Expand All @@ -1572,7 +1586,6 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
deliver = nc.NewInbox()
cfg.DeliverSubject = deliver
}

// Do filtering always, server will clear as needed.
cfg.FilterSubject = subj

Expand Down Expand Up @@ -2151,6 +2164,16 @@ type subOpts struct {
// For an ordered consumer.
ordered bool
ctx context.Context

// To disable calling ConsumerInfo
skipCInfo bool
}

func SkipConsumerInfo() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.skipCInfo = true
return nil
})
}

// OrderedConsumer will create a FIFO direct/ephemeral consumer for in order delivery of messages.
Expand Down

0 comments on commit 936d863

Please sign in to comment.