Skip to content

Commit

Permalink
[ADDED] CreateConsumer and UpdateConsumer methods to work with server…
Browse files Browse the repository at this point in the history
… 2.10 capabilities (#1379)

Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio committed Aug 29, 2023
1 parent cfe06a5 commit 2d59abc
Show file tree
Hide file tree
Showing 7 changed files with 468 additions and 11 deletions.
23 changes: 20 additions & 3 deletions jetstream/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,16 +253,33 @@ CRUD operations on consumers can be achieved on 2 levels:
js, _ := jetstream.New(nc)

// create a consumer (this is an idempotent operation)
cons, _ := js.CreateOrUpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
// an error will be returned if consumer already exists and has different configuration.
cons, _ := js.CreateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
Durable: "foo",
AckPolicy: jetstream.AckExplicitPolicy,
})

// create an ephemeral pull consumer by not providing `Durable`
ephemeral, _ := js.CreateOrUpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
ephemeral, _ := js.CreateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
AckPolicy: jetstream.AckExplicitPolicy,
})


// consumer can also be created using CreateOrUpdateConsumer
// this method will either create a consumer if it does not exist
// or update existing consumer (if possible)
cons2 := js.CreateOrUpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
Name: "bar",
})

// consumers can be updated
// an error will be returned if consumer with given name does not exists
// or an illegal property is to be updated (e.g. AckPolicy)
updated, _ := js.UpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
AckPolicy: jetstream.AckExplicitPolicy,
Description: "updated consumer"
})

// get consumer handle
cons, _ = js.Consumer(ctx, "ORDERS", "foo")

Expand All @@ -280,7 +297,7 @@ js, _ := jetstream.New(nc)
stream, _ := js.Stream(ctx, "ORDERS")

// create consumer
cons, _ := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
cons, _ := stream.CreateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "foo",
AckPolicy: jetstream.AckExplicitPolicy,
})
Expand Down
15 changes: 14 additions & 1 deletion jetstream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ type (
// CachedInfo returns [*ConsumerInfo] cached on a consumer struct
CachedInfo() *ConsumerInfo
}

createConsumerRequest struct {
Stream string `json:"stream_name"`
Config *ConsumerConfig `json:"config"`
Action string `json:"action"`
}
)

// Info returns [ConsumerInfo] for a given consumer
Expand Down Expand Up @@ -84,14 +90,15 @@ func (p *pullConsumer) CachedInfo() *ConsumerInfo {
return p.info
}

func upsertConsumer(ctx context.Context, js *jetStream, stream string, cfg ConsumerConfig) (Consumer, error) {
func upsertConsumer(ctx context.Context, js *jetStream, stream string, cfg ConsumerConfig, action string) (Consumer, error) {
ctx, cancel := wrapContextWithoutDeadline(ctx)
if cancel != nil {
defer cancel()
}
req := createConsumerRequest{
Stream: stream,
Config: &cfg,
Action: action,
}
reqJSON, err := json.Marshal(req)
if err != nil {
Expand Down Expand Up @@ -143,6 +150,12 @@ func upsertConsumer(ctx context.Context, js *jetStream, stream string, cfg Consu
}, nil
}

const (
consumerActionCreate = "create"
consumerActionUpdate = "update"
consumerActionCreateOrUpdate = ""
)

func generateConsName() string {
name := nuid.Next()
sha := sha256.New()
Expand Down
8 changes: 8 additions & 0 deletions jetstream/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,11 @@ const (
JSErrCodeConsumerNotFound ErrorCode = 10014
JSErrCodeConsumerNameExists ErrorCode = 10013
JSErrCodeConsumerAlreadyExists ErrorCode = 10105
JSErrCodeConsumerExists ErrorCode = 10148
JSErrCodeDuplicateFilterSubjects ErrorCode = 10136
JSErrCodeOverlappingFilterSubjects ErrorCode = 10138
JSErrCodeConsumerEmptyFilter ErrorCode = 10139
JSErrCodeConsumerDoesNotExist ErrorCode = 10149

JSErrCodeMessageNotFound ErrorCode = 10037

Expand Down Expand Up @@ -102,6 +104,12 @@ var (
// ErrConsumerNotFound is an error returned when consumer with given name does not exist.
ErrConsumerNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerNotFound, Description: "consumer not found", Code: 404}}

// ErrConsumerExists is returned when attempting to create a consumer with CreateConsumer but a consumer with given name already exists.
ErrConsumerExists JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerExists, Description: "consumer already exists", Code: 400}}

// ErrConsumerNameExists is returned when attempting to update a consumer with UpdateConsumer but a consumer with given name does not exist.
ErrConsumerDoesNotExist JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerDoesNotExist, Description: "consumer does not exist", Code: 400}}

// ErrMsgNotFound is returned when message with provided sequence number does not exist.
ErrMsgNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeMessageNotFound, Description: "message not found", Code: 404}}

