Skip to content

Commit

Permalink
Merge b97867d into 0a6b5f6
Browse files Browse the repository at this point in the history
  • Loading branch information
wallyqs committed Jan 14, 2021
2 parents 0a6b5f6 + b97867d commit c8c1d6c
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 10 deletions.
32 changes: 22 additions & 10 deletions js.go
Expand Up @@ -570,7 +570,12 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts []
// Check if we are manual ack.
if cb != nil && !o.mack {
ocb := cb
cb = func(m *Msg) { ocb(m); m.Ack() }
cb = func(m *Msg) {
m.Sub.jsi.autoAck = true
ocb(m)
m.Sub.jsi.autoAck = false
m.Ack()
}
}

sub, err = js.nc.subscribe(deliver, queue, cb, ch, cb == nil, &jsSub{js: js})
Expand Down Expand Up @@ -863,32 +868,37 @@ func (js *js) getConsumerInfo(stream, consumer string) (*ConsumerInfo, error) {
return info.ConsumerInfo, nil
}

func (m *Msg) checkReply() (*js, bool, error) {
func (m *Msg) checkReply() (*js, bool, bool, error) {
if m.Reply == "" {
return nil, false, ErrMsgNoReply
return nil, false, false, ErrMsgNoReply
}
if m == nil || m.Sub == nil {
return nil, false, ErrMsgNotBound
return nil, false, false, ErrMsgNotBound
}
sub := m.Sub
sub.mu.Lock()
if sub.jsi == nil {
sub.mu.Unlock()
return nil, false, ErrNotJSMessage
return nil, false, false, ErrNotJSMessage
}
js := sub.jsi.js
isPullMode := sub.jsi.pull > 0
autoAck := sub.jsi.autoAck
sub.mu.Unlock()

return js, isPullMode, nil
return js, isPullMode, autoAck, nil
}

// ackReply handles all acks. Will do the right thing for pull and sync mode.
func (m *Msg) ackReply(ackType []byte, sync bool) error {
js, isPullMode, err := m.checkReply()
js, isPullMode, autoAck, err := m.checkReply()
if err != nil {
return err
}
// Cannot ack while autoAck is enabled.
if autoAck {
return ErrInvalidJSAck
}
if isPullMode {
if bytes.Equal(ackType, AckAck) {
err = js.nc.PublishRequest(m.Reply, m.Sub.Subject, AckNext)
Expand Down Expand Up @@ -928,12 +938,13 @@ func (m *Msg) Term() error {
return m.ackReply(AckTerm, false)
}

// Indicate that this message is being worked on and reset redelkivery timer in the server.
// InProgress indicates that this message is being worked on
// and reset the redelivery timer in the server.
func (m *Msg) InProgress() error {
return m.ackReply(AckProgress, false)
}

// JetStream metadata associated with received messages.
// MsgMetaData is metadata associated with received messages.
type MsgMetaData struct {
Consumer uint64
Stream uint64
Expand All @@ -942,8 +953,9 @@ type MsgMetaData struct {
Timestamp time.Time
}

// MetaData returns the metadata from a JetStream message.
func (m *Msg) MetaData() (*MsgMetaData, error) {
if _, _, err := m.checkReply(); err != nil {
if _, _, _, err := m.checkReply(); err != nil {
return nil, err
}

Expand Down
1 change: 1 addition & 0 deletions nats.go
Expand Up @@ -542,6 +542,7 @@ type jsSub struct {
stream string
deliver string
pull int
autoAck bool
}

// Msg is a structure used by Subscribers and PublishMsg().
Expand Down
48 changes: 48 additions & 0 deletions test/js_test.go
Expand Up @@ -1349,3 +1349,51 @@ func TestJetStreamSubscribe_AckPolicy(t *testing.T) {
})
}
}

func TestJetStreamSubscribe_AutoAck(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}

nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

js, err := nc.JetStream()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// Create the stream using our client API.
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

js.Publish("foo", []byte("hello"))

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

var msgErr error
_, err = js.Subscribe("foo", func(m *nats.Msg) {
// Should fail and let auto ack handle the ack.
msgErr = m.Ack()
cancel()
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
<-ctx.Done()
if msgErr != nats.ErrInvalidJSAck {
t.Errorf("Expected invalid ack error, got: %v", msgErr)
}
}

0 comments on commit c8c1d6c

Please sign in to comment.