Skip to content

Commit

Permalink
Add more Stream and Consumer management APIs
Browse files Browse the repository at this point in the history
This adds the follow APIs for Streams.
* UpdateStream
* DeleteStream
* SnapshotStream
* RestoreStream
* PurgeStream

This adds the follow APIs for Consumers.
* ConsumerInfo
* ListConsumers
* ListStreams
* DeleteConsumer
  • Loading branch information
nsurfer committed Dec 16, 2020
1 parent a32679c commit 7910891
Show file tree
Hide file tree
Showing 2 changed files with 414 additions and 5 deletions.
341 changes: 336 additions & 5 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strconv"
"strings"
Expand All @@ -42,12 +43,29 @@ type JetStream interface {
// Management
// Create a stream.
AddStream(cfg *StreamConfig) (*StreamInfo, error)
// Create a consumer.
AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, error)
// Update a stream.
UpdateStream(cfg *StreamConfig) (*StreamInfo, error)
// Delete a stream.
DeleteStream(name string) error
// Stream information.
StreamInfo(stream string) (*StreamInfo, error)
// Snapshot stream.
SnapshotStream(name string, cfg *StreamSnapshotConfig) (io.Reader, error)
// Restore stream.
RestoreStream(name string, snapshot io.Reader) error
// Purge stream messages.
PurgeStream(name string) error
// List streams.
ListStreams(offset int) ([]*StreamInfo, error)

// TODO(dlc) - add more
// Create a consumer.
AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, error)
// Delete a consumer.
DeleteConsumer(stream, consumer string) error
// Consumer information.
ConsumerInfo(stream, durable string) (*ConsumerInfo, error)
// List consumers.
ListConsumers(stream string, offset int) ([]*ConsumerInfo, error)
}

// APIError is included in all API responses if there was an error.
Expand Down Expand Up @@ -99,8 +117,7 @@ const (
JSDefaultAPIPrefix = "$JS.API."
// JSApiAccountInfo is for obtaining general information about JetStream.
JSApiAccountInfo = "INFO"
// JSApiStreams can lookup a stream by subject.
JSApiStreams = "STREAM.NAMES"

// JSApiConsumerCreateT is used to create consumers.
JSApiConsumerCreateT = "CONSUMER.CREATE.%s"
// JSApiDurableCreateT is used to create durable consumers.
Expand All @@ -109,10 +126,29 @@ const (
JSApiConsumerInfoT = "CONSUMER.INFO.%s.%s"
// JSApiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode.
JSApiRequestNextT = "CONSUMER.MSG.NEXT.%s.%s"
// JSApiDeleteConsumerT is used to delete consumers.
JSApiConsumerDeleteT = "CONSUMER.DELETE.%s.%s"
// JSApiConsumerListT is used to return all detailed consumer information
JSApiConsumerListT = "CONSUMER.LIST.%s"

// JSApiStreams can lookup a stream by subject.
JSApiStreams = "STREAM.NAMES"
// JSApiStreamCreateT is the endpoint to create new streams.
JSApiStreamCreateT = "STREAM.CREATE.%s"
// JSApiStreamInfoT is the endpoint to get information on a stream.
JSApiStreamInfoT = "STREAM.INFO.%s"
// JSApiStreamUpdate is the endpoint to update existing streams.
JSApiStreamUpdateT = "STREAM.UPDATE.%s"
// JSApiStreamDeleteT is the endpoint to delete streams.
JSApiStreamDeleteT = "STREAM.DELETE.%s"
// JSApiStreamSnapshotT is the endpoint to snapshot streams.
JSApiStreamSnapshotT = "STREAM.SNAPSHOT.%s"
// JSApiStreamRestoreT is the endpoint to restore a stream from a snapshot.
JSApiStreamRestoreT = "STREAM.RESTORE.%s"
// JSApiPurgeStreamT is the endpoint to purge streams.
JSApiStreamPurgeT = "STREAM.PURGE.%s"
// JSApiStreamListT is the endpoint that will return all detailed stream information
JSApiStreamList = "STREAM.LIST"
)

// JetStream returns a JetStream context for pub/sub interactions.
Expand Down Expand Up @@ -1062,6 +1098,85 @@ func (js *js) AddConsumer(stream string, cfg *ConsumerConfig) (*ConsumerInfo, er
return info.ConsumerInfo, nil
}

// JSApiConsumerDeleteResponse.
type JSApiConsumerDeleteResponse struct {
APIResponse
Success bool `json:"success,omitempty"`
}

func (js *js) DeleteConsumer(stream, durable string) error {
if stream == _EMPTY_ {
return ErrStreamNameRequired
}

dcSubj := js.apiSubj(fmt.Sprintf(JSApiConsumerDeleteT, stream, durable))
r, err := js.nc.Request(dcSubj, nil, js.wait)
if err != nil {
return err
}
var resp JSApiConsumerDeleteResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
return err
}
if resp.Error != nil {
return errors.New(resp.Error.Description)
}
return nil
}

