Skip to content

Commit

Permalink
Validate ack policy for pull consumers
Browse files Browse the repository at this point in the history
Signed-off-by: Waldemar Quevedo <wally@synadia.com>
  • Loading branch information
nsurfer authored and wallyqs committed Feb 28, 2021
1 parent 924bd70 commit 171c566
Show file tree
Hide file tree
Showing 2 changed files with 346 additions and 35 deletions.
40 changes: 5 additions & 35 deletions js.go
Expand Up @@ -495,6 +495,11 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts []
return nil, ErrPullModeNotAllowed
}

badPullAck := o.cfg.AckPolicy == AckNonePolicy || o.cfg.AckPolicy == AckAllPolicy
if isPullMode && badPullAck {
return nil, fmt.Errorf("invalid ack mode for pull consumers: %s", o.cfg.AckPolicy)
}

var (
err error
shouldCreate bool
Expand Down Expand Up @@ -813,41 +818,6 @@ func MaxDeliver(n int) SubOpt {
})
}

func PlaybackInstant() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.ReplayPolicy = ReplayInstant
return nil
})
}

func PlaybackOriginal() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.ReplayPolicy = ReplayOriginal
return nil
})
}

func RateLimit(n uint64) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.RateLimit = n
return nil
})
}

func SampleFrequency(s string) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.SampleFrequency = s
return nil
})
}

func PullMaxWaiting(n int) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.MaxWaiting = n
return nil
})
}

func MaxAckPending(n int) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.MaxAckPending = n
Expand Down

0 comments on commit 171c566

Please sign in to comment.