diff --git a/nats.go b/nats.go index 5f4ddf5e9..9ae73b92f 100644 --- a/nats.go +++ b/nats.go @@ -3911,7 +3911,14 @@ func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error) { if s == nil { return nil, ErrBadSubscription } - + // In case of Pull Subscription, alias NextMsg to Fetch + if s.typ == PullSubscription { + msgs, err := s.Fetch(1, MaxWait(timeout)) + if err != nil { + return nil, err + } + return msgs[0], nil + } s.mu.Lock() err := s.validateNextMsgState() if err != nil { diff --git a/test/js_test.go b/test/js_test.go index a9367f698..5b88babe5 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -526,6 +526,20 @@ func TestJetStreamSubscribe(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } + // Test the NextMsg Alias for Fetch + nmsg, err := sub.NextMsg(time.Second * 1) + if err != nil { + t.Fatal(err) + } + if info, _ := sub.ConsumerInfo(); info.NumAckPending != 1 || info.NumPending != uint64(toSend-1) { + t.Fatalf("Expected %d pending ack, and %d still waiting to be delivered, got %d and %d", 1, 1, info.NumAckPending, info.NumPending) + } + + // do nat ack the message to not affect rest of the test by alias test + if err := nmsg.Nak(); err != nil { + t.Fatal(err) + } + // The first batch if available should be delivered and queued up. bmsgs, err := sub.Fetch(batch) if err != nil { @@ -544,7 +558,9 @@ func TestJetStreamSubscribe(t *testing.T) { t.Fatal(err) } } - if info, _ := sub.ConsumerInfo(); info.AckFloor.Consumer != uint64(batch) { + + // we expect AckFloor to be equal batch size + 1 from NextMsg alias test + if info, _ := sub.ConsumerInfo(); info.AckFloor.Consumer != uint64(batch+1) { t.Fatalf("Expected ack floor to be %d, got %d", batch, info.AckFloor.Consumer) } waitForPending(t, 0)