Skip to content

Commit

Permalink
Merge 994c933 into d636dd9
Browse files Browse the repository at this point in the history
  • Loading branch information
variadico committed Mar 9, 2021
2 parents d636dd9 + 994c933 commit 5bd9132
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 91 deletions.
8 changes: 7 additions & 1 deletion js.go
Expand Up @@ -115,7 +115,8 @@ type JetStreamContext interface {

// js is an internal struct from a JetStreamContext.
type js struct {
nc *Conn
ctx context.Context
nc *Conn
// For importing JetStream from other accounts.
pre string
// Amount of time to wait for API requests.
Expand Down Expand Up @@ -363,6 +364,11 @@ func (ctx ContextOpt) configurePublish(opts *pubOpts) error {
return nil
}

func (ctx ContextOpt) configureJSContext(opts *js) error {
opts.ctx = ctx
return nil
}

// Context returns an option that can be used to configure a context.
func Context(ctx context.Context) ContextOpt {
return ContextOpt{ctx}
Expand Down
121 changes: 99 additions & 22 deletions jsm.go
Expand Up @@ -37,11 +37,11 @@ type JetStreamManager interface {
// StreamInfo retrieves information from a stream.
StreamInfo(stream string) (*StreamInfo, error)

// Purge stream messages.
// PurgeStream purges a stream messages.
PurgeStream(name string) error

// NewStreamLister is used to return pages of StreamInfo objects.
NewStreamLister() *StreamLister
// StreamsInfo can be used to retrieve a list of StreamInfo objects.
StreamsInfo(opts ...JSOpt) <-chan *StreamInfo

// StreamNames is used to retrieve a list of Stream names.
StreamNames(ctx context.Context) <-chan string
Expand All @@ -58,11 +58,11 @@ type JetStreamManager interface {
// DeleteConsumer deletes a consumer.
DeleteConsumer(stream, consumer string) error

// ConsumerInfo retrieves consumer information.
// ConsumerInfo retrieves information of a consumer from a stream.
ConsumerInfo(stream, name string) (*ConsumerInfo, error)

// NewConsumerLister is used to return pages of ConsumerInfo objects.
NewConsumerLister(stream string) *ConsumerLister
// ConsumersInfo is used to retrieve a list of ConsumerInfo objects.
ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo

// ConsumerNames is used to retrieve a list of Consumer names.
ConsumerNames(ctx context.Context, stream string) <-chan string
Expand Down Expand Up @@ -273,9 +273,9 @@ func (js *js) ConsumerInfo(stream, consumer string) (*ConsumerInfo, error) {
return js.getConsumerInfo(stream, consumer)
}

// ConsumerLister fetches pages of ConsumerInfo objects. This object is not
// consumerLister fetches pages of ConsumerInfo objects. This object is not
// safe to use for multiple threads.
type ConsumerLister struct {
type consumerLister struct {
stream string
js *js

Expand All @@ -298,7 +298,7 @@ type consumerListResponse struct {
}

// Next fetches the next ConsumerInfo page.
func (c *ConsumerLister) Next() bool {
func (c *consumerLister) Next() bool {
if c.err != nil {
return false
}
Expand Down Expand Up @@ -340,18 +340,43 @@ func (c *ConsumerLister) Next() bool {
}

// Page returns the current ConsumerInfo page.
func (c *ConsumerLister) Page() []*ConsumerInfo {
func (c *consumerLister) Page() []*ConsumerInfo {
return c.page
}

// Err returns any errors found while fetching pages.
func (c *ConsumerLister) Err() error {
func (c *consumerLister) Err() error {
return c.err
}

// NewConsumerLister is used to return pages of ConsumerInfo objects.
func (js *js) NewConsumerLister(stream string) *ConsumerLister {
return &ConsumerLister{stream: stream, js: js}
// ConsumersInfo is used to retrieve a list of ConsumerInfo objects.
func (jsc *js) ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo {
o, cancel, err := getJSContextOpts(jsc, opts...)
if err != nil {
return nil
}

ch := make(chan *ConsumerInfo)
l := &consumerLister{js: o, stream: stream}
go func() {
defer func() {
if cancel != nil {
cancel()
}
}()
defer close(ch)
for l.Next() {
for _, info := range l.Page() {
select {
case ch <- info:
case <-o.ctx.Done():
return
}
}
}
}()

return ch
}

type consumerNamesLister struct {
Expand Down Expand Up @@ -726,9 +751,9 @@ func (js *js) PurgeStream(name string) error {
return nil
}

// StreamLister fetches pages of StreamInfo objects. This object is not safe
// streamLister fetches pages of StreamInfo objects. This object is not safe
// to use for multiple threads.
type StreamLister struct {
type streamLister struct {
js *js
page []*StreamInfo
err error
Expand All @@ -753,7 +778,7 @@ type streamNamesRequest struct {
}

// Next fetches the next StreamInfo page.
func (s *StreamLister) Next() bool {
func (s *streamLister) Next() bool {
if s.err != nil {
return false
}
Expand Down Expand Up @@ -792,18 +817,43 @@ func (s *StreamLister) Next() bool {
}

// Page returns the current StreamInfo page.
func (s *StreamLister) Page() []*StreamInfo {
func (s *streamLister) Page() []*StreamInfo {
return s.page
}

// Err returns any errors found while fetching pages.
func (s *StreamLister) Err() error {
func (s *streamLister) Err() error {
return s.err
}

// NewStreamLister is used to return pages of StreamInfo objects.
func (js *js) NewStreamLister() *StreamLister {
return &StreamLister{js: js}
// StreamsInfo can be used to retrieve a list of StreamInfo objects.
func (jsc *js) StreamsInfo(opts ...JSOpt) <-chan *StreamInfo {
o, cancel, err := getJSContextOpts(jsc, opts...)
if err != nil {
return nil
}

ch := make(chan *StreamInfo)
l := &streamLister{js: o}
go func() {
defer func() {
if cancel != nil {
cancel()
}
}()
defer close(ch)
for l.Next() {
for _, info := range l.Page() {
select {
case ch <- info:
case <-o.ctx.Done():
return
}
}
}
}()

return ch
}

type streamNamesLister struct {
Expand Down Expand Up @@ -884,3 +934,30 @@ func (js *js) StreamNames(ctx context.Context) <-chan string {

return ch
}

func getJSContextOpts(defs *js, opts ...JSOpt) (*js, context.CancelFunc, error) {
var o js
for _, opt := range opts {
if err := opt.configureJSContext(&o); err != nil {
return nil, nil, err
}
}

// Check for option collisions. Right now just timeout and context.
if o.ctx != nil && o.wait != 0 {
return nil, nil, ErrContextAndTimeout
}
if o.wait == 0 && o.ctx == nil {
o.wait = defs.wait
}
var cancel context.CancelFunc
if o.ctx == nil && o.wait > 0 {
o.ctx, cancel = context.WithTimeout(context.Background(), o.wait)
}
if o.pre == "" {
o.pre = defs.pre
}
o.nc = defs.nc

return &o, cancel, nil
}
102 changes: 34 additions & 68 deletions test/js_test.go
Expand Up @@ -253,22 +253,15 @@ func TestJetStreamSubscribe(t *testing.T) {

expectConsumers := func(t *testing.T, expected int) []*nats.ConsumerInfo {
t.Helper()
cl := js.NewConsumerLister("TEST")
if !cl.Next() {
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
}
t.Fatalf("Unexpected consumer lister next")
}
p := cl.Page()
if len(p) != expected {
t.Fatalf("Expected %d consumers, got: %d", expected, len(p))
var infos []*nats.ConsumerInfo
for info := range js.ConsumersInfo("TEST") {
infos = append(infos, info)
}
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
if len(infos) != expected {
t.Fatalf("Expected %d consumers, got: %d", expected, len(infos))
}

return p
return infos
}

// Create the stream using our client API.
Expand Down Expand Up @@ -1106,43 +1099,30 @@ func TestJetStreamManagement(t *testing.T) {
})

t.Run("list streams", func(t *testing.T) {
sl := js.NewStreamLister()
if !sl.Next() {
if err := sl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
}
t.Fatalf("Unexpected stream lister next")
var infos []*nats.StreamInfo
for info := range js.StreamsInfo() {
infos = append(infos, info)
}
if p := sl.Page(); len(p) != 1 || p[0].Config.Name != "foo" {
t.Fatalf("StreamInfo is not correct %+v", p)
}
if err := sl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
if len(infos) != 1 || infos[0].Config.Name != "foo" {
t.Fatalf("StreamInfo is not correct %+v", infos)
}
})

t.Run("list consumers", func(t *testing.T) {
if cl := js.NewConsumerLister(""); cl.Next() {
t.Fatalf("Unexpected next ok")
} else if err := cl.Err(); err == nil {
if cl.Next() {
t.Fatalf("Unexpected next ok")
}
t.Fatalf("Unexpected nil error")
var infos []*nats.ConsumerInfo
for info := range js.ConsumersInfo("") {
infos = append(infos, info)
}

cl := js.NewConsumerLister("foo")
if !cl.Next() {
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
}
t.Fatalf("Unexpected consumer lister next")
if len(infos) != 0 {
t.Fatalf("ConsumerInfo is not correct %+v", infos)
}
if p := cl.Page(); len(p) != 1 || p[0].Stream != "foo" || p[0].Config.Durable != "dlc" {
t.Fatalf("ConsumerInfo is not correct %+v", p)

infos = infos[:0]
for info := range js.ConsumersInfo("foo") {
infos = append(infos, info)
}
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
if len(infos) != 1 || infos[0].Stream != "foo" || infos[0].Config.Durable != "dlc" {
t.Fatalf("ConsumerInfo is not correct %+v", infos)
}
})

Expand Down Expand Up @@ -2570,22 +2550,15 @@ func TestJetStream_Unsubscribe(t *testing.T) {

fetchConsumers := func(t *testing.T, expected int) []*nats.ConsumerInfo {
t.Helper()
cl := js.NewConsumerLister("foo")
if !cl.Next() {
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
}
t.Fatalf("Unexpected consumer lister next")
var infos []*nats.ConsumerInfo
for info := range js.ConsumersInfo("foo") {
infos = append(infos, info)
}
p := cl.Page()
if len(p) != expected {
t.Fatalf("Expected %d consumers, got: %d", expected, len(p))
}
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
if len(infos) != expected {
t.Fatalf("Expected %d consumers, got: %d", expected, len(infos))
}

return p
return infos
}

js.Publish("foo.A", []byte("A"))
Expand Down Expand Up @@ -2708,22 +2681,15 @@ func TestJetStream_UnsubscribeCloseDrain(t *testing.T) {

fetchConsumers := func(t *testing.T, expected int) []*nats.ConsumerInfo {
t.Helper()
cl := jsm.NewConsumerLister("foo")
if !cl.Next() {
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
}
t.Fatalf("Unexpected consumer lister next")
var infos []*nats.ConsumerInfo
for info := range jsm.ConsumersInfo("foo") {
infos = append(infos, info)
}
p := cl.Page()
if len(p) != expected {
t.Fatalf("Expected %d consumers, got: %d", expected, len(p))
}
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
if len(infos) != expected {
t.Fatalf("Expected %d consumers, got: %d", expected, len(infos))
}

return p
return infos
}

t.Run("conn drain deletes ephemeral consumers", func(t *testing.T) {
Expand Down

0 comments on commit 5bd9132

Please sign in to comment.