Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alias NextMsg to Fetch for Pull Subscribers #754

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 8 additions & 1 deletion nats.go
Expand Up @@ -3911,7 +3911,14 @@ func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error) {
if s == nil {
return nil, ErrBadSubscription
}

// In case of Pull Subscription, alias NextMsg to Fetch
if s.typ == PullSubscription {
msgs, err := s.Fetch(1, MaxWait(timeout))
if err != nil {
return nil, err
}
return msgs[0], nil
}
s.mu.Lock()
err := s.validateNextMsgState()
if err != nil {
Expand Down
18 changes: 17 additions & 1 deletion test/js_test.go
Expand Up @@ -526,6 +526,20 @@ func TestJetStreamSubscribe(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}

// Test the NextMsg Alias for Fetch
nmsg, err := sub.NextMsg(time.Second * 1)
if err != nil {
t.Fatal(err)
}
if info, _ := sub.ConsumerInfo(); info.NumAckPending != 1 || info.NumPending != uint64(toSend-1) {
t.Fatalf("Expected %d pending ack, and %d still waiting to be delivered, got %d and %d", 1, 1, info.NumAckPending, info.NumPending)
}

// do nat ack the message to not affect rest of the test by alias test
if err := nmsg.Nak(); err != nil {
t.Fatal(err)
}

// The first batch if available should be delivered and queued up.
bmsgs, err := sub.Fetch(batch)
if err != nil {
Expand All @@ -544,7 +558,9 @@ func TestJetStreamSubscribe(t *testing.T) {
t.Fatal(err)
}
}
if info, _ := sub.ConsumerInfo(); info.AckFloor.Consumer != uint64(batch) {

// we expect AckFloor to be equal batch size + 1 from NextMsg alias test
if info, _ := sub.ConsumerInfo(); info.AckFloor.Consumer != uint64(batch+1) {
t.Fatalf("Expected ack floor to be %d, got %d", batch, info.AckFloor.Consumer)
}
waitForPending(t, 0)
Expand Down