diff --git a/js.go b/js.go index 94eb69c86..4310512f6 100644 --- a/js.go +++ b/js.go @@ -495,6 +495,11 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts [] return nil, ErrPullModeNotAllowed } + badPullAck := o.cfg.AckPolicy == AckNonePolicy || o.cfg.AckPolicy == AckAllPolicy + if isPullMode && badPullAck { + return nil, fmt.Errorf("invalid ack mode for pull consumers: %s", o.cfg.AckPolicy) + } + var ( err error shouldCreate bool @@ -813,41 +818,6 @@ func MaxDeliver(n int) SubOpt { }) } -func PlaybackInstant() SubOpt { - return subOptFn(func(opts *subOpts) error { - opts.cfg.ReplayPolicy = ReplayInstant - return nil - }) -} - -func PlaybackOriginal() SubOpt { - return subOptFn(func(opts *subOpts) error { - opts.cfg.ReplayPolicy = ReplayOriginal - return nil - }) -} - -func RateLimit(n uint64) SubOpt { - return subOptFn(func(opts *subOpts) error { - opts.cfg.RateLimit = n - return nil - }) -} - -func SampleFrequency(s string) SubOpt { - return subOptFn(func(opts *subOpts) error { - opts.cfg.SampleFrequency = s - return nil - }) -} - -func PullMaxWaiting(n int) SubOpt { - return subOptFn(func(opts *subOpts) error { - opts.cfg.MaxWaiting = n - return nil - }) -} - func MaxAckPending(n int) SubOpt { return subOptFn(func(opts *subOpts) error { opts.cfg.MaxAckPending = n diff --git a/test/js_test.go b/test/js_test.go index b4a7bad0c..5e2bb1881 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -562,6 +562,347 @@ func TestJetStreamSubscribe(t *testing.T) { if _, err := js.SubscribeSync("baz", nats.Durable("test.durable")); err != nats.ErrInvalidDurableName { t.Fatalf("Expected invalid durable name error") } + + ackWait := 1 * time.Millisecond + sub, err = js.SubscribeSync("bar", nats.Durable("ack-wait"), nats.AckWait(ackWait)) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + _, err = sub.NextMsg(1 * time.Second) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + info, err = sub.ConsumerInfo() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if info.Config.AckWait != ackWait { + t.Errorf("Expected %v, got %v", ackWait, info.Config.AckWait) + } +} + +func TestJetStreamAckPending_Pull(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + js, err := nc.JetStream() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + const totalMsgs = 3 + for i := 0; i < totalMsgs; i++ { + if _, err := js.Publish("foo", []byte(fmt.Sprintf("msg %d", i))); err != nil { + t.Fatal(err) + } + } + + sub, err := js.SubscribeSync("foo", + nats.Durable("dname-pull-ack-wait"), + nats.AckWait(100*time.Millisecond), + nats.MaxDeliver(5), + nats.MaxAckPending(3), + nats.Pull(15), + ) + if err != nil { + t.Fatal(err) + } + defer sub.Unsubscribe() + + // 3 messages delivered 5 times. + expected := 15 + timeout := time.Now().Add(2 * time.Second) + pending := 0 + for time.Now().Before(timeout) { + if pending, _, _ = sub.Pending(); pending >= expected { + break + } + time.Sleep(10 * time.Millisecond) + } + if pending < expected { + t.Errorf("Expected %v, got %v", expected, pending) + } + + info, err := sub.ConsumerInfo() + if err != nil { + t.Fatal(err) + } + + got := info.NumRedelivered + expected = 3 + if got < expected { + t.Errorf("Expected %v, got: %v", expected, got) + } + + got = info.NumAckPending + expected = 3 + if got < expected { + t.Errorf("Expected %v, got: %v", expected, got) + } + + got = info.NumWaiting + expected = 0 + if got != expected { + t.Errorf("Expected %v, got: %v", expected, got) + } + + got = int(info.NumPending) + expected = 0 + if got != expected { + t.Errorf("Expected %v, got: %v", expected, got) + } + + got = info.Config.MaxAckPending + expected = 3 + if got != expected { + t.Errorf("Expected %v, got %v", expected, pending) + } + + got = info.Config.MaxDeliver + expected = 5 + if got != expected { + t.Errorf("Expected %v, got %v", expected, pending) + } + + acks := map[int]int{} + + ackPending := 3 + timeout = time.Now().Add(2 * time.Second) + for time.Now().Before(timeout) { + info, err := sub.ConsumerInfo() + if err != nil { + t.Fatal(err) + } + if got, want := info.NumAckPending, ackPending; got > 0 && got != want { + t.Fatalf("unexpected num ack pending: got=%d, want=%d", got, want) + } + + // Continue to ack all messages until no more pending. + pending, _, _ = sub.Pending() + if pending == 0 { + break + } + + m, err := sub.NextMsg(100 * time.Millisecond) + if err != nil { + t.Fatalf("Error getting next message: %v", err) + } + + if err := m.AckSync(); err != nil { + t.Fatalf("Error on ack message: %v", err) + } + + meta, err := m.MetaData() + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + acks[int(meta.Stream)]++ + + if ackPending != 0 { + ackPending-- + } + if int(meta.Pending) != ackPending { + t.Errorf("Expected %v, got %v", ackPending, meta.Pending) + } + } + + got = len(acks) + expected = 3 + if got != expected { + t.Errorf("Expected %v, got %v", expected, got) + } + + expected = 5 + for _, got := range acks { + if got != expected { + t.Errorf("Expected %v, got %v", expected, got) + } + } + + _, err = sub.NextMsg(100 * time.Millisecond) + if err != nats.ErrTimeout { + t.Errorf("Expected timeout, got: %v", err) + } +} + +func TestJetStreamAckPending_Push(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + js, err := nc.JetStream() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + const totalMsgs = 3 + for i := 0; i < totalMsgs; i++ { + if _, err := js.Publish("foo", []byte(fmt.Sprintf("msg %d", i))); err != nil { + t.Fatal(err) + } + } + + sub, err := js.SubscribeSync("foo", + nats.Durable("dname-wait"), + nats.AckWait(100*time.Millisecond), + nats.MaxDeliver(5), + nats.MaxAckPending(3), + ) + if err != nil { + t.Fatal(err) + } + defer sub.Unsubscribe() + + // 3 messages delivered 5 times. + expected := 15 + timeout := time.Now().Add(2 * time.Second) + pending := 0 + for time.Now().Before(timeout) { + if pending, _, _ = sub.Pending(); pending >= expected { + break + } + time.Sleep(10 * time.Millisecond) + } + if pending < expected { + t.Errorf("Expected %v, got %v", expected, pending) + } + + info, err := sub.ConsumerInfo() + if err != nil { + t.Fatal(err) + } + + got := info.NumRedelivered + expected = 3 + if got < expected { + t.Errorf("Expected %v, got: %v", expected, got) + } + + got = info.NumAckPending + expected = 3 + if got < expected { + t.Errorf("Expected %v, got: %v", expected, got) + } + + got = info.NumWaiting + expected = 0 + if got != expected { + t.Errorf("Expected %v, got: %v", expected, got) + } + + got = int(info.NumPending) + expected = 0 + if got != expected { + t.Errorf("Expected %v, got: %v", expected, got) + } + + got = info.Config.MaxAckPending + expected = 3 + if got != expected { + t.Errorf("Expected %v, got %v", expected, pending) + } + + got = info.Config.MaxDeliver + expected = 5 + if got != expected { + t.Errorf("Expected %v, got %v", expected, pending) + } + + acks := map[int]int{} + + ackPending := 3 + timeout = time.Now().Add(2 * time.Second) + for time.Now().Before(timeout) { + info, err := sub.ConsumerInfo() + if err != nil { + t.Fatal(err) + } + if got, want := info.NumAckPending, ackPending; got > 0 && got != want { + t.Fatalf("unexpected num ack pending: got=%d, want=%d", got, want) + } + + // Continue to ack all messages until no more pending. + pending, _, _ = sub.Pending() + if pending == 0 { + break + } + + m, err := sub.NextMsg(100 * time.Millisecond) + if err != nil { + t.Fatalf("Error getting next message: %v", err) + } + + if err := m.AckSync(); err != nil { + t.Fatalf("Error on ack message: %v", err) + } + + meta, err := m.MetaData() + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + acks[int(meta.Stream)]++ + + if ackPending != 0 { + ackPending-- + } + if int(meta.Pending) != ackPending { + t.Errorf("Expected %v, got %v", ackPending, meta.Pending) + } + } + + got = len(acks) + expected = 3 + if got != expected { + t.Errorf("Expected %v, got %v", expected, got) + } + + expected = 5 + for _, got := range acks { + if got != expected { + t.Errorf("Expected %v, got %v", expected, got) + } + } + + _, err = sub.NextMsg(100 * time.Millisecond) + if err != nats.ErrTimeout { + t.Errorf("Expected timeout, got: %v", err) + } } func TestJetStream_Drain(t *testing.T) {