Skip to content

Commit

Permalink
Merge 3e0aed6 into b4450fb
Browse files Browse the repository at this point in the history
  • Loading branch information
variadico committed Jan 6, 2021
2 parents b4450fb + 3e0aed6 commit 694bccf
Show file tree
Hide file tree
Showing 3 changed files with 477 additions and 70 deletions.
322 changes: 314 additions & 8 deletions js.go
Expand Up @@ -45,10 +45,25 @@ type JetStream interface {
type JetStreamManager interface {
// Create a stream.
AddStream(cfg *StreamConfig) (*StreamInfo, error)
// Create a consumer.
AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, error)
// Update a stream.
UpdateStream(cfg *StreamConfig) (*StreamInfo, error)
// Delete a stream.
DeleteStream(name string) error
// Stream information.
StreamInfo(stream string) (*StreamInfo, error)
// Purge stream messages.
PurgeStream(name string) error
// NewStreamLister is used to return pages of StreamInfo objects.
NewStreamLister() *StreamLister

// Create a consumer.
AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, error)
// Delete a consumer.
DeleteConsumer(stream, consumer string) error
// Consumer information.
ConsumerInfo(stream, name string) (*ConsumerInfo, error)
// NewConsumerLister is used to return pages of ConsumerInfo objects.
NewConsumerLister(stream string) *ConsumerLister
}

// JetStream is the public interface for the JetStream context.
Expand Down Expand Up @@ -106,8 +121,7 @@ const (
JSDefaultAPIPrefix = "$JS.API."
// JSApiAccountInfo is for obtaining general information about JetStream.
JSApiAccountInfo = "INFO"
// JSApiStreams can lookup a stream by subject.
JSApiStreams = "STREAM.NAMES"

// JSApiConsumerCreateT is used to create consumers.
JSApiConsumerCreateT = "CONSUMER.CREATE.%s"
// JSApiDurableCreateT is used to create durable consumers.
Expand All @@ -116,10 +130,29 @@ const (
JSApiConsumerInfoT = "CONSUMER.INFO.%s.%s"
// JSApiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode.
JSApiRequestNextT = "CONSUMER.MSG.NEXT.%s.%s"
// JSApiDeleteConsumerT is used to delete consumers.
JSApiConsumerDeleteT = "CONSUMER.DELETE.%s.%s"
// JSApiConsumerListT is used to return all detailed consumer information
JSApiConsumerListT = "CONSUMER.LIST.%s"

// JSApiStreams can lookup a stream by subject.
JSApiStreams = "STREAM.NAMES"
// JSApiStreamCreateT is the endpoint to create new streams.
JSApiStreamCreateT = "STREAM.CREATE.%s"
// JSApiStreamInfoT is the endpoint to get information on a stream.
JSApiStreamInfoT = "STREAM.INFO.%s"
// JSApiStreamUpdate is the endpoint to update existing streams.
JSApiStreamUpdateT = "STREAM.UPDATE.%s"
// JSApiStreamDeleteT is the endpoint to delete streams.
JSApiStreamDeleteT = "STREAM.DELETE.%s"
// JSApiStreamSnapshotT is the endpoint to snapshot streams.
JSApiStreamSnapshotT = "STREAM.SNAPSHOT.%s"
// JSApiStreamRestoreT is the endpoint to restore a stream from a snapshot.
JSApiStreamRestoreT = "STREAM.RESTORE.%s"
// JSApiPurgeStreamT is the endpoint to purge streams.
JSApiStreamPurgeT = "STREAM.PURGE.%s"
// JSApiStreamListT is the endpoint that will return all detailed stream information
JSApiStreamList = "STREAM.LIST"
)

// JetStream returns a JetStream context for pub/sub interactions.
Expand Down Expand Up @@ -1089,9 +1122,6 @@ func (p DeliverPolicy) MarshalJSON() ([]byte, error) {
}
}

// Management for JetStream
// TODO(dlc) - Fill this out.

