From 1a29e1be4ae2ffe21fd44c4783ff8e4b271ae3b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jaime=20Pi=C3=B1a?= Date: Thu, 10 Dec 2020 16:54:30 -0800 Subject: [PATCH 1/2] Add more Stream and Consumer management APIs This adds the follow APIs for Streams. * UpdateStream * DeleteStream * SnapshotStream * RestoreStream * PurgeStream * StreamLister This adds the follow APIs for Consumers. * ConsumerInfo * ConsumerLister * DeleteConsumer --- js.go | 440 +++++++++++++++++++++++++++++++++++++++++++++++- nats.go | 122 +++++++------- test/js_test.go | 123 +++++++++++++- 3 files changed, 615 insertions(+), 70 deletions(-) diff --git a/js.go b/js.go index 942e1c78d..992fb7de1 100644 --- a/js.go +++ b/js.go @@ -19,6 +19,7 @@ import ( "encoding/json" "errors" "fmt" + "io" "net/http" "strconv" "strings" @@ -45,10 +46,29 @@ type JetStream interface { type JetStreamManager interface { // Create a stream. AddStream(cfg *StreamConfig) (*StreamInfo, error) - // Create a consumer. - AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, error) + // Update a stream. + UpdateStream(cfg *StreamConfig) (*StreamInfo, error) + // Delete a stream. + DeleteStream(name string) error // Stream information. StreamInfo(stream string) (*StreamInfo, error) + // Snapshot stream. + SnapshotStream(name string, timeout time.Duration, cfg *StreamSnapshotConfig) (io.Reader, error) + // Restore stream. + RestoreStream(name string, snapshot io.Reader) error + // Purge stream messages. + PurgeStream(name string) error + // NewStreamLister is used to return pages of StreamInfo objects. + NewStreamLister() *StreamLister + + // Create a consumer. + AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, error) + // Delete a consumer. + DeleteConsumer(stream, consumer string) error + // Consumer information. + ConsumerInfo(stream, name string) (*ConsumerInfo, error) + // NewConsumerLister is used to return pages of ConsumerInfo objects. + NewConsumerLister(stream string) *ConsumerLister } // JetStream is the public interface for the JetStream context. @@ -106,8 +126,7 @@ const ( JSDefaultAPIPrefix = "$JS.API." // JSApiAccountInfo is for obtaining general information about JetStream. JSApiAccountInfo = "INFO" - // JSApiStreams can lookup a stream by subject. - JSApiStreams = "STREAM.NAMES" + // JSApiConsumerCreateT is used to create consumers. JSApiConsumerCreateT = "CONSUMER.CREATE.%s" // JSApiDurableCreateT is used to create durable consumers. @@ -116,10 +135,29 @@ const ( JSApiConsumerInfoT = "CONSUMER.INFO.%s.%s" // JSApiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode. JSApiRequestNextT = "CONSUMER.MSG.NEXT.%s.%s" + // JSApiDeleteConsumerT is used to delete consumers. + JSApiConsumerDeleteT = "CONSUMER.DELETE.%s.%s" + // JSApiConsumerListT is used to return all detailed consumer information + JSApiConsumerListT = "CONSUMER.LIST.%s" + + // JSApiStreams can lookup a stream by subject. + JSApiStreams = "STREAM.NAMES" // JSApiStreamCreateT is the endpoint to create new streams. JSApiStreamCreateT = "STREAM.CREATE.%s" // JSApiStreamInfoT is the endpoint to get information on a stream. JSApiStreamInfoT = "STREAM.INFO.%s" + // JSApiStreamUpdate is the endpoint to update existing streams. + JSApiStreamUpdateT = "STREAM.UPDATE.%s" + // JSApiStreamDeleteT is the endpoint to delete streams. + JSApiStreamDeleteT = "STREAM.DELETE.%s" + // JSApiStreamSnapshotT is the endpoint to snapshot streams. + JSApiStreamSnapshotT = "STREAM.SNAPSHOT.%s" + // JSApiStreamRestoreT is the endpoint to restore a stream from a snapshot. + JSApiStreamRestoreT = "STREAM.RESTORE.%s" + // JSApiPurgeStreamT is the endpoint to purge streams. + JSApiStreamPurgeT = "STREAM.PURGE.%s" + // JSApiStreamListT is the endpoint that will return all detailed stream information + JSApiStreamList = "STREAM.LIST" ) // JetStream returns a JetStream context for pub/sub interactions. @@ -1089,9 +1127,6 @@ func (p DeliverPolicy) MarshalJSON() ([]byte, error) { } } -// Management for JetStream -// TODO(dlc) - Fill this out. - // AddConsumer will add a JetStream consumer. func (js *js) AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, error) { if stream == _EMPTY_ { @@ -1103,7 +1138,7 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, er } var ccSubj string - if cfg.Durable != _EMPTY_ { + if cfg != nil && cfg.Durable != _EMPTY_ { ccSubj = fmt.Sprintf(JSApiDurableCreateT, stream, cfg.Durable) } else { ccSubj = fmt.Sprintf(JSApiConsumerCreateT, stream) @@ -1127,6 +1162,125 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, er return info.ConsumerInfo, nil } +// JSAPIConsumerDeleteResponse is the response for a Consumer delete request. +type JSAPIConsumerDeleteResponse struct { + APIResponse + Success bool `json:"success,omitempty"` +} + +// DeleteConsumer deletes a Consumer. +func (js *js) DeleteConsumer(stream, durable string) error { + if stream == _EMPTY_ { + return ErrStreamNameRequired + } + + dcSubj := js.apiSubj(fmt.Sprintf(JSApiConsumerDeleteT, stream, durable)) + r, err := js.nc.Request(dcSubj, nil, js.wait) + if err != nil { + return err + } + var resp JSAPIConsumerDeleteResponse + if err := json.Unmarshal(r.Data, &resp); err != nil { + return err + } + if resp.Error != nil { + return errors.New(resp.Error.Description) + } + return nil +} + +// ConsumerInfo returns information about a Consumer. +func (js *js) ConsumerInfo(stream, durable string) (*ConsumerInfo, error) { + return js.getConsumerInfo(stream, durable) +} + +// APIPagedRequest includes parameters allowing specific pages to be requested +// from APIs responding with APIPaged. +type APIPagedRequest struct { + Offset int `json:"offset"` +} + +// JSAPIConsumersRequest is the type used for Consumers requests. +type JSAPIConsumersRequest struct { + APIPagedRequest +} + +// JSAPIConsumerListResponse is the response for a Consumers List request. +type JSAPIConsumerListResponse struct { + APIResponse + APIPaged + Consumers []*ConsumerInfo `json:"consumers"` +} + +// ConsumerLister fetches pages of ConsumerInfo objects. This object is not +// safe to use for multiple threads. +type ConsumerLister struct { + stream string + js *js + + err error + offset int + page []*ConsumerInfo + pageInfo *APIPaged +} + +// Next fetches the next ConsumerInfo page. +func (c *ConsumerLister) Next() bool { + if c.err != nil { + return false + } + if c.stream == _EMPTY_ { + c.err = ErrStreamNameRequired + return false + } + if c.pageInfo != nil && c.offset >= c.pageInfo.Total { + return false + } + + req, err := json.Marshal(JSAPIConsumersRequest{ + APIPagedRequest: APIPagedRequest{Offset: c.offset}, + }) + if err != nil { + c.err = err + return false + } + clSubj := c.js.apiSubj(fmt.Sprintf(JSApiConsumerListT, c.stream)) + r, err := c.js.nc.Request(clSubj, req, c.js.wait) + if err != nil { + c.err = err + return false + } + var resp JSAPIConsumerListResponse + if err := json.Unmarshal(r.Data, &resp); err != nil { + c.err = err + return false + } + if resp.Error != nil { + c.err = errors.New(resp.Error.Description) + return false + } + + c.pageInfo = &resp.APIPaged + c.page = resp.Consumers + c.offset += len(c.page) + return true +} + +// Page returns the current ConsumerInfo page. +func (c *ConsumerLister) Page() []*ConsumerInfo { + return c.page +} + +// Err returns any errors found while fetching pages. +func (c *ConsumerLister) Err() error { + return c.err +} + +// NewConsumerLister is used to return pages of ConsumerInfo objects. +func (js *js) NewConsumerLister(stream string) *ConsumerLister { + return &ConsumerLister{stream: stream, js: js} +} + // StreamConfig will determine the properties for a stream. // There are sensible defaults for most. If no subjects are // given the name will be used as the only subject. @@ -1214,6 +1368,276 @@ type StreamState struct { Consumers int `json:"consumer_count"` } +// UpdateStream updates a Stream. +func (js *js) UpdateStream(cfg *StreamConfig) (*StreamInfo, error) { + if cfg == nil || cfg.Name == _EMPTY_ { + return nil, ErrStreamNameRequired + } + + req, err := json.Marshal(cfg) + if err != nil { + return nil, err + } + + usSubj := js.apiSubj(fmt.Sprintf(JSApiStreamUpdateT, cfg.Name)) + r, err := js.nc.Request(usSubj, req, js.wait) + if err != nil { + return nil, err + } + var resp JSApiStreamInfoResponse + if err := json.Unmarshal(r.Data, &resp); err != nil { + return nil, err + } + if resp.Error != nil { + return nil, errors.New(resp.Error.Description) + } + return resp.StreamInfo, nil +} + +// JSAPIStreamDeleteResponse is the response for a Stream delete request. +type JSAPIStreamDeleteResponse struct { + APIResponse + Success bool `json:"success,omitempty"` +} + +// DeleteStream deletes a Stream. +func (js *js) DeleteStream(name string) error { + if name == _EMPTY_ { + return ErrStreamNameRequired + } + + dsSubj := js.apiSubj(fmt.Sprintf(JSApiStreamDeleteT, name)) + r, err := js.nc.Request(dsSubj, nil, js.wait) + if err != nil { + return err + } + var resp JSAPIStreamDeleteResponse + if err := json.Unmarshal(r.Data, &resp); err != nil { + return err + } + if resp.Error != nil { + return errors.New(resp.Error.Description) + } + return nil +} + +// StreamSnapshotConfig contains options for snapshotting a Stream. +type StreamSnapshotConfig struct { + // Subject to deliver the chunks to for the snapshot. + DeliverSubject string `json:"deliver_subject"` + // Do not include consumers in the snapshot. + NoConsumers bool `json:"no_consumers,omitempty"` + // Optional chunk size preference. + // Best to just let server select. + ChunkSize int `json:"chunk_size,omitempty"` + // Check all message's checksums prior to snapshot. + CheckMsgs bool `json:"jsck,omitempty"` +} + +// JSAPIStreamSnapshotResponse is the response for a snapshot request. +type JSAPIStreamSnapshotResponse struct { + APIResponse + // Estimate of number of blocks for the messages. + NumBlks int `json:"num_blks"` + // Block size limit as specified by the stream. + BlkSize int `json:"blk_size"` +} + +// SnapshotStream creates a snapshot of a Stream. It returns a copy of the +// snapshot data. +func (js *js) SnapshotStream(name string, timeout time.Duration, cfg *StreamSnapshotConfig) (io.Reader, error) { + if cfg == nil { + return nil, ErrStreamSnapshotConfigRequired + } + if cfg.DeliverSubject == "" { + return nil, ErrDeliverSubjectRequired + } + + req, err := json.Marshal(cfg) + if err != nil { + return nil, err + } + + ssSubj := js.apiSubj(fmt.Sprintf(JSApiStreamSnapshotT, name)) + r, err := js.nc.Request(ssSubj, req, js.wait) + if err != nil { + return nil, err + } + var resp JSAPIStreamSnapshotResponse + if err := json.Unmarshal(r.Data, &resp); err != nil { + return nil, err + } + if resp.Error != nil { + return nil, errors.New(resp.Error.Description) + } + + // Setup to process snapshot chunks. + var snapshot []byte + done := make(chan struct{}, 1) + sub, err := js.nc.Subscribe(cfg.DeliverSubject, func(m *Msg) { + // EOF + if len(m.Data) == 0 { + done <- struct{}{} + return + } + // Could be writing to a file here too. + snapshot = append(snapshot, m.Data...) + // Flow ack + m.Respond(nil) + }) + if err != nil { + return nil, err + } + defer sub.Unsubscribe() + // Wait to receive the snapshot. + select { + case <-done: + case <-time.After(timeout): + return nil, errors.New("nats: snapshot timeout exceeded") + } + + return bytes.NewReader(snapshot), nil +} + +// JSAPIStreamRestoreResponse is the direct response to the restore request. +type JSAPIStreamRestoreResponse struct { + APIResponse + // Subject to deliver the chunks to for the snapshot restore. + DeliverSubject string `json:"deliver_subject"` +} + +// RestoreStream restores a Stream from a snapshot. +func (js *js) RestoreStream(name string, snapshot io.Reader) error { + rsSubj := js.apiSubj(fmt.Sprintf(JSApiStreamRestoreT, name)) + r, err := js.nc.Request(rsSubj, nil, js.wait) + if err != nil { + return err + } + var resp JSAPIStreamRestoreResponse + if err := json.Unmarshal(r.Data, &resp); err != nil { + return err + } + if resp.Error != nil { + return errors.New(resp.Error.Description) + } + + // Can be any size message. + var chunk [512]byte + for r := snapshot; ; { + n, err := r.Read(chunk[:]) + if err != nil { + break + } + js.nc.Request(resp.DeliverSubject, chunk[:n], js.wait) + } + js.nc.Request(resp.DeliverSubject, nil, js.wait) + return nil +} + +// JSAPIStreamPurgeResponse. +type JSAPIStreamPurgeResponse struct { + APIResponse + Success bool `json:"success,omitempty"` + Purged uint64 `json:"purged"` +} + +// PurgeStream purges messages on a Stream. +func (js *js) PurgeStream(name string) error { + psSubj := js.apiSubj(fmt.Sprintf(JSApiStreamPurgeT, name)) + r, err := js.nc.Request(psSubj, nil, js.wait) + if err != nil { + return err + } + var resp JSAPIStreamPurgeResponse + if err := json.Unmarshal(r.Data, &resp); err != nil { + return err + } + if resp.Error != nil { + return errors.New(resp.Error.Description) + } + return nil +} + +// JSAPIStreamNamesRequest is used for Stream Name requests. +type JSAPIStreamNamesRequest struct { + APIPagedRequest + // These are filters that can be applied to the list. + Subject string `json:"subject,omitempty"` +} + +// JSApiStreamListResponse list of detailed stream information. +// A nil request is valid and means all streams. +type JSApiStreamListResponse struct { + APIResponse + APIPaged + Streams []*StreamInfo `json:"streams"` +} + +// StreamLister fetches pages of StreamInfo objects. This object is not safe +// to use for multiple threads. +type StreamLister struct { + js *js + page []*StreamInfo + err error + + offset int + pageInfo *APIPaged +} + +// Next fetches the next StreamInfo page. +func (s *StreamLister) Next() bool { + if s.err != nil { + return false + } + if s.pageInfo != nil && s.offset >= s.pageInfo.Total { + return false + } + + req, err := json.Marshal(JSAPIStreamNamesRequest{ + APIPagedRequest: APIPagedRequest{Offset: s.offset}, + }) + if err != nil { + s.err = err + return false + } + + slSubj := s.js.apiSubj(JSApiStreamList) + r, err := s.js.nc.Request(slSubj, req, s.js.wait) + if err != nil { + s.err = err + return false + } + var resp JSApiStreamListResponse + if err := json.Unmarshal(r.Data, &resp); err != nil { + s.err = err + return false + } + if resp.Error != nil { + s.err = errors.New(resp.Error.Description) + return false + } + + s.pageInfo = &resp.APIPaged + s.page = resp.Streams + s.offset += len(s.page) + return true +} + +// Page returns the current StreamInfo page. +func (s *StreamLister) Page() []*StreamInfo { + return s.page +} + +// Err returns any errors found while fetching pages. +func (s *StreamLister) Err() error { + return s.err +} + +// NewStreamLister is used to return pages of StreamInfo objects. +func (js *js) NewStreamLister() *StreamLister { + return &StreamLister{js: js} +} + // RetentionPolicy determines how messages in a set are retained. type RetentionPolicy int diff --git a/nats.go b/nats.go index caba0547b..2163036db 100644 --- a/nats.go +++ b/nats.go @@ -80,66 +80,68 @@ const ( // Errors var ( - ErrConnectionClosed = errors.New("nats: connection closed") - ErrConnectionDraining = errors.New("nats: connection draining") - ErrDrainTimeout = errors.New("nats: draining connection timed out") - ErrConnectionReconnecting = errors.New("nats: connection reconnecting") - ErrSecureConnRequired = errors.New("nats: secure connection required") - ErrSecureConnWanted = errors.New("nats: secure connection not available") - ErrBadSubscription = errors.New("nats: invalid subscription") - ErrTypeSubscription = errors.New("nats: invalid subscription type") - ErrBadSubject = errors.New("nats: invalid subject") - ErrBadQueueName = errors.New("nats: invalid queue name") - ErrSlowConsumer = errors.New("nats: slow consumer, messages dropped") - ErrTimeout = errors.New("nats: timeout") - ErrBadTimeout = errors.New("nats: timeout invalid") - ErrAuthorization = errors.New("nats: authorization violation") - ErrAuthExpired = errors.New("nats: authentication expired") - ErrNoServers = errors.New("nats: no servers available for connection") - ErrJsonParse = errors.New("nats: connect message, json parse error") - ErrChanArg = errors.New("nats: argument needs to be a channel type") - ErrMaxPayload = errors.New("nats: maximum payload exceeded") - ErrMaxMessages = errors.New("nats: maximum messages delivered") - ErrSyncSubRequired = errors.New("nats: illegal call on an async subscription") - ErrMultipleTLSConfigs = errors.New("nats: multiple tls.Configs not allowed") - ErrNoInfoReceived = errors.New("nats: protocol exception, INFO not received") - ErrReconnectBufExceeded = errors.New("nats: outbound buffer limit exceeded") - ErrInvalidConnection = errors.New("nats: invalid connection") - ErrInvalidMsg = errors.New("nats: invalid message or message nil") - ErrInvalidArg = errors.New("nats: invalid argument") - ErrInvalidContext = errors.New("nats: invalid context") - ErrNoDeadlineContext = errors.New("nats: context requires a deadline") - ErrNoEchoNotSupported = errors.New("nats: no echo option not supported by this server") - ErrClientIDNotSupported = errors.New("nats: client ID not supported by this server") - ErrUserButNoSigCB = errors.New("nats: user callback defined without a signature handler") - ErrNkeyButNoSigCB = errors.New("nats: nkey defined without a signature handler") - ErrNoUserCB = errors.New("nats: user callback not defined") - ErrNkeyAndUser = errors.New("nats: user callback and nkey defined") - ErrNkeysNotSupported = errors.New("nats: nkeys not supported by the server") - ErrStaleConnection = errors.New("nats: " + STALE_CONNECTION) - ErrTokenAlreadySet = errors.New("nats: token and token handler both set") - ErrMsgNotBound = errors.New("nats: message is not bound to subscription/connection") - ErrMsgNoReply = errors.New("nats: message does not have a reply") - ErrClientIPNotSupported = errors.New("nats: client IP not supported by this server") - ErrDisconnected = errors.New("nats: server is disconnected") - ErrHeadersNotSupported = errors.New("nats: headers not supported by this server") - ErrBadHeaderMsg = errors.New("nats: message could not decode headers") - ErrNoResponders = errors.New("nats: no responders available for request") - ErrNoContextOrTimeout = errors.New("nats: no context or timeout given") - ErrDirectModeRequired = errors.New("nats: direct access requires direct pull or push") - ErrPullModeNotAllowed = errors.New("nats: pull based not supported") - ErrJetStreamNotEnabled = errors.New("nats: jetstream not enabled") - ErrJetStreamBadPre = errors.New("nats: jetstream api prefix not valid") - ErrNoStreamResponse = errors.New("nats: no response from stream") - ErrNotJSMessage = errors.New("nats: not a jetstream message") - ErrInvalidStreamName = errors.New("nats: invalid stream name") - ErrNoMatchingStream = errors.New("nats: no stream matches subject") - ErrSubjectMismatch = errors.New("nats: subject does not match consumer") - ErrContextAndTimeout = errors.New("nats: context and timeout can not both be set") - ErrInvalidJSAck = errors.New("nats: invalid jetstream publish response") - ErrMultiStreamUnsupported = errors.New("nats: multiple streams are not supported") - ErrStreamNameRequired = errors.New("nats: stream name is required") - ErrConsumerConfigRequired = errors.New("nats: consumer configuration is required") + ErrConnectionClosed = errors.New("nats: connection closed") + ErrConnectionDraining = errors.New("nats: connection draining") + ErrDrainTimeout = errors.New("nats: draining connection timed out") + ErrConnectionReconnecting = errors.New("nats: connection reconnecting") + ErrSecureConnRequired = errors.New("nats: secure connection required") + ErrSecureConnWanted = errors.New("nats: secure connection not available") + ErrBadSubscription = errors.New("nats: invalid subscription") + ErrTypeSubscription = errors.New("nats: invalid subscription type") + ErrBadSubject = errors.New("nats: invalid subject") + ErrBadQueueName = errors.New("nats: invalid queue name") + ErrSlowConsumer = errors.New("nats: slow consumer, messages dropped") + ErrTimeout = errors.New("nats: timeout") + ErrBadTimeout = errors.New("nats: timeout invalid") + ErrAuthorization = errors.New("nats: authorization violation") + ErrAuthExpired = errors.New("nats: authentication expired") + ErrNoServers = errors.New("nats: no servers available for connection") + ErrJsonParse = errors.New("nats: connect message, json parse error") + ErrChanArg = errors.New("nats: argument needs to be a channel type") + ErrMaxPayload = errors.New("nats: maximum payload exceeded") + ErrMaxMessages = errors.New("nats: maximum messages delivered") + ErrSyncSubRequired = errors.New("nats: illegal call on an async subscription") + ErrMultipleTLSConfigs = errors.New("nats: multiple tls.Configs not allowed") + ErrNoInfoReceived = errors.New("nats: protocol exception, INFO not received") + ErrReconnectBufExceeded = errors.New("nats: outbound buffer limit exceeded") + ErrInvalidConnection = errors.New("nats: invalid connection") + ErrInvalidMsg = errors.New("nats: invalid message or message nil") + ErrInvalidArg = errors.New("nats: invalid argument") + ErrInvalidContext = errors.New("nats: invalid context") + ErrNoDeadlineContext = errors.New("nats: context requires a deadline") + ErrNoEchoNotSupported = errors.New("nats: no echo option not supported by this server") + ErrClientIDNotSupported = errors.New("nats: client ID not supported by this server") + ErrUserButNoSigCB = errors.New("nats: user callback defined without a signature handler") + ErrNkeyButNoSigCB = errors.New("nats: nkey defined without a signature handler") + ErrNoUserCB = errors.New("nats: user callback not defined") + ErrNkeyAndUser = errors.New("nats: user callback and nkey defined") + ErrNkeysNotSupported = errors.New("nats: nkeys not supported by the server") + ErrStaleConnection = errors.New("nats: " + STALE_CONNECTION) + ErrTokenAlreadySet = errors.New("nats: token and token handler both set") + ErrMsgNotBound = errors.New("nats: message is not bound to subscription/connection") + ErrMsgNoReply = errors.New("nats: message does not have a reply") + ErrClientIPNotSupported = errors.New("nats: client IP not supported by this server") + ErrDisconnected = errors.New("nats: server is disconnected") + ErrHeadersNotSupported = errors.New("nats: headers not supported by this server") + ErrBadHeaderMsg = errors.New("nats: message could not decode headers") + ErrNoResponders = errors.New("nats: no responders available for request") + ErrNoContextOrTimeout = errors.New("nats: no context or timeout given") + ErrDirectModeRequired = errors.New("nats: direct access requires direct pull or push") + ErrPullModeNotAllowed = errors.New("nats: pull based not supported") + ErrJetStreamNotEnabled = errors.New("nats: jetstream not enabled") + ErrJetStreamBadPre = errors.New("nats: jetstream api prefix not valid") + ErrNoStreamResponse = errors.New("nats: no response from stream") + ErrNotJSMessage = errors.New("nats: not a jetstream message") + ErrInvalidStreamName = errors.New("nats: invalid stream name") + ErrNoMatchingStream = errors.New("nats: no stream matches subject") + ErrSubjectMismatch = errors.New("nats: subject does not match consumer") + ErrContextAndTimeout = errors.New("nats: context and timeout can not both be set") + ErrInvalidJSAck = errors.New("nats: invalid jetstream publish response") + ErrMultiStreamUnsupported = errors.New("nats: multiple streams are not supported") + ErrStreamNameRequired = errors.New("nats: stream name is required") + ErrConsumerConfigRequired = errors.New("nats: consumer configuration is required") + ErrStreamSnapshotConfigRequired = errors.New("nats: stream snapshot configuration is required") + ErrDeliverSubjectRequired = errors.New("nats: deliver subject is required") ) func init() { diff --git a/test/js_test.go b/test/js_test.go index 13147f8e0..72b6929ac 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -498,7 +498,6 @@ func TestAckForNonJetStream(t *testing.T) { } } -// TODO(dlc) - fill out with more stuff. func TestJetStreamManagement(t *testing.T) { s := RunBasicJetStreamServer() defer s.Shutdown() @@ -519,6 +518,9 @@ func TestJetStreamManagement(t *testing.T) { } // Create the stream using our client API. + if _, err := js.AddStream(nil); err == nil { + t.Fatalf("Unexpected success") + } si, err := js.AddStream(&nats.StreamConfig{Name: "foo"}) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -527,6 +529,10 @@ func TestJetStreamManagement(t *testing.T) { t.Fatalf("StreamInfo is not correct %+v", si) } + for i := 0; i < 25; i++ { + js.Publish("foo", []byte("hi")) + } + // Check info calls. si, err = js.StreamInfo("foo") if err != nil { @@ -536,14 +542,127 @@ func TestJetStreamManagement(t *testing.T) { t.Fatalf("StreamInfo is not correct %+v", si) } + // Update the stream using our client API. + if _, err := js.UpdateStream(nil); err == nil { + t.Fatal("Unexpected success") + } + prevMaxMsgs := si.Config.MaxMsgs + si, err = js.UpdateStream(&nats.StreamConfig{Name: "foo", MaxMsgs: prevMaxMsgs + 100}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si == nil || si.Config.Name != "foo" || si.Config.MaxMsgs == prevMaxMsgs { + t.Fatalf("StreamInfo is not correct %+v", si) + } + // Create a consumer using our client API. - ci, err := js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckExplicit}) + if _, err := js.AddConsumer("", nil); err == nil { + t.Fatalf("Unexpected success") + } + ci, err := js.AddConsumer("foo", &nats.ConsumerConfig{ + Durable: "dlc", + AckPolicy: nats.AckExplicit, + }) if err != nil { t.Fatalf("Unexpected error: %v", err) } if ci == nil || ci.Name != "dlc" || ci.Stream != "foo" { t.Fatalf("ConsumerInfo is not correct %+v", ci) } + + // Check info calls. + ci, err = js.ConsumerInfo("foo", "dlc") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if ci == nil || ci.Config.Durable != "dlc" { + t.Fatalf("ConsumerInfo is not correct %+v", si) + } + + // Create a snapshot using our client API. + inboxSubj := nats.NewInbox() + snapshot, err := js.SnapshotStream("foo", 5*time.Second, &nats.StreamSnapshotConfig{ + DeliverSubject: inboxSubj, + ChunkSize: 1024, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + sl := js.NewStreamLister() + if !sl.Next() { + if err := sl.Err(); err != nil { + t.Errorf("Unexpected error: %v", err) + } + t.Fatalf("Unexpected stream lister next") + } + if p := sl.Page(); len(p) != 1 || p[0].Config.Name != "foo" { + t.Fatalf("StreamInfo is not correct %+v", p) + } + if err := sl.Err(); err != nil { + t.Errorf("Unexpected error: %v", err) + } + + if cl := js.NewConsumerLister(""); cl.Next() { + t.Fatalf("Unexpected next ok") + } else if err := cl.Err(); err == nil { + if cl.Next() { + t.Fatalf("Unexpected next ok") + } + t.Fatalf("Unexpected nil error") + } + cl := js.NewConsumerLister("foo") + if !cl.Next() { + if err := cl.Err(); err != nil { + t.Errorf("Unexpected error: %v", err) + } + t.Fatalf("Unexpected consumer lister next") + } + if p := cl.Page(); len(p) != 1 || p[0].Stream != "foo" || p[0].Config.Durable != "dlc" { + t.Fatalf("ConsumerInfo is not correct %+v", p) + } + if err := cl.Err(); err != nil { + t.Errorf("Unexpected error: %v", err) + } + + // Delete a consumer using our client API. + if err := js.DeleteConsumer("", ""); err == nil { + t.Fatalf("Unexpected success") + } + if err := js.DeleteConsumer("foo", "dlc"); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Delete a stream using our client API. + if err := js.DeleteStream(""); err == nil { + t.Fatal("Unexpected success") + } + if err := js.DeleteStream("foo"); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if _, err := js.StreamInfo("foo"); err == nil { + t.Fatalf("Unexpected success") + } + + // Restore a stream using our client API. + if err := js.RestoreStream("foo", snapshot); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si, err := js.StreamInfo("foo"); err != nil { + t.Fatalf("Unexpected error: %v", err) + } else if si.Config.Name != "foo" && si.State.Msgs != 25 { + t.Fatalf("StreamInfo is not correct %+v", si) + } + + // Purge a stream using our client API. + if err := js.PurgeStream("foo"); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si, err := js.StreamInfo("foo"); err != nil { + t.Fatalf("Unexpected error: %v", err) + } else if si.State.Msgs != 0 { + t.Fatalf("StreamInfo.Msgs is not correct") + } } func TestJetStreamImport(t *testing.T) { From 3e0aed6da769bd5b572edcfe263b9f136e672f61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jaime=20Pi=C3=B1a?= Date: Wed, 6 Jan 2021 13:35:35 -0800 Subject: [PATCH 2/2] Remove snapshot logic --- js.go | 118 ------------------------------------------------ test/js_test.go | 40 ++++------------ 2 files changed, 10 insertions(+), 148 deletions(-) diff --git a/js.go b/js.go index 992fb7de1..6ffe4021f 100644 --- a/js.go +++ b/js.go @@ -19,7 +19,6 @@ import ( "encoding/json" "errors" "fmt" - "io" "net/http" "strconv" "strings" @@ -52,10 +51,6 @@ type JetStreamManager interface { DeleteStream(name string) error // Stream information. StreamInfo(stream string) (*StreamInfo, error) - // Snapshot stream. - SnapshotStream(name string, timeout time.Duration, cfg *StreamSnapshotConfig) (io.Reader, error) - // Restore stream. - RestoreStream(name string, snapshot io.Reader) error // Purge stream messages. PurgeStream(name string) error // NewStreamLister is used to return pages of StreamInfo objects. @@ -1421,119 +1416,6 @@ func (js *js) DeleteStream(name string) error { return nil } -// StreamSnapshotConfig contains options for snapshotting a Stream. -type StreamSnapshotConfig struct { - // Subject to deliver the chunks to for the snapshot. - DeliverSubject string `json:"deliver_subject"` - // Do not include consumers in the snapshot. - NoConsumers bool `json:"no_consumers,omitempty"` - // Optional chunk size preference. - // Best to just let server select. - ChunkSize int `json:"chunk_size,omitempty"` - // Check all message's checksums prior to snapshot. - CheckMsgs bool `json:"jsck,omitempty"` -} - -// JSAPIStreamSnapshotResponse is the response for a snapshot request. -type JSAPIStreamSnapshotResponse struct { - APIResponse - // Estimate of number of blocks for the messages. - NumBlks int `json:"num_blks"` - // Block size limit as specified by the stream. - BlkSize int `json:"blk_size"` -} - -// SnapshotStream creates a snapshot of a Stream. It returns a copy of the -// snapshot data. -func (js *js) SnapshotStream(name string, timeout time.Duration, cfg *StreamSnapshotConfig) (io.Reader, error) { - if cfg == nil { - return nil, ErrStreamSnapshotConfigRequired - } - if cfg.DeliverSubject == "" { - return nil, ErrDeliverSubjectRequired - } - - req, err := json.Marshal(cfg) - if err != nil { - return nil, err - } - - ssSubj := js.apiSubj(fmt.Sprintf(JSApiStreamSnapshotT, name)) - r, err := js.nc.Request(ssSubj, req, js.wait) - if err != nil { - return nil, err - } - var resp JSAPIStreamSnapshotResponse - if err := json.Unmarshal(r.Data, &resp); err != nil { - return nil, err - } - if resp.Error != nil { - return nil, errors.New(resp.Error.Description) - } - - // Setup to process snapshot chunks. - var snapshot []byte - done := make(chan struct{}, 1) - sub, err := js.nc.Subscribe(cfg.DeliverSubject, func(m *Msg) { - // EOF - if len(m.Data) == 0 { - done <- struct{}{} - return - } - // Could be writing to a file here too. - snapshot = append(snapshot, m.Data...) - // Flow ack - m.Respond(nil) - }) - if err != nil { - return nil, err - } - defer sub.Unsubscribe() - // Wait to receive the snapshot. - select { - case <-done: - case <-time.After(timeout): - return nil, errors.New("nats: snapshot timeout exceeded") - } - - return bytes.NewReader(snapshot), nil -} - -// JSAPIStreamRestoreResponse is the direct response to the restore request. -type JSAPIStreamRestoreResponse struct { - APIResponse - // Subject to deliver the chunks to for the snapshot restore. - DeliverSubject string `json:"deliver_subject"` -} - -// RestoreStream restores a Stream from a snapshot. -func (js *js) RestoreStream(name string, snapshot io.Reader) error { - rsSubj := js.apiSubj(fmt.Sprintf(JSApiStreamRestoreT, name)) - r, err := js.nc.Request(rsSubj, nil, js.wait) - if err != nil { - return err - } - var resp JSAPIStreamRestoreResponse - if err := json.Unmarshal(r.Data, &resp); err != nil { - return err - } - if resp.Error != nil { - return errors.New(resp.Error.Description) - } - - // Can be any size message. - var chunk [512]byte - for r := snapshot; ; { - n, err := r.Read(chunk[:]) - if err != nil { - break - } - js.nc.Request(resp.DeliverSubject, chunk[:n], js.wait) - } - js.nc.Request(resp.DeliverSubject, nil, js.wait) - return nil -} - // JSAPIStreamPurgeResponse. type JSAPIStreamPurgeResponse struct { APIResponse diff --git a/test/js_test.go b/test/js_test.go index 72b6929ac..f7b5a9fe4 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -579,16 +579,6 @@ func TestJetStreamManagement(t *testing.T) { t.Fatalf("ConsumerInfo is not correct %+v", si) } - // Create a snapshot using our client API. - inboxSubj := nats.NewInbox() - snapshot, err := js.SnapshotStream("foo", 5*time.Second, &nats.StreamSnapshotConfig{ - DeliverSubject: inboxSubj, - ChunkSize: 1024, - }) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - sl := js.NewStreamLister() if !sl.Next() { if err := sl.Err(); err != nil { @@ -633,6 +623,16 @@ func TestJetStreamManagement(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } + // Purge a stream using our client API. + if err := js.PurgeStream("foo"); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si, err := js.StreamInfo("foo"); err != nil { + t.Fatalf("Unexpected error: %v", err) + } else if si.State.Msgs != 0 { + t.Fatalf("StreamInfo.Msgs is not correct") + } + // Delete a stream using our client API. if err := js.DeleteStream(""); err == nil { t.Fatal("Unexpected success") @@ -643,26 +643,6 @@ func TestJetStreamManagement(t *testing.T) { if _, err := js.StreamInfo("foo"); err == nil { t.Fatalf("Unexpected success") } - - // Restore a stream using our client API. - if err := js.RestoreStream("foo", snapshot); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if si, err := js.StreamInfo("foo"); err != nil { - t.Fatalf("Unexpected error: %v", err) - } else if si.Config.Name != "foo" && si.State.Msgs != 25 { - t.Fatalf("StreamInfo is not correct %+v", si) - } - - // Purge a stream using our client API. - if err := js.PurgeStream("foo"); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if si, err := js.StreamInfo("foo"); err != nil { - t.Fatalf("Unexpected error: %v", err) - } else if si.State.Msgs != 0 { - t.Fatalf("StreamInfo.Msgs is not correct") - } } func TestJetStreamImport(t *testing.T) {