diff --git a/js.go b/js.go index 2e1ef2f7f..942e1c78d 100644 --- a/js.go +++ b/js.go @@ -34,6 +34,7 @@ type JetStream interface { // Subscribing to messages in JetStream. Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error) + // Channel versions. ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) // QueueSubscribe. @@ -701,6 +702,53 @@ func ManualAck() SubOpt { }) } +// DeliverAll will configure a Consumer to receive all the +// messages from a Stream. +func DeliverAll() SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.DeliverPolicy = DeliverAllPolicy + return nil + }) +} + +// DeliverLast configures a Consumer to receive messages +// starting with the latest one. +func DeliverLast() SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.DeliverPolicy = DeliverLastPolicy + return nil + }) +} + +// DeliverNew configures a Consumer to receive messages +// published after the subscription. +func DeliverNew() SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.DeliverPolicy = DeliverNewPolicy + return nil + }) +} + +// StartSequence configures a Consumer to receive +// messages from a start sequence. +func StartSequence(seq uint64) SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.DeliverPolicy = DeliverByStartSequencePolicy + opts.cfg.OptStartSeq = seq + return nil + }) +} + +// DeliverFromTime configures a Consumer to receive +// messages from a start time. +func StartTime(startTime time.Time) SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.DeliverPolicy = DeliverByStartTimePolicy + opts.cfg.OptStartTime = &startTime + return nil + }) +} + func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) { sub.mu.Lock() // TODO(dlc) - Better way to mark especially if we attach. @@ -986,28 +1034,39 @@ var ( AckTerm = []byte("+TERM") ) +// DeliverPolicy determines how the consumer should select the first message to deliver. type DeliverPolicy int const ( - DeliverAll DeliverPolicy = iota - DeliverLast - DeliverNew - DeliverByStartSequence - DeliverByStartTime + // DeliverAllPolicy will be the default so can be omitted from the request. + DeliverAllPolicy DeliverPolicy = iota + + // DeliverLastPolicy will start the consumer with the last sequence received. + DeliverLastPolicy + + // DeliverNewPolicy will only deliver new messages that are sent + // after the consumer is created. + DeliverNewPolicy + + // DeliverByStartSequencePolicy will look for a defined starting sequence to start. + DeliverByStartSequencePolicy + + // StartTime will select the first messsage with a timestamp >= to StartTime. + DeliverByStartTimePolicy ) func (p *DeliverPolicy) UnmarshalJSON(data []byte) error { switch string(data) { case jsonString("all"), jsonString("undefined"): - *p = DeliverAll + *p = DeliverAllPolicy case jsonString("last"): - *p = DeliverLast + *p = DeliverLastPolicy case jsonString("new"): - *p = DeliverNew + *p = DeliverNewPolicy case jsonString("by_start_sequence"): - *p = DeliverByStartSequence + *p = DeliverByStartSequencePolicy case jsonString("by_start_time"): - *p = DeliverByStartTime + *p = DeliverByStartTimePolicy } return nil @@ -1015,15 +1074,15 @@ func (p *DeliverPolicy) UnmarshalJSON(data []byte) error { func (p DeliverPolicy) MarshalJSON() ([]byte, error) { switch p { - case DeliverAll: + case DeliverAllPolicy: return json.Marshal("all") - case DeliverLast: + case DeliverLastPolicy: return json.Marshal("last") - case DeliverNew: + case DeliverNewPolicy: return json.Marshal("new") - case DeliverByStartSequence: + case DeliverByStartSequencePolicy: return json.Marshal("by_start_sequence") - case DeliverByStartTime: + case DeliverByStartTimePolicy: return json.Marshal("by_start_time") default: return nil, fmt.Errorf("unknown deliver policy %v", p) diff --git a/test/js_test.go b/test/js_test.go index 277bf7d2f..13147f8e0 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -960,3 +960,89 @@ func TestJetStreamPullBasedStall(t *testing.T) { } } } + +func TestJetStreamSubscribe_DeliverPolicy(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + js, err := nc.JetStream() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Create the stream using our client API. + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + var publishTime time.Time + + for i := 0; i < 10; i++ { + payload := fmt.Sprintf("i:%d", i) + if i == 5 { + publishTime = time.Now() + } + js.Publish("foo", []byte(payload)) + } + + for _, test := range []struct { + name string + subopt nats.SubOpt + expected int + }{ + { + "deliver.all", nats.DeliverAll(), 10, + }, + { + "deliver.last", nats.DeliverLast(), 1, + }, + { + "deliver.new", nats.DeliverNew(), 0, + }, + { + "deliver.starttime", nats.StartTime(publishTime), 5, + }, + { + "deliver.startseq", nats.StartSequence(6), 5, + }, + } { + test := test + t.Run(test.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + got := 0 + sub, err := js.Subscribe("foo", func(m *nats.Msg) { + got++ + if got == test.expected { + cancel() + } + }, test.subopt) + + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + <-ctx.Done() + sub.Drain() + + if got != test.expected { + t.Fatalf("Expected %d, got %d", test.expected, got) + } + }) + } +}