Skip to content

Commit

Permalink
Merge ee948d9 into c50dffa
Browse files Browse the repository at this point in the history
  • Loading branch information
wallyqs committed Feb 25, 2021
2 parents c50dffa + ee948d9 commit 510b27d
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 101 deletions.
103 changes: 77 additions & 26 deletions jsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package nats

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -36,11 +37,11 @@ type JetStreamManager interface {
// StreamInfo retrieves information from a stream.
StreamInfo(stream string) (*StreamInfo, error)

// Purge stream messages.
// PurgeStream purges a stream messages.
PurgeStream(name string) error

// NewStreamLister is used to return pages of StreamInfo objects.
NewStreamLister() *StreamLister
// StreamsInfo can be used to retrieve a list of StreamInfo objects.
StreamsInfo(ctx context.Context) <-chan *StreamInfo

// GetMsg retrieves a raw stream message stored in JetStream by sequence number.
GetMsg(name string, seq uint64) (*RawStreamMsg, error)
Expand All @@ -54,11 +55,11 @@ type JetStreamManager interface {
// DeleteConsumer deletes a consumer.
DeleteConsumer(stream, consumer string) error

// ConsumerInfo retrieves consumer information.
// ConsumerInfo retrieves information of a consumer from a stream.
ConsumerInfo(stream, name string) (*ConsumerInfo, error)

// NewConsumerLister is used to return pages of ConsumerInfo objects.
NewConsumerLister(stream string) *ConsumerLister
// ConsumersInfo is used to retrieve a list of ConsumerInfo objects.
ConsumersInfo(ctx context.Context, stream string) <-chan *ConsumerInfo

// AccountInfo retrieves info about the JetStream usage from an account.
AccountInfo() (*AccountInfo, error)
Expand Down Expand Up @@ -248,9 +249,9 @@ func (js *js) ConsumerInfo(stream, consumer string) (*ConsumerInfo, error) {
return js.getConsumerInfo(stream, consumer)
}

// ConsumerLister fetches pages of ConsumerInfo objects. This object is not
// consumerLister fetches pages of ConsumerInfo objects. This object is not
// safe to use for multiple threads.
type ConsumerLister struct {
type consumerLister struct {
stream string
js *js

Expand All @@ -260,6 +261,36 @@ type ConsumerLister struct {
pageInfo *apiPaged
}

// ConsumersInfo returns a receive only channel to iterate on the consumers info.
func (js *js) ConsumersInfo(ctx context.Context, stream string) <-chan *ConsumerInfo {
var cancel context.CancelFunc
if ctx == nil {
ctx, cancel = context.WithTimeout(context.Background(), js.wait)
}

ach := make(chan *ConsumerInfo)
cl := &consumerLister{stream: stream, js: js}
go func() {
defer func() {
if cancel != nil {
cancel()
}
}()
defer close(ach)
for cl.Next() {
for _, info := range cl.Page() {
select {
case ach <- info:
case <-ctx.Done():
return
}
}
}
}()

return ach
}

// consumersRequest is the type used for Consumers requests.
type consumersRequest struct {
apiPagedRequest
Expand All @@ -273,7 +304,7 @@ type consumerListResponse struct {
}

// Next fetches the next ConsumerInfo page.
func (c *ConsumerLister) Next() bool {
func (c *consumerLister) Next() bool {
if c.err != nil {
return false
}
Expand Down Expand Up @@ -315,20 +346,15 @@ func (c *ConsumerLister) Next() bool {
}

// Page returns the current ConsumerInfo page.
func (c *ConsumerLister) Page() []*ConsumerInfo {
func (c *consumerLister) Page() []*ConsumerInfo {
return c.page
}

// Err returns any errors found while fetching pages.
func (c *ConsumerLister) Err() error {
func (c *consumerLister) Err() error {
return c.err
}

// NewConsumerLister is used to return pages of ConsumerInfo objects.
func (js *js) NewConsumerLister(stream string) *ConsumerLister {
return &ConsumerLister{stream: stream, js: js}
}

// streamCreateResponse stream creation.
type streamCreateResponse struct {
apiResponse
Expand Down Expand Up @@ -600,9 +626,9 @@ func (js *js) PurgeStream(name string) error {
return nil
}

// StreamLister fetches pages of StreamInfo objects. This object is not safe
// streamLister fetches pages of StreamInfo objects. This object is not safe
// to use for multiple threads.
type StreamLister struct {
type streamLister struct {
js *js
page []*StreamInfo
err error
Expand All @@ -611,6 +637,36 @@ type StreamLister struct {
pageInfo *apiPaged
}

// StreamsInfo returns a receive only channel to iterate on the streams.
func (js *js) StreamsInfo(ctx context.Context) <-chan *StreamInfo {
var cancel context.CancelFunc
if ctx == nil {
ctx, cancel = context.WithTimeout(context.Background(), js.wait)
}

ach := make(chan *StreamInfo)
sl := &streamLister{js: js}
go func() {
defer func() {
if cancel != nil {
cancel()
}
}()
defer close(ach)
for sl.Next() {
for _, info := range sl.Page() {
select {
case ach <- info:
case <-ctx.Done():
return
}
}
}
}()

return ach
}

// streamListResponse list of detailed stream information.
// A nil request is valid and means all streams.
type streamListResponse struct {
Expand All @@ -627,7 +683,7 @@ type streamNamesRequest struct {
}

// Next fetches the next StreamInfo page.
func (s *StreamLister) Next() bool {
func (s *streamLister) Next() bool {
if s.err != nil {
return false
}
Expand Down Expand Up @@ -666,16 +722,11 @@ func (s *StreamLister) Next() bool {
}

// Page returns the current StreamInfo page.
func (s *StreamLister) Page() []*StreamInfo {
func (s *streamLister) Page() []*StreamInfo {
return s.page
}

// Err returns any errors found while fetching pages.
func (s *StreamLister) Err() error {
func (s *streamLister) Err() error {
return s.err
}

// NewStreamLister is used to return pages of StreamInfo objects.
func (js *js) NewStreamLister() *StreamLister {
return &StreamLister{js: js}
}
132 changes: 57 additions & 75 deletions test/js_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,24 +251,17 @@ func TestJetStreamSubscribe(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}

expectConsumers := func(t *testing.T, expected int) []*nats.ConsumerInfo {
expectConsumers := func(t *testing.T, expected int) {
t.Helper()
cl := js.NewConsumerLister("TEST")
if !cl.Next() {
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
}
t.Fatalf("Unexpected consumer lister next")
}
p := cl.Page()
if len(p) != expected {
t.Fatalf("Expected %d consumers, got: %d", expected, len(p))
var count int
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
for range js.ConsumersInfo(ctx, "TEST") {
count++
}
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
if count != expected {
t.Fatalf("Expected %d consumers, got: %d", expected, count)
}

return p
}

// Create the stream using our client API.
Expand Down Expand Up @@ -757,41 +750,39 @@ func TestJetStreamManagement(t *testing.T) {
t.Fatalf("ConsumerInfo is not correct %+v", si)
}

sl := js.NewStreamLister()
if !sl.Next() {
if err := sl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
var i int
expected := "foo"
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
for stream := range js.StreamsInfo(ctx) {
i++

got := stream.Config.Name
if got != expected {
t.Fatalf("Expected stream to be %v, got: %v", expected, got)
}
t.Fatalf("Unexpected stream lister next")
}
if p := sl.Page(); len(p) != 1 || p[0].Config.Name != "foo" {
t.Fatalf("StreamInfo is not correct %+v", p)
}
if err := sl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
cancel()
if i != 1 {
t.Errorf("Expected single stream: %v", err)
}

if cl := js.NewConsumerLister(""); cl.Next() {
t.Fatalf("Unexpected next ok")
} else if err := cl.Err(); err == nil {
if cl.Next() {
t.Fatalf("Unexpected next ok")
}
t.Fatalf("Unexpected nil error")
}
cl := js.NewConsumerLister("foo")
if !cl.Next() {
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
}
t.Fatalf("Unexpected consumer lister next")
called := false
ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
for range js.ConsumersInfo(ctx, "") {
called = true
}
if p := cl.Page(); len(p) != 1 || p[0].Stream != "foo" || p[0].Config.Durable != "dlc" {
t.Fatalf("ConsumerInfo is not correct %+v", p)
cancel()
if called {
t.Error("Expected not not receive entries")
}
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)

ctx, cancel = context.WithTimeout(context.Background(), 3*time.Second)
for ci := range js.ConsumersInfo(ctx, "foo") {
if ci.Stream != "foo" || ci.Config.Durable != "dlc" {
t.Fatalf("ConsumerInfo is not correct %+v", ci)
}
}
cancel()

// Delete a consumer using our client API.
if err := js.DeleteConsumer("", ""); err == nil {
Expand Down Expand Up @@ -994,7 +985,7 @@ func testJetStreamManagement_GetMsg(t *testing.T, srvs ...*jsServer) {
t.Errorf("Expected %v, got: %v", 4, streamMsg.Sequence)
}
expectedMap := map[string][]string{
"X-Nats-Test-Data": []string{"A:1"},
"X-Nats-Test-Data": {"A:1"},
}
if !reflect.DeepEqual(streamMsg.Header, http.Header(expectedMap)) {
t.Errorf("Expected %v, got: %v", expectedMap, streamMsg.Header)
Expand Down Expand Up @@ -2059,24 +2050,18 @@ func TestJetStream_Unsubscribe(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}

fetchConsumers := func(t *testing.T, expected int) []*nats.ConsumerInfo {
fetchConsumers := func(t *testing.T, expected int) {
t.Helper()
cl := js.NewConsumerLister("foo")
if !cl.Next() {
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
}
t.Fatalf("Unexpected consumer lister next")
}
p := cl.Page()
if len(p) != expected {
t.Fatalf("Expected %d consumers, got: %d", expected, len(p))

var i int
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
for range js.ConsumersInfo(ctx, "foo") {
i++
}
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
if i != expected {
t.Fatalf("Expected %d consumers, got: %d", expected, i)
}

return p
}

js.Publish("foo.A", []byte("A"))
Expand Down Expand Up @@ -2197,24 +2182,18 @@ func TestJetStream_UnsubscribeCloseDrain(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}

fetchConsumers := func(t *testing.T, expected int) []*nats.ConsumerInfo {
fetchConsumers := func(t *testing.T, expected int) {
t.Helper()
cl := jsm.NewConsumerLister("foo")
if !cl.Next() {
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
}
t.Fatalf("Unexpected consumer lister next")
}
p := cl.Page()
if len(p) != expected {
t.Fatalf("Expected %d consumers, got: %d", expected, len(p))

var i int
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
for range jsm.ConsumersInfo(ctx, "foo") {
i++
}
if err := cl.Err(); err != nil {
t.Errorf("Unexpected error: %v", err)
if i != expected {
t.Fatalf("Expected %d consumers, got: %d", expected, i)
}

return p
}

t.Run("conn drain deletes ephemeral consumers", func(t *testing.T) {
Expand Down Expand Up @@ -2565,7 +2544,10 @@ func withJSClusterAndStream(t *testing.T, clusterName string, size int, stream *
timeout := time.Now().Add(10 * time.Second)
for time.Now().Before(timeout) {
_, err = jsm.AddStream(stream)
if err != nil {
if err != nil && err.Error() == "stream name already in use" {
// Great, the stream was created.
break
} else if err != nil {
t.Logf("WARN: Got error while trying to create stream: %v", err)
// Backoff for a bit until cluster and resources ready.
time.Sleep(500 * time.Millisecond)
Expand Down

0 comments on commit 510b27d

Please sign in to comment.