Skip to content

Commit

Permalink
Create jsOpts
Browse files Browse the repository at this point in the history
  • Loading branch information
nsurfer committed Mar 11, 2021
1 parent 455bf4a commit 127790f
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 46 deletions.
56 changes: 33 additions & 23 deletions js.go
Expand Up @@ -115,8 +115,12 @@ type JetStreamContext interface {

// js is an internal struct from a JetStreamContext.
type js struct {
nc *Conn
opts *jsOpts
}

type jsOpts struct {
ctx context.Context
nc *Conn
// For importing JetStream from other accounts.
pre string
// Amount of time to wait for API requests.
Expand All @@ -129,15 +133,21 @@ const defaultRequestWait = 5 * time.Second

// JetStream returns a JetStream context for pub/sub interactions.
func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) {
js := &js{nc: nc, pre: defaultAPIPrefix, wait: defaultRequestWait}
js := &js{
nc: nc,
opts: &jsOpts{
pre: defaultAPIPrefix,
wait: defaultRequestWait,
},
}

for _, opt := range opts {
if err := opt.configureJSContext(js); err != nil {
if err := opt.configureJSContext(js.opts); err != nil {
return nil, err
}
}

if js.direct {
if js.opts.direct {
return js, nil
}

Expand All @@ -153,19 +163,19 @@ func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) {

// JSOpt configures a JetStream context.
type JSOpt interface {
configureJSContext(opts *js) error
configureJSContext(opts *jsOpts) error
}

// jsOptFn configures an option for the JetStream context.
type jsOptFn func(opts *js) error
type jsOptFn func(opts *jsOpts) error

func (opt jsOptFn) configureJSContext(opts *js) error {
func (opt jsOptFn) configureJSContext(opts *jsOpts) error {
return opt(opts)
}

// APIPrefix changes the default prefix used for the JetStream API.
func APIPrefix(pre string) JSOpt {
return jsOptFn(func(js *js) error {
return jsOptFn(func(js *jsOpts) error {
js.pre = pre
if !strings.HasSuffix(js.pre, ".") {
js.pre = js.pre + "."
Expand All @@ -176,18 +186,18 @@ func APIPrefix(pre string) JSOpt {

// DirectOnly makes a JetStream context avoid using the JetStream API altogether.
func DirectOnly() JSOpt {
return jsOptFn(func(js *js) error {
return jsOptFn(func(js *jsOpts) error {
js.direct = true
return nil
})
}

func (js *js) apiSubj(subj string) string {
if js.pre == _EMPTY_ {
if js.opts.pre == _EMPTY_ {
return subj
}
var b strings.Builder
b.WriteString(js.pre)
b.WriteString(js.opts.pre)
b.WriteString(subj)
return b.String()
}
Expand Down Expand Up @@ -252,7 +262,7 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
return nil, ErrContextAndTimeout
}
if o.ttl == 0 && o.ctx == nil {
o.ttl = js.wait
o.ttl = js.opts.wait
}

if o.id != _EMPTY_ {
Expand Down Expand Up @@ -336,7 +346,7 @@ func ExpectLastMsgId(id string) PubOpt {
// MaxWait sets the maximum amount of time we will wait for a response.
type MaxWait time.Duration

func (ttl MaxWait) configureJSContext(js *js) error {
func (ttl MaxWait) configureJSContext(js *jsOpts) error {
js.wait = time.Duration(ttl)
return nil
}
Expand Down Expand Up @@ -364,7 +374,7 @@ func (ctx ContextOpt) configurePublish(opts *pubOpts) error {
return nil
}

func (ctx ContextOpt) configureJSContext(opts *js) error {
func (ctx ContextOpt) configureJSContext(opts *jsOpts) error {
opts.ctx = ctx
return nil
}
Expand Down Expand Up @@ -442,7 +452,7 @@ func (jsi *jsSub) unsubscribe(drainMode bool) error {

// Skip if in direct mode as well.
js := jsi.js
if js.direct {
if js.opts.direct {
return nil
}

Expand Down Expand Up @@ -520,11 +530,11 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts []
requiresAPI = (stream == _EMPTY_ && consumer == _EMPTY_) && o.cfg.DeliverSubject == _EMPTY_
)

if js.direct && requiresAPI {
if js.opts.direct && requiresAPI {
return nil, ErrDirectModeRequired
}

if js.direct {
if js.opts.direct {
if o.cfg.DeliverSubject != _EMPTY_ {
deliver = o.cfg.DeliverSubject
} else {
Expand Down Expand Up @@ -619,7 +629,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts []
ccSubj = fmt.Sprintf(apiConsumerCreateT, stream)
}

resp, err := js.nc.Request(js.apiSubj(ccSubj), j, js.wait)
resp, err := js.nc.Request(js.apiSubj(ccSubj), j, js.opts.wait)
if err != nil {
if err == ErrNoResponders {
err = ErrJetStreamNotEnabled
Expand Down Expand Up @@ -647,7 +657,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts []
} else {
sub.jsi.stream = stream
sub.jsi.consumer = consumer
if js.direct {
if js.opts.direct {
sub.jsi.deliver = o.cfg.DeliverSubject
} else {
sub.jsi.deliver = ccfg.DeliverSubject
Expand Down Expand Up @@ -681,7 +691,7 @@ func (js *js) lookupStreamBySubject(subj string) (string, error) {
if err != nil {
return _EMPTY_, err
}
resp, err := js.nc.Request(js.apiSubj(apiStreams), j, js.wait)
resp, err := js.nc.Request(js.apiSubj(apiStreams), j, js.opts.wait)
if err != nil {
if err == ErrNoResponders {
err = ErrJetStreamNotEnabled
Expand Down Expand Up @@ -860,7 +870,7 @@ func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {

// Consumer info lookup should fail if in direct mode.
js := sub.jsi.js
if js.direct {
if js.opts.direct {
sub.mu.Unlock()
return nil, ErrDirectModeRequired
}
Expand Down Expand Up @@ -890,7 +900,7 @@ func (sub *Subscription) Poll() error {

func (js *js) getConsumerInfo(stream, consumer string) (*ConsumerInfo, error) {
ccInfoSubj := fmt.Sprintf(apiConsumerInfoT, stream, consumer)
resp, err := js.nc.Request(js.apiSubj(ccInfoSubj), nil, js.wait)
resp, err := js.nc.Request(js.apiSubj(ccInfoSubj), nil, js.opts.wait)
if err != nil {
if err == ErrNoResponders {
err = ErrJetStreamNotEnabled
Expand Down Expand Up @@ -959,7 +969,7 @@ func (m *Msg) ackReply(ackType []byte, sync bool, opts ...PubOpt) error {
if o.ttl > 0 {
wait = o.ttl
} else if js != nil {
wait = js.wait
wait = js.opts.wait
}

if isPullMode {
Expand Down

0 comments on commit 127790f

Please sign in to comment.