Skip to content

Commit

Permalink
Merge pull request #694 from nats-io/js-ack-opts
Browse files Browse the repository at this point in the history
js: Add AckOpts for msg.Ack()
  • Loading branch information
wallyqs committed Mar 30, 2021
2 parents c0e12e4 + c433636 commit 7013a58
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 19 deletions.
51 changes: 37 additions & 14 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,15 @@ func ExpectLastMsgId(id string) PubOpt {
})
}

type ackOpts struct {
ttl time.Duration
ctx context.Context
}

type AckOpt interface {
configureAck(opts *ackOpts) error
}

// MaxWait sets the maximum amount of time we will wait for a response.
type MaxWait time.Duration

Expand All @@ -703,6 +712,11 @@ func (ttl AckWait) configureSubscribe(opts *subOpts) error {
return nil
}

func (ttl AckWait) configureAck(opts *ackOpts) error {
opts.ttl = time.Duration(ttl)
return nil
}

// ContextOpt is an option used to set a context.Context.
type ContextOpt struct {
context.Context
Expand All @@ -723,6 +737,11 @@ func (ctx ContextOpt) configurePull(opts *pullOpts) error {
return nil
}

func (ctx ContextOpt) configureAck(opts *ackOpts) error {
opts.ctx = ctx
return nil
}

// Context returns an option that can be used to configure a context.
func Context(ctx context.Context) ContextOpt {
return ContextOpt{ctx}
Expand Down Expand Up @@ -1549,13 +1568,14 @@ func (m *Msg) checkReply() (*js, bool, error) {
// ackReply handles all acks. Will do the right thing for pull and sync mode.
// It ensures that an ack is only sent a single time, regardless of
// how many times it is being called to avoid duplicated acks.
func (m *Msg) ackReply(ackType []byte, sync bool, opts ...PubOpt) error {
var o pubOpts
func (m *Msg) ackReply(ackType []byte, sync bool, opts ...AckOpt) error {
var o ackOpts
for _, opt := range opts {
if err := opt.configurePublish(&o); err != nil {
if err := opt.configureAck(&o); err != nil {
return err
}
}

js, _, err := m.checkReply()
if err != nil {
return err
Expand All @@ -1570,16 +1590,19 @@ func (m *Msg) ackReply(ackType []byte, sync bool, opts ...PubOpt) error {
nc := m.Sub.conn
m.Sub.mu.Unlock()

usesCtx := o.ctx != nil
usesWait := o.ttl > 0
sync = sync || usesCtx || usesWait
ctx := o.ctx
wait := defaultRequestWait
if o.ttl > 0 {
if usesWait {
wait = o.ttl
} else if js != nil {
wait = js.opts.wait
}

if sync {
if ctx != nil {
if usesCtx {
_, err = nc.RequestWithContext(ctx, m.Reply, ackType)
} else {
_, err = nc.Request(m.Reply, ackType, wait)
Expand All @@ -1599,33 +1622,33 @@ func (m *Msg) ackReply(ackType []byte, sync bool, opts ...PubOpt) error {

// Ack acknowledges a message. This tells the server that the message was
// successfully processed and it can move on to the next message.
func (m *Msg) Ack() error {
return m.ackReply(ackAck, false)
func (m *Msg) Ack(opts ...AckOpt) error {
return m.ackReply(ackAck, false, opts...)
}

// Ack is the synchronous version of Ack. This indicates successful message
// processing.
func (m *Msg) AckSync(opts ...PubOpt) error {
func (m *Msg) AckSync(opts ...AckOpt) error {
return m.ackReply(ackAck, true, opts...)
}

// Nak negatively acknowledges a message. This tells the server to redeliver
// the message. You can configure the number of redeliveries by passing
// nats.MaxDeliver when you Subscribe. The default is infinite redeliveries.
func (m *Msg) Nak() error {
return m.ackReply(ackNak, false)
func (m *Msg) Nak(opts ...AckOpt) error {
return m.ackReply(ackNak, false, opts...)
}

// Term tells the server to not redeliver this message, regardless of the value
// of nats.MaxDeliver.
func (m *Msg) Term() error {
return m.ackReply(ackTerm, false)
func (m *Msg) Term(opts ...AckOpt) error {
return m.ackReply(ackTerm, false, opts...)
}

// InProgress tells the server that this message is being worked on. It resets
// the redelivery timer on the server.
func (m *Msg) InProgress() error {
return m.ackReply(ackProgress, false)
func (m *Msg) InProgress(opts ...AckOpt) error {
return m.ackReply(ackProgress, false, opts...)
}

// MsgMetadata is the JetStream metadata associated with received messages.
Expand Down
29 changes: 24 additions & 5 deletions test/js_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2312,7 +2312,7 @@ func TestJetStreamSubscribe_AckPolicy(t *testing.T) {
if got != expected {
t.Errorf("Expected %v, got %v", expected, got)
}
err = msg.Nak()
err = msg.Nak(nats.AckWait(2 * time.Second))
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
Expand All @@ -2331,7 +2331,12 @@ func TestJetStreamSubscribe_AckPolicy(t *testing.T) {
t.Errorf("Unexpected error: %v", err)
}

msg, err = sub.NextMsg(2 * time.Second)
ctx, done := context.WithTimeout(context.Background(), 2*time.Second)
defer done()

// Convert context into nats option.
nctx := nats.Context(ctx)
msg, err = sub.NextMsgWithContext(nctx)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
Expand All @@ -2340,15 +2345,29 @@ func TestJetStreamSubscribe_AckPolicy(t *testing.T) {
if got != expected {
t.Errorf("Expected %v, got %v", expected, got)
}
err = msg.InProgress()
err = msg.Term(nctx)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
err = msg.InProgress()

msg, err = sub.NextMsgWithContext(nctx)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
err = msg.Ack()
got = string(msg.Data)
expected = "i:7"
if got != expected {
t.Errorf("Expected %v, got %v", expected, got)
}
err = msg.InProgress(nctx)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
err = msg.InProgress(nctx)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
err = msg.Ack(nctx)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
Expand Down

0 comments on commit 7013a58

Please sign in to comment.