Skip to content

Commit

Permalink
Merge a0077b9 into 58bf69a
Browse files Browse the repository at this point in the history
  • Loading branch information
wallyqs committed Jan 27, 2021
2 parents 58bf69a + a0077b9 commit 442060c
Show file tree
Hide file tree
Showing 3 changed files with 214 additions and 101 deletions.
3 changes: 3 additions & 0 deletions js.go
Expand Up @@ -52,6 +52,9 @@ const (
// apiConsumerListT is used to return all detailed consumer information
apiConsumerListT = "CONSUMER.LIST.%s"

// apiConsumerNamesT is used to return a list with all consumer names for the stream.
apiConsumerNamesT = "CONSUMER.NAMES.%s"

// apiStreams can lookup a stream by subject.
apiStreams = "STREAM.NAMES"

Expand Down
70 changes: 70 additions & 0 deletions jsm.go
Expand Up @@ -56,6 +56,9 @@ type JetStreamManager interface {
// NewConsumerLister is used to return pages of ConsumerInfo objects.
NewConsumerLister(stream string) *ConsumerLister

// NewConsumerNamesLister is used to return pages of Consumer names from a stream.
NewConsumerNamesLister(stream string) *ConsumerNamesLister

// AccountInfo retrieves info about the JetStream usage from an account.
AccountInfo() (*AccountInfo, error)
}
Expand Down Expand Up @@ -304,6 +307,73 @@ func (js *js) NewConsumerLister(stream string) *ConsumerLister {
return &ConsumerLister{stream: stream, js: js}
}

type ConsumerNamesLister struct {
stream string
js *js

err error
offset int
page []string
pageInfo *apiPaged
}

// consumerNamesListResponse is the response for a Consumers Names List request.
type consumerNamesListResponse struct {
apiResponse
apiPaged
Consumers []string `json:"consumers"`
}

// Next fetches the next ConsumerInfo page.
func (c *ConsumerNamesLister) Next() bool {
if c.err != nil {
return false
}
if c.stream == _EMPTY_ {
c.err = ErrStreamNameRequired
return false
}
if c.pageInfo != nil && c.offset >= c.pageInfo.Total {
return false
}

clSubj := c.js.apiSubj(fmt.Sprintf(apiConsumerNamesT, c.stream))
r, err := c.js.nc.Request(clSubj, nil, c.js.wait)
if err != nil {
c.err = err
return false
}
var resp consumerNamesListResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
c.err = err
return false
}
if resp.Error != nil {
c.err = errors.New(resp.Error.Description)
return false
}

c.pageInfo = &resp.apiPaged
c.page = resp.Consumers
c.offset += len(c.page)
return true
}

// Page returns the current ConsumerInfo page.
func (c *ConsumerNamesLister) Page() []string {
return c.page
}

// Err returns any errors found while fetching pages.
func (c *ConsumerNamesLister) Err() error {
return c.err
}

// NewConsumerNamesLister is used to return pages of Consumer names.
func (js *js) NewConsumerNamesLister(stream string) *ConsumerNamesLister {
return &ConsumerNamesLister{stream: stream, js: js}
}

// streamCreateResponse stream creation.
type streamCreateResponse struct {
apiResponse
Expand Down
242 changes: 141 additions & 101 deletions test/js_test.go
Expand Up @@ -523,129 +523,169 @@ func TestJetStreamManagement(t *testing.T) {
}

// Create the stream using our client API.
if _, err := js.AddStream(nil); err == nil {
t.Fatalf("Unexpected success")
}
si, err := js.AddStream(&nats.StreamConfig{Name: "foo"})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si == nil || si.Config.Name != "foo" {
t.Fatalf("StreamInfo is not correct %+v", si)
}
var si *nats.StreamInfo
t.Run("create stream", func(t *testing.T) {
if _, err := js.AddStream(nil); err == nil {
t.Fatalf("Unexpected success")
}
si, err := js.AddStream(&nats.StreamConfig{Name: "foo"})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si == nil || si.Config.Name != "foo" {
t.Fatalf("StreamInfo is not correct %+v", si)
}
})

for i := 0; i < 25; i++ {
js.Publish("foo", []byte("hi"))
}

// Check info calls.
si, err = js.StreamInfo("foo")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si == nil || si.Config.Name != "foo" {
t.Fatalf("StreamInfo is not correct %+v", si)
}
t.Run("stream info", func(t *testing.T) {
si, err = js.StreamInfo("foo")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si == nil || si.Config.Name != "foo" {
t.Fatalf("StreamInfo is not correct %+v", si)
}
})

// Update the stream using our client API.
if _, err := js.UpdateStream(nil); err == nil {
t.Fatal("Unexpected success")
}
prevMaxMsgs := si.Config.MaxMsgs
si, err = js.UpdateStream(&nats.StreamConfig{Name: "foo", MaxMsgs: prevMaxMsgs + 100})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si == nil || si.Config.Name != "foo" || si.Config.MaxMsgs == prevMaxMsgs {
t.Fatalf("StreamInfo is not correct %+v", si)
}
t.Run("stream update", func(t *testing.T) {
if _, err := js.UpdateStream(nil); err == nil {
t.Fatal("Unexpected success")
}
prevMaxMsgs := si.Config.MaxMsgs
si, err = js.UpdateStream(&nats.StreamConfig{Name: "foo", MaxMsgs: prevMaxMsgs + 100})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si == nil || si.Config.Name != "foo" || si.Config.MaxMsgs == prevMaxMsgs {
t.Fatalf("StreamInfo is not correct %+v", si)
}
})

// Create a consumer using our client API.
ci, err := js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckExplicitPolicy})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if ci == nil || ci.Name != "dlc" || ci.Stream != "foo" {
t.Fatalf("ConsumerInfo is not correct %+v", ci)
}
t.Run("create consumer", func(t *testing.T) {
ci, err := js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckExplicitPolicy})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if ci == nil || ci.Name != "dlc" || ci.Stream != "foo" {
t.Fatalf("ConsumerInfo is not correct %+v", ci)
}

