diff --git a/js.go b/js.go index 429959d92..d6d4b5463 100644 --- a/js.go +++ b/js.go @@ -332,13 +332,21 @@ func ExpectLastMsgId(id string) PubOpt { // MaxWait sets the maximum amount of time we will wait for a response. type MaxWait time.Duration -func (ttl MaxWait) configurePublish(opts *pubOpts) error { +func (ttl MaxWait) configureJSContext(js *js) error { + js.wait = time.Duration(ttl) + return nil +} + +// AckWait sets the maximum amount of time we will wait for an ack. +type AckWait time.Duration + +func (ttl AckWait) configurePublish(opts *pubOpts) error { opts.ttl = time.Duration(ttl) return nil } -func (ttl MaxWait) configureJSContext(js *js) error { - js.wait = time.Duration(ttl) +func (ttl AckWait) configureSubscribe(opts *subOpts) error { + opts.cfg.AckWait = time.Duration(ttl) return nil } @@ -487,6 +495,11 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts [] return nil, ErrPullModeNotAllowed } + badPullAck := o.cfg.AckPolicy == AckNonePolicy || o.cfg.AckPolicy == AckAllPolicy + if isPullMode && badPullAck { + return nil, fmt.Errorf("invalid ack mode for pull consumers: %s", o.cfg.AckPolicy) + } + var ( err error shouldCreate bool @@ -686,18 +699,6 @@ type subOpts struct { cfg *ConsumerConfig } -// Durable defines the consumer name for JetStream durable subscribers. -func Durable(name string) SubOpt { - return subOptFn(func(opts *subOpts) error { - if strings.Contains(name, ".") { - return ErrInvalidDurableName - } - - opts.cfg.Durable = name - return nil - }) -} - // Pull defines the batch size of messages that will be received // when using pull based JetStream consumers. func Pull(batchSize int) SubOpt { @@ -730,6 +731,18 @@ func ManualAck() SubOpt { }) } +// Durable defines the consumer name for JetStream durable subscribers. +func Durable(name string) SubOpt { + return subOptFn(func(opts *subOpts) error { + if strings.Contains(name, ".") { + return ErrInvalidDurableName + } + + opts.cfg.Durable = name + return nil + }) +} + // DeliverAll will configure a Consumer to receive all the // messages from a Stream. func DeliverAll() SubOpt { @@ -798,6 +811,55 @@ func AckExplicit() SubOpt { }) } +func MaxDeliver(n int) SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.MaxDeliver = n + return nil + }) +} + +func PlaybackInstant() SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.ReplayPolicy = ReplayInstant + return nil + }) +} + +func PlaybackOriginal() SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.ReplayPolicy = ReplayOriginal + return nil + }) +} + +func RateLimit(n uint64) SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.RateLimit = n + return nil + }) +} + +func SampleFrequency(s string) SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.SampleFrequency = s + return nil + }) +} + +func PullMaxWaiting(n int) SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.MaxWaiting = n + return nil + }) +} + +func MaxAckPending(n int) SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.MaxAckPending = n + return nil + }) +} + func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) { sub.mu.Lock() // TODO(dlc) - Better way to mark especially if we attach. diff --git a/test/js_test.go b/test/js_test.go index f52466b94..e87e7d5b5 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -211,7 +211,7 @@ func TestJetStreamPublish(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - _, err = js.Publish("foo", msg, nats.MaxWait(time.Second), nats.Context(ctx)) + _, err = js.Publish("foo", msg, nats.AckWait(time.Second), nats.Context(ctx)) if err != nats.ErrContextAndTimeout { t.Fatalf("Expected %q, got %q", nats.ErrContextAndTimeout, err) } @@ -220,7 +220,7 @@ func TestJetStreamPublish(t *testing.T) { sub, _ := nc.SubscribeSync("baz") defer sub.Unsubscribe() - _, err = js.Publish("baz", msg, nats.MaxWait(time.Nanosecond)) + _, err = js.Publish("baz", msg, nats.AckWait(time.Nanosecond)) if err != nats.ErrTimeout { t.Fatalf("Expected %q, got %q", nats.ErrTimeout, err) } @@ -564,6 +564,66 @@ func TestJetStreamSubscribe(t *testing.T) { } } +func TestAckPendingGoesDown(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + js, err := nc.JetStream() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo", "bar", "baz", "foo.*"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + const totalMsgs = 3 + for i := 0; i < totalMsgs; i++ { + if _, err := js.Publish("foo", []byte(fmt.Sprintf("msg %d", i))); err != nil { + t.Fatal(err) + } + } + + sub, err := js.SubscribeSync("foo", nats.Durable("dname"), nats.Pull(totalMsgs), nats.AckExplicit()) + if err != nil { + t.Fatal(err) + } + defer sub.Unsubscribe() + + for i := totalMsgs; i > 0; i-- { + info, err := sub.ConsumerInfo() + if err != nil { + t.Fatal(err) + } + if got, want := info.NumAckPending, i; got != want { + t.Fatalf("unexpected num ack pending: got=%d, want=%d", got, want) + } + + m, err := sub.NextMsg(100 * time.Millisecond) + if err != nil { + t.Fatal("NextMsg:", err) + } + + if err := m.Ack(); err != nil { + t.Fatal("Ack:", err) + } + } +} + func TestJetStream_Drain(t *testing.T) { s := RunBasicJetStreamServer() defer s.Shutdown() @@ -1776,7 +1836,7 @@ func TestJetStreamSubscribe_AckPolicy(t *testing.T) { t.Errorf("Unexpected error: %v", err) } - err = msg.AckSync(nats.MaxWait(2 * time.Second)) + err = msg.AckSync(nats.AckWait(2 * time.Second)) if err != nats.ErrInvalidJSAck { t.Errorf("Unexpected error: %v", err) }