Skip to content

Commit

Permalink
Merge c75e584 into f44358c
Browse files Browse the repository at this point in the history
  • Loading branch information
variadico committed Mar 12, 2021
2 parents f44358c + c75e584 commit efe59fe
Show file tree
Hide file tree
Showing 3 changed files with 350 additions and 181 deletions.
27 changes: 26 additions & 1 deletion js.go
Expand Up @@ -127,6 +127,9 @@ type jsOpts struct {
wait time.Duration
// Signals only direct access and no API access.
direct bool

maxTries int
retryBackoff time.Duration
}

const defaultRequestWait = 5 * time.Second
Expand Down Expand Up @@ -192,6 +195,22 @@ func DirectOnly() JSOpt {
})
}

// MaxRetries indicates the number of times to retry a failed operation.
type MaxRetries int

func (v MaxRetries) configureJSContext(opts *jsOpts) error {
opts.maxTries = int(v)
return nil
}

// RetryBackoff indicates how long to wait between retries.
type RetryBackoff time.Duration

func (v RetryBackoff) configureJSContext(opts *jsOpts) error {
opts.retryBackoff = time.Duration(v)
return nil
}

func (js *js) apiSubj(subj string) string {
if js.opts.pre == _EMPTY_ {
return subj
Expand Down Expand Up @@ -899,8 +918,14 @@ func (sub *Subscription) Poll() error {
}

func (js *js) getConsumerInfo(stream, consumer string) (*ConsumerInfo, error) {
ctx, cancel := context.WithTimeout(context.Background(), js.opts.wait)
defer cancel()
return js.getConsumerInfoContext(ctx, stream, consumer)
}

func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer string) (*ConsumerInfo, error) {
ccInfoSubj := fmt.Sprintf(apiConsumerInfoT, stream, consumer)
resp, err := js.nc.Request(js.apiSubj(ccInfoSubj), nil, js.opts.wait)
resp, err := js.nc.RequestWithContext(ctx, js.apiSubj(ccInfoSubj), nil)
if err != nil {
if err == ErrNoResponders {
err = ErrJetStreamNotEnabled
Expand Down

0 comments on commit efe59fe

Please sign in to comment.