// AddConsumer will add a JetStream consumer.
func (js *js) AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, error) {
if stream == _EMPTY_ {
Expand All @@ -1103,7 +1133,7 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, er
}

var ccSubj string
if cfg.Durable != _EMPTY_ {
if cfg != nil && cfg.Durable != _EMPTY_ {
ccSubj = fmt.Sprintf(JSApiDurableCreateT, stream, cfg.Durable)
} else {
ccSubj = fmt.Sprintf(JSApiConsumerCreateT, stream)
Expand All @@ -1127,6 +1157,125 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, er
return info.ConsumerInfo, nil
}

// JSAPIConsumerDeleteResponse is the response for a Consumer delete request.
type JSAPIConsumerDeleteResponse struct {
APIResponse
Success bool `json:"success,omitempty"`
}

// DeleteConsumer deletes a Consumer.
func (js *js) DeleteConsumer(stream, durable string) error {
if stream == _EMPTY_ {
return ErrStreamNameRequired
}

dcSubj := js.apiSubj(fmt.Sprintf(JSApiConsumerDeleteT, stream, durable))
r, err := js.nc.Request(dcSubj, nil, js.wait)
if err != nil {
return err
}
var resp JSAPIConsumerDeleteResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
return err
}
if resp.Error != nil {
return errors.New(resp.Error.Description)
}
return nil
}

// ConsumerInfo returns information about a Consumer.
func (js *js) ConsumerInfo(stream, durable string) (*ConsumerInfo, error) {
return js.getConsumerInfo(stream, durable)
}

// APIPagedRequest includes parameters allowing specific pages to be requested
// from APIs responding with APIPaged.
type APIPagedRequest struct {
Offset int `json:"offset"`
}

// JSAPIConsumersRequest is the type used for Consumers requests.
type JSAPIConsumersRequest struct {
APIPagedRequest
}

// JSAPIConsumerListResponse is the response for a Consumers List request.
type JSAPIConsumerListResponse struct {
APIResponse
APIPaged
Consumers []*ConsumerInfo `json:"consumers"`
}

// ConsumerLister fetches pages of ConsumerInfo objects. This object is not
// safe to use for multiple threads.
type ConsumerLister struct {
stream string
js *js

err error
offset int
page []*ConsumerInfo
pageInfo *APIPaged
}

// Next fetches the next ConsumerInfo page.
func (c *ConsumerLister) Next() bool {
if c.err != nil {
return false
}
if c.stream == _EMPTY_ {
c.err = ErrStreamNameRequired
return false
}
if c.pageInfo != nil && c.offset >= c.pageInfo.Total {
return false
}

req, err := json.Marshal(JSAPIConsumersRequest{
APIPagedRequest: APIPagedRequest{Offset: c.offset},
})
if err != nil {
c.err = err
return false
}
clSubj := c.js.apiSubj(fmt.Sprintf(JSApiConsumerListT, c.stream))
r, err := c.js.nc.Request(clSubj, req, c.js.wait)
if err != nil {
c.err = err
return false
}
var resp JSAPIConsumerListResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
c.err = err
return false
}
if resp.Error != nil {
c.err = errors.New(resp.Error.Description)
return false
}

c.pageInfo = &resp.APIPaged
c.page = resp.Consumers
c.offset += len(c.page)
return true
}

// Page returns the current ConsumerInfo page.
func (c *ConsumerLister) Page() []*ConsumerInfo {
return c.page
}

// Err returns any errors found while fetching pages.
func (c *ConsumerLister) Err() error {
return c.err
}

// NewConsumerLister is used to return pages of ConsumerInfo objects.
func (js *js) NewConsumerLister(stream string) *ConsumerLister {
return &ConsumerLister{stream: stream, js: js}
}

// StreamConfig will determine the properties for a stream.
// There are sensible defaults for most. If no subjects are
// given the name will be used as the only subject.
Expand Down Expand Up @@ -1214,6 +1363,163 @@ type StreamState struct {
Consumers int `json:"consumer_count"`
}

