Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] CreateConsumer and UpdateConsumer methods to work with server 2.10 capabilities #1379

Merged
merged 2 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 12 additions & 3 deletions jetstream/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,16 +253,25 @@ CRUD operations on consumers can be achieved on 2 levels:
js, _ := jetstream.New(nc)

// create a consumer (this is an idempotent operation)
cons, _ := js.CreateOrUpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
// an error will be returned if consumer already exists and has different configuration.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should still have an example using CreateOrUpdateConsumer() (since it is still there)

cons, _ := js.CreateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
Durable: "foo",
AckPolicy: jetstream.AckExplicitPolicy,
})

// create an ephemeral pull consumer by not providing `Durable`
ephemeral, _ := js.CreateOrUpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
ephemeral, _ := js.CreateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
AckPolicy: jetstream.AckExplicitPolicy,
})

// consumers can be updated
// an error will be returned if consumer with given name does not exists
// or an illegal property is to be updated (e.g. AckPolicy)
updated, _ := js.UpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
AckPolicy: jetstream.AckExplicitPolicy,
Description: "updated consumer"
})

// get consumer handle
cons, _ = js.Consumer(ctx, "ORDERS", "foo")

Expand All @@ -280,7 +289,7 @@ js, _ := jetstream.New(nc)
stream, _ := js.Stream(ctx, "ORDERS")

// create consumer
cons, _ := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
cons, _ := stream.CreateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "foo",
AckPolicy: jetstream.AckExplicitPolicy,
})
Expand Down
48 changes: 47 additions & 1 deletion jetstream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ type (
// CachedInfo returns [*ConsumerInfo] cached on a consumer struct
CachedInfo() *ConsumerInfo
}

createConsumerRequest struct {
Stream string `json:"stream_name"`
Config *ConsumerConfig `json:"config"`
Action consumerAction `json:"action"`
}

consumerAction int
)

// Info returns [ConsumerInfo] for a given consumer
Expand Down Expand Up @@ -84,14 +92,15 @@ func (p *pullConsumer) CachedInfo() *ConsumerInfo {
return p.info
}

func upsertConsumer(ctx context.Context, js *jetStream, stream string, cfg ConsumerConfig) (Consumer, error) {
func upsertConsumer(ctx context.Context, js *jetStream, stream string, cfg ConsumerConfig, action consumerAction) (Consumer, error) {
ctx, cancel := wrapContextWithoutDeadline(ctx)
if cancel != nil {
defer cancel()
}
req := createConsumerRequest{
Stream: stream,
Config: &cfg,
Action: action,
}
reqJSON, err := json.Marshal(req)
if err != nil {
Expand Down Expand Up @@ -143,6 +152,43 @@ func upsertConsumer(ctx context.Context, js *jetStream, stream string, cfg Consu
}, nil
}

const (
actionCreateOrUpdate consumerAction = iota
actionUpdate
actionCreate
)

const (
actionUpdateString = "update"
actionCreateString = "create"
actionCreateOrUpdateString = ""
)

func (a consumerAction) String() string {
switch a {
case actionCreateOrUpdate:
return actionCreateOrUpdateString
case actionCreate:
return actionCreateString
case actionUpdate:
return actionUpdateString
}
return actionCreateOrUpdateString
}

func (a consumerAction) MarshalJSON() ([]byte, error) {
switch a {
case actionCreate:
return json.Marshal(actionCreateString)
case actionUpdate:
return json.Marshal(actionUpdateString)
case actionCreateOrUpdate:
return json.Marshal(actionCreateOrUpdateString)
default:
return nil, fmt.Errorf("can not marshal %v", a)
}
}

func generateConsName() string {
name := nuid.Next()
sha := sha256.New()
Expand Down
8 changes: 8 additions & 0 deletions jetstream/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,11 @@ const (
JSErrCodeConsumerNotFound ErrorCode = 10014
JSErrCodeConsumerNameExists ErrorCode = 10013
JSErrCodeConsumerAlreadyExists ErrorCode = 10105
JSErrCodeConsumerExists ErrorCode = 10148
JSErrCodeDuplicateFilterSubjects ErrorCode = 10136
JSErrCodeOverlappingFilterSubjects ErrorCode = 10138
JSErrCodeConsumerEmptyFilter ErrorCode = 10139
JSErrCodeConsumerDoesNotExist ErrorCode = 10149

JSErrCodeMessageNotFound ErrorCode = 10037

Expand Down Expand Up @@ -102,6 +104,12 @@ var (
// ErrConsumerNotFound is an error returned when consumer with given name does not exist.
ErrConsumerNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerNotFound, Description: "consumer not found", Code: 404}}

// ErrConsumerExists is returned when attempting to create a consumer with CreateConsumer but a consumer with given name already exists.
ErrConsumerExists JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerExists, Description: "consumer already exists", Code: 400}}

// ErrConsumerNameExists is returned when attempting to update a consumer with UpdateConsumer but a consumer with given name does not exist.
ErrConsumerDoesNotExist JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerDoesNotExist, Description: "consumer does not exist", Code: 400}}

