Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more Stream and Consumer management APIs #626

Merged
merged 2 commits into from
Jan 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
322 changes: 314 additions & 8 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,25 @@ type JetStream interface {
type JetStreamManager interface {
// Create a stream.
AddStream(cfg *StreamConfig) (*StreamInfo, error)
// Create a consumer.
AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, error)
// Update a stream.
UpdateStream(cfg *StreamConfig) (*StreamInfo, error)
// Delete a stream.
DeleteStream(name string) error
// Stream information.
StreamInfo(stream string) (*StreamInfo, error)
// Purge stream messages.
PurgeStream(name string) error
// NewStreamLister is used to return pages of StreamInfo objects.
NewStreamLister() *StreamLister

// Create a consumer.
AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, error)
// Delete a consumer.
DeleteConsumer(stream, consumer string) error
// Consumer information.
ConsumerInfo(stream, name string) (*ConsumerInfo, error)
// NewConsumerLister is used to return pages of ConsumerInfo objects.
NewConsumerLister(stream string) *ConsumerLister
}

// JetStream is the public interface for the JetStream context.
Expand Down Expand Up @@ -106,8 +121,7 @@ const (
JSDefaultAPIPrefix = "$JS.API."
// JSApiAccountInfo is for obtaining general information about JetStream.
JSApiAccountInfo = "INFO"
// JSApiStreams can lookup a stream by subject.
JSApiStreams = "STREAM.NAMES"

// JSApiConsumerCreateT is used to create consumers.
JSApiConsumerCreateT = "CONSUMER.CREATE.%s"
// JSApiDurableCreateT is used to create durable consumers.
Expand All @@ -116,10 +130,29 @@ const (
JSApiConsumerInfoT = "CONSUMER.INFO.%s.%s"
// JSApiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode.
JSApiRequestNextT = "CONSUMER.MSG.NEXT.%s.%s"
// JSApiDeleteConsumerT is used to delete consumers.
JSApiConsumerDeleteT = "CONSUMER.DELETE.%s.%s"
// JSApiConsumerListT is used to return all detailed consumer information
JSApiConsumerListT = "CONSUMER.LIST.%s"

// JSApiStreams can lookup a stream by subject.
JSApiStreams = "STREAM.NAMES"
// JSApiStreamCreateT is the endpoint to create new streams.
JSApiStreamCreateT = "STREAM.CREATE.%s"
// JSApiStreamInfoT is the endpoint to get information on a stream.
JSApiStreamInfoT = "STREAM.INFO.%s"
// JSApiStreamUpdate is the endpoint to update existing streams.
JSApiStreamUpdateT = "STREAM.UPDATE.%s"
// JSApiStreamDeleteT is the endpoint to delete streams.
JSApiStreamDeleteT = "STREAM.DELETE.%s"
// JSApiStreamSnapshotT is the endpoint to snapshot streams.
JSApiStreamSnapshotT = "STREAM.SNAPSHOT.%s"
// JSApiStreamRestoreT is the endpoint to restore a stream from a snapshot.
JSApiStreamRestoreT = "STREAM.RESTORE.%s"
// JSApiPurgeStreamT is the endpoint to purge streams.
JSApiStreamPurgeT = "STREAM.PURGE.%s"
// JSApiStreamListT is the endpoint that will return all detailed stream information
JSApiStreamList = "STREAM.LIST"
)

// JetStream returns a JetStream context for pub/sub interactions.
Expand Down Expand Up @@ -1089,9 +1122,6 @@ func (p DeliverPolicy) MarshalJSON() ([]byte, error) {
}
}

// Management for JetStream
// TODO(dlc) - Fill this out.