// UpdateStream updates a Stream.
func (js *js) UpdateStream(cfg *StreamConfig) (*StreamInfo, error) {
if cfg == nil || cfg.Name == _EMPTY_ {
return nil, ErrStreamNameRequired
}

req, err := json.Marshal(cfg)
if err != nil {
return nil, err
}

usSubj := js.apiSubj(fmt.Sprintf(JSApiStreamUpdateT, cfg.Name))
r, err := js.nc.Request(usSubj, req, js.wait)
if err != nil {
return nil, err
}
var resp JSApiStreamInfoResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
return nil, err
}
if resp.Error != nil {
return nil, errors.New(resp.Error.Description)
}
return resp.StreamInfo, nil
}

// JSAPIStreamDeleteResponse is the response for a Stream delete request.
type JSAPIStreamDeleteResponse struct {
APIResponse
Success bool `json:"success,omitempty"`
}

// DeleteStream deletes a Stream.
func (js *js) DeleteStream(name string) error {
if name == _EMPTY_ {
return ErrStreamNameRequired
}

dsSubj := js.apiSubj(fmt.Sprintf(JSApiStreamDeleteT, name))
r, err := js.nc.Request(dsSubj, nil, js.wait)
if err != nil {
return err
}
var resp JSAPIStreamDeleteResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
return err
}
if resp.Error != nil {
return errors.New(resp.Error.Description)
}
return nil
}

// JSAPIStreamPurgeResponse.
type JSAPIStreamPurgeResponse struct {
APIResponse
Success bool `json:"success,omitempty"`
Purged uint64 `json:"purged"`
}

// PurgeStream purges messages on a Stream.
func (js *js) PurgeStream(name string) error {
psSubj := js.apiSubj(fmt.Sprintf(JSApiStreamPurgeT, name))
r, err := js.nc.Request(psSubj, nil, js.wait)
if err != nil {
return err
}
var resp JSAPIStreamPurgeResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
return err
}
if resp.Error != nil {
return errors.New(resp.Error.Description)
}
return nil
}

// JSAPIStreamNamesRequest is used for Stream Name requests.
type JSAPIStreamNamesRequest struct {
APIPagedRequest
// These are filters that can be applied to the list.
Subject string `json:"subject,omitempty"`
}

// JSApiStreamListResponse list of detailed stream information.
// A nil request is valid and means all streams.
type JSApiStreamListResponse struct {
APIResponse
APIPaged
Streams []*StreamInfo `json:"streams"`
}

// StreamLister fetches pages of StreamInfo objects. This object is not safe
// to use for multiple threads.
type StreamLister struct {
js *js
page []*StreamInfo
err error

offset int
pageInfo *APIPaged
}

// Next fetches the next StreamInfo page.
func (s *StreamLister) Next() bool {
if s.err != nil {
return false
}
if s.pageInfo != nil && s.offset >= s.pageInfo.Total {
return false
}

req, err := json.Marshal(JSAPIStreamNamesRequest{
APIPagedRequest: APIPagedRequest{Offset: s.offset},
})
if err != nil {
s.err = err
return false
}

slSubj := s.js.apiSubj(JSApiStreamList)
r, err := s.js.nc.Request(slSubj, req, s.js.wait)
if err != nil {
s.err = err
return false
}
var resp JSApiStreamListResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
s.err = err
return false
}
if resp.Error != nil {
s.err = errors.New(resp.Error.Description)
return false
}

s.pageInfo = &resp.APIPaged
s.page = resp.Streams
s.offset += len(s.page)
return true
}

// Page returns the current StreamInfo page.
func (s *StreamLister) Page() []*StreamInfo {
return s.page
}

// Err returns any errors found while fetching pages.
func (s *StreamLister) Err() error {
return s.err
}

// NewStreamLister is used to return pages of StreamInfo objects.
func (js *js) NewStreamLister() *StreamLister {
return &StreamLister{js: js}
}

// RetentionPolicy determines how messages in a set are retained.
type RetentionPolicy int

Expand Down

0 comments on commit 694bccf

Please sign in to comment.