if _, err = js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "test.durable"}); err != nats.ErrInvalidDurableName {
t.Fatalf("Expected invalid durable name error")
}
if _, err = js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "test.durable"}); err != nats.ErrInvalidDurableName {
t.Fatalf("Expected invalid durable name error")
}
})

// Check info calls.
ci, err = js.ConsumerInfo("foo", "dlc")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if ci == nil || ci.Config.Durable != "dlc" {
t.Fatalf("ConsumerInfo is not correct %+v", si)
}
t.Run("consumer info", func(t *testing.T) {
ci, err := js.ConsumerInfo("foo", "dlc")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if ci == nil || ci.Config.Durable != "dlc" {
t.Fatalf("ConsumerInfo is not correct %+v", si)
}
})

sl := js.NewStreamLister()
if !sl.Next() {
t.Run("list streams", func(t *testing.T) {
sl := js.NewStreamLister()
if !sl.Next() {
if err := sl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
}
t.Fatalf("Unexpected stream lister next")
}
if p := sl.Page(); len(p) != 1 || p[0].Config.Name != "foo" {
t.Fatalf("StreamInfo is not correct %+v", p)
}
if err := sl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
}
t.Fatalf("Unexpected stream lister next")
}
if p := sl.Page(); len(p) != 1 || p[0].Config.Name != "foo" {
t.Fatalf("StreamInfo is not correct %+v", p)
}
if err := sl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
}
})

if cl := js.NewConsumerLister(""); cl.Next() {
t.Fatalf("Unexpected next ok")
} else if err := cl.Err(); err == nil {
if cl.Next() {
t.Run("list consumers", func(t *testing.T) {
if cl := js.NewConsumerLister(""); cl.Next() {
t.Fatalf("Unexpected next ok")
} else if err := cl.Err(); err == nil {
if cl.Next() {
t.Fatalf("Unexpected next ok")
}
t.Fatalf("Unexpected nil error")
}

cl := js.NewConsumerLister("foo")
if !cl.Next() {
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
}
t.Fatalf("Unexpected consumer lister next")
}
if p := cl.Page(); len(p) != 1 || p[0].Stream != "foo" || p[0].Config.Durable != "dlc" {
t.Fatalf("ConsumerInfo is not correct %+v", p)
}
t.Fatalf("Unexpected nil error")
}
cl := js.NewConsumerLister("foo")
if !cl.Next() {
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
}
t.Fatalf("Unexpected consumer lister next")
}
if p := cl.Page(); len(p) != 1 || p[0].Stream != "foo" || p[0].Config.Durable != "dlc" {
t.Fatalf("ConsumerInfo is not correct %+v", p)
}
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
}
})

// Delete a consumer using our client API.
if err := js.DeleteConsumer("", ""); err == nil {
t.Fatalf("Unexpected success")
}
if err := js.DeleteConsumer("foo", "dlc"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
t.Run("list consumers", func(t *testing.T) {
if cl := js.NewConsumerLister(""); cl.Next() {
t.Fatalf("Unexpected next ok")
} else if err := cl.Err(); err == nil {
if cl.Next() {
t.Fatalf("Unexpected next ok")
}
t.Fatalf("Unexpected nil error")
}

// Purge a stream using our client API.
if err := js.PurgeStream("foo"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si, err := js.StreamInfo("foo"); err != nil {
t.Fatalf("Unexpected error: %v", err)
} else if si.State.Msgs != 0 {
t.Fatalf("StreamInfo.Msgs is not correct")
}
cl := js.NewConsumerNamesLister("foo")
if !cl.Next() {
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
}
t.Fatalf("Unexpected consumer lister next")
}
if p := cl.Page(); len(p) != 1 || p[0] != "dlc" {
t.Fatalf("Consumer name is not correct %+v", p)
}
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
}
})

// Delete a stream using our client API.
if err := js.DeleteStream(""); err == nil {
t.Fatal("Unexpected success")
}
if err := js.DeleteStream("foo"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if _, err := js.StreamInfo("foo"); err == nil {
t.Fatalf("Unexpected success")
}
t.Run("delete consumers", func(t *testing.T) {
if err := js.DeleteConsumer("", ""); err == nil {
t.Fatalf("Unexpected success")
}
if err := js.DeleteConsumer("foo", "dlc"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
})

t.Run("purge stream", func(t *testing.T) {
if err := js.PurgeStream("foo"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si, err := js.StreamInfo("foo"); err != nil {
t.Fatalf("Unexpected error: %v", err)
} else if si.State.Msgs != 0 {
t.Fatalf("StreamInfo.Msgs is not correct")
}
})

t.Run("delete stream", func(t *testing.T) {
if err := js.DeleteStream(""); err == nil {
t.Fatal("Unexpected success")
}
if err := js.DeleteStream("foo"); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if _, err := js.StreamInfo("foo"); err == nil {
t.Fatalf("Unexpected success")
}
})

t.Run("fetch account info", func(t *testing.T) {
info, err := js.AccountInfo()
Expand Down

0 comments on commit 442060c

Please sign in to comment.