Expand Down
24 changes: 23 additions & 1 deletion jetstream/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ type (
// If consumer already exists, it will be updated (if possible).
// Consumer interface is returned, serving as a hook to operate on a consumer (e.g. fetch messages)
CreateOrUpdateConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error)
// CreateConsumer creates a consumer on a given stream with given config.
// If consumer already exists, ErrConsumerExists is returned.
// Consumer interface is returned, serving as a hook to operate on a consumer (e.g. fetch messages)
CreateConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error)
// UpdateConsumer updates an existing consumer.
// If consumer does not exist, ErrConsumerDoesNotExist is returned.
// Consumer interface is returned, serving as a hook to operate on a consumer (e.g. fetch messages)
UpdateConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error)
// OrderedConsumer returns an OrderedConsumer instance.
// OrderedConsumer allows fetching messages from a stream (just like standard consumer),
// for in order delivery of messages. Underlying consumer is re-created when necessary,
Expand Down Expand Up @@ -518,7 +526,21 @@ func (js *jetStream) CreateOrUpdateConsumer(ctx context.Context, stream string,
if err := validateStreamName(stream); err != nil {
return nil, err
}
return upsertConsumer(ctx, js, stream, cfg)
return upsertConsumer(ctx, js, stream, cfg, consumerActionCreateOrUpdate)
}

func (js *jetStream) CreateConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error) {
if err := validateStreamName(stream); err != nil {
return nil, err
}
return upsertConsumer(ctx, js, stream, cfg, consumerActionCreate)
}

func (js *jetStream) UpdateConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error) {
if err := validateStreamName(stream); err != nil {
return nil, err
}
return upsertConsumer(ctx, js, stream, cfg, consumerActionUpdate)
}

