diff --git a/js.go b/js.go index 942e1c78d..6ffe4021f 100644 --- a/js.go +++ b/js.go @@ -45,10 +45,25 @@ type JetStream interface { type JetStreamManager interface { // Create a stream. AddStream(cfg *StreamConfig) (*StreamInfo, error) - // Create a consumer. - AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, error) + // Update a stream. + UpdateStream(cfg *StreamConfig) (*StreamInfo, error) + // Delete a stream. + DeleteStream(name string) error // Stream information. StreamInfo(stream string) (*StreamInfo, error) + // Purge stream messages. + PurgeStream(name string) error + // NewStreamLister is used to return pages of StreamInfo objects. + NewStreamLister() *StreamLister + + // Create a consumer. + AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, error) + // Delete a consumer. + DeleteConsumer(stream, consumer string) error + // Consumer information. + ConsumerInfo(stream, name string) (*ConsumerInfo, error) + // NewConsumerLister is used to return pages of ConsumerInfo objects. + NewConsumerLister(stream string) *ConsumerLister } // JetStream is the public interface for the JetStream context. @@ -106,8 +121,7 @@ const ( JSDefaultAPIPrefix = "$JS.API." // JSApiAccountInfo is for obtaining general information about JetStream. JSApiAccountInfo = "INFO" - // JSApiStreams can lookup a stream by subject. - JSApiStreams = "STREAM.NAMES" + // JSApiConsumerCreateT is used to create consumers. JSApiConsumerCreateT = "CONSUMER.CREATE.%s" // JSApiDurableCreateT is used to create durable consumers. @@ -116,10 +130,29 @@ const ( JSApiConsumerInfoT = "CONSUMER.INFO.%s.%s" // JSApiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode. JSApiRequestNextT = "CONSUMER.MSG.NEXT.%s.%s" + // JSApiDeleteConsumerT is used to delete consumers. + JSApiConsumerDeleteT = "CONSUMER.DELETE.%s.%s" + // JSApiConsumerListT is used to return all detailed consumer information + JSApiConsumerListT = "CONSUMER.LIST.%s" + + // JSApiStreams can lookup a stream by subject. + JSApiStreams = "STREAM.NAMES" // JSApiStreamCreateT is the endpoint to create new streams. JSApiStreamCreateT = "STREAM.CREATE.%s" // JSApiStreamInfoT is the endpoint to get information on a stream. JSApiStreamInfoT = "STREAM.INFO.%s" + // JSApiStreamUpdate is the endpoint to update existing streams. + JSApiStreamUpdateT = "STREAM.UPDATE.%s" + // JSApiStreamDeleteT is the endpoint to delete streams. + JSApiStreamDeleteT = "STREAM.DELETE.%s" + // JSApiStreamSnapshotT is the endpoint to snapshot streams. + JSApiStreamSnapshotT = "STREAM.SNAPSHOT.%s" + // JSApiStreamRestoreT is the endpoint to restore a stream from a snapshot. + JSApiStreamRestoreT = "STREAM.RESTORE.%s" + // JSApiPurgeStreamT is the endpoint to purge streams. + JSApiStreamPurgeT = "STREAM.PURGE.%s" + // JSApiStreamListT is the endpoint that will return all detailed stream information + JSApiStreamList = "STREAM.LIST" ) // JetStream returns a JetStream context for pub/sub interactions. @@ -1089,9 +1122,6 @@ func (p DeliverPolicy) MarshalJSON() ([]byte, error) { } } -// Management for JetStream -// TODO(dlc) - Fill this out. - // AddConsumer will add a JetStream consumer. func (js *js) AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, error) { if stream == _EMPTY_ { @@ -1103,7 +1133,7 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, er } var ccSubj string - if cfg.Durable != _EMPTY_ { + if cfg != nil && cfg.Durable != _EMPTY_ { ccSubj = fmt.Sprintf(JSApiDurableCreateT, stream, cfg.Durable) } else { ccSubj = fmt.Sprintf(JSApiConsumerCreateT, stream) @@ -1127,6 +1157,125 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, er return info.ConsumerInfo, nil } +// JSAPIConsumerDeleteResponse is the response for a Consumer delete request. +type JSAPIConsumerDeleteResponse struct { + APIResponse + Success bool `json:"success,omitempty"` +} + +// DeleteConsumer deletes a Consumer. +func (js *js) DeleteConsumer(stream, durable string) error { + if stream == _EMPTY_ { + return ErrStreamNameRequired + } + + dcSubj := js.apiSubj(fmt.Sprintf(JSApiConsumerDeleteT, stream, durable)) + r, err := js.nc.Request(dcSubj, nil, js.wait) + if err != nil { + return err + } + var resp JSAPIConsumerDeleteResponse + if err := json.Unmarshal(r.Data, &resp); err != nil { + return err + } + if resp.Error != nil { + return errors.New(resp.Error.Description) + } + return nil +} + +// ConsumerInfo returns information about a Consumer. +func (js *js) ConsumerInfo(stream, durable string) (*ConsumerInfo, error) { + return js.getConsumerInfo(stream, durable) +} + +// APIPagedRequest includes parameters allowing specific pages to be requested +// from APIs responding with APIPaged. +type APIPagedRequest struct { + Offset int `json:"offset"` +} + +// JSAPIConsumersRequest is the type used for Consumers requests. +type JSAPIConsumersRequest struct { + APIPagedRequest +} + +// JSAPIConsumerListResponse is the response for a Consumers List request. +type JSAPIConsumerListResponse struct { + APIResponse + APIPaged + Consumers []*ConsumerInfo `json:"consumers"` +} + +// ConsumerLister fetches pages of ConsumerInfo objects. This object is not +// safe to use for multiple threads. +type ConsumerLister struct { + stream string + js *js + + err error + offset int + page []*ConsumerInfo + pageInfo *APIPaged +} + +// Next fetches the next ConsumerInfo page. +func (c *ConsumerLister) Next() bool { + if c.err != nil { + return false + } + if c.stream == _EMPTY_ { + c.err = ErrStreamNameRequired + return false + } + if c.pageInfo != nil && c.offset >= c.pageInfo.Total { + return false + } + + req, err := json.Marshal(JSAPIConsumersRequest{ + APIPagedRequest: APIPagedRequest{Offset: c.offset}, + }) + if err != nil { + c.err = err + return false + } + clSubj := c.js.apiSubj(fmt.Sprintf(JSApiConsumerListT, c.stream)) + r, err := c.js.nc.Request(clSubj, req, c.js.wait) + if err != nil { + c.err = err + return false + } + var resp JSAPIConsumerListResponse + if err := json.Unmarshal(r.Data, &resp); err != nil { + c.err = err + return false + } + if resp.Error != nil { + c.err = errors.New(resp.Error.Description) + return false + } + + c.pageInfo = &resp.APIPaged + c.page = resp.Consumers + c.offset += len(c.page) + return true +} + +// Page returns the current ConsumerInfo page. +func (c *ConsumerLister) Page() []*ConsumerInfo { + return c.page +} + +// Err returns any errors found while fetching pages. +func (c *ConsumerLister) Err() error { + return c.err +} + +// NewConsumerLister is used to return pages of ConsumerInfo objects. +func (js *js) NewConsumerLister(stream string) *ConsumerLister { + return &ConsumerLister{stream: stream, js: js} +} + // StreamConfig will determine the properties for a stream. // There are sensible defaults for most. If no subjects are // given the name will be used as the only subject. @@ -1214,6 +1363,163 @@ type StreamState struct { Consumers int `json:"consumer_count"` } +// UpdateStream updates a Stream. +func (js *js) UpdateStream(cfg *StreamConfig) (*StreamInfo, error) { + if cfg == nil || cfg.Name == _EMPTY_ { + return nil, ErrStreamNameRequired + } + + req, err := json.Marshal(cfg) + if err != nil { + return nil, err + } + + usSubj := js.apiSubj(fmt.Sprintf(JSApiStreamUpdateT, cfg.Name)) + r, err := js.nc.Request(usSubj, req, js.wait) + if err != nil { + return nil, err + } + var resp JSApiStreamInfoResponse + if err := json.Unmarshal(r.Data, &resp); err != nil { + return nil, err + } + if resp.Error != nil { + return nil, errors.New(resp.Error.Description) + } + return resp.StreamInfo, nil +} + +// JSAPIStreamDeleteResponse is the response for a Stream delete request. +type JSAPIStreamDeleteResponse struct { + APIResponse + Success bool `json:"success,omitempty"` +} + +// DeleteStream deletes a Stream. +func (js *js) DeleteStream(name string) error { + if name == _EMPTY_ { + return ErrStreamNameRequired + } + + dsSubj := js.apiSubj(fmt.Sprintf(JSApiStreamDeleteT, name)) + r, err := js.nc.Request(dsSubj, nil, js.wait) + if err != nil { + return err + } + var resp JSAPIStreamDeleteResponse + if err := json.Unmarshal(r.Data, &resp); err != nil { + return err + } + if resp.Error != nil { + return errors.New(resp.Error.Description) + } + return nil +} + +// JSAPIStreamPurgeResponse. +type JSAPIStreamPurgeResponse struct { + APIResponse + Success bool `json:"success,omitempty"` + Purged uint64 `json:"purged"` +} + +// PurgeStream purges messages on a Stream. +func (js *js) PurgeStream(name string) error { + psSubj := js.apiSubj(fmt.Sprintf(JSApiStreamPurgeT, name)) + r, err := js.nc.Request(psSubj, nil, js.wait) + if err != nil { + return err + } + var resp JSAPIStreamPurgeResponse + if err := json.Unmarshal(r.Data, &resp); err != nil { + return err + } + if resp.Error != nil { + return errors.New(resp.Error.Description) + } + return nil +} + +// JSAPIStreamNamesRequest is used for Stream Name requests. +type JSAPIStreamNamesRequest struct { + APIPagedRequest + // These are filters that can be applied to the list. + Subject string `json:"subject,omitempty"` +} + +// JSApiStreamListResponse list of detailed stream information. +// A nil request is valid and means all streams. +type JSApiStreamListResponse struct { + APIResponse + APIPaged + Streams []*StreamInfo `json:"streams"` +} + +// StreamLister fetches pages of StreamInfo objects. This object is not safe +// to use for multiple threads. +type StreamLister struct { + js *js + page []*StreamInfo + err error + + offset int + pageInfo *APIPaged +} + +// Next fetches the next StreamInfo page. +func (s *StreamLister) Next() bool { + if s.err != nil { + return false + } + if s.pageInfo != nil && s.offset >= s.pageInfo.Total { + return false + } + + req, err := json.Marshal(JSAPIStreamNamesRequest{ + APIPagedRequest: APIPagedRequest{Offset: s.offset}, + }) + if err != nil { + s.err = err + return false + } + + slSubj := s.js.apiSubj(JSApiStreamList) + r, err := s.js.nc.Request(slSubj, req, s.js.wait) + if err != nil { + s.err = err + return false + } + var resp JSApiStreamListResponse + if err := json.Unmarshal(r.Data, &resp); err != nil { + s.err = err + return false + } + if resp.Error != nil { + s.err = errors.New(resp.Error.Description) + return false + } + + s.pageInfo = &resp.APIPaged + s.page = resp.Streams + s.offset += len(s.page) + return true +} + +// Page returns the current StreamInfo page. +func (s *StreamLister) Page() []*StreamInfo { + return s.page +} + +// Err returns any errors found while fetching pages. +func (s *StreamLister) Err() error { + return s.err +} + +// NewStreamLister is used to return pages of StreamInfo objects. +func (js *js) NewStreamLister() *StreamLister { + return &StreamLister{js: js} +} + // RetentionPolicy determines how messages in a set are retained. type RetentionPolicy int diff --git a/nats.go b/nats.go index caba0547b..2163036db 100644 --- a/nats.go +++ b/nats.go @@ -80,66 +80,68 @@ const ( // Errors var ( - ErrConnectionClosed = errors.New("nats: connection closed") - ErrConnectionDraining = errors.New("nats: connection draining") - ErrDrainTimeout = errors.New("nats: draining connection timed out") - ErrConnectionReconnecting = errors.New("nats: connection reconnecting") - ErrSecureConnRequired = errors.New("nats: secure connection required") - ErrSecureConnWanted = errors.New("nats: secure connection not available") - ErrBadSubscription = errors.New("nats: invalid subscription") - ErrTypeSubscription = errors.New("nats: invalid subscription type") - ErrBadSubject = errors.New("nats: invalid subject") - ErrBadQueueName = errors.New("nats: invalid queue name") - ErrSlowConsumer = errors.New("nats: slow consumer, messages dropped") - ErrTimeout = errors.New("nats: timeout") - ErrBadTimeout = errors.New("nats: timeout invalid") - ErrAuthorization = errors.New("nats: authorization violation") - ErrAuthExpired = errors.New("nats: authentication expired") - ErrNoServers = errors.New("nats: no servers available for connection") - ErrJsonParse = errors.New("nats: connect message, json parse error") - ErrChanArg = errors.New("nats: argument needs to be a channel type") - ErrMaxPayload = errors.New("nats: maximum payload exceeded") - ErrMaxMessages = errors.New("nats: maximum messages delivered") - ErrSyncSubRequired = errors.New("nats: illegal call on an async subscription") - ErrMultipleTLSConfigs = errors.New("nats: multiple tls.Configs not allowed") - ErrNoInfoReceived = errors.New("nats: protocol exception, INFO not received") - ErrReconnectBufExceeded = errors.New("nats: outbound buffer limit exceeded") - ErrInvalidConnection = errors.New("nats: invalid connection") - ErrInvalidMsg = errors.New("nats: invalid message or message nil") - ErrInvalidArg = errors.New("nats: invalid argument") - ErrInvalidContext = errors.New("nats: invalid context") - ErrNoDeadlineContext = errors.New("nats: context requires a deadline") - ErrNoEchoNotSupported = errors.New("nats: no echo option not supported by this server") - ErrClientIDNotSupported = errors.New("nats: client ID not supported by this server") - ErrUserButNoSigCB = errors.New("nats: user callback defined without a signature handler") - ErrNkeyButNoSigCB = errors.New("nats: nkey defined without a signature handler") - ErrNoUserCB = errors.New("nats: user callback not defined") - ErrNkeyAndUser = errors.New("nats: user callback and nkey defined") - ErrNkeysNotSupported = errors.New("nats: nkeys not supported by the server") - ErrStaleConnection = errors.New("nats: " + STALE_CONNECTION) - ErrTokenAlreadySet = errors.New("nats: token and token handler both set") - ErrMsgNotBound = errors.New("nats: message is not bound to subscription/connection") - ErrMsgNoReply = errors.New("nats: message does not have a reply") - ErrClientIPNotSupported = errors.New("nats: client IP not supported by this server") - ErrDisconnected = errors.New("nats: server is disconnected") - ErrHeadersNotSupported = errors.New("nats: headers not supported by this server") - ErrBadHeaderMsg = errors.New("nats: message could not decode headers") - ErrNoResponders = errors.New("nats: no responders available for request") - ErrNoContextOrTimeout = errors.New("nats: no context or timeout given") - ErrDirectModeRequired = errors.New("nats: direct access requires direct pull or push") - ErrPullModeNotAllowed = errors.New("nats: pull based not supported") - ErrJetStreamNotEnabled = errors.New("nats: jetstream not enabled") - ErrJetStreamBadPre = errors.New("nats: jetstream api prefix not valid") - ErrNoStreamResponse = errors.New("nats: no response from stream") - ErrNotJSMessage = errors.New("nats: not a jetstream message") - ErrInvalidStreamName = errors.New("nats: invalid stream name") - ErrNoMatchingStream = errors.New("nats: no stream matches subject") - ErrSubjectMismatch = errors.New("nats: subject does not match consumer") - ErrContextAndTimeout = errors.New("nats: context and timeout can not both be set") - ErrInvalidJSAck = errors.New("nats: invalid jetstream publish response") - ErrMultiStreamUnsupported = errors.New("nats: multiple streams are not supported") - ErrStreamNameRequired = errors.New("nats: stream name is required") - ErrConsumerConfigRequired = errors.New("nats: consumer configuration is required") + ErrConnectionClosed = errors.New("nats: connection closed") + ErrConnectionDraining = errors.New("nats: connection draining") + ErrDrainTimeout = errors.New("nats: draining connection timed out") + ErrConnectionReconnecting = errors.New("nats: connection reconnecting") + ErrSecureConnRequired = errors.New("nats: secure connection required") + ErrSecureConnWanted = errors.New("nats: secure connection not available") + ErrBadSubscription = errors.New("nats: invalid subscription") + ErrTypeSubscription = errors.New("nats: invalid subscription type") + ErrBadSubject = errors.New("nats: invalid subject") + ErrBadQueueName = errors.New("nats: invalid queue name") + ErrSlowConsumer = errors.New("nats: slow consumer, messages dropped") + ErrTimeout = errors.New("nats: timeout") + ErrBadTimeout = errors.New("nats: timeout invalid") + ErrAuthorization = errors.New("nats: authorization violation") + ErrAuthExpired = errors.New("nats: authentication expired") + ErrNoServers = errors.New("nats: no servers available for connection") + ErrJsonParse = errors.New("nats: connect message, json parse error") + ErrChanArg = errors.New("nats: argument needs to be a channel type") + ErrMaxPayload = errors.New("nats: maximum payload exceeded") + ErrMaxMessages = errors.New("nats: maximum messages delivered") + ErrSyncSubRequired = errors.New("nats: illegal call on an async subscription") + ErrMultipleTLSConfigs = errors.New("nats: multiple tls.Configs not allowed") + ErrNoInfoReceived = errors.New("nats: protocol exception, INFO not received") + ErrReconnectBufExceeded = errors.New("nats: outbound buffer limit exceeded") + ErrInvalidConnection = errors.New("nats: invalid connection") + ErrInvalidMsg = errors.New("nats: invalid message or message nil") + ErrInvalidArg = errors.New("nats: invalid argument") + ErrInvalidContext = errors.New("nats: invalid context") + ErrNoDeadlineContext = errors.New("nats: context requires a deadline") + ErrNoEchoNotSupported = errors.New("nats: no echo option not supported by this server") + ErrClientIDNotSupported = errors.New("nats: client ID not supported by this server") + ErrUserButNoSigCB = errors.New("nats: user callback defined without a signature handler") + ErrNkeyButNoSigCB = errors.New("nats: nkey defined without a signature handler") + ErrNoUserCB = errors.New("nats: user callback not defined") + ErrNkeyAndUser = errors.New("nats: user callback and nkey defined") + ErrNkeysNotSupported = errors.New("nats: nkeys not supported by the server") + ErrStaleConnection = errors.New("nats: " + STALE_CONNECTION) + ErrTokenAlreadySet = errors.New("nats: token and token handler both set") + ErrMsgNotBound = errors.New("nats: message is not bound to subscription/connection") + ErrMsgNoReply = errors.New("nats: message does not have a reply") + ErrClientIPNotSupported = errors.New("nats: client IP not supported by this server") + ErrDisconnected = errors.New("nats: server is disconnected") + ErrHeadersNotSupported = errors.New("nats: headers not supported by this server") + ErrBadHeaderMsg = errors.New("nats: message could not decode headers") + ErrNoResponders = errors.New("nats: no responders available for request") + ErrNoContextOrTimeout = errors.New("nats: no context or timeout given") + ErrDirectModeRequired = errors.New("nats: direct access requires direct pull or push") + ErrPullModeNotAllowed = errors.New("nats: pull based not supported") + ErrJetStreamNotEnabled = errors.New("nats: jetstream not enabled") + ErrJetStreamBadPre = errors.New("nats: jetstream api prefix not valid") + ErrNoStreamResponse = errors.New("nats: no response from stream") + ErrNotJSMessage = errors.New("nats: not a jetstream message") + ErrInvalidStreamName = errors.New("nats: invalid stream name") + ErrNoMatchingStream = errors.New("nats: no stream matches subject") + ErrSubjectMismatch = errors.New("nats: subject does not match consumer") + ErrContextAndTimeout = errors.New("nats: context and timeout can not both be set") + ErrInvalidJSAck = errors.New("nats: invalid jetstream publish response") + ErrMultiStreamUnsupported = errors.New("nats: multiple streams are not supported") + ErrStreamNameRequired = errors.New("nats: stream name is required") + ErrConsumerConfigRequired = errors.New("nats: consumer configuration is required") + ErrStreamSnapshotConfigRequired = errors.New("nats: stream snapshot configuration is required") + ErrDeliverSubjectRequired = errors.New("nats: deliver subject is required") ) func init() { diff --git a/test/js_test.go b/test/js_test.go index 13147f8e0..f7b5a9fe4 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -498,7 +498,6 @@ func TestAckForNonJetStream(t *testing.T) { } } -// TODO(dlc) - fill out with more stuff. func TestJetStreamManagement(t *testing.T) { s := RunBasicJetStreamServer() defer s.Shutdown() @@ -519,6 +518,9 @@ func TestJetStreamManagement(t *testing.T) { } // Create the stream using our client API. + if _, err := js.AddStream(nil); err == nil { + t.Fatalf("Unexpected success") + } si, err := js.AddStream(&nats.StreamConfig{Name: "foo"}) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -527,6 +529,10 @@ func TestJetStreamManagement(t *testing.T) { t.Fatalf("StreamInfo is not correct %+v", si) } + for i := 0; i < 25; i++ { + js.Publish("foo", []byte("hi")) + } + // Check info calls. si, err = js.StreamInfo("foo") if err != nil { @@ -536,14 +542,107 @@ func TestJetStreamManagement(t *testing.T) { t.Fatalf("StreamInfo is not correct %+v", si) } + // Update the stream using our client API. + if _, err := js.UpdateStream(nil); err == nil { + t.Fatal("Unexpected success") + } + prevMaxMsgs := si.Config.MaxMsgs + si, err = js.UpdateStream(&nats.StreamConfig{Name: "foo", MaxMsgs: prevMaxMsgs + 100}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si == nil || si.Config.Name != "foo" || si.Config.MaxMsgs == prevMaxMsgs { + t.Fatalf("StreamInfo is not correct %+v", si) + } + // Create a consumer using our client API. - ci, err := js.AddConsumer("foo", &nats.ConsumerConfig{Durable: "dlc", AckPolicy: nats.AckExplicit}) + if _, err := js.AddConsumer("", nil); err == nil { + t.Fatalf("Unexpected success") + } + ci, err := js.AddConsumer("foo", &nats.ConsumerConfig{ + Durable: "dlc", + AckPolicy: nats.AckExplicit, + }) if err != nil { t.Fatalf("Unexpected error: %v", err) } if ci == nil || ci.Name != "dlc" || ci.Stream != "foo" { t.Fatalf("ConsumerInfo is not correct %+v", ci) } + + // Check info calls. + ci, err = js.ConsumerInfo("foo", "dlc") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if ci == nil || ci.Config.Durable != "dlc" { + t.Fatalf("ConsumerInfo is not correct %+v", si) + } + + sl := js.NewStreamLister() + if !sl.Next() { + if err := sl.Err(); err != nil { + t.Errorf("Unexpected error: %v", err) + } + t.Fatalf("Unexpected stream lister next") + } + if p := sl.Page(); len(p) != 1 || p[0].Config.Name != "foo" { + t.Fatalf("StreamInfo is not correct %+v", p) + } + if err := sl.Err(); err != nil { + t.Errorf("Unexpected error: %v", err) + } + + if cl := js.NewConsumerLister(""); cl.Next() { + t.Fatalf("Unexpected next ok") + } else if err := cl.Err(); err == nil { + if cl.Next() { + t.Fatalf("Unexpected next ok") + } + t.Fatalf("Unexpected nil error") + } + cl := js.NewConsumerLister("foo") + if !cl.Next() { + if err := cl.Err(); err != nil { + t.Errorf("Unexpected error: %v", err) + } + t.Fatalf("Unexpected consumer lister next") + } + if p := cl.Page(); len(p) != 1 || p[0].Stream != "foo" || p[0].Config.Durable != "dlc" { + t.Fatalf("ConsumerInfo is not correct %+v", p) + } + if err := cl.Err(); err != nil { + t.Errorf("Unexpected error: %v", err) + } + + // Delete a consumer using our client API. + if err := js.DeleteConsumer("", ""); err == nil { + t.Fatalf("Unexpected success") + } + if err := js.DeleteConsumer("foo", "dlc"); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Purge a stream using our client API. + if err := js.PurgeStream("foo"); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if si, err := js.StreamInfo("foo"); err != nil { + t.Fatalf("Unexpected error: %v", err) + } else if si.State.Msgs != 0 { + t.Fatalf("StreamInfo.Msgs is not correct") + } + + // Delete a stream using our client API. + if err := js.DeleteStream(""); err == nil { + t.Fatal("Unexpected success") + } + if err := js.DeleteStream("foo"); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if _, err := js.StreamInfo("foo"); err == nil { + t.Fatalf("Unexpected success") + } } func TestJetStreamImport(t *testing.T) {