From 077874eecca4da8ed0d9fa45da0a234ea44cfc13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jaime=20Pi=C3=B1a?= Date: Thu, 10 Dec 2020 16:54:30 -0800 Subject: [PATCH] Add more Stream and Consumer management APIs This adds the follow APIs for Streams. * UpdateStream * DeleteStream * SnapshotStream * RestoreStream * PurgeStream * StreamLister This adds the follow APIs for Consumers. * ConsumerInfo * ConsumerLister * DeleteConsumer --- js.go | 422 +++++++++++++++++++++++++++++++++++++++++++++++- test/js_test.go | 123 +++++++++++++- 2 files changed, 535 insertions(+), 10 deletions(-) diff --git a/js.go b/js.go index 942e1c78d..9f040749c 100644 --- a/js.go +++ b/js.go @@ -19,6 +19,7 @@ import ( "encoding/json" "errors" "fmt" + "io" "net/http" "strconv" "strings" @@ -45,10 +46,29 @@ type JetStream interface { type JetStreamManager interface { // Create a stream. AddStream(cfg *StreamConfig) (*StreamInfo, error) - // Create a consumer. - AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, error) + // Update a stream. + UpdateStream(cfg *StreamConfig) (*StreamInfo, error) + // Delete a stream. + DeleteStream(name string) error // Stream information. StreamInfo(stream string) (*StreamInfo, error) + // Snapshot stream. + SnapshotStream(name string, cfg *StreamSnapshotConfig) (io.Reader, error) + // Restore stream. + RestoreStream(name string, snapshot io.Reader) error + // Purge stream messages. + PurgeStream(name string) error + // NewStreamLister is used to return pages of StreamInfo objects. + NewStreamLister() *StreamLister + + // Create a consumer. + AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, error) + // Delete a consumer. + DeleteConsumer(stream, consumer string) error + // Consumer information. + ConsumerInfo(stream, name string) (*ConsumerInfo, error) + // NewConsumerLister is used to return pages of ConsumerInfo objects. + NewConsumerLister(stream string) *ConsumerLister } // JetStream is the public interface for the JetStream context. @@ -106,8 +126,7 @@ const ( JSDefaultAPIPrefix = "$JS.API." // JSApiAccountInfo is for obtaining general information about JetStream. JSApiAccountInfo = "INFO" - // JSApiStreams can lookup a stream by subject. - JSApiStreams = "STREAM.NAMES" + // JSApiConsumerCreateT is used to create consumers. JSApiConsumerCreateT = "CONSUMER.CREATE.%s" // JSApiDurableCreateT is used to create durable consumers. @@ -116,10 +135,29 @@ const ( JSApiConsumerInfoT = "CONSUMER.INFO.%s.%s" // JSApiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode. JSApiRequestNextT = "CONSUMER.MSG.NEXT.%s.%s" + // JSApiDeleteConsumerT is used to delete consumers. + JSApiConsumerDeleteT = "CONSUMER.DELETE.%s.%s" + // JSApiConsumerListT is used to return all detailed consumer information + JSApiConsumerListT = "CONSUMER.LIST.%s" + + // JSApiStreams can lookup a stream by subject. + JSApiStreams = "STREAM.NAMES" // JSApiStreamCreateT is the endpoint to create new streams. JSApiStreamCreateT = "STREAM.CREATE.%s" // JSApiStreamInfoT is the endpoint to get information on a stream. JSApiStreamInfoT = "STREAM.INFO.%s" + // JSApiStreamUpdate is the endpoint to update existing streams. + JSApiStreamUpdateT = "STREAM.UPDATE.%s" + // JSApiStreamDeleteT is the endpoint to delete streams. + JSApiStreamDeleteT = "STREAM.DELETE.%s" + // JSApiStreamSnapshotT is the endpoint to snapshot streams. + JSApiStreamSnapshotT = "STREAM.SNAPSHOT.%s" + // JSApiStreamRestoreT is the endpoint to restore a stream from a snapshot. + JSApiStreamRestoreT = "STREAM.RESTORE.%s" + // JSApiPurgeStreamT is the endpoint to purge streams. + JSApiStreamPurgeT = "STREAM.PURGE.%s" + // JSApiStreamListT is the endpoint that will return all detailed stream information + JSApiStreamList = "STREAM.LIST" ) // JetStream returns a JetStream context for pub/sub interactions. @@ -1089,9 +1127,6 @@ func (p DeliverPolicy) MarshalJSON() ([]byte, error) { } } -// Management for JetStream -// TODO(dlc) - Fill this out. - // AddConsumer will add a JetStream consumer. func (js *js) AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, error) { if stream == _EMPTY_ { @@ -1103,7 +1138,7 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, er } var ccSubj string - if cfg.Durable != _EMPTY_ { + if cfg != nil && cfg.Durable != _EMPTY_ { ccSubj = fmt.Sprintf(JSApiDurableCreateT, stream, cfg.Durable) } else { ccSubj = fmt.Sprintf(JSApiConsumerCreateT, stream) @@ -1127,6 +1162,121 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, er return info.ConsumerInfo, nil } +// JSApiConsumerDeleteResponse. +type JSApiConsumerDeleteResponse struct { + APIResponse + Success bool `json:"success,omitempty"` +} + +func (js *js) DeleteConsumer(stream, durable string) error { + if stream == _EMPTY_ { + return ErrStreamNameRequired + } + + dcSubj := js.apiSubj(fmt.Sprintf(JSApiConsumerDeleteT, stream, durable)) + r, err := js.nc.Request(dcSubj, nil, js.wait) + if err != nil { + return err + } + var resp JSApiConsumerDeleteResponse + if err := json.Unmarshal(r.Data, &resp); err != nil { + return err + } + if resp.Error != nil { + return errors.New(resp.Error.Description) + } + return nil +} + +func (js *js) ConsumerInfo(stream, durable string) (*ConsumerInfo, error) { + return js.getConsumerInfo(stream, durable) +} + +// APIPagedRequest includes parameters allowing specific pages to be requests from APIs responding with APIPaged +type APIPagedRequest struct { + Offset int `json:"offset"` +} + +type JSApiConsumersRequest struct { + APIPagedRequest +} + +// JSApiConsumerListResponse. +type JSApiConsumerListResponse struct { + APIResponse + APIPaged + Consumers []*ConsumerInfo `json:"consumers"` +} + +// ConsumerLister fetches pages of ConsumerInfo objects. +type ConsumerLister struct { + stream string + js *js + + err error + offset int + page []*ConsumerInfo + pageInfo *APIPaged +} + +// Next fetches the next ConsumerInfo page. +func (c *ConsumerLister) Next() bool { + if c.err != nil { + return false + } + if c.pageInfo != nil && c.offset >= c.pageInfo.Total { + return false + } + + if c.stream == _EMPTY_ { + c.err = ErrStreamNameRequired + return false + } + + req, err := json.Marshal(JSApiConsumersRequest{ + APIPagedRequest: APIPagedRequest{Offset: c.offset}, + }) + if err != nil { + c.err = err + return false + } + clSubj := c.js.apiSubj(fmt.Sprintf(JSApiConsumerListT, c.stream)) + r, err := c.js.nc.Request(clSubj, req, c.js.wait) + if err != nil { + c.err = err + return false + } + var resp JSApiConsumerListResponse + if err := json.Unmarshal(r.Data, &resp); err != nil { + c.err = err + return false + } + if resp.Error != nil { + c.err = errors.New(resp.Error.Description) + return false + } + + c.pageInfo = &resp.APIPaged + c.page = resp.Consumers + c.offset += len(c.page) + return true +} + +// Page returns the current ConsumerInfo page. +func (c *ConsumerLister) Page() []*ConsumerInfo { + return c.page +} + +// Err returns any errors found while fetching pages. +func (c *ConsumerLister) Err() error { + return c.err +} + +// NewConsumerLister is used to return pages of ConsumerInfo objects. +func (js *js) NewConsumerLister(stream string) *ConsumerLister { + return &ConsumerLister{stream: stream, js: js} +} + // StreamConfig will determine the properties for a stream. // There are sensible defaults for most. If no subjects are // given the name will be used as the only subject. @@ -1214,6 +1364,262 @@ type StreamState struct { Consumers int `json:"consumer_count"` } +func (js *js) UpdateStream(cfg *StreamConfig) (*StreamInfo, error) { + if cfg == nil || cfg.Name == _EMPTY_ { + return nil, ErrStreamNameRequired + } + + req, err := json.Marshal(cfg) + if err != nil { + return nil, err + } + + usSubj := js.apiSubj(fmt.Sprintf(JSApiStreamUpdateT, cfg.Name)) + r, err := js.nc.Request(usSubj, req, js.wait) + if err != nil { + return nil, err + } + var resp JSApiStreamInfoResponse + if err := json.Unmarshal(r.Data, &resp); err != nil { + return nil, err + } + if resp.Error != nil { + return nil, errors.New(resp.Error.Description) + } + return resp.StreamInfo, nil +} + +// JSApiStreamDeleteResponse stream removal. +type JSApiStreamDeleteResponse struct { + APIResponse + Success bool `json:"success,omitempty"` +} + +const JSApiStreamDeleteResponseType = "io.nats.jetstream.api.v1.stream_delete_response" + +func (js *js) DeleteStream(name string) error { + if name == _EMPTY_ { + return ErrStreamNameRequired + } + + dsSubj := js.apiSubj(fmt.Sprintf(JSApiStreamDeleteT, name)) + r, err := js.nc.Request(dsSubj, nil, js.wait) + if err != nil { + return err + } + var resp JSApiStreamDeleteResponse + if err := json.Unmarshal(r.Data, &resp); err != nil { + return err + } + if resp.Error != nil { + return errors.New(resp.Error.Description) + } + return nil +} + +type StreamSnapshotConfig struct { + // Subject to deliver the chunks to for the snapshot. + DeliverSubject string `json:"deliver_subject"` + // Do not include consumers in the snapshot. + NoConsumers bool `json:"no_consumers,omitempty"` + // Optional chunk size preference. + // Best to just let server select. + ChunkSize int `json:"chunk_size,omitempty"` + // Check all message's checksums prior to snapshot. + CheckMsgs bool `json:"jsck,omitempty"` +} + +// JSApiStreamSnapshotResponse is the direct response to the snapshot request. +type JSApiStreamSnapshotResponse struct { + APIResponse + // Estimate of number of blocks for the messages. + NumBlks int `json:"num_blks"` + // Block size limit as specified by the stream. + BlkSize int `json:"blk_size"` +} + +func (js *js) SnapshotStream(name string, cfg *StreamSnapshotConfig) (io.Reader, error) { + req, err := json.Marshal(cfg) + if err != nil { + return nil, err + } + + ssSubj := js.apiSubj(fmt.Sprintf(JSApiStreamSnapshotT, name)) + r, err := js.nc.Request(ssSubj, req, js.wait) + if err != nil { + return nil, err + } + var resp JSApiStreamSnapshotResponse + if err := json.Unmarshal(r.Data, &resp); err != nil { + return nil, err + } + if resp.Error != nil { + return nil, errors.New(resp.Error.Description) + } + + // Setup to process snapshot chunks. + var snapshot []byte + done := make(chan bool) + sub, err := js.nc.Subscribe(cfg.DeliverSubject, func(m *Msg) { + // EOF + if len(m.Data) == 0 { + done <- true + return + } + // Could be writing to a file here too. + snapshot = append(snapshot, m.Data...) + // Flow ack + m.Respond(nil) + }) + if err != nil { + return nil, err + } + defer sub.Unsubscribe() + // Wait to receive the snapshot. + select { + case <-done: + case <-time.After(5 * time.Second): + return nil, errors.New("nats: snapshot timeout exceeded") + } + + return bytes.NewReader(snapshot), nil +} + +// JSApiStreamRestoreResponse is the direct response to the restore request. +type JSApiStreamRestoreResponse struct { + APIResponse + // Subject to deliver the chunks to for the snapshot restore. + DeliverSubject string `json:"deliver_subject"` +} + +func (js *js) RestoreStream(name string, snapshot io.Reader) error { + rsSubj := js.apiSubj(fmt.Sprintf(JSApiStreamRestoreT, name)) + r, err := js.nc.Request(rsSubj, nil, js.wait) + if err != nil { + return err + } + var resp JSApiStreamRestoreResponse + if err := json.Unmarshal(r.Data, &resp); err != nil { + return err + } + if resp.Error != nil { + return errors.New(resp.Error.Description) + } + + // Can be any size message. + var chunk [512]byte + for r := snapshot; ; { + n, err := r.Read(chunk[:]) + if err != nil { + break + } + js.nc.Request(resp.DeliverSubject, chunk[:n], time.Second) + } + js.nc.Request(resp.DeliverSubject, nil, time.Second) + return nil +} + +// JSApiStreamPurgeResponse. +type JSApiStreamPurgeResponse struct { + APIResponse + Success bool `json:"success,omitempty"` + Purged uint64 `json:"purged"` +} + +func (js *js) PurgeStream(name string) error { + psSubj := js.apiSubj(fmt.Sprintf(JSApiStreamPurgeT, name)) + r, err := js.nc.Request(psSubj, nil, js.wait) + if err != nil { + return err + } + var resp JSApiStreamPurgeResponse + if err := json.Unmarshal(r.Data, &resp); err != nil { + return err + } + if resp.Error != nil { + return errors.New(resp.Error.Description) + } + return nil +} + +type JSApiStreamNamesRequest struct { + APIPagedRequest + // These are filters that can be applied to the list. + Subject string `json:"subject,omitempty"` +} + +// JSApiStreamListResponse list of detailed stream information. +// A nil request is valid and means all streams. +type JSApiStreamListResponse struct { + APIResponse + APIPaged + Streams []*StreamInfo `json:"streams"` +} + +// StreamLister fetches pages of StreamInfo objects. +type StreamLister struct { + js *js + page []*StreamInfo + err error + + offset int + pageInfo *APIPaged +} + +// Next fetches the next StreamInfo page. +func (s *StreamLister) Next() bool { + if s.err != nil { + return false + } + if s.pageInfo != nil && s.offset >= s.pageInfo.Total { + return false + } + + req, err := json.Marshal(JSApiStreamNamesRequest{ + APIPagedRequest: APIPagedRequest{Offset: s.offset}, + }) + if err != nil { + s.err = err + return false + } + + slSubj := s.js.apiSubj(JSApiStreamList) + r, err := s.js.nc.Request(slSubj, req, s.js.wait) + if err != nil { + s.err = err + return false + } + var resp JSApiStreamListResponse + if err := json.Unmarshal(r.Data, &resp); err != nil { + s.err = err + return false + } + if resp.Error != nil { + s.err = errors.New(resp.Error.Description) + return false + } + + s.pageInfo = &resp.APIPaged + s.page = resp.Streams + s.offset += len(s.page) + return true +} + +// Page returns the current StreamInfo page. +func (s *StreamLister) Page() []*StreamInfo { + return s.page +} + +// Err returns any errors found while fetching pages. +func (s *StreamLister) Err() error { + return s.err +} + +// NewStreamLister is used to return pages of StreamInfo objects. +func (js *js) NewStreamLister() *StreamLister { + return &StreamLister{js: js} +} + // RetentionPolicy determines how messages in a set are retained. type RetentionPolicy int diff --git a/test/js_test.go b/test/js_test.go index 13147f8e0..3d957bbb3 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -498,7 +498,6 @@ func TestAckForNonJetStream(t *testing.T) { } } -// TODO(dlc) - fill out with more stuff. func TestJetStreamManagement(t *testing.T) { s := RunBasicJetStreamServer() defer s.Shutdown() @@ -519,6 +518,9 @@ func TestJetStreamManagement(t *testing.T) { } // Create the stream using our client API. + if _, err := js.AddStream(nil); err == nil { + t.Fatalf("Unexpected success") + } si, err := js.AddStream(&nats.StreamConfig{Name: "foo"}) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -527,6 +529,10 @@ func TestJetStreamManagement(t *testing.T) { t.Fatalf("StreamInfo is not correct %+v", si) } + for i := 0; i < 25; i++ { + js.Publish("foo", []byte("hi")) + } + // Check info calls. si, err = js.StreamInfo("foo") if err != nil { @@ -536,14 +542,127 @@ func TestJetStreamManagement(t *testing.T) { t.Fatalf("StreamInfo is not correct %+v", si) } + // Update the stream using our client API. + if _, err := js.UpdateStream(nil); err == nil { + t.Fatal("Unexpected success") + } + prevMaxMsgs := si.Config.MaxMsgs + si, err = js.UpdateStream(&nats.StreamConfig{Name: "foo", MaxMsgs: prevMaxMsgs + 100}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si == nil || si.Config.Name != "foo" || si.Config.MaxMsgs == prevMaxMsgs { + t.Fatalf("StreamInfo is not correct %+v", si) + } + // Create a consumer using our client API. - ci, err := js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckExplicit}) + if _, err := js.AddConsumer("", nil); err == nil { + t.Fatalf("Unexpected success") + } + ci, err := js.AddConsumer("foo", &nats.ConsumerConfig{ + Durable: "dlc", + AckPolicy: nats.AckExplicit, + }) if err != nil { t.Fatalf("Unexpected error: %v", err) } if ci == nil || ci.Name != "dlc" || ci.Stream != "foo" { t.Fatalf("ConsumerInfo is not correct %+v", ci) } + + // Check info calls. + ci, err = js.ConsumerInfo("foo", "dlc") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if ci == nil || ci.Config.Durable != "dlc" { + t.Fatalf("ConsumerInfo is not correct %+v", si) + } + + // Create a snapshot using our client API. + inboxSubj := nats.NewInbox() + snapshot, err := js.SnapshotStream("foo", &nats.StreamSnapshotConfig{ + DeliverSubject: inboxSubj, + ChunkSize: 1024, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + sl := js.NewStreamLister() + if !sl.Next() { + if err := sl.Err(); err != nil { + t.Errorf("Unexpected error: %v", err) + } + t.Fatalf("Unexpected stream lister next") + } + if p := sl.Page(); len(p) != 1 || p[0].Config.Name != "foo" { + t.Fatalf("StreamInfo is not correct %+v", p) + } + if err := sl.Err(); err != nil { + t.Errorf("Unexpected error: %v", err) + } + + if cl := js.NewConsumerLister(""); cl.Next() { + t.Fatalf("Unexpected next ok") + } else if err := cl.Err(); err == nil { + if cl.Next() { + t.Fatalf("Unexpected next ok") + } + t.Fatalf("Unexpected nil error") + } + cl := js.NewConsumerLister("foo") + if !cl.Next() { + if err := cl.Err(); err != nil { + t.Errorf("Unexpected error: %v", err) + } + t.Fatalf("Unexpected consumer lister next") + } + if p := cl.Page(); len(p) != 1 || p[0].Stream != "foo" || p[0].Config.Durable != "dlc" { + t.Fatalf("ConsumerInfo is not correct %+v", p) + } + if err := cl.Err(); err != nil { + t.Errorf("Unexpected error: %v", err) + } + + // Delete a consumer using our client API. + if err := js.DeleteConsumer("", ""); err == nil { + t.Fatalf("Unexpected success") + } + if err := js.DeleteConsumer("foo", "dlc"); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Delete a stream using our client API. + if err := js.DeleteStream(""); err == nil { + t.Fatal("Unexpected success") + } + if err := js.DeleteStream("foo"); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if _, err := js.StreamInfo("foo"); err == nil { + t.Fatalf("Unexpected success") + } + + // Restore a stream using our client API. + if err := js.RestoreStream("foo", snapshot); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si, err := js.StreamInfo("foo"); err != nil { + t.Fatalf("Unexpected error: %v", err) + } else if si.Config.Name != "foo" && si.State.Msgs != 25 { + t.Fatalf("StreamInfo is not correct %+v", si) + } + + // Purge a stream using our client API. + if err := js.PurgeStream("foo"); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si, err := js.StreamInfo("foo"); err != nil { + t.Fatalf("Unexpected error: %v", err) + } else if si.State.Msgs != 0 { + t.Fatalf("StreamInfo.Msgs is not correct") + } } func TestJetStreamImport(t *testing.T) {