// ErrMsgNotFound is returned when message with provided sequence number does not exist.
ErrMsgNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeMessageNotFound, Description: "message not found", Code: 404}}

Expand Down
24 changes: 23 additions & 1 deletion jetstream/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ type (
// If consumer already exists, it will be updated (if possible).
// Consumer interface is returned, serving as a hook to operate on a consumer (e.g. fetch messages)
CreateOrUpdateConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error)
// CreateConsumer creates a consumer on a given stream with given config.
// If consumer already exists, ErrConsumerExists is returned.
// Consumer interface is returned, serving as a hook to operate on a consumer (e.g. fetch messages)
CreateConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error)
// UpdateConsumer updates an existing consumer.
// If consumer does not exist, ErrConsumerDoesNotExist is returned.
// Consumer interface is returned, serving as a hook to operate on a consumer (e.g. fetch messages)
UpdateConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error)
// OrderedConsumer returns an OrderedConsumer instance.
// OrderedConsumer allows fetching messages from a stream (just like standard consumer),
// for in order delivery of messages. Underlying consumer is re-created when necessary,
Expand Down Expand Up @@ -518,7 +526,21 @@ func (js *jetStream) CreateOrUpdateConsumer(ctx context.Context, stream string,
if err := validateStreamName(stream); err != nil {
return nil, err
}
return upsertConsumer(ctx, js, stream, cfg)
return upsertConsumer(ctx, js, stream, cfg, actionCreateOrUpdate)
}

func (js *jetStream) CreateConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error) {
if err := validateStreamName(stream); err != nil {
return nil, err
}
return upsertConsumer(ctx, js, stream, cfg, actionCreate)
}

func (js *jetStream) UpdateConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error) {
if err := validateStreamName(stream); err != nil {
return nil, err
}
return upsertConsumer(ctx, js, stream, cfg, actionUpdate)
}

func (js *jetStream) OrderedConsumer(ctx context.Context, stream string, cfg OrderedConsumerConfig) (Consumer, error) {
Expand Down
26 changes: 20 additions & 6 deletions jetstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ type (
// Consumer interface is returned, serving as a hook to operate on a consumer (e.g. fetch messages).
CreateOrUpdateConsumer(ctx context.Context, cfg ConsumerConfig) (Consumer, error)

// CreateConsumer creates a consumer on a given stream with given config.
// If consumer already exists, an ErrConsumerExists is returned.
// Consumer interface is returned, serving as a hook to operate on a consumer (e.g. fetch messages).
CreateConsumer(ctx context.Context, cfg ConsumerConfig) (Consumer, error)

// UpdateConsumer updates an existing consumer with given config.
// If consumer does not exist, an ErrConsumerDoesNotExist is returned.
// Consumer interface is returned, serving as a hook to operate on a consumer (e.g. fetch messages).
UpdateConsumer(ctx context.Context, cfg ConsumerConfig) (Consumer, error)

// OrderedConsumer returns an OrderedConsumer instance.
// OrderedConsumer allows fetching messages from a stream (just like standard consumer),
// for in order delivery of messages. Underlying consumer is re-created when necessary,
Expand All @@ -74,6 +84,7 @@ type (
// ConsumerNames returns a ConsumerNameLister enabling iterating over a channel of consumer names
ConsumerNames(context.Context) ConsumerNameLister
}

RawStreamMsg struct {
Subject string
Sequence uint64
Expand All @@ -100,11 +111,6 @@ type (
*ConsumerInfo
}

createConsumerRequest struct {
Stream string `json:"stream_name"`
Config *ConsumerConfig `json:"config"`
}

StreamPurgeOpt func(*StreamPurgeRequest) error

StreamPurgeRequest struct {
Expand Down Expand Up @@ -194,7 +200,15 @@ type (
)

func (s *stream) CreateOrUpdateConsumer(ctx context.Context, cfg ConsumerConfig) (Consumer, error) {
return upsertConsumer(ctx, s.jetStream, s.name, cfg)
return upsertConsumer(ctx, s.jetStream, s.name, cfg, actionCreateOrUpdate)
}

func (s *stream) CreateConsumer(ctx context.Context, cfg ConsumerConfig) (Consumer, error) {
return upsertConsumer(ctx, s.jetStream, s.name, cfg, actionCreate)
}

func (s *stream) UpdateConsumer(ctx context.Context, cfg ConsumerConfig) (Consumer, error) {
return upsertConsumer(ctx, s.jetStream, s.name, cfg, actionUpdate)
}

func (s *stream) OrderedConsumer(ctx context.Context, cfg OrderedConsumerConfig) (Consumer, error) {
Expand Down