Skip to content

Commit

Permalink
Merge ec57987 into ed74374
Browse files Browse the repository at this point in the history
  • Loading branch information
variadico committed Feb 28, 2021
2 parents ed74374 + ec57987 commit 215b14f
Show file tree
Hide file tree
Showing 2 changed files with 423 additions and 18 deletions.
94 changes: 79 additions & 15 deletions js.go
Expand Up @@ -332,13 +332,21 @@ func ExpectLastMsgId(id string) PubOpt {
// MaxWait sets the maximum amount of time we will wait for a response.
type MaxWait time.Duration

func (ttl MaxWait) configurePublish(opts *pubOpts) error {
func (ttl MaxWait) configureJSContext(js *js) error {
js.wait = time.Duration(ttl)
return nil
}

// AckWait sets the maximum amount of time we will wait for an ack.
type AckWait time.Duration

func (ttl AckWait) configurePublish(opts *pubOpts) error {
opts.ttl = time.Duration(ttl)
return nil
}

func (ttl MaxWait) configureJSContext(js *js) error {
js.wait = time.Duration(ttl)
func (ttl AckWait) configureSubscribe(opts *subOpts) error {
opts.cfg.AckWait = time.Duration(ttl)
return nil
}

Expand Down Expand Up @@ -487,6 +495,11 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts []
return nil, ErrPullModeNotAllowed
}

badPullAck := o.cfg.AckPolicy == AckNonePolicy || o.cfg.AckPolicy == AckAllPolicy
if isPullMode && badPullAck {
return nil, fmt.Errorf("invalid ack mode for pull consumers: %s", o.cfg.AckPolicy)
}

var (
err error
shouldCreate bool
Expand Down Expand Up @@ -686,18 +699,6 @@ type subOpts struct {
cfg *ConsumerConfig
}

// Durable defines the consumer name for JetStream durable subscribers.
func Durable(name string) SubOpt {
return subOptFn(func(opts *subOpts) error {
if strings.Contains(name, ".") {
return ErrInvalidDurableName
}

opts.cfg.Durable = name
return nil
})
}

// Pull defines the batch size of messages that will be received
// when using pull based JetStream consumers.
func Pull(batchSize int) SubOpt {
Expand Down Expand Up @@ -730,6 +731,18 @@ func ManualAck() SubOpt {
})
}

// Durable defines the consumer name for JetStream durable subscribers.
func Durable(name string) SubOpt {
return subOptFn(func(opts *subOpts) error {
if strings.Contains(name, ".") {
return ErrInvalidDurableName
}

opts.cfg.Durable = name
return nil
})
}

// DeliverAll will configure a Consumer to receive all the
// messages from a Stream.
func DeliverAll() SubOpt {
Expand Down Expand Up @@ -798,6 +811,55 @@ func AckExplicit() SubOpt {
})
}

func MaxDeliver(n int) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.MaxDeliver = n
return nil
})
}

func PlaybackInstant() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.ReplayPolicy = ReplayInstant
return nil
})
}

func PlaybackOriginal() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.ReplayPolicy = ReplayOriginal
return nil
})
}

func RateLimit(n uint64) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.RateLimit = n
return nil
})
}

func SampleFrequency(s string) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.SampleFrequency = s
return nil
})
}

func PullMaxWaiting(n int) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.MaxWaiting = n
return nil
})
}

func MaxAckPending(n int) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.MaxAckPending = n
return nil
})
}

func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
sub.mu.Lock()
// TODO(dlc) - Better way to mark especially if we attach.
Expand Down Expand Up @@ -924,6 +986,8 @@ func (m *Msg) ackReply(ackType []byte, sync bool, opts ...PubOpt) error {
}
}
} else if sync {
// The response is just an empty payload which servers as a signal,
// to continue so ok to omit the response.
if ctx != nil {
_, err = nc.RequestWithContext(ctx, m.Reply, ackType)
} else {
Expand Down

0 comments on commit 215b14f

Please sign in to comment.