Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement new json based api #50

Merged
merged 1 commit into from
May 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions api/consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,26 @@ const (
JetStreamMetricConsumerAckPre = JetStreamMetricPrefix + ".CONSUMER_ACK"
)

type JetStreamDeleteConsumerResponse struct {
Error *ApiError `json:"error,omitempty"`
Success bool `json:"success,omitempty"`
}

type JetStreamCreateConsumerResponse struct {
JetStreamResponse
*ConsumerInfo
}

type JetStreamConsumerInfoResponse struct {
JetStreamResponse
*ConsumerInfo
}

type JetStreamConsumersResponse struct {
JetStreamResponse
Consumers []string `json:"streams,omitempty"`
}

type AckPolicy int

const (
Expand Down
46 changes: 46 additions & 0 deletions api/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ func (p RetentionPolicy) MarshalJSON() ([]byte, error) {
}
}

type JetStreamEnabledResponse struct {
JetStreamResponse
Enabled bool `json:"enabled"`
}

type JetStreamAccountStats struct {
Memory uint64 `json:"memory"`
Store uint64 `json:"storage"`
Expand All @@ -180,3 +185,44 @@ type JetStreamAccountLimits struct {
MaxStreams int `json:"max_streams"`
MaxConsumers int `json:"max_consumers"`
}

type ApiError struct {
Code int `json:"code"`
Description string `json:"description,omitempty"`
}

// Error implements error
func (e ApiError) Error() string {
switch {
case e.Description == "" && e.Code == 0:
return "unknown JetStream Error"
case e.Description == "" && e.Code > 0:
return fmt.Sprintf("unknown JetStream %d Error", e.Code)
default:
return e.Description
}
}

// ErrorCode is the JetStream error code
func (e ApiError) ErrorCode() int {
return e.Code
}

type JetStreamResponse struct {
Type string `json:"type"`
Error *ApiError `json:"error,omitempty"`
}

// ToError extracts a standard error from a JetStream response
func (r JetStreamResponse) ToError() error {
if r.Error == nil {
return nil
}

return *r.Error
}

// IsError determines if a standard JetStream API response is a error
func (r JetStreamResponse) IsError() bool {
return r.Error == nil
}
20 changes: 20 additions & 0 deletions api/stream_templates.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,23 @@ const (
JetStreamTemplateInfoT = "$JS.TEMPLATE.%s.INFO"
JetStreamDeleteTemplateT = "$JS.TEMPLATE.%s.DELETE"
)

type JetStreamDeleteTemplateResponse struct {
JetStreamResponse
Success bool `json:"success,omitempty"`
}

type JetStreamCreateTemplateResponse struct {
JetStreamResponse
*StreamTemplateInfo
}

type JetStreamListTemplatesResponse struct {
JetStreamResponse
Templates []string `json:"streams,omitempty"`
}

type JetStreamTemplateInfoResponse struct {
JetStreamResponse
*StreamTemplateInfo
}
37 changes: 37 additions & 0 deletions api/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,37 @@ type StoredMsg struct {
Time time.Time `json:"time"`
}

type JetStreamDeleteMsgResponse struct {
JetStreamResponse
Success bool `json:"success,omitempty"`
}

type JetStreamCreateStreamResponse struct {
JetStreamResponse
*StreamInfo
}

type JetStreamStreamInfoResponse struct {
JetStreamResponse
*StreamInfo
}

type JetStreamUpdateStreamResponse struct {
JetStreamResponse
*StreamInfo
}

type JetStreamDeleteStreamResponse struct {
JetStreamResponse
Success bool `json:"success,omitempty"`
}

type JetStreamPurgeStreamResponse struct {
Error *ApiError `json:"error,omitempty"`
Success bool `json:"success,omitempty"`
Purged uint64 `json:"purged,omitempty"`
}

// StreamConfig is the configuration for a JetStream Stream Template
//
// NATS Schema Type io.nats.jetstream.api.v1.stream_configuration
Expand Down Expand Up @@ -111,3 +142,9 @@ type StreamState struct {
LastSeq uint64 `json:"last_seq"`
Consumers int `json:"consumer_count"`
}

// Response from JetStreamListStreams
type JetStreamListStreamsResponse struct {
JetStreamResponse
Streams []string `json:"streams,omitempty"`
}
62 changes: 26 additions & 36 deletions consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package jsm

import (
"encoding/json"
"fmt"
"strconv"
"strings"
Expand Down Expand Up @@ -96,36 +95,23 @@ func NewConsumerFromDefault(stream string, dflt api.ConsumerConfig, opts ...Cons
}

func createDurableConsumer(req api.CreateConsumerRequest, opts *reqoptions) (name string, err error) {
jreq, err := json.Marshal(req)
var resp api.JetStreamCreateConsumerResponse
err = jsonRequest(fmt.Sprintf(api.JetStreamCreateConsumerT, req.Stream, req.Config.Durable), req, &resp, opts)
if err != nil {
return "", err
}

_, err = request(fmt.Sprintf(api.JetStreamCreateConsumerT, req.Stream, req.Config.Durable), jreq, opts)
if err != nil {
return "", err
}

return req.Config.Durable, nil
return resp.Name, nil
}

func createEphemeralConsumer(req api.CreateConsumerRequest, opts *reqoptions) (name string, err error) {
jreq, err := json.Marshal(req)
if err != nil {
return "", err
}

response, err := request(fmt.Sprintf(api.JetStreamCreateEphemeralConsumerT, req.Stream), jreq, opts)
var resp api.JetStreamCreateConsumerResponse
err = jsonRequest(fmt.Sprintf(api.JetStreamCreateEphemeralConsumerT, req.Stream), req, &resp, opts)
if err != nil {
return "", err
}

parts := strings.Split(string(response.Data), " ")
if len(parts) != 2 {
return "", fmt.Errorf("invalid ephemeral OK response from server: %q", response.Data)
}

return parts[1], nil
return resp.Name, nil
}

// NewConsumer creates a consumer based on DefaultConsumer modified by opts
Expand All @@ -140,7 +126,7 @@ func LoadOrNewConsumer(stream string, name string, opts ...ConsumerOption) (cons

// LoadOrNewConsumerFromDefault loads a consumer by name if known else creates a new one with these properties based on template
func LoadOrNewConsumerFromDefault(stream string, name string, template api.ConsumerConfig, opts ...ConsumerOption) (consumer *Consumer, err error) {
cfg, err := NewConsumerConfiguration(DefaultConsumer, opts...)
cfg, err := NewConsumerConfiguration(template, opts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -170,7 +156,7 @@ func LoadConsumer(stream string, name string, opts ...RequestOption) (consumer *
},
}

err = loadConfigForConsumer(consumer)
err = loadConfigForConsumer(consumer, conn)
if err != nil {
return nil, err
}
Expand All @@ -195,8 +181,8 @@ func NewConsumerConfiguration(dflt api.ConsumerConfig, opts ...ConsumerOption) (
return cfg, nil
}

func loadConfigForConsumer(consumer *Consumer) (err error) {
info, err := loadConsumerInfo(consumer.stream, consumer.name, consumer.cfg.conn)
func loadConfigForConsumer(consumer *Consumer, opts *reqoptions) (err error) {
info, err := loadConsumerInfo(consumer.stream, consumer.name, opts)
if err != nil {
return err
}
Expand All @@ -207,18 +193,14 @@ func loadConfigForConsumer(consumer *Consumer) (err error) {
}

func loadConsumerInfo(s string, c string, opts *reqoptions) (info api.ConsumerInfo, err error) {
response, err := request(fmt.Sprintf(api.JetStreamConsumerInfoT, s, c), nil, opts)
if err != nil {
return info, err
}
var resp api.JetStreamConsumerInfoResponse

info = api.ConsumerInfo{}
err = json.Unmarshal(response.Data, &info)
err = jsonRequest(fmt.Sprintf(api.JetStreamConsumerInfoT, s, c), nil, &resp, opts)
if err != nil {
return info, err
}

return info, nil
return *resp.ConsumerInfo, nil
}

func DeliverySubject(s string) ConsumerOption {
Expand Down Expand Up @@ -382,8 +364,15 @@ func ConsumerConnection(opts ...RequestOption) ConsumerOption {
}

// Reset reloads the Consumer configuration from the JetStream server
func (c *Consumer) Reset() error {
return loadConfigForConsumer(c)
func (c *Consumer) Reset(opts ...RequestOption) error {
ropts, err := newreqoptions(opts...)
if err != nil {
if err != nil {
return err
}
}

return loadConfigForConsumer(c, ropts)
}

// NextSubject returns the subject used to retrieve the next message for pull-based Consumers, empty when not a pull-base consumer
Expand Down Expand Up @@ -572,16 +561,17 @@ func (c *Consumer) Configuration() (config api.ConsumerConfig) {

// Delete deletes the Consumer, after this the Consumer object should be disposed
func (c *Consumer) Delete() (err error) {
response, err := request(fmt.Sprintf(api.JetStreamDeleteConsumerT, c.StreamName(), c.Name()), nil, c.cfg.conn)
var resp api.JetStreamDeleteConsumerResponse
err = jsonRequest(fmt.Sprintf(api.JetStreamDeleteConsumerT, c.StreamName(), c.Name()), nil, &resp, c.cfg.conn)
if err != nil {
return err
}

if IsOKResponse(response) {
if resp.Success {
return nil
}

return fmt.Errorf("unknown response while removing consumer %s: %q", c.Name(), response.Data)
return fmt.Errorf("unknown response while removing consumer %s", c.Name())
}

func (c *Consumer) Name() string { return c.name }
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/nats-io/jsm.go
go 1.14

require (
github.com/nats-io/nats-server/v2 v2.1.7-0.20200430031153-acbd41c3d66f
github.com/nats-io/nats-server/v2 v2.1.7-0.20200430204921-43c01007048c
github.com/nats-io/nats.go v1.9.2
github.com/xeipuuv/gojsonschema v1.2.0
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ github.com/minio/highwayhash v1.0.0 h1:iMSDhgUILCr0TNm8LWlSjF8N0ZIj2qbO8WHp6Q/J2
github.com/minio/highwayhash v1.0.0/go.mod h1:xQboMTeM9nY9v/LlAOxFctujiv5+Aq2hR5dxBpaMbdc=
github.com/nats-io/jwt v0.3.2 h1:+RB5hMpXUUA2dfxuhBTEkMOrYmM+gKIZYS1KjSostMI=
github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU=
github.com/nats-io/nats-server/v2 v2.1.7-0.20200430031153-acbd41c3d66f h1:5yKmsEQehcniAWXesyuBv+kBQ4J2TJ9iYEoKVk9iRow=
github.com/nats-io/nats-server/v2 v2.1.7-0.20200430031153-acbd41c3d66f/go.mod h1:XvxaI3tOtf8QAnaPwk4uHqFsJV85H4saugcWr3IO8Rk=
github.com/nats-io/nats-server/v2 v2.1.7-0.20200430204921-43c01007048c h1:I4ezd96ET5KG1eJCU4OVnhHU1y7aPWk9M9/r7HbSFLU=
github.com/nats-io/nats-server/v2 v2.1.7-0.20200430204921-43c01007048c/go.mod h1:XvxaI3tOtf8QAnaPwk4uHqFsJV85H4saugcWr3IO8Rk=
github.com/nats-io/nats.go v1.9.2 h1:oDeERm3NcZVrPpdR/JpGdWHMv3oJ8yY30YwxKq+DU2s=
github.com/nats-io/nats.go v1.9.2/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
Expand Down
Loading