Skip to content

Commit

Permalink
Create jsOpts
Browse files Browse the repository at this point in the history
  • Loading branch information
nsurfer committed Mar 10, 2021
1 parent 455bf4a commit 3c90329
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 46 deletions.
56 changes: 33 additions & 23 deletions js.go
Expand Up @@ -115,8 +115,12 @@ type JetStreamContext interface {

// js is an internal struct from a JetStreamContext.
type js struct {
nc *Conn
opts *jsOpts
}

type jsOpts struct {
ctx context.Context
nc *Conn
// For importing JetStream from other accounts.
pre string
// Amount of time to wait for API requests.
Expand All @@ -129,15 +133,21 @@ const defaultRequestWait = 5 * time.Second

// JetStream returns a JetStream context for pub/sub interactions.
func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) {
js := &js{nc: nc, pre: defaultAPIPrefix, wait: defaultRequestWait}
js := &js{
nc: nc,
opts: &jsOpts{
pre: defaultAPIPrefix,
wait: defaultRequestWait,
},
}

for _, opt := range opts {
if err := opt.configureJSContext(js); err != nil {
if err := opt.configureJSContext(js.opts); err != nil {
return nil, err
}
}

if js.direct {
if js.opts.direct {
return js, nil
}

Expand All @@ -153,19 +163,19 @@ func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) {

// JSOpt configures a JetStream context.
type JSOpt interface {
configureJSContext(opts *js) error
configureJSContext(opts *jsOpts) error
}

// jsOptFn configures an option for the JetStream context.
type jsOptFn func(opts *js) error
type jsOptFn func(opts *jsOpts) error

func (opt jsOptFn) configureJSContext(opts *js) error {
func (opt jsOptFn) configureJSContext(opts *jsOpts) error {
return opt(opts)
}

// APIPrefix changes the default prefix used for the JetStream API.
func APIPrefix(pre string) JSOpt {
return jsOptFn(func(js *js) error {
return jsOptFn(func(js *jsOpts) error {
js.pre = pre
if !strings.HasSuffix(js.pre, ".") {
js.pre = js.pre + "."
Expand All @@ -176,18 +186,18 @@ func APIPrefix(pre string) JSOpt {

// DirectOnly makes a JetStream context avoid using the JetStream API altogether.
func DirectOnly() JSOpt {
return jsOptFn(func(js *js) error {
return jsOptFn(func(js *jsOpts) error {
js.direct = true
return nil
})
}

func (js *js) apiSubj(subj string) string {
if js.pre == _EMPTY_ {
if js.opts.pre == _EMPTY_ {
return subj
}
var b strings.Builder
b.WriteString(js.pre)
b.WriteString(js.opts.pre)
b.WriteString(subj)
return b.String()
}
Expand Down Expand Up @@ -252,7 +262,7 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
return nil, ErrContextAndTimeout
}
if o.ttl == 0 && o.ctx == nil {
o.ttl = js.wait
o.ttl = js.opts.wait
}

if o.id != _EMPTY_ {
Expand Down Expand Up @@ -336,7 +346,7 @@ func ExpectLastMsgId(id string) PubOpt {
// MaxWait sets the maximum amount of time we will wait for a response.
type MaxWait time.Duration

func (ttl MaxWait) configureJSContext(js *js) error {
func (ttl MaxWait) configureJSContext(js *jsOpts) error {
js.wait = time.Duration(ttl)
return nil
}
Expand Down Expand Up @@ -364,7 +374,7 @@ func (ctx ContextOpt) configurePublish(opts *pubOpts) error {
return nil
}

func (ctx ContextOpt) configureJSContext(opts *js) error {
func (ctx ContextOpt) configureJSContext(opts *jsOpts) error {
opts.ctx = ctx
return nil
}
Expand Down Expand Up @@ -442,7 +452,7 @@ func (jsi *jsSub) unsubscribe(drainMode bool) error {

// Skip if in direct mode as well.
js := jsi.js
if js.direct {
if js.opts.direct {
return nil
}

Expand Down Expand Up @@ -520,11 +530,11 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts []
requiresAPI = (stream == _EMPTY_ && consumer == _EMPTY_) && o.cfg.DeliverSubject == _EMPTY_
)

if js.direct && requiresAPI {
if js.opts.direct && requiresAPI {
return nil, ErrDirectModeRequired
}

if js.direct {
if js.opts.direct {
if o.cfg.DeliverSubject != _EMPTY_ {
deliver = o.cfg.DeliverSubject
} else {
Expand Down Expand Up @@ -619,7 +629,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts []
ccSubj = fmt.Sprintf(apiConsumerCreateT, stream)
}

resp, err := js.nc.Request(js.apiSubj(ccSubj), j, js.wait)
resp, err := js.nc.Request(js.apiSubj(ccSubj), j, js.opts.wait)
if err != nil {
if err == ErrNoResponders {
err = ErrJetStreamNotEnabled
Expand Down Expand Up @@ -647,7 +657,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts []
} else {
sub.jsi.stream = stream
sub.jsi.consumer = consumer
if js.direct {
if js.opts.direct {
sub.jsi.deliver = o.cfg.DeliverSubject
} else {
sub.jsi.deliver = ccfg.DeliverSubject
Expand Down Expand Up @@ -681,7 +691,7 @@ func (js *js) lookupStreamBySubject(subj string) (string, error) {
if err != nil {
return _EMPTY_, err
}
resp, err := js.nc.Request(js.apiSubj(apiStreams), j, js.wait)
resp, err := js.nc.Request(js.apiSubj(apiStreams), j, js.opts.wait)
if err != nil {
if err == ErrNoResponders {
err = ErrJetStreamNotEnabled
Expand Down Expand Up @@ -860,7 +870,7 @@ func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {

// Consumer info lookup should fail if in direct mode.
js := sub.jsi.js
if js.direct {
if js.opts.direct {
sub.mu.Unlock()
return nil, ErrDirectModeRequired
}
Expand Down Expand Up @@ -890,7 +900,7 @@ func (sub *Subscription) Poll() error {

func (js *js) getConsumerInfo(stream, consumer string) (*ConsumerInfo, error) {
ccInfoSubj := fmt.Sprintf(apiConsumerInfoT, stream, consumer)
resp, err := js.nc.Request(js.apiSubj(ccInfoSubj), nil, js.wait)
resp, err := js.nc.Request(js.apiSubj(ccInfoSubj), nil, js.opts.wait)
if err != nil {
if err == ErrNoResponders {
err = ErrJetStreamNotEnabled
Expand Down Expand Up @@ -959,7 +969,7 @@ func (m *Msg) ackReply(ackType []byte, sync bool, opts ...PubOpt) error {
if o.ttl > 0 {
wait = o.ttl
} else if js != nil {
wait = js.wait
wait = js.opts.wait
}

if isPullMode {
Expand Down
45 changes: 22 additions & 23 deletions jsm.go
Expand Up @@ -172,7 +172,7 @@ type accountInfoResponse struct {

// AccountInfo retrieves info about the JetStream usage from the current account.
func (js *js) AccountInfo() (*AccountInfo, error) {
resp, err := js.nc.Request(js.apiSubj(apiAccountInfo), nil, js.wait)
resp, err := js.nc.Request(js.apiSubj(apiAccountInfo), nil, js.opts.wait)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -223,7 +223,7 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, er
ccSubj = fmt.Sprintf(apiConsumerCreateT, stream)
}

resp, err := js.nc.Request(js.apiSubj(ccSubj), req, js.wait)
resp, err := js.nc.Request(js.apiSubj(ccSubj), req, js.opts.wait)
if err != nil {
if err == ErrNoResponders {
err = ErrJetStreamNotEnabled
Expand Down Expand Up @@ -254,7 +254,7 @@ func (js *js) DeleteConsumer(stream, consumer string) error {
}

dcSubj := js.apiSubj(fmt.Sprintf(apiConsumerDeleteT, stream, consumer))
r, err := js.nc.Request(dcSubj, nil, js.wait)
r, err := js.nc.Request(dcSubj, nil, js.opts.wait)
if err != nil {
return err
}
Expand Down Expand Up @@ -318,7 +318,7 @@ func (c *consumerLister) Next() bool {
return false
}
clSubj := c.js.apiSubj(fmt.Sprintf(apiConsumerListT, c.stream))
r, err := c.js.nc.Request(clSubj, req, c.js.wait)
r, err := c.js.nc.Request(clSubj, req, c.js.opts.wait)
if err != nil {
c.err = err
return false
Expand Down Expand Up @@ -357,7 +357,7 @@ func (jsc *js) ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo
}

ch := make(chan *ConsumerInfo)
l := &consumerLister{js: o, stream: stream}
l := &consumerLister{js: &js{nc: jsc.nc, opts: o}, stream: stream}
go func() {
defer func() {
if cancel != nil {
Expand Down Expand Up @@ -410,7 +410,7 @@ func (c *consumerNamesLister) Next() bool {
}

clSubj := c.js.apiSubj(fmt.Sprintf(apiConsumerNamesT, c.stream))
r, err := c.js.nc.Request(clSubj, nil, c.js.wait)
r, err := c.js.nc.Request(clSubj, nil, c.js.opts.wait)
if err != nil {
c.err = err
return false
Expand Down Expand Up @@ -445,7 +445,7 @@ func (c *consumerNamesLister) Err() error {
func (js *js) ConsumerNames(ctx context.Context, stream string) <-chan string {
var cancel context.CancelFunc
if ctx == nil {
ctx, cancel = context.WithTimeout(context.Background(), js.wait)
ctx, cancel = context.WithTimeout(context.Background(), js.opts.wait)
}

ch := make(chan string)
Expand Down Expand Up @@ -488,7 +488,7 @@ func (js *js) AddStream(cfg *StreamConfig) (*StreamInfo, error) {
}

csSubj := js.apiSubj(fmt.Sprintf(apiStreamCreateT, cfg.Name))
r, err := js.nc.Request(csSubj, req, js.wait)
r, err := js.nc.Request(csSubj, req, js.opts.wait)
if err != nil {
return nil, err
}
Expand All @@ -506,7 +506,7 @@ type streamInfoResponse = streamCreateResponse

func (js *js) StreamInfo(stream string) (*StreamInfo, error) {
csSubj := js.apiSubj(fmt.Sprintf(apiStreamInfoT, stream))
r, err := js.nc.Request(csSubj, nil, js.wait)
r, err := js.nc.Request(csSubj, nil, js.opts.wait)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -578,7 +578,7 @@ func (js *js) UpdateStream(cfg *StreamConfig) (*StreamInfo, error) {
}

usSubj := js.apiSubj(fmt.Sprintf(apiStreamUpdateT, cfg.Name))
r, err := js.nc.Request(usSubj, req, js.wait)
r, err := js.nc.Request(usSubj, req, js.opts.wait)
if err != nil {
return nil, err
}
Expand All @@ -605,7 +605,7 @@ func (js *js) DeleteStream(name string) error {
}

dsSubj := js.apiSubj(fmt.Sprintf(apiStreamDeleteT, name))
r, err := js.nc.Request(dsSubj, nil, js.wait)
r, err := js.nc.Request(dsSubj, nil, js.opts.wait)
if err != nil {
return err
}
Expand Down Expand Up @@ -660,7 +660,7 @@ func (js *js) GetMsg(name string, seq uint64) (*RawStreamMsg, error) {
}

dsSubj := js.apiSubj(fmt.Sprintf(apiMsgGetT, name))
r, err := js.nc.Request(dsSubj, req, js.wait)
r, err := js.nc.Request(dsSubj, req, js.opts.wait)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -714,7 +714,7 @@ func (js *js) DeleteMsg(name string, seq uint64) error {
}

dsSubj := js.apiSubj(fmt.Sprintf(apiMsgDeleteT, name))
r, err := js.nc.Request(dsSubj, req, js.wait)
r, err := js.nc.Request(dsSubj, req, js.opts.wait)
if err != nil {
return err
}
Expand All @@ -737,7 +737,7 @@ type streamPurgeResponse struct {
// PurgeStream purges messages on a Stream.
func (js *js) PurgeStream(name string) error {
psSubj := js.apiSubj(fmt.Sprintf(apiStreamPurgeT, name))
r, err := js.nc.Request(psSubj, nil, js.wait)
r, err := js.nc.Request(psSubj, nil, js.opts.wait)
if err != nil {
return err
}
Expand Down Expand Up @@ -795,7 +795,7 @@ func (s *streamLister) Next() bool {
}

slSubj := s.js.apiSubj(apiStreamList)
r, err := s.js.nc.Request(slSubj, req, s.js.wait)
r, err := s.js.nc.Request(slSubj, req, s.js.opts.wait)
if err != nil {
s.err = err
return false
Expand Down Expand Up @@ -834,7 +834,7 @@ func (jsc *js) StreamsInfo(opts ...JSOpt) <-chan *StreamInfo {
}

ch := make(chan *StreamInfo)
l := &streamLister{js: o}
l := &streamLister{js: &js{nc: jsc.nc, opts: o}}
go func() {
defer func() {
if cancel != nil {
Expand Down Expand Up @@ -874,7 +874,7 @@ func (l *streamNamesLister) Next() bool {
return false
}

r, err := l.js.nc.Request(l.js.apiSubj(apiStreams), nil, l.js.wait)
r, err := l.js.nc.Request(l.js.apiSubj(apiStreams), nil, l.js.opts.wait)
if err != nil {
l.err = err
return false
Expand Down Expand Up @@ -909,7 +909,7 @@ func (l *streamNamesLister) Err() error {
func (js *js) StreamNames(ctx context.Context) <-chan string {
var cancel context.CancelFunc
if ctx == nil {
ctx, cancel = context.WithTimeout(context.Background(), js.wait)
ctx, cancel = context.WithTimeout(context.Background(), js.opts.wait)
}

ch := make(chan string)
Expand All @@ -935,8 +935,8 @@ func (js *js) StreamNames(ctx context.Context) <-chan string {
return ch
}

func getJSContextOpts(defs *js, opts ...JSOpt) (*js, context.CancelFunc, error) {
var o js
func getJSContextOpts(defs *js, opts ...JSOpt) (*jsOpts, context.CancelFunc, error) {
var o jsOpts
for _, opt := range opts {
if err := opt.configureJSContext(&o); err != nil {
return nil, nil, err
Expand All @@ -948,16 +948,15 @@ func getJSContextOpts(defs *js, opts ...JSOpt) (*js, context.CancelFunc, error)
return nil, nil, ErrContextAndTimeout
}
if o.wait == 0 && o.ctx == nil {
o.wait = defs.wait
o.wait = defs.opts.wait
}
var cancel context.CancelFunc
if o.ctx == nil && o.wait > 0 {
o.ctx, cancel = context.WithTimeout(context.Background(), o.wait)
}
if o.pre == "" {
o.pre = defs.pre
o.pre = defs.opts.pre
}
o.nc = defs.nc

return &o, cancel, nil
}

0 comments on commit 3c90329

Please sign in to comment.