From 728bc1e37bfb13c62fdd6cf872c047b205c41e14 Mon Sep 17 00:00:00 2001 From: Jarema Date: Sat, 19 Jun 2021 19:19:14 +0200 Subject: [PATCH 1/3] Alias NextMsg to Fetch for Pull Subscribers --- nats.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/nats.go b/nats.go index 5f4ddf5e9..9ae73b92f 100644 --- a/nats.go +++ b/nats.go @@ -3911,7 +3911,14 @@ func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error) { if s == nil { return nil, ErrBadSubscription } - + // In case of Pull Subscription, alias NextMsg to Fetch + if s.typ == PullSubscription { + msgs, err := s.Fetch(1, MaxWait(timeout)) + if err != nil { + return nil, err + } + return msgs[0], nil + } s.mu.Lock() err := s.validateNextMsgState() if err != nil { From f452241c065fec7505d5b7380f6402a063312cb4 Mon Sep 17 00:00:00 2001 From: Jarema Date: Sat, 19 Jun 2021 21:31:32 +0200 Subject: [PATCH 2/3] Add test for NextMsg as Fetch alias in Pull Subscribe --- test/js_test.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/test/js_test.go b/test/js_test.go index a9367f698..7335af3f9 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -526,6 +526,19 @@ func TestJetStreamSubscribe(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } + // Test the NextMsg Alias for Fetch + nmsg, err := sub.NextMsg(time.Second * 1) + if err != nil { + t.Fatal(err) + } + if info, _ := sub.ConsumerInfo(); info.NumAckPending != 1 || info.NumPending != uint64(toSend-1) { + t.Fatalf("Expected %d pending ack, and %d still waiting to be delivered, got %d and %d", 1, 1, info.NumAckPending, info.NumPending) + } + + if err := nmsg.Nak(); err != nil { + t.Fatal(err) + } + // The first batch if available should be delivered and queued up. bmsgs, err := sub.Fetch(batch) if err != nil { @@ -544,7 +557,9 @@ func TestJetStreamSubscribe(t *testing.T) { t.Fatal(err) } } - if info, _ := sub.ConsumerInfo(); info.AckFloor.Consumer != uint64(batch) { + + // we expect AckFloor to be equal batch size + 1 from NextMsg alias test + if info, _ := sub.ConsumerInfo(); info.AckFloor.Consumer != uint64(batch+1) { t.Fatalf("Expected ack floor to be %d, got %d", batch, info.AckFloor.Consumer) } waitForPending(t, 0) From dad4664f20262854043f0da096ec5b1e9a2d6e8d Mon Sep 17 00:00:00 2001 From: Jarema Date: Sat, 19 Jun 2021 21:44:23 +0200 Subject: [PATCH 3/3] Add more comments to tests --- test/js_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/test/js_test.go b/test/js_test.go index 7335af3f9..5b88babe5 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -535,6 +535,7 @@ func TestJetStreamSubscribe(t *testing.T) { t.Fatalf("Expected %d pending ack, and %d still waiting to be delivered, got %d and %d", 1, 1, info.NumAckPending, info.NumPending) } + // do nat ack the message to not affect rest of the test by alias test if err := nmsg.Nak(); err != nil { t.Fatal(err) }