Skip to content

Commit

Permalink
Merge pull request #627 from wallyqs/js-deliver-opt
Browse files Browse the repository at this point in the history
js: Add option to configure deliver policy from consumer
  • Loading branch information
wallyqs committed Dec 18, 2020
2 parents a3519e1 + 86a16bb commit b4450fb
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 15 deletions.
89 changes: 74 additions & 15 deletions js.go
Expand Up @@ -34,6 +34,7 @@ type JetStream interface {
// Subscribing to messages in JetStream.
Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error)
SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error)

// Channel versions.
ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error)
// QueueSubscribe.
Expand Down Expand Up @@ -701,6 +702,53 @@ func ManualAck() SubOpt {
})
}

// DeliverAll will configure a Consumer to receive all the
// messages from a Stream.
func DeliverAll() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.DeliverPolicy = DeliverAllPolicy
return nil
})
}

// DeliverLast configures a Consumer to receive messages
// starting with the latest one.
func DeliverLast() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.DeliverPolicy = DeliverLastPolicy
return nil
})
}

// DeliverNew configures a Consumer to receive messages
// published after the subscription.
func DeliverNew() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.DeliverPolicy = DeliverNewPolicy
return nil
})
}

// StartSequence configures a Consumer to receive
// messages from a start sequence.
func StartSequence(seq uint64) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.DeliverPolicy = DeliverByStartSequencePolicy
opts.cfg.OptStartSeq = seq
return nil
})
}

// DeliverFromTime configures a Consumer to receive
// messages from a start time.
func StartTime(startTime time.Time) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.DeliverPolicy = DeliverByStartTimePolicy
opts.cfg.OptStartTime = &startTime
return nil
})
}

func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
sub.mu.Lock()
// TODO(dlc) - Better way to mark especially if we attach.
Expand Down Expand Up @@ -986,44 +1034,55 @@ var (
AckTerm = []byte("+TERM")
)

// DeliverPolicy determines how the consumer should select the first message to deliver.
type DeliverPolicy int

const (
DeliverAll DeliverPolicy = iota
DeliverLast
DeliverNew
DeliverByStartSequence
DeliverByStartTime
// DeliverAllPolicy will be the default so can be omitted from the request.
DeliverAllPolicy DeliverPolicy = iota

// DeliverLastPolicy will start the consumer with the last sequence received.
DeliverLastPolicy

// DeliverNewPolicy will only deliver new messages that are sent
// after the consumer is created.
DeliverNewPolicy

// DeliverByStartSequencePolicy will look for a defined starting sequence to start.
DeliverByStartSequencePolicy

// StartTime will select the first messsage with a timestamp >= to StartTime.
DeliverByStartTimePolicy
)

func (p *DeliverPolicy) UnmarshalJSON(data []byte) error {
switch string(data) {
case jsonString("all"), jsonString("undefined"):
*p = DeliverAll
*p = DeliverAllPolicy
case jsonString("last"):
*p = DeliverLast
*p = DeliverLastPolicy
case jsonString("new"):
*p = DeliverNew
*p = DeliverNewPolicy
case jsonString("by_start_sequence"):
*p = DeliverByStartSequence
*p = DeliverByStartSequencePolicy
case jsonString("by_start_time"):
*p = DeliverByStartTime
*p = DeliverByStartTimePolicy
}

return nil
}

func (p DeliverPolicy) MarshalJSON() ([]byte, error) {
switch p {
case DeliverAll:
case DeliverAllPolicy:
return json.Marshal("all")
case DeliverLast:
case DeliverLastPolicy:
return json.Marshal("last")
case DeliverNew:
case DeliverNewPolicy:
return json.Marshal("new")
case DeliverByStartSequence:
case DeliverByStartSequencePolicy:
return json.Marshal("by_start_sequence")
case DeliverByStartTime:
case DeliverByStartTimePolicy:
return json.Marshal("by_start_time")
default:
return nil, fmt.Errorf("unknown deliver policy %v", p)
Expand Down
86 changes: 86 additions & 0 deletions test/js_test.go
Expand Up @@ -960,3 +960,89 @@ func TestJetStreamPullBasedStall(t *testing.T) {
}
}
}

func TestJetStreamSubscribe_DeliverPolicy(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}

nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

js, err := nc.JetStream()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// Create the stream using our client API.
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

var publishTime time.Time

for i := 0; i < 10; i++ {
payload := fmt.Sprintf("i:%d", i)
if i == 5 {
publishTime = time.Now()
}
js.Publish("foo", []byte(payload))
}

for _, test := range []struct {
name string
subopt nats.SubOpt
expected int
}{
{
"deliver.all", nats.DeliverAll(), 10,
},
{
"deliver.last", nats.DeliverLast(), 1,
},
{
"deliver.new", nats.DeliverNew(), 0,
},
{
"deliver.starttime", nats.StartTime(publishTime), 5,
},
{
"deliver.startseq", nats.StartSequence(6), 5,
},
} {
test := test
t.Run(test.name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

got := 0
sub, err := js.Subscribe("foo", func(m *nats.Msg) {
got++
if got == test.expected {
cancel()
}
}, test.subopt)

if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

<-ctx.Done()
sub.Drain()

if got != test.expected {
t.Fatalf("Expected %d, got %d", test.expected, got)
}
})
}
}

0 comments on commit b4450fb

Please sign in to comment.