diff --git a/js.go b/js.go index abd71c7ff..e232805e3 100644 --- a/js.go +++ b/js.go @@ -828,6 +828,22 @@ func MaxAckPending(n int) SubOpt { }) } +// ReplayOriginal replays the messages at the original speed. +func ReplayOriginal() SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.ReplayPolicy = ReplayOriginalPolicy + return nil + }) +} + +// RateLimit is the Bits per sec rate limit applied to a push consumer. +func RateLimit(n uint64) SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.RateLimit = n + return nil + }) +} + func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) { sub.mu.Lock() // TODO(dlc) - Better way to mark especially if we attach. @@ -1128,19 +1144,23 @@ func (p AckPolicy) String() string { } } +// ReplayPolicy determines how the consumer should replay messages it already has queued in the stream. type ReplayPolicy int const ( - ReplayInstant ReplayPolicy = iota - ReplayOriginal + // ReplayInstant will replay messages as fast as possible. + ReplayInstantPolicy ReplayPolicy = iota + + // ReplayOriginalPolicy will maintain the same timing as the messages were received. + ReplayOriginalPolicy ) func (p *ReplayPolicy) UnmarshalJSON(data []byte) error { switch string(data) { case jsonString("instant"): - *p = ReplayInstant + *p = ReplayInstantPolicy case jsonString("original"): - *p = ReplayOriginal + *p = ReplayOriginalPolicy default: return fmt.Errorf("can not unmarshal %q", data) } @@ -1150,9 +1170,9 @@ func (p *ReplayPolicy) UnmarshalJSON(data []byte) error { func (p ReplayPolicy) MarshalJSON() ([]byte, error) { switch p { - case ReplayOriginal: + case ReplayOriginalPolicy: return json.Marshal("original") - case ReplayInstant: + case ReplayInstantPolicy: return json.Marshal("instant") default: return nil, fmt.Errorf("unknown replay policy %v", p) diff --git a/test/js_test.go b/test/js_test.go index 7349302ba..06e21d8b7 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -2945,6 +2945,171 @@ func TestJetStream_UnsubscribeDeleteNoPermissions(t *testing.T) { } } +func TestJetStreamSubscribe_ReplayPolicy(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + js, err := nc.JetStream() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Create the stream using our client API. + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + i := 0 + totalMsgs := 10 + for range time.NewTicker(100 * time.Millisecond).C { + payload := fmt.Sprintf("i:%d", i) + js.Publish("foo", []byte(payload)) + i++ + + if i == totalMsgs { + break + } + } + + // By default it is ReplayInstant playback policy. + isub, err := js.SubscribeSync("foo") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + ci, err := isub.ConsumerInfo() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if ci.Config.ReplayPolicy != nats.ReplayInstantPolicy { + t.Fatalf("Expected original replay policy, got: %v", ci.Config.ReplayPolicy) + } + + // Change into original playback. + sub, err := js.SubscribeSync("foo", nats.ReplayOriginal()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + ci, err = sub.ConsumerInfo() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if ci.Config.ReplayPolicy != nats.ReplayOriginalPolicy { + t.Fatalf("Expected original replay policy, got: %v", ci.Config.ReplayPolicy) + } + + // There should already be a message delivered. + _, err = sub.NextMsg(10 * time.Millisecond) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // We should timeout faster since too soon for the original playback. + _, err = sub.NextMsg(10 * time.Millisecond) + if err != nats.ErrTimeout { + t.Fatalf("Expected timeout error replaying the stream, got: %v", err) + } + + // Enough time to get the next message according to the original playback. + _, err = sub.NextMsg(110 * time.Millisecond) + if err != nil { + + t.Fatalf("Unexpected error: %v", err) + } +} + +func TestJetStreamSubscribe_RateLimit(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + js, err := nc.JetStream() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Create the stream using our client API. + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + totalMsgs := 2048 + for i := 0; i < totalMsgs; i++ { + payload := strings.Repeat("A", 1024) + js.Publish("foo", []byte(payload)) + } + + // By default there is no RateLimit + isub, err := js.SubscribeSync("foo") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + ci, err := isub.ConsumerInfo() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if ci.Config.RateLimit != 0 { + t.Fatalf("Expected no rate limit, got: %v", ci.Config.RateLimit) + } + + // Change rate limit. + recvd := make(chan *nats.Msg) + duration := 2 * time.Second + ctx, cancel := context.WithTimeout(context.Background(), duration) + defer cancel() + + var rl uint64 = 1024 + sub, err := js.Subscribe("foo", func(m *nats.Msg) { + recvd <- m + + if len(recvd) == totalMsgs { + cancel() + } + + }, nats.RateLimit(rl)) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + ci, err = sub.ConsumerInfo() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if ci.Config.RateLimit != rl { + t.Fatalf("Expected %v, got: %v", rl, ci.Config.RateLimit) + } + <-ctx.Done() + + if len(recvd) >= int(rl) { + t.Errorf("Expected applied rate limit to push consumer, got %v msgs in %v", recvd, duration) + } +} + type jsServer struct { *server.Server myopts *server.Options