// AddConsumer will add a JetStream consumer.
func (js *js) AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, error) {
if stream == _EMPTY_ {
Expand All @@ -1103,7 +1133,7 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, er
}

var ccSubj string
if cfg.Durable != _EMPTY_ {
if cfg != nil && cfg.Durable != _EMPTY_ {
ccSubj = fmt.Sprintf(JSApiDurableCreateT, stream, cfg.Durable)
} else {
ccSubj = fmt.Sprintf(JSApiConsumerCreateT, stream)
Expand All @@ -1127,6 +1157,125 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, er
return info.ConsumerInfo, nil
}

// JSAPIConsumerDeleteResponse is the response for a Consumer delete request.
type JSAPIConsumerDeleteResponse struct {
APIResponse
Success bool `json:"success,omitempty"`
}

// DeleteConsumer deletes a Consumer.
func (js *js) DeleteConsumer(stream, durable string) error {
variadico marked this conversation as resolved.
Show resolved Hide resolved
if stream == _EMPTY_ {
return ErrStreamNameRequired
}

dcSubj := js.apiSubj(fmt.Sprintf(JSApiConsumerDeleteT, stream, durable))
r, err := js.nc.Request(dcSubj, nil, js.wait)
if err != nil {
return err
}
var resp JSAPIConsumerDeleteResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
return err
}
if resp.Error != nil {
return errors.New(resp.Error.Description)
}
return nil
}

// ConsumerInfo returns information about a Consumer.
func (js *js) ConsumerInfo(stream, durable string) (*ConsumerInfo, error) {
return js.getConsumerInfo(stream, durable)
}

// APIPagedRequest includes parameters allowing specific pages to be requested
// from APIs responding with APIPaged.
type APIPagedRequest struct {
Offset int `json:"offset"`
}

// JSAPIConsumersRequest is the type used for Consumers requests.
type JSAPIConsumersRequest struct {
APIPagedRequest
}

// JSAPIConsumerListResponse is the response for a Consumers List request.
type JSAPIConsumerListResponse struct {
APIResponse
APIPaged
Consumers []*ConsumerInfo `json:"consumers"`
}

// ConsumerLister fetches pages of ConsumerInfo objects. This object is not
// safe to use for multiple threads.
type ConsumerLister struct {
variadico marked this conversation as resolved.
Show resolved Hide resolved
stream string
js *js

err error
offset int
page []*ConsumerInfo
pageInfo *APIPaged
}

// Next fetches the next ConsumerInfo page.
func (c *ConsumerLister) Next() bool {
if c.err != nil {
return false
}
if c.stream == _EMPTY_ {
variadico marked this conversation as resolved.
Show resolved Hide resolved
c.err = ErrStreamNameRequired
return false
}
if c.pageInfo != nil && c.offset >= c.pageInfo.Total {
return false
}

req, err := json.Marshal(JSAPIConsumersRequest{
APIPagedRequest: APIPagedRequest{Offset: c.offset},
})
if err != nil {
c.err = err
return false
}
clSubj := c.js.apiSubj(fmt.Sprintf(JSApiConsumerListT, c.stream))
r, err := c.js.nc.Request(clSubj, req, c.js.wait)
if err != nil {
c.err = err
return false
}
var resp JSAPIConsumerListResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
c.err = err
return false
}
if resp.Error != nil {
c.err = errors.New(resp.Error.Description)
return false
}

c.pageInfo = &resp.APIPaged
c.page = resp.Consumers
c.offset += len(c.page)
return true
}

// Page returns the current ConsumerInfo page.
func (c *ConsumerLister) Page() []*ConsumerInfo {
return c.page
}

// Err returns any errors found while fetching pages.
func (c *ConsumerLister) Err() error {
return c.err
}

// NewConsumerLister is used to return pages of ConsumerInfo objects.
func (js *js) NewConsumerLister(stream string) *ConsumerLister {
return &ConsumerLister{stream: stream, js: js}
}

// StreamConfig will determine the properties for a stream.
// There are sensible defaults for most. If no subjects are
// given the name will be used as the only subject.
Expand Down Expand Up @@ -1214,6 +1363,163 @@ type StreamState struct {
Consumers int `json:"consumer_count"`
}