func (js *js) ConsumerInfo(stream, durable string) (*ConsumerInfo, error) {
return js.getConsumerInfo(stream, durable)
}

// ApiPagedRequest includes parameters allowing specific pages to be requests from APIs responding with ApiPaged
type ApiPagedRequest struct {
Offset int `json:"offset"`
}

type JSApiConsumersRequest struct {
ApiPagedRequest
}

// ApiPaged includes variables used to create paged responses from the JSON API
type ApiPaged struct {
Total int `json:"total"`
Offset int `json:"offset"`
Limit int `json:"limit"`
}

// JSApiConsumerListResponse.
type JSApiConsumerListResponse struct {
APIResponse
ApiPaged
Consumers []*ConsumerInfo `json:"consumers"`
}

func (js *js) ListConsumers(stream string, offset int) ([]*ConsumerInfo, error) {
if stream == _EMPTY_ {
return nil, ErrStreamNameRequired
}

req, err := json.Marshal(JSApiConsumersRequest{
ApiPagedRequest: ApiPagedRequest{Offset: offset},
})
if err != nil {
return nil, err
}
clSubj := js.apiSubj(fmt.Sprintf(JSApiConsumerListT, stream))
r, err := js.nc.Request(clSubj, req, js.wait)
if err != nil {
return nil, err
}
var resp JSApiConsumerListResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
return nil, err
}
if resp.Error != nil {
return nil, errors.New(resp.Error.Description)
}
return resp.Consumers, nil
}

// StreamConfig will determine the properties for a stream.
// There are sensible defaults for most. If no subjects are
// given the name will be used as the only subject.
Expand Down Expand Up @@ -1149,6 +1264,222 @@ type StreamState struct {
Consumers int `json:"consumer_count"`
}

func (js *js) UpdateStream(cfg *StreamConfig) (*StreamInfo, error) {
if cfg == nil || cfg.Name == _EMPTY_ {
return nil, ErrStreamNameRequired
}

req, err := json.Marshal(cfg)
if err != nil {
return nil, err
}

usSubj := js.apiSubj(fmt.Sprintf(JSApiStreamUpdateT, cfg.Name))
r, err := js.nc.Request(usSubj, req, js.wait)
if err != nil {
return nil, err
}
var resp JSApiStreamInfoResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
return nil, err
}
if resp.Error != nil {
return nil, errors.New(resp.Error.Description)
}
return resp.StreamInfo, nil
}

// JSApiStreamDeleteResponse stream removal.
type JSApiStreamDeleteResponse struct {
APIResponse
Success bool `json:"success,omitempty"`
}

const JSApiStreamDeleteResponseType = "io.nats.jetstream.api.v1.stream_delete_response"

func (js *js) DeleteStream(name string) error {
if name == _EMPTY_ {
return ErrStreamNameRequired
}

dsSubj := js.apiSubj(fmt.Sprintf(JSApiStreamDeleteT, name))
r, err := js.nc.Request(dsSubj, nil, js.wait)
if err != nil {
return err
}
var resp JSApiStreamDeleteResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
return err
}
if resp.Error != nil {
return errors.New(resp.Error.Description)
}
return nil
}

type StreamSnapshotConfig struct {
// Subject to deliver the chunks to for the snapshot.
DeliverSubject string `json:"deliver_subject"`
// Do not include consumers in the snapshot.
NoConsumers bool `json:"no_consumers,omitempty"`
// Optional chunk size preference.
// Best to just let server select.
ChunkSize int `json:"chunk_size,omitempty"`
// Check all message's checksums prior to snapshot.
CheckMsgs bool `json:"jsck,omitempty"`
}

