Skip to content

Commit

Permalink
js: Add ConsumersInfo to JetStreamManager
Browse files Browse the repository at this point in the history
Signed-off-by: Waldemar Quevedo <wally@synadia.com>
  • Loading branch information
wallyqs committed Feb 11, 2021
1 parent b29d5d3 commit 4f14c26
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 100 deletions.
70 changes: 33 additions & 37 deletions jsm.go
Expand Up @@ -24,19 +24,19 @@ import (

// JetStreamManager is the public interface for managing JetStream streams & consumers.
type JetStreamManager interface {
// Create a stream.
// AddStream adds a stream.
AddStream(cfg *StreamConfig) (*StreamInfo, error)

// Update a stream.
// UpdateStream updates a stream.
UpdateStream(cfg *StreamConfig) (*StreamInfo, error)

// Delete a stream.
// DeleteStream deletes a stream.
DeleteStream(name string) error

// Stream information.
// StreamInfo retrieves a information from a Stream.
StreamInfo(stream string) (*StreamInfo, error)

// Purge stream messages.
// PurgeStream purges a stream messages.
PurgeStream(name string) error

// StreamsInfo can be used to retrieve a list of StreamInfo objects.
Expand All @@ -48,17 +48,17 @@ type JetStreamManager interface {
// DeleteMsg erases a message from a Stream.
DeleteMsg(name string, seq uint64) error

// Create a consumer.
// AddConsumer creates a consumer.
AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, error)

// Delete a consumer.
// DeleteConsumer deletes a consumer.
DeleteConsumer(stream, consumer string) error

// Consumer information.
// ConsumerInfo retrieves information of a consumer from a stream.
ConsumerInfo(stream, name string) (*ConsumerInfo, error)

// NewConsumerLister is used to return pages of ConsumerInfo objects.
NewConsumerLister(stream string) *ConsumerLister
// ConsumersInfo is used to retrieve a list of ConsumerInfo objects.
ConsumersInfo(stream string) <-chan *ConsumerInfo

// AccountInfo retrieves info about the JetStream usage from an account.
AccountInfo() (*AccountInfo, error)
Expand Down Expand Up @@ -248,9 +248,9 @@ func (js *js) ConsumerInfo(stream, consumer string) (*ConsumerInfo, error) {
return js.getConsumerInfo(stream, consumer)
}

// ConsumerLister fetches pages of ConsumerInfo objects. This object is not
// consumerLister fetches pages of ConsumerInfo objects. This object is not
// safe to use for multiple threads.
type ConsumerLister struct {
type consumerLister struct {
stream string
js *js

Expand All @@ -260,6 +260,23 @@ type ConsumerLister struct {
pageInfo *apiPaged
}

// ConsumersInfo returns a receive only channel to iterate on the consumers info.
func (js *js) ConsumersInfo(stream string) <-chan *ConsumerInfo {
ach := make(chan *ConsumerInfo)
cl := &consumerLister{stream: stream, js: js}
go func() {
for cl.Next() {
for _, info := range cl.Page() {
ach <- info
}
}

close(ach)
}()

return ach
}

// consumersRequest is the type used for Consumers requests.
type consumersRequest struct {
apiPagedRequest
Expand All @@ -273,7 +290,7 @@ type consumerListResponse struct {
}

// Next fetches the next ConsumerInfo page.
func (c *ConsumerLister) Next() bool {
func (c *consumerLister) Next() bool {
if c.err != nil {
return false
}
Expand Down Expand Up @@ -315,20 +332,15 @@ func (c *ConsumerLister) Next() bool {
}

// Page returns the current ConsumerInfo page.
func (c *ConsumerLister) Page() []*ConsumerInfo {
func (c *consumerLister) Page() []*ConsumerInfo {
return c.page
}

// Err returns any errors found while fetching pages.
func (c *ConsumerLister) Err() error {
func (c *consumerLister) Err() error {
return c.err
}

// NewConsumerLister is used to return pages of ConsumerInfo objects.
func (js *js) NewConsumerLister(stream string) *ConsumerLister {
return &ConsumerLister{stream: stream, js: js}
}

// streamCreateResponse stream creation.
type streamCreateResponse struct {
apiResponse
Expand Down Expand Up @@ -600,22 +612,6 @@ func (js *js) PurgeStream(name string) error {
return nil
}

// StreamLister fetches StreamInfo objects. This object is not safe
// to use for multiple threads.
type StreamLister interface {
// Next fetches the next page of streams.
Next() bool

// Page() is a collection of streams.
Page() []*StreamInfo

// Err() is the sticky error like Scanner.
Err() error

// Streams is a receive only channel to iterate over streams.
Streams() <-chan *StreamInfo
}

// streamLister fetches pages of StreamInfo objects. This object is not safe
// to use for multiple threads.
type streamLister struct {
Expand All @@ -627,7 +623,7 @@ type streamLister struct {
pageInfo *apiPaged
}

// Streams returns a receive only channel to iterate on the streams.
// StreamsInfo returns a receive only channel to iterate on the streams.
func (js *js) StreamsInfo() <-chan *StreamInfo {
ach := make(chan *StreamInfo)
sl := &streamLister{js: js}
Expand Down
93 changes: 30 additions & 63 deletions test/js_test.go
Expand Up @@ -246,24 +246,15 @@ func TestJetStreamSubscribe(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}

expectConsumers := func(t *testing.T, expected int) []*nats.ConsumerInfo {
expectConsumers := func(t *testing.T, expected int) {
t.Helper()
cl := js.NewConsumerLister("TEST")
if !cl.Next() {
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
}
t.Fatalf("Unexpected consumer lister next")
}
p := cl.Page()
if len(p) != expected {
t.Fatalf("Expected %d consumers, got: %d", expected, len(p))
var count int
for range js.ConsumersInfo("TEST") {
count++
}
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
if count != expected {
t.Fatalf("Expected %d consumers, got: %d", expected, count)
}

return p
}

// Create the stream using our client API.
Expand Down Expand Up @@ -770,26 +761,18 @@ func TestJetStreamManagement(t *testing.T) {
t.Errorf("Expected single stream: %v", err)
}

if cl := js.NewConsumerLister(""); cl.Next() {
t.Fatalf("Unexpected next ok")
} else if err := cl.Err(); err == nil {
if cl.Next() {
t.Fatalf("Unexpected next ok")
}
t.Fatalf("Unexpected nil error")
called := false
for range js.ConsumersInfo("") {
called = true
}
cl := js.NewConsumerLister("foo")
if !cl.Next() {
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
}
t.Fatalf("Unexpected consumer lister next")
}
if p := cl.Page(); len(p) != 1 || p[0].Stream != "foo" || p[0].Config.Durable != "dlc" {
t.Fatalf("ConsumerInfo is not correct %+v", p)
if called {
t.Error("Expected not not receive entries")
}
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)

for ci := range js.ConsumersInfo("foo") {
if ci.Stream != "foo" || ci.Config.Durable != "dlc" {
t.Fatalf("ConsumerInfo is not correct %+v", ci)
}
}

// Delete a consumer using our client API.
Expand Down Expand Up @@ -2056,24 +2039,16 @@ func TestJetStream_Unsubscribe(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}

fetchConsumers := func(t *testing.T, expected int) []*nats.ConsumerInfo {
fetchConsumers := func(t *testing.T, expected int) {
t.Helper()
cl := js.NewConsumerLister("foo")
if !cl.Next() {
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
}
t.Fatalf("Unexpected consumer lister next")
}
p := cl.Page()
if len(p) != expected {
t.Fatalf("Expected %d consumers, got: %d", expected, len(p))

var i int
for range js.ConsumersInfo("foo") {
i++
}
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
if i != expected {
t.Fatalf("Expected %d consumers, got: %d", expected, i)
}

return p
}

js.Publish("foo.A", []byte("A"))
Expand Down Expand Up @@ -2194,24 +2169,16 @@ func TestJetStream_UnsubscribeCloseDrain(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}

fetchConsumers := func(t *testing.T, expected int) []*nats.ConsumerInfo {
fetchConsumers := func(t *testing.T, expected int) {
t.Helper()
cl := jsm.NewConsumerLister("foo")
if !cl.Next() {
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
}
t.Fatalf("Unexpected consumer lister next")
}
p := cl.Page()
if len(p) != expected {
t.Fatalf("Expected %d consumers, got: %d", expected, len(p))

var i int
for range jsm.ConsumersInfo("foo") {
i++
}
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
if i != expected {
t.Fatalf("Expected %d consumers, got: %d", expected, i)
}

return p
}

t.Run("conn drain deletes ephemeral consumers", func(t *testing.T) {
Expand Down

0 comments on commit 4f14c26

Please sign in to comment.