// UpdateStream updates a Stream.
func (js *js) UpdateStream(cfg *StreamConfig) (*StreamInfo, error) {
if cfg == nil || cfg.Name == _EMPTY_ {
return nil, ErrStreamNameRequired
}

req, err := json.Marshal(cfg)
if err != nil {
return nil, err
}

usSubj := js.apiSubj(fmt.Sprintf(JSApiStreamUpdateT, cfg.Name))
r, err := js.nc.Request(usSubj, req, js.wait)
if err != nil {
return nil, err
}
var resp JSApiStreamInfoResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
return nil, err
}
if resp.Error != nil {
return nil, errors.New(resp.Error.Description)
}
return resp.StreamInfo, nil
}

// JSAPIStreamDeleteResponse is the response for a Stream delete request.
type JSAPIStreamDeleteResponse struct {
APIResponse
Success bool `json:"success,omitempty"`
}

// DeleteStream deletes a Stream.
func (js *js) DeleteStream(name string) error {
if name == _EMPTY_ {
return ErrStreamNameRequired
}

dsSubj := js.apiSubj(fmt.Sprintf(JSApiStreamDeleteT, name))
r, err := js.nc.Request(dsSubj, nil, js.wait)
if err != nil {
return err
}
var resp JSAPIStreamDeleteResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
return err
}
if resp.Error != nil {
return errors.New(resp.Error.Description)
}
return nil
}

// JSAPIStreamPurgeResponse.
type JSAPIStreamPurgeResponse struct {
APIResponse
Success bool `json:"success,omitempty"`
Purged uint64 `json:"purged"`
}

// PurgeStream purges messages on a Stream.
func (js *js) PurgeStream(name string) error {
psSubj := js.apiSubj(fmt.Sprintf(JSApiStreamPurgeT, name))
r, err := js.nc.Request(psSubj, nil, js.wait)
if err != nil {
return err
}
var resp JSAPIStreamPurgeResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
return err
}
if resp.Error != nil {
return errors.New(resp.Error.Description)
}
return nil
}

// JSAPIStreamNamesRequest is used for Stream Name requests.
type JSAPIStreamNamesRequest struct {
APIPagedRequest
// These are filters that can be applied to the list.
Subject string `json:"subject,omitempty"`
}

// JSApiStreamListResponse list of detailed stream information.
// A nil request is valid and means all streams.
type JSApiStreamListResponse struct {
APIResponse
APIPaged
Streams []*StreamInfo `json:"streams"`
}

// StreamLister fetches pages of StreamInfo objects. This object is not safe
// to use for multiple threads.
type StreamLister struct {
variadico marked this conversation as resolved.
Show resolved Hide resolved
js *js
page []*StreamInfo
err error

offset int
pageInfo *APIPaged
}

// Next fetches the next StreamInfo page.
func (s *StreamLister) Next() bool {
if s.err != nil {
return false
}
if s.pageInfo != nil && s.offset >= s.pageInfo.Total {
return false
}

req, err := json.Marshal(JSAPIStreamNamesRequest{
APIPagedRequest: APIPagedRequest{Offset: s.offset},
})
if err != nil {
s.err = err
return false
}

slSubj := s.js.apiSubj(JSApiStreamList)
r, err := s.js.nc.Request(slSubj, req, s.js.wait)
if err != nil {
s.err = err
return false
}
var resp JSApiStreamListResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
s.err = err
return false
}
if resp.Error != nil {
s.err = errors.New(resp.Error.Description)
return false
}

s.pageInfo = &resp.APIPaged
s.page = resp.Streams
s.offset += len(s.page)
return true
}

// Page returns the current StreamInfo page.
func (s *StreamLister) Page() []*StreamInfo {
return s.page
}

// Err returns any errors found while fetching pages.
func (s *StreamLister) Err() error {
return s.err
}

// NewStreamLister is used to return pages of StreamInfo objects.
func (js *js) NewStreamLister() *StreamLister {
return &StreamLister{js: js}
}

// RetentionPolicy determines how messages in a set are retained.
type RetentionPolicy int

Expand Down
Loading