// JSApiStreamSnapshotResponse is the direct response to the snapshot request.
type JSApiStreamSnapshotResponse struct {
APIResponse
// Estimate of number of blocks for the messages.
NumBlks int `json:"num_blks"`
// Block size limit as specified by the stream.
BlkSize int `json:"blk_size"`
}

func (js *js) SnapshotStream(name string, cfg *StreamSnapshotConfig) (io.Reader, error) {
req, err := json.Marshal(cfg)
if err != nil {
return nil, err
}

ssSubj := js.apiSubj(fmt.Sprintf(JSApiStreamSnapshotT, name))
r, err := js.nc.Request(ssSubj, req, time.Second)
if err != nil {
return nil, err
}
var resp JSApiStreamSnapshotResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
return nil, err
}
if resp.Error != nil {
return nil, errors.New(resp.Error.Description)
}

// Setup to process snapshot chunks.
var snapshot []byte
done := make(chan bool)
sub, err := js.nc.Subscribe(cfg.DeliverSubject, func(m *Msg) {
// EOF
if len(m.Data) == 0 {
done <- true
return
}
// Could be writing to a file here too.
snapshot = append(snapshot, m.Data...)
// Flow ack
m.Respond(nil)
})
if err != nil {
return nil, err
}
defer sub.Unsubscribe()
// Wait to receive the snapshot.
select {
case <-done:
case <-time.After(5 * time.Second):
return nil, errors.New("nats: snapshot timeout exceeded")
}

return bytes.NewReader(snapshot), nil
}

// JSApiStreamRestoreResponse is the direct response to the restore request.
type JSApiStreamRestoreResponse struct {
APIResponse
// Subject to deliver the chunks to for the snapshot restore.
DeliverSubject string `json:"deliver_subject"`
}

func (js *js) RestoreStream(name string, snapshot io.Reader) error {
rsSubj := js.apiSubj(fmt.Sprintf(JSApiStreamRestoreT, name))
r, err := js.nc.Request(rsSubj, nil, time.Second)
if err != nil {
return err
}
var resp JSApiStreamRestoreResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
return err
}
if resp.Error != nil {
return errors.New(resp.Error.Description)
}

// Can be any size message.
var chunk [512]byte
for r := snapshot; ; {
n, err := r.Read(chunk[:])
if err != nil {
break
}
js.nc.Request(resp.DeliverSubject, chunk[:n], time.Second)
}
js.nc.Request(resp.DeliverSubject, nil, time.Second)
return nil
}

// JSApiStreamPurgeResponse.
type JSApiStreamPurgeResponse struct {
APIResponse
Success bool `json:"success,omitempty"`
Purged uint64 `json:"purged"`
}

func (js *js) PurgeStream(name string) error {
psSubj := js.apiSubj(fmt.Sprintf(JSApiStreamPurgeT, name))
r, err := js.nc.Request(psSubj, nil, time.Second)
if err != nil {
return err
}
var resp JSApiStreamPurgeResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
return err
}
if resp.Error != nil {
return errors.New(resp.Error.Description)
}
return nil
}

type JSApiStreamNamesRequest struct {
ApiPagedRequest
// These are filters that can be applied to the list.
Subject string `json:"subject,omitempty"`
}

// JSApiStreamListResponse list of detailed stream information.
// A nil request is valid and means all streams.
type JSApiStreamListResponse struct {
APIResponse
ApiPaged
Streams []*StreamInfo `json:"streams"`
}

func (js *js) ListStreams(offset int) ([]*StreamInfo, error) {
req, err := json.Marshal(JSApiStreamNamesRequest{
ApiPagedRequest: ApiPagedRequest{Offset: offset},
})
if err != nil {
return nil, err
}

slSubj := js.apiSubj(JSApiStreamList)
r, err := js.nc.Request(slSubj, req, time.Second)
if err != nil {
return nil, err
}
var resp JSApiStreamListResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
return nil, err
}
if resp.Error != nil {
return nil, errors.New(resp.Error.Description)
}

return resp.Streams, nil
}

// RetentionPolicy determines how messages in a set are retained.
type RetentionPolicy int

Expand Down
Loading

0 comments on commit 7910891

Please sign in to comment.