Skip to content

Commit

Permalink
Add ConsumerInfo and UpdateStream
Browse files Browse the repository at this point in the history
  • Loading branch information
nsurfer committed Dec 11, 2020
1 parent 5bbcdc9 commit 85d65b4
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 3 deletions.
43 changes: 40 additions & 3 deletions js.go
Expand Up @@ -44,10 +44,15 @@ type JetStream interface {
type JetStreamManager interface {
// Create a stream.
AddStream(cfg *StreamConfig) (*StreamInfo, error)
// Update a stream.
UpdateStream(cfg *StreamConfig) (*StreamInfo, error)
// Create a consumer.
AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, error)

// Stream information.
StreamInfo(stream string) (*StreamInfo, error)
// Consumer information.
ConsumerInfo(stream, durable string) (*ConsumerInfo, error)
}

// JetStream is the public interface for the JetStream context.
Expand Down Expand Up @@ -105,8 +110,7 @@ const (
JSDefaultAPIPrefix = "$JS.API."
// JSApiAccountInfo is for obtaining general information about JetStream.
JSApiAccountInfo = "INFO"
// JSApiStreams can lookup a stream by subject.
JSApiStreams = "STREAM.NAMES"

// JSApiConsumerCreateT is used to create consumers.
JSApiConsumerCreateT = "CONSUMER.CREATE.%s"
// JSApiDurableCreateT is used to create durable consumers.
Expand All @@ -115,10 +119,14 @@ const (
JSApiConsumerInfoT = "CONSUMER.INFO.%s.%s"
// JSApiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode.
JSApiRequestNextT = "CONSUMER.MSG.NEXT.%s.%s"

// JSApiStreams can lookup a stream by subject.
JSApiStreams = "STREAM.NAMES"
// JSApiStreamCreateT is the endpoint to create new streams.
JSApiStreamCreateT = "STREAM.CREATE.%s"
// JSApiStreamInfoT is the endpoint to get information on a stream.
JSApiStreamInfoT = "STREAM.INFO.%s"
JSApiStreamInfoT = "STREAM.INFO.%s"
JSApiStreamUpdateT = "STREAM.UPDATE.%s"
)

// JetStream returns a JetStream context for pub/sub interactions.
Expand Down Expand Up @@ -1036,6 +1044,10 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, er
return info.ConsumerInfo, nil
}

func (js *js) ConsumerInfo(stream, durable string) (*ConsumerInfo, error) {
return js.getConsumerInfo(stream, durable)
}

// StreamConfig will determine the properties for a stream.
// There are sensible defaults for most. If no subjects are
// given the name will be used as the only subject.
Expand Down Expand Up @@ -1136,6 +1148,31 @@ const (
WorkQueuePolicy
)

func (js *js) UpdateStream(cfg *StreamConfig) (*StreamInfo, error) {
if cfg == nil || cfg.Name == _EMPTY_ {
return nil, ErrStreamNameRequired
}

req, err := json.Marshal(cfg)
if err != nil {
return nil, err
}

usSubj := js.apiSubj(fmt.Sprintf(JSApiStreamUpdateT, cfg.Name))
r, err := js.nc.Request(usSubj, req, js.wait)
if err != nil {
return nil, err
}
var resp JSApiStreamInfoResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
return nil, err
}
if resp.Error != nil {
return nil, errors.New(resp.Error.Description)
}
return resp.StreamInfo, nil
}

// Discard Policy determines how we proceed when limits of messages or bytes are hit. The default, DicscardOld will
// remove older messages. DiscardNew will fail to store the new message.
type DiscardPolicy int
Expand Down
19 changes: 19 additions & 0 deletions test/js_test.go
Expand Up @@ -536,6 +536,16 @@ func TestJetStreamManagement(t *testing.T) {
t.Fatalf("StreamInfo is not correct %+v", si)
}

// Update the stream using our client API.
prevMaxMsgs := si.Config.MaxMsgs
si, err = js.UpdateStream(&nats.StreamConfig{Name: "foo", MaxMsgs: prevMaxMsgs + 100})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if si == nil || si.Config.Name != "foo" || si.Config.MaxMsgs != prevMaxMsgs+100 {
t.Fatalf("StreamInfo is not correct %+v", si)
}

// Create a consumer using our client API.
ci, err := js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckExplicit})
if err != nil {
Expand All @@ -544,6 +554,15 @@ func TestJetStreamManagement(t *testing.T) {
if ci == nil || ci.Name != "dlc" || ci.Stream != "foo" {
t.Fatalf("ConsumerInfo is not correct %+v", ci)
}

// Check info calls.
ci, err = js.ConsumerInfo("foo", "dlc")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if ci == nil || ci.Config.Durable != "dlc" {
t.Fatalf("ConsumerInfo is not correct %+v", si)
}
}

func TestJetStreamImport(t *testing.T) {
Expand Down

0 comments on commit 85d65b4

Please sign in to comment.