func (js *jetStream) OrderedConsumer(ctx context.Context, stream string, cfg OrderedConsumerConfig) (Consumer, error) {
Expand Down
26 changes: 20 additions & 6 deletions jetstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ type (
// Consumer interface is returned, serving as a hook to operate on a consumer (e.g. fetch messages).
CreateOrUpdateConsumer(ctx context.Context, cfg ConsumerConfig) (Consumer, error)

// CreateConsumer creates a consumer on a given stream with given config.
// If consumer already exists, an ErrConsumerExists is returned.
// Consumer interface is returned, serving as a hook to operate on a consumer (e.g. fetch messages).
CreateConsumer(ctx context.Context, cfg ConsumerConfig) (Consumer, error)

// UpdateConsumer updates an existing consumer with given config.
// If consumer does not exist, an ErrConsumerDoesNotExist is returned.
// Consumer interface is returned, serving as a hook to operate on a consumer (e.g. fetch messages).
UpdateConsumer(ctx context.Context, cfg ConsumerConfig) (Consumer, error)

// OrderedConsumer returns an OrderedConsumer instance.
// OrderedConsumer allows fetching messages from a stream (just like standard consumer),
// for in order delivery of messages. Underlying consumer is re-created when necessary,
Expand All @@ -74,6 +84,7 @@ type (
// ConsumerNames returns a ConsumerNameLister enabling iterating over a channel of consumer names
ConsumerNames(context.Context) ConsumerNameLister
}

RawStreamMsg struct {
Subject string
Sequence uint64
Expand All @@ -100,11 +111,6 @@ type (
*ConsumerInfo
}

createConsumerRequest struct {
Stream string `json:"stream_name"`
Config *ConsumerConfig `json:"config"`
}

StreamPurgeOpt func(*StreamPurgeRequest) error

StreamPurgeRequest struct {
Expand Down Expand Up @@ -194,7 +200,15 @@ type (
)

func (s *stream) CreateOrUpdateConsumer(ctx context.Context, cfg ConsumerConfig) (Consumer, error) {
return upsertConsumer(ctx, s.jetStream, s.name, cfg)
return upsertConsumer(ctx, s.jetStream, s.name, cfg, consumerActionCreateOrUpdate)
}

func (s *stream) CreateConsumer(ctx context.Context, cfg ConsumerConfig) (Consumer, error) {
return upsertConsumer(ctx, s.jetStream, s.name, cfg, consumerActionCreate)
}

func (s *stream) UpdateConsumer(ctx context.Context, cfg ConsumerConfig) (Consumer, error) {
return upsertConsumer(ctx, s.jetStream, s.name, cfg, consumerActionUpdate)
}

func (s *stream) OrderedConsumer(ctx context.Context, cfg OrderedConsumerConfig) (Consumer, error) {
Expand Down
181 changes: 181 additions & 0 deletions jetstream/test/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1090,6 +1090,187 @@ func TestJetStream_CreateOrUpdateConsumer(t *testing.T) {
}
}

func TestJetStream_CreateConsumer(t *testing.T) {
tests := []struct {
name string
consumerConfig jetstream.ConsumerConfig
shouldCreate bool
stream string
withError error
}{
{
name: "create consumer",
consumerConfig: jetstream.ConsumerConfig{Durable: "dur"},
stream: "foo",
shouldCreate: true,
},
{
name: "consumer already exists, error",
consumerConfig: jetstream.ConsumerConfig{Durable: "dur", Description: "test consumer"},
stream: "foo",
withError: jetstream.ErrConsumerExists,
},
{
name: "stream does not exist",
stream: "abc",
consumerConfig: jetstream.ConsumerConfig{Durable: "dur"},
withError: jetstream.ErrStreamNotFound,
},
}

srv := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, srv)
nc, err := nats.Connect(srv.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

js, err := jetstream.New(nc)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var sub *nats.Subscription
if test.consumerConfig.FilterSubject != "" {
sub, err = nc.SubscribeSync(fmt.Sprintf("$JS.API.CONSUMER.CREATE.foo.*.%s", test.consumerConfig.FilterSubject))
} else {
sub, err = nc.SubscribeSync("$JS.API.CONSUMER.CREATE.foo.*")
}
c, err := js.CreateConsumer(ctx, test.stream, test.consumerConfig)
if test.withError != nil {
if !errors.Is(err, test.withError) {
t.Fatalf("Expected error: %v; got: %v", test.withError, err)
}
return
}
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if test.shouldCreate {
if _, err := sub.NextMsgWithContext(ctx); err != nil {
t.Fatalf("Expected request on %s; got %s", sub.Subject, err)
}
}
ci, err := s.Consumer(ctx, c.CachedInfo().Name)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if ci.CachedInfo().Config.AckPolicy != test.consumerConfig.AckPolicy {
t.Fatalf("Invalid ack policy; want: %s; got: %s", test.consumerConfig.AckPolicy, ci.CachedInfo().Config.AckPolicy)
}
if !reflect.DeepEqual(test.consumerConfig.FilterSubjects, ci.CachedInfo().Config.FilterSubjects) {
t.Fatalf("Invalid filter subjects; want: %v; got: %v", test.consumerConfig.FilterSubjects, ci.CachedInfo().Config.FilterSubjects)
}
})
}
}

func TestJetStream_UpdateConsumer(t *testing.T) {
tests := []struct {
name string
consumerConfig jetstream.ConsumerConfig
shouldUpdate bool
stream string
withError error
}{
{
name: "update consumer",
consumerConfig: jetstream.ConsumerConfig{Name: "testcons", Description: "updated consumer"},
stream: "foo",
shouldUpdate: true,
},
{
name: "illegal update",
consumerConfig: jetstream.ConsumerConfig{Name: "testcons", AckPolicy: jetstream.AckNonePolicy},
stream: "foo",
withError: jetstream.ErrConsumerCreate,
},
{
name: "consumer does not exist",
consumerConfig: jetstream.ConsumerConfig{Name: "abc"},
stream: "foo",
withError: jetstream.ErrConsumerDoesNotExist,
},
{
name: "stream does not exist",
consumerConfig: jetstream.ConsumerConfig{Name: "testcons"},
stream: "abc",
withError: jetstream.ErrStreamNotFound,
},
}

srv := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, srv)
nc, err := nats.Connect(srv.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

js, err := jetstream.New(nc)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

_, err = s.CreateConsumer(ctx, jetstream.ConsumerConfig{Name: "testcons"})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var sub *nats.Subscription
if test.consumerConfig.FilterSubject != "" {
sub, err = nc.SubscribeSync(fmt.Sprintf("$JS.API.CONSUMER.CREATE.foo.*.%s", test.consumerConfig.FilterSubject))
} else {
sub, err = nc.SubscribeSync("$JS.API.CONSUMER.CREATE.foo.*")
}
c, err := js.UpdateConsumer(ctx, test.stream, test.consumerConfig)
if test.withError != nil {
if !errors.Is(err, test.withError) {
t.Fatalf("Expected error: %v; got: %v", test.withError, err)
}
return
}
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if test.shouldUpdate {
if _, err := sub.NextMsgWithContext(ctx); err != nil {
t.Fatalf("Expected request on %s; got %s", sub.Subject, err)
}
}
ci, err := s.Consumer(ctx, c.CachedInfo().Name)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if ci.CachedInfo().Config.AckPolicy != test.consumerConfig.AckPolicy {
t.Fatalf("Invalid ack policy; want: %s; got: %s", test.consumerConfig.AckPolicy, ci.CachedInfo().Config.AckPolicy)
}
if !reflect.DeepEqual(test.consumerConfig.FilterSubjects, ci.CachedInfo().Config.FilterSubjects) {
t.Fatalf("Invalid filter subjects; want: %v; got: %v", test.consumerConfig.FilterSubjects, ci.CachedInfo().Config.FilterSubjects)
}
})
}
}

func TestJetStream_Consumer(t *testing.T) {
tests := []struct {
name string
Expand Down

0 comments on commit 2d59abc

Please sign in to comment.