diff --git a/js.go b/js.go index e232805e3..ff4235910 100644 --- a/js.go +++ b/js.go @@ -115,7 +115,8 @@ type JetStreamContext interface { // js is an internal struct from a JetStreamContext. type js struct { - nc *Conn + ctx context.Context + nc *Conn // For importing JetStream from other accounts. pre string // Amount of time to wait for API requests. @@ -363,6 +364,11 @@ func (ctx ContextOpt) configurePublish(opts *pubOpts) error { return nil } +func (ctx ContextOpt) configureJSContext(opts *js) error { + opts.ctx = ctx + return nil +} + // Context returns an option that can be used to configure a context. func Context(ctx context.Context) ContextOpt { return ContextOpt{ctx} diff --git a/jsm.go b/jsm.go index a7e8521f2..250c62dbd 100644 --- a/jsm.go +++ b/jsm.go @@ -37,11 +37,11 @@ type JetStreamManager interface { // StreamInfo retrieves information from a stream. StreamInfo(stream string) (*StreamInfo, error) - // Purge stream messages. + // PurgeStream purges a stream messages. PurgeStream(name string) error - // NewStreamLister is used to return pages of StreamInfo objects. - NewStreamLister() *StreamLister + // StreamsInfo can be used to retrieve a list of StreamInfo objects. + StreamsInfo(opts ...JSOpt) <-chan *StreamInfo // StreamNames is used to retrieve a list of Stream names. StreamNames(ctx context.Context) <-chan string @@ -58,11 +58,11 @@ type JetStreamManager interface { // DeleteConsumer deletes a consumer. DeleteConsumer(stream, consumer string) error - // ConsumerInfo retrieves consumer information. + // ConsumerInfo retrieves information of a consumer from a stream. ConsumerInfo(stream, name string) (*ConsumerInfo, error) - // NewConsumerLister is used to return pages of ConsumerInfo objects. - NewConsumerLister(stream string) *ConsumerLister + // ConsumersInfo is used to retrieve a list of ConsumerInfo objects. + ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo // ConsumerNames is used to retrieve a list of Consumer names. ConsumerNames(ctx context.Context, stream string) <-chan string @@ -273,9 +273,9 @@ func (js *js) ConsumerInfo(stream, consumer string) (*ConsumerInfo, error) { return js.getConsumerInfo(stream, consumer) } -// ConsumerLister fetches pages of ConsumerInfo objects. This object is not +// consumerLister fetches pages of ConsumerInfo objects. This object is not // safe to use for multiple threads. -type ConsumerLister struct { +type consumerLister struct { stream string js *js @@ -298,7 +298,7 @@ type consumerListResponse struct { } // Next fetches the next ConsumerInfo page. -func (c *ConsumerLister) Next() bool { +func (c *consumerLister) Next() bool { if c.err != nil { return false } @@ -340,18 +340,43 @@ func (c *ConsumerLister) Next() bool { } // Page returns the current ConsumerInfo page. -func (c *ConsumerLister) Page() []*ConsumerInfo { +func (c *consumerLister) Page() []*ConsumerInfo { return c.page } // Err returns any errors found while fetching pages. -func (c *ConsumerLister) Err() error { +func (c *consumerLister) Err() error { return c.err } -// NewConsumerLister is used to return pages of ConsumerInfo objects. -func (js *js) NewConsumerLister(stream string) *ConsumerLister { - return &ConsumerLister{stream: stream, js: js} +// ConsumersInfo is used to retrieve a list of ConsumerInfo objects. +func (jsc *js) ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo { + o, cancel, err := getJSContextOpts(jsc, opts...) + if err != nil { + return nil + } + + ch := make(chan *ConsumerInfo) + l := &consumerLister{js: o, stream: stream} + go func() { + defer func() { + if cancel != nil { + cancel() + } + }() + defer close(ch) + for l.Next() { + for _, info := range l.Page() { + select { + case ch <- info: + case <-o.ctx.Done(): + return + } + } + } + }() + + return ch } type consumerNamesLister struct { @@ -726,9 +751,9 @@ func (js *js) PurgeStream(name string) error { return nil } -// StreamLister fetches pages of StreamInfo objects. This object is not safe +// streamLister fetches pages of StreamInfo objects. This object is not safe // to use for multiple threads. -type StreamLister struct { +type streamLister struct { js *js page []*StreamInfo err error @@ -753,7 +778,7 @@ type streamNamesRequest struct { } // Next fetches the next StreamInfo page. -func (s *StreamLister) Next() bool { +func (s *streamLister) Next() bool { if s.err != nil { return false } @@ -792,18 +817,43 @@ func (s *StreamLister) Next() bool { } // Page returns the current StreamInfo page. -func (s *StreamLister) Page() []*StreamInfo { +func (s *streamLister) Page() []*StreamInfo { return s.page } // Err returns any errors found while fetching pages. -func (s *StreamLister) Err() error { +func (s *streamLister) Err() error { return s.err } -// NewStreamLister is used to return pages of StreamInfo objects. -func (js *js) NewStreamLister() *StreamLister { - return &StreamLister{js: js} +// StreamsInfo can be used to retrieve a list of StreamInfo objects. +func (jsc *js) StreamsInfo(opts ...JSOpt) <-chan *StreamInfo { + o, cancel, err := getJSContextOpts(jsc, opts...) + if err != nil { + return nil + } + + ch := make(chan *StreamInfo) + l := &streamLister{js: o} + go func() { + defer func() { + if cancel != nil { + cancel() + } + }() + defer close(ch) + for l.Next() { + for _, info := range l.Page() { + select { + case ch <- info: + case <-o.ctx.Done(): + return + } + } + } + }() + + return ch } type streamNamesLister struct { @@ -884,3 +934,30 @@ func (js *js) StreamNames(ctx context.Context) <-chan string { return ch } + +func getJSContextOpts(defs *js, opts ...JSOpt) (*js, context.CancelFunc, error) { + var o js + for _, opt := range opts { + if err := opt.configureJSContext(&o); err != nil { + return nil, nil, err + } + } + + // Check for option collisions. Right now just timeout and context. + if o.ctx != nil && o.wait != 0 { + return nil, nil, ErrContextAndTimeout + } + if o.wait == 0 && o.ctx == nil { + o.wait = defs.wait + } + var cancel context.CancelFunc + if o.ctx == nil && o.wait > 0 { + o.ctx, cancel = context.WithTimeout(context.Background(), o.wait) + } + if o.pre == "" { + o.pre = defs.pre + } + o.nc = defs.nc + + return &o, cancel, nil +} diff --git a/test/js_test.go b/test/js_test.go index 06e21d8b7..c7d2b8644 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -253,22 +253,15 @@ func TestJetStreamSubscribe(t *testing.T) { expectConsumers := func(t *testing.T, expected int) []*nats.ConsumerInfo { t.Helper() - cl := js.NewConsumerLister("TEST") - if !cl.Next() { - if err := cl.Err(); err != nil { - t.Errorf("Unexpected error: %v", err) - } - t.Fatalf("Unexpected consumer lister next") - } - p := cl.Page() - if len(p) != expected { - t.Fatalf("Expected %d consumers, got: %d", expected, len(p)) + var infos []*nats.ConsumerInfo + for info := range js.ConsumersInfo("TEST") { + infos = append(infos, info) } - if err := cl.Err(); err != nil { - t.Errorf("Unexpected error: %v", err) + if len(infos) != expected { + t.Fatalf("Expected %d consumers, got: %d", expected, len(infos)) } - return p + return infos } // Create the stream using our client API. @@ -1106,43 +1099,30 @@ func TestJetStreamManagement(t *testing.T) { }) t.Run("list streams", func(t *testing.T) { - sl := js.NewStreamLister() - if !sl.Next() { - if err := sl.Err(); err != nil { - t.Errorf("Unexpected error: %v", err) - } - t.Fatalf("Unexpected stream lister next") + var infos []*nats.StreamInfo + for info := range js.StreamsInfo() { + infos = append(infos, info) } - if p := sl.Page(); len(p) != 1 || p[0].Config.Name != "foo" { - t.Fatalf("StreamInfo is not correct %+v", p) - } - if err := sl.Err(); err != nil { - t.Errorf("Unexpected error: %v", err) + if len(infos) != 1 || infos[0].Config.Name != "foo" { + t.Fatalf("StreamInfo is not correct %+v", infos) } }) t.Run("list consumers", func(t *testing.T) { - if cl := js.NewConsumerLister(""); cl.Next() { - t.Fatalf("Unexpected next ok") - } else if err := cl.Err(); err == nil { - if cl.Next() { - t.Fatalf("Unexpected next ok") - } - t.Fatalf("Unexpected nil error") + var infos []*nats.ConsumerInfo + for info := range js.ConsumersInfo("") { + infos = append(infos, info) } - - cl := js.NewConsumerLister("foo") - if !cl.Next() { - if err := cl.Err(); err != nil { - t.Errorf("Unexpected error: %v", err) - } - t.Fatalf("Unexpected consumer lister next") + if len(infos) != 0 { + t.Fatalf("ConsumerInfo is not correct %+v", infos) } - if p := cl.Page(); len(p) != 1 || p[0].Stream != "foo" || p[0].Config.Durable != "dlc" { - t.Fatalf("ConsumerInfo is not correct %+v", p) + + infos = infos[:0] + for info := range js.ConsumersInfo("foo") { + infos = append(infos, info) } - if err := cl.Err(); err != nil { - t.Errorf("Unexpected error: %v", err) + if len(infos) != 1 || infos[0].Stream != "foo" || infos[0].Config.Durable != "dlc" { + t.Fatalf("ConsumerInfo is not correct %+v", infos) } }) @@ -2570,22 +2550,15 @@ func TestJetStream_Unsubscribe(t *testing.T) { fetchConsumers := func(t *testing.T, expected int) []*nats.ConsumerInfo { t.Helper() - cl := js.NewConsumerLister("foo") - if !cl.Next() { - if err := cl.Err(); err != nil { - t.Errorf("Unexpected error: %v", err) - } - t.Fatalf("Unexpected consumer lister next") + var infos []*nats.ConsumerInfo + for info := range js.ConsumersInfo("foo") { + infos = append(infos, info) } - p := cl.Page() - if len(p) != expected { - t.Fatalf("Expected %d consumers, got: %d", expected, len(p)) - } - if err := cl.Err(); err != nil { - t.Errorf("Unexpected error: %v", err) + if len(infos) != expected { + t.Fatalf("Expected %d consumers, got: %d", expected, len(infos)) } - return p + return infos } js.Publish("foo.A", []byte("A")) @@ -2708,22 +2681,15 @@ func TestJetStream_UnsubscribeCloseDrain(t *testing.T) { fetchConsumers := func(t *testing.T, expected int) []*nats.ConsumerInfo { t.Helper() - cl := jsm.NewConsumerLister("foo") - if !cl.Next() { - if err := cl.Err(); err != nil { - t.Errorf("Unexpected error: %v", err) - } - t.Fatalf("Unexpected consumer lister next") + var infos []*nats.ConsumerInfo + for info := range jsm.ConsumersInfo("foo") { + infos = append(infos, info) } - p := cl.Page() - if len(p) != expected { - t.Fatalf("Expected %d consumers, got: %d", expected, len(p)) - } - if err := cl.Err(); err != nil { - t.Errorf("Unexpected error: %v", err) + if len(infos) != expected { + t.Fatalf("Expected %d consumers, got: %d", expected, len(infos)) } - return p + return infos } t.Run("conn drain deletes ephemeral consumers", func(t *testing.T) {