Skip to content

Commit

Permalink
Merge pull request #673 from nats-io/js-rate-and-playback
Browse files Browse the repository at this point in the history
js: Add ReplayOriginal policy and RateLimit for push consumers
  • Loading branch information
wallyqs committed Mar 9, 2021
2 parents 3678d91 + e61aa9e commit d636dd9
Show file tree
Hide file tree
Showing 2 changed files with 191 additions and 6 deletions.
32 changes: 26 additions & 6 deletions js.go
Expand Up @@ -828,6 +828,22 @@ func MaxAckPending(n int) SubOpt {
})
}

// ReplayOriginal replays the messages at the original speed.
func ReplayOriginal() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.ReplayPolicy = ReplayOriginalPolicy
return nil
})
}

// RateLimit is the Bits per sec rate limit applied to a push consumer.
func RateLimit(n uint64) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.RateLimit = n
return nil
})
}

func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
sub.mu.Lock()
// TODO(dlc) - Better way to mark especially if we attach.
Expand Down Expand Up @@ -1128,19 +1144,23 @@ func (p AckPolicy) String() string {
}
}

// ReplayPolicy determines how the consumer should replay messages it already has queued in the stream.
type ReplayPolicy int

const (
ReplayInstant ReplayPolicy = iota
ReplayOriginal
// ReplayInstant will replay messages as fast as possible.
ReplayInstantPolicy ReplayPolicy = iota

// ReplayOriginalPolicy will maintain the same timing as the messages were received.
ReplayOriginalPolicy
)

func (p *ReplayPolicy) UnmarshalJSON(data []byte) error {
switch string(data) {
case jsonString("instant"):
*p = ReplayInstant
*p = ReplayInstantPolicy
case jsonString("original"):
*p = ReplayOriginal
*p = ReplayOriginalPolicy
default:
return fmt.Errorf("can not unmarshal %q", data)
}
Expand All @@ -1150,9 +1170,9 @@ func (p *ReplayPolicy) UnmarshalJSON(data []byte) error {

func (p ReplayPolicy) MarshalJSON() ([]byte, error) {
switch p {
case ReplayOriginal:
case ReplayOriginalPolicy:
return json.Marshal("original")
case ReplayInstant:
case ReplayInstantPolicy:
return json.Marshal("instant")
default:
return nil, fmt.Errorf("unknown replay policy %v", p)
Expand Down
165 changes: 165 additions & 0 deletions test/js_test.go
Expand Up @@ -2945,6 +2945,171 @@ func TestJetStream_UnsubscribeDeleteNoPermissions(t *testing.T) {
}
}

func TestJetStreamSubscribe_ReplayPolicy(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}

nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

js, err := nc.JetStream()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// Create the stream using our client API.
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

i := 0
totalMsgs := 10
for range time.NewTicker(100 * time.Millisecond).C {
payload := fmt.Sprintf("i:%d", i)
js.Publish("foo", []byte(payload))
i++

if i == totalMsgs {
break
}
}

// By default it is ReplayInstant playback policy.
isub, err := js.SubscribeSync("foo")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
ci, err := isub.ConsumerInfo()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if ci.Config.ReplayPolicy != nats.ReplayInstantPolicy {
t.Fatalf("Expected original replay policy, got: %v", ci.Config.ReplayPolicy)
}

// Change into original playback.
sub, err := js.SubscribeSync("foo", nats.ReplayOriginal())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
ci, err = sub.ConsumerInfo()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if ci.Config.ReplayPolicy != nats.ReplayOriginalPolicy {
t.Fatalf("Expected original replay policy, got: %v", ci.Config.ReplayPolicy)
}

// There should already be a message delivered.
_, err = sub.NextMsg(10 * time.Millisecond)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// We should timeout faster since too soon for the original playback.
_, err = sub.NextMsg(10 * time.Millisecond)
if err != nats.ErrTimeout {
t.Fatalf("Expected timeout error replaying the stream, got: %v", err)
}

// Enough time to get the next message according to the original playback.
_, err = sub.NextMsg(110 * time.Millisecond)
if err != nil {

t.Fatalf("Unexpected error: %v", err)
}
}

func TestJetStreamSubscribe_RateLimit(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()

if config := s.JetStreamConfig(); config != nil {
defer os.RemoveAll(config.StoreDir)
}

nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

js, err := nc.JetStream()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// Create the stream using our client API.
_, err = js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

totalMsgs := 2048
for i := 0; i < totalMsgs; i++ {
payload := strings.Repeat("A", 1024)
js.Publish("foo", []byte(payload))
}

// By default there is no RateLimit
isub, err := js.SubscribeSync("foo")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
ci, err := isub.ConsumerInfo()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if ci.Config.RateLimit != 0 {
t.Fatalf("Expected no rate limit, got: %v", ci.Config.RateLimit)
}

// Change rate limit.
recvd := make(chan *nats.Msg)
duration := 2 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), duration)
defer cancel()

var rl uint64 = 1024
sub, err := js.Subscribe("foo", func(m *nats.Msg) {
recvd <- m

if len(recvd) == totalMsgs {
cancel()
}

}, nats.RateLimit(rl))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
ci, err = sub.ConsumerInfo()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if ci.Config.RateLimit != rl {
t.Fatalf("Expected %v, got: %v", rl, ci.Config.RateLimit)
}
<-ctx.Done()

if len(recvd) >= int(rl) {
t.Errorf("Expected applied rate limit to push consumer, got %v msgs in %v", recvd, duration)
}
}

type jsServer struct {
*server.Server
myopts *server.Options
Expand Down

0 comments on commit d636dd9

Please sign in to comment.