Skip to content

Commit

Permalink
js: Replace JetStreamManager listers with iterators
Browse files Browse the repository at this point in the history
Signed-off-by: Waldemar Quevedo <wally@synadia.com>
  • Loading branch information
wallyqs authored and nsurfer committed Mar 4, 2021
1 parent 3678d91 commit bf1abcf
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 110 deletions.
102 changes: 76 additions & 26 deletions jsm.go
Expand Up @@ -37,11 +37,11 @@ type JetStreamManager interface {
// StreamInfo retrieves information from a stream.
StreamInfo(stream string) (*StreamInfo, error)

// Purge stream messages.
// PurgeStream purges a stream messages.
PurgeStream(name string) error

// NewStreamLister is used to return pages of StreamInfo objects.
NewStreamLister() *StreamLister
// StreamsInfo can be used to retrieve a list of StreamInfo objects.
StreamsInfo(ctx context.Context) <-chan *StreamInfo

// StreamNames is used to retrieve a list of Stream names.
StreamNames(ctx context.Context) <-chan string
Expand All @@ -58,11 +58,11 @@ type JetStreamManager interface {
// DeleteConsumer deletes a consumer.
DeleteConsumer(stream, consumer string) error

// ConsumerInfo retrieves consumer information.
// ConsumerInfo retrieves information of a consumer from a stream.
ConsumerInfo(stream, name string) (*ConsumerInfo, error)

// NewConsumerLister is used to return pages of ConsumerInfo objects.
NewConsumerLister(stream string) *ConsumerLister
// ConsumersInfo is used to retrieve a list of ConsumerInfo objects.
ConsumersInfo(ctx context.Context, stream string) <-chan *ConsumerInfo

// ConsumerNames is used to retrieve a list of Consumer names.
ConsumerNames(ctx context.Context, stream string) <-chan string
Expand Down Expand Up @@ -273,9 +273,9 @@ func (js *js) ConsumerInfo(stream, consumer string) (*ConsumerInfo, error) {
return js.getConsumerInfo(stream, consumer)
}

// ConsumerLister fetches pages of ConsumerInfo objects. This object is not
// consumerLister fetches pages of ConsumerInfo objects. This object is not
// safe to use for multiple threads.
type ConsumerLister struct {
type consumerLister struct {
stream string
js *js

Expand All @@ -285,6 +285,36 @@ type ConsumerLister struct {
pageInfo *apiPaged
}

// ConsumersInfo returns a receive only channel to iterate on the consumers info.
func (js *js) ConsumersInfo(ctx context.Context, stream string) <-chan *ConsumerInfo {
var cancel context.CancelFunc
if ctx == nil {
ctx, cancel = context.WithTimeout(context.Background(), js.wait)
}

ach := make(chan *ConsumerInfo)
cl := &consumerLister{stream: stream, js: js}
go func() {
defer func() {
if cancel != nil {
cancel()
}
}()
defer close(ach)
for cl.Next() {
for _, info := range cl.Page() {
select {
case ach <- info:
case <-ctx.Done():
return
}
}
}
}()

return ach
}

// consumersRequest is the type used for Consumers requests.
type consumersRequest struct {
apiPagedRequest
Expand All @@ -298,7 +328,7 @@ type consumerListResponse struct {
}

// Next fetches the next ConsumerInfo page.
func (c *ConsumerLister) Next() bool {
func (c *consumerLister) Next() bool {
if c.err != nil {
return false
}
Expand Down Expand Up @@ -340,20 +370,15 @@ func (c *ConsumerLister) Next() bool {
}

// Page returns the current ConsumerInfo page.
func (c *ConsumerLister) Page() []*ConsumerInfo {
func (c *consumerLister) Page() []*ConsumerInfo {
return c.page
}

// Err returns any errors found while fetching pages.
func (c *ConsumerLister) Err() error {
func (c *consumerLister) Err() error {
return c.err
}

// NewConsumerLister is used to return pages of ConsumerInfo objects.
func (js *js) NewConsumerLister(stream string) *ConsumerLister {
return &ConsumerLister{stream: stream, js: js}
}

type consumerNamesLister struct {
stream string
js *js
Expand Down Expand Up @@ -726,9 +751,9 @@ func (js *js) PurgeStream(name string) error {
return nil
}

// StreamLister fetches pages of StreamInfo objects. This object is not safe
// streamLister fetches pages of StreamInfo objects. This object is not safe
// to use for multiple threads.
type StreamLister struct {
type streamLister struct {
js *js
page []*StreamInfo
err error
Expand All @@ -737,6 +762,36 @@ type StreamLister struct {
pageInfo *apiPaged
}

// StreamsInfo returns a receive only channel to iterate on the streams.
func (js *js) StreamsInfo(ctx context.Context) <-chan *StreamInfo {
var cancel context.CancelFunc
if ctx == nil {
ctx, cancel = context.WithTimeout(context.Background(), js.wait)
}

ach := make(chan *StreamInfo)
sl := &streamLister{js: js}
go func() {
defer func() {
if cancel != nil {
cancel()
}
}()
defer close(ach)
for sl.Next() {
for _, info := range sl.Page() {
select {
case ach <- info:
case <-ctx.Done():
return
}
}
}
}()

return ach
}

// streamListResponse list of detailed stream information.
// A nil request is valid and means all streams.
type streamListResponse struct {
Expand All @@ -753,7 +808,7 @@ type streamNamesRequest struct {
}

// Next fetches the next StreamInfo page.
func (s *StreamLister) Next() bool {
func (s *streamLister) Next() bool {
if s.err != nil {
return false
}
Expand Down Expand Up @@ -792,20 +847,15 @@ func (s *StreamLister) Next() bool {
}

// Page returns the current StreamInfo page.
func (s *StreamLister) Page() []*StreamInfo {
func (s *streamLister) Page() []*StreamInfo {
return s.page
}

// Err returns any errors found while fetching pages.
func (s *StreamLister) Err() error {
func (s *streamLister) Err() error {
return s.err
}

// NewStreamLister is used to return pages of StreamInfo objects.
func (js *js) NewStreamLister() *StreamLister {
return &StreamLister{js: js}
}

type streamNamesLister struct {
js *js

Expand Down
149 changes: 65 additions & 84 deletions test/js_test.go
Expand Up @@ -251,24 +251,17 @@ func TestJetStreamSubscribe(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}

expectConsumers := func(t *testing.T, expected int) []*nats.ConsumerInfo {
expectConsumers := func(t *testing.T, expected int) {
t.Helper()
cl := js.NewConsumerLister("TEST")
if !cl.Next() {
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
}
t.Fatalf("Unexpected consumer lister next")
}
p := cl.Page()
if len(p) != expected {
t.Fatalf("Expected %d consumers, got: %d", expected, len(p))
var count int
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
for range js.ConsumersInfo(ctx, "TEST") {
count++
}
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
if count != expected {
t.Fatalf("Expected %d consumers, got: %d", expected, count)
}

return p
}

// Create the stream using our client API.
Expand Down Expand Up @@ -1105,57 +1098,54 @@ func TestJetStreamManagement(t *testing.T) {
}
})

t.Run("list streams", func(t *testing.T) {
sl := js.NewStreamLister()
if !sl.Next() {
if err := sl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
}
t.Fatalf("Unexpected stream lister next")
}
if p := sl.Page(); len(p) != 1 || p[0].Config.Name != "foo" {
t.Fatalf("StreamInfo is not correct %+v", p)
t.Run("list consumer names", func(t *testing.T) {
var names []string
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
for name := range js.ConsumerNames(ctx, "foo") {
names = append(names, name)
}
if err := sl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
if got, want := len(names), 1; got != want {
t.Fatalf("Unexpected names, got=%d, want=%d", got, want)
}
})

t.Run("list consumers", func(t *testing.T) {
if cl := js.NewConsumerLister(""); cl.Next() {
t.Fatalf("Unexpected next ok")
} else if err := cl.Err(); err == nil {
if cl.Next() {
t.Fatalf("Unexpected next ok")
}
t.Fatalf("Unexpected nil error")
}
t.Run("streams info", func(t *testing.T) {
var i int
expected := "foo"
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
for stream := range js.StreamsInfo(ctx) {
i++

cl := js.NewConsumerLister("foo")
if !cl.Next() {
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
got := stream.Config.Name
if got != expected {
t.Fatalf("Expected stream to be %v, got: %v", expected, got)
}
t.Fatalf("Unexpected consumer lister next")
}
if p := cl.Page(); len(p) != 1 || p[0].Stream != "foo" || p[0].Config.Durable != "dlc" {
t.Fatalf("ConsumerInfo is not correct %+v", p)
}
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
cancel()
if i != 1 {
t.Errorf("Expected single stream: %v", err)
}
})

t.Run("list consumer names", func(t *testing.T) {
var names []string
t.Run("consumers info", func(t *testing.T) {
var called bool
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
for name := range js.ConsumerNames(ctx, "foo") {
names = append(names, name)
for range js.ConsumersInfo(ctx, "") {
called = true
}
if got, want := len(names), 1; got != want {
t.Fatalf("Unexpected names, got=%d, want=%d", got, want)
cancel()
if called {
t.Error("Expected not not receive entries")
}

ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
for ci := range js.ConsumersInfo(ctx, "foo") {
if ci.Stream != "foo" || ci.Config.Durable != "dlc" {
t.Fatalf("ConsumerInfo is not correct %+v", ci)
}
}
cancel()
})

t.Run("delete consumers", func(t *testing.T) {
Expand Down Expand Up @@ -2568,24 +2558,18 @@ func TestJetStream_Unsubscribe(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}

fetchConsumers := func(t *testing.T, expected int) []*nats.ConsumerInfo {
fetchConsumers := func(t *testing.T, expected int) {
t.Helper()
cl := js.NewConsumerLister("foo")
if !cl.Next() {
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
}
t.Fatalf("Unexpected consumer lister next")
}
p := cl.Page()
if len(p) != expected {
t.Fatalf("Expected %d consumers, got: %d", expected, len(p))

var i int
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
for range js.ConsumersInfo(ctx, "foo") {
i++
}
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
if i != expected {
t.Fatalf("Expected %d consumers, got: %d", expected, i)
}

return p
}

js.Publish("foo.A", []byte("A"))
Expand Down Expand Up @@ -2706,24 +2690,18 @@ func TestJetStream_UnsubscribeCloseDrain(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}

fetchConsumers := func(t *testing.T, expected int) []*nats.ConsumerInfo {
fetchConsumers := func(t *testing.T, expected int) {
t.Helper()
cl := jsm.NewConsumerLister("foo")
if !cl.Next() {
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
}
t.Fatalf("Unexpected consumer lister next")
}
p := cl.Page()
if len(p) != expected {
t.Fatalf("Expected %d consumers, got: %d", expected, len(p))

var i int
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
for range jsm.ConsumersInfo(ctx, "foo") {
i++
}
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
if i != expected {
t.Fatalf("Expected %d consumers, got: %d", expected, i)
}

return p
}

t.Run("conn drain deletes ephemeral consumers", func(t *testing.T) {
Expand Down Expand Up @@ -3074,7 +3052,10 @@ func withJSClusterAndStream(t *testing.T, clusterName string, size int, stream *
timeout := time.Now().Add(10 * time.Second)
for time.Now().Before(timeout) {
_, err = jsm.AddStream(stream)
if err != nil {
if err != nil && err.Error() == "stream name already in use" {
// Great, the stream was created.
break
} else if err != nil {
t.Logf("WARN: Got error while trying to create stream: %v", err)
// Backoff for a bit until cluster and resources ready.
time.Sleep(500 * time.Millisecond)
Expand Down

0 comments on commit bf1abcf

Please sign in to comment.