Skip to content

Commit

Permalink
Merge pull request #683 from nats-io/js-pull-acks
Browse files Browse the repository at this point in the history
Add test for PullSubscribe and types of acks
  • Loading branch information
wallyqs committed Mar 24, 2021
2 parents 2f2e16b + ae7d960 commit 20b5230
Showing 1 changed file with 216 additions and 0 deletions.
216 changes: 216 additions & 0 deletions test/js_test.go
Expand Up @@ -2371,6 +2371,222 @@ func TestJetStreamSubscribe_AckPolicy(t *testing.T) {
})
}

func TestJetStreamPullSubscribe_AckPending(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}

nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

js, err := nc.JetStream()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// Create the stream using our client API.
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

const totalMsgs = 10
for i := 0; i < totalMsgs; i++ {
payload := fmt.Sprintf("i:%d", i)
js.Publish("foo", []byte(payload))
}

sub, err := js.PullSubscribe("foo",
nats.Durable("wq"),
nats.AckWait(200*time.Millisecond),
nats.MaxAckPending(5),
)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

nextMsg := func() *nats.Msg {
msgs, err := sub.Fetch(1)
if err != nil {
t.Fatal(err)
}
return msgs[0]
}

getPending := func() (int, int) {
info, err := sub.ConsumerInfo()
if err != nil {
t.Fatal(err)
}
return info.NumAckPending, int(info.NumPending)
}

getMetaData := func(msg *nats.Msg) *nats.MsgMetaData {
meta, err := msg.MetaData()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
return meta
}

expectedPending := func(inflight int, pending int) {
i, p := getPending()
if i != inflight || p != pending {
t.Errorf("Unexpected inflight/pending msgs: %v/%v", i, p)
}
}

inflight, pending := getPending()
if inflight != 0 || pending != totalMsgs {
t.Errorf("Unexpected inflight/pending msgs: %v/%v", inflight, pending)
}

// Normal Ack should decrease pending
msg := nextMsg()
err = msg.Ack()
if err != nil {
t.Fatal(err)
}

expectedPending(0, 9)
meta := getMetaData(msg)
if meta.Consumer != 1 || meta.Stream != 1 || meta.Delivered != 1 {
t.Errorf("Unexpected metadata: %+v", meta)
}

// AckSync
msg = nextMsg()
err = msg.AckSync()
if err != nil {
t.Fatal(err)
}
expectedPending(0, 8)
meta = getMetaData(msg)
if meta.Consumer != 2 || meta.Stream != 2 || meta.Delivered != 1 {
t.Errorf("Unexpected metadata: %+v", meta)
}

// Nak the message so that it is redelivered.
msg = nextMsg()
err = msg.Nak()
if err != nil {
t.Fatal(err)
}
expectedPending(1, 7)
meta = getMetaData(msg)
if meta.Consumer != 3 || meta.Stream != 3 || meta.Delivered != 1 {
t.Errorf("Unexpected metadata: %+v", meta)
}
prevSeq := meta.Stream
prevPayload := string(msg.Data)

// Nak same sequence again, sequence number should not change.
msg = nextMsg()
err = msg.Nak()
if err != nil {
t.Fatal(err)
}
expectedPending(1, 7)
meta = getMetaData(msg)
if meta.Stream != prevSeq {
t.Errorf("Expected to get message at seq=%v, got seq=%v", prevSeq, meta.Stream)
}
if string(msg.Data) != prevPayload {
t.Errorf("Expected: %q, got: %q", string(prevPayload), string(msg.Data))
}
if meta.Consumer != 4 || meta.Delivered != 2 {
t.Errorf("Unexpected metadata: %+v", meta)
}

// Terminate message so it is no longer pending.
msg = nextMsg()
err = msg.Term()
if err != nil {
t.Fatal(err)
}
expectedPending(0, 7)
meta = getMetaData(msg)
if meta.Stream != prevSeq {
t.Errorf("Expected to get message at seq=%v, got seq=%v", prevSeq, meta.Stream)
}
if string(msg.Data) != prevPayload {
t.Errorf("Expected: %q, got: %q", string(prevPayload), string(msg.Data))
}
if meta.Consumer != 5 || meta.Stream != 3 || meta.Delivered != 3 {
t.Errorf("Unexpected metadata: %+v", meta)
}

// Get next message and ack in progress a few times
msg = nextMsg()
expected := "i:3"
if string(msg.Data) != expected {
t.Errorf("Expected: %q, got: %q", string(msg.Data), expected)
}
err = msg.InProgress()
if err != nil {
t.Fatal(err)
}
err = msg.InProgress()
if err != nil {
t.Fatal(err)
}
expectedPending(1, 6)
meta = getMetaData(msg)
if meta.Consumer != 6 || meta.Stream != 4 || meta.Delivered != 1 {
t.Errorf("Unexpected metadata: %+v", meta)
}

// Now ack the message to mark it as done.
err = msg.AckSync()
if err != nil {
t.Fatal(err)
}
expectedPending(0, 6)

// Fetch next message, but do not ack and wait for redelivery.
msg = nextMsg()
expectedPending(1, 5)
meta = getMetaData(msg)
if meta.Consumer != 7 || meta.Stream != 5 || meta.Delivered != 1 {
t.Errorf("Unexpected metadata: %+v", meta)
}
prevSeq = meta.Stream
time.Sleep(500 * time.Millisecond)
expectedPending(1, 5)

// Next message should be a redelivery.
msg = nextMsg()
expectedPending(1, 5)
meta = getMetaData(msg)
if meta.Consumer != 8 || meta.Stream != prevSeq || meta.Delivered != 2 {
t.Errorf("Unexpected metadata: %+v", meta)
}
err = msg.AckSync()
if err != nil {
t.Fatal(err)
}

// Get rest of messages.
msgs, err := sub.Fetch(5)
if err != nil {
t.Fatal(err)
}
for _, msg := range msgs {
getMetaData(msg)
msg.Ack()
}
expectedPending(0, 0)
}

func TestJetStreamSubscribe_AckDup(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
Expand Down

0 comments on commit 20b5230

Please sign in to comment.