Skip to content

Commit

Permalink
Add context to StreamsInfo and ConsumersInfo
Browse files Browse the repository at this point in the history
  • Loading branch information
nsurfer committed Feb 22, 2021
1 parent 4f14c26 commit f2a2830
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 16 deletions.
27 changes: 17 additions & 10 deletions jsm.go
Expand Up @@ -14,6 +14,7 @@
package nats

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -40,7 +41,7 @@ type JetStreamManager interface {
PurgeStream(name string) error

// StreamsInfo can be used to retrieve a list of StreamInfo objects.
StreamsInfo() <-chan *StreamInfo
StreamsInfo(ctx context.Context) <-chan *StreamInfo

// GetMsg retrieves a raw stream message stored in JetStream by sequence number.
GetMsg(name string, seq uint64) (*RawStreamMsg, error)
Expand All @@ -58,7 +59,7 @@ type JetStreamManager interface {
ConsumerInfo(stream, name string) (*ConsumerInfo, error)

// ConsumersInfo is used to retrieve a list of ConsumerInfo objects.
ConsumersInfo(stream string) <-chan *ConsumerInfo
ConsumersInfo(ctx context.Context, stream string) <-chan *ConsumerInfo

// AccountInfo retrieves info about the JetStream usage from an account.
AccountInfo() (*AccountInfo, error)
Expand Down Expand Up @@ -261,17 +262,20 @@ type consumerLister struct {
}

// ConsumersInfo returns a receive only channel to iterate on the consumers info.
func (js *js) ConsumersInfo(stream string) <-chan *ConsumerInfo {
func (js *js) ConsumersInfo(ctx context.Context, stream string) <-chan *ConsumerInfo {
ach := make(chan *ConsumerInfo)
cl := &consumerLister{stream: stream, js: js}
go func() {
defer close(ach)
for cl.Next() {
for _, info := range cl.Page() {
ach <- info
select {
case ach <- info:
case <-ctx.Done():
return
}
}
}

close(ach)
}()

return ach
Expand Down Expand Up @@ -624,17 +628,20 @@ type streamLister struct {
}

// StreamsInfo returns a receive only channel to iterate on the streams.
func (js *js) StreamsInfo() <-chan *StreamInfo {
func (js *js) StreamsInfo(ctx context.Context) <-chan *StreamInfo {
ach := make(chan *StreamInfo)
sl := &streamLister{js: js}
go func() {
defer close(ach)
for sl.Next() {
for _, info := range sl.Page() {
ach <- info
select {
case ach <- info:
case <-ctx.Done():
return
}
}
}

close(ach)
}()

return ach
Expand Down
24 changes: 18 additions & 6 deletions test/js_test.go
Expand Up @@ -249,7 +249,9 @@ func TestJetStreamSubscribe(t *testing.T) {
expectConsumers := func(t *testing.T, expected int) {
t.Helper()
var count int
for range js.ConsumersInfo("TEST") {
ctx, cancel := context.WithTimeout(context.Background(), 3 * time.Second)
defer cancel()
for range js.ConsumersInfo(ctx, "TEST") {
count++
}
if count != expected {
Expand Down Expand Up @@ -749,31 +751,37 @@ func TestJetStreamManagement(t *testing.T) {

var i int
expected := "foo"
for stream := range js.StreamsInfo() {
ctx, cancel := context.WithTimeout(context.Background(), 3 * time.Second)
for stream := range js.StreamsInfo(ctx) {
i++

got := stream.Config.Name
if got != expected {
t.Fatalf("Expected stream to be %v, got: %v", expected, got)
}
}
cancel()
if i != 1 {
t.Errorf("Expected single stream: %v", err)
}

called := false
for range js.ConsumersInfo("") {
ctx, cancel = context.WithTimeout(context.Background(), 3 * time.Second)
for range js.ConsumersInfo(ctx, "") {
called = true
}
cancel()
if called {
t.Error("Expected not not receive entries")
}

for ci := range js.ConsumersInfo("foo") {
ctx, cancel = context.WithTimeout(context.Background(), 3 * time.Second)
for ci := range js.ConsumersInfo(ctx, "foo") {
if ci.Stream != "foo" || ci.Config.Durable != "dlc" {
t.Fatalf("ConsumerInfo is not correct %+v", ci)
}
}
cancel()

// Delete a consumer using our client API.
if err := js.DeleteConsumer("", ""); err == nil {
Expand Down Expand Up @@ -2043,7 +2051,9 @@ func TestJetStream_Unsubscribe(t *testing.T) {
t.Helper()

var i int
for range js.ConsumersInfo("foo") {
ctx, cancel := context.WithTimeout(context.Background(), 3 * time.Second)
defer cancel()
for range js.ConsumersInfo(ctx, "foo") {
i++
}
if i != expected {
Expand Down Expand Up @@ -2173,7 +2183,9 @@ func TestJetStream_UnsubscribeCloseDrain(t *testing.T) {
t.Helper()

var i int
for range jsm.ConsumersInfo("foo") {
ctx, cancel := context.WithTimeout(context.Background(), 3 * time.Second)
defer cancel()
for range jsm.ConsumersInfo(ctx, "foo") {
i++
}
if i != expected {
Expand Down

0 comments on commit f2a2830

Please sign in to comment.