Skip to content

Commit

Permalink
Merge pull request #1410 from nats-io/2.10.0
Browse files Browse the repository at this point in the history
Merge 2.10.0 to main
  • Loading branch information
piotrpio committed Sep 20, 2023
2 parents 33fb960 + cc6870c commit b3fab9b
Show file tree
Hide file tree
Showing 22 changed files with 1,658 additions and 228 deletions.
5 changes: 5 additions & 0 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,11 @@ func ExampleSubOpt() {
js.Subscribe("foo", func(msg *nats.Msg) {
fmt.Printf("Received a message: %s\n", string(msg.Data))
}, nats.Durable("FOO"), nats.SkipConsumerLookup())

// Use multiple subject filters.
js.Subscribe("", func(msg *nats.Msg) {
fmt.Printf("Received a message: %s\n", string(msg.Data))
}, nats.Durable("FOO"), nats.ConsumerFilterSubjects("foo", "bar"), nats.BindStream("test_stream"))
}

func ExampleMaxWait() {
Expand Down
14 changes: 7 additions & 7 deletions go_test.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@ go 1.19

require (
github.com/golang/protobuf v1.4.2
github.com/klauspost/compress v1.16.7
github.com/nats-io/nats-server/v2 v2.9.22
github.com/nats-io/nkeys v0.4.4
github.com/klauspost/compress v1.17.0
github.com/nats-io/nats-server/v2 v2.10.0
github.com/nats-io/nkeys v0.4.5
github.com/nats-io/nuid v1.0.1
go.uber.org/goleak v1.2.1
golang.org/x/text v0.12.0
golang.org/x/text v0.13.0
google.golang.org/protobuf v1.23.0
)

require (
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.5.0 // indirect
golang.org/x/crypto v0.12.0 // indirect
golang.org/x/sys v0.11.0 // indirect
github.com/nats-io/jwt/v2 v2.5.2 // indirect
golang.org/x/crypto v0.13.0 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/time v0.3.0 // indirect
)
29 changes: 15 additions & 14 deletions go_test.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,30 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.5.0 h1:WQQ40AAlqqfx+f6ku+i0pOVm+ASirD4fUh+oQsiE9Ak=
github.com/nats-io/jwt/v2 v2.5.0/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
github.com/nats-io/nats-server/v2 v2.9.22 h1:rzl88pqWFFrU4G00ed+JnY+uGHSLZ+3jrxDnJxzKwGA=
github.com/nats-io/nats-server/v2 v2.9.22/go.mod h1:wEjrEy9vnqIGE4Pqz4/c75v9Pmaq7My2IgFmnykc4C0=
github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA=
github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU=
github.com/nats-io/jwt/v2 v2.5.2/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
github.com/nats-io/nats-server/v2 v2.10.0 h1:rcU++Hzo+wARxtJugrV3J5z5iGdHeVG8tT8Chb3bKDg=
github.com/nats-io/nats-server/v2 v2.10.0/go.mod h1:3PMvMSu2cuK0J9YInRLWdFpFsswKKGUS77zVSAudRto=
github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk=
github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk=
golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw=
golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck=
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc=
golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
Expand Down
23 changes: 20 additions & 3 deletions jetstream/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,16 +253,33 @@ CRUD operations on consumers can be achieved on 2 levels:
js, _ := jetstream.New(nc)

// create a consumer (this is an idempotent operation)
cons, _ := js.CreateOrUpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
// an error will be returned if consumer already exists and has different configuration.
cons, _ := js.CreateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
Durable: "foo",
AckPolicy: jetstream.AckExplicitPolicy,
})

// create an ephemeral pull consumer by not providing `Durable`
ephemeral, _ := js.CreateOrUpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
ephemeral, _ := js.CreateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
AckPolicy: jetstream.AckExplicitPolicy,
})


// consumer can also be created using CreateOrUpdateConsumer
// this method will either create a consumer if it does not exist
// or update existing consumer (if possible)
cons2 := js.CreateOrUpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
Name: "bar",
})

// consumers can be updated
// an error will be returned if consumer with given name does not exists
// or an illegal property is to be updated (e.g. AckPolicy)
updated, _ := js.UpdateConsumer(ctx, "ORDERS", jetstream.ConsumerConfig{
AckPolicy: jetstream.AckExplicitPolicy,
Description: "updated consumer"
})

// get consumer handle
cons, _ = js.Consumer(ctx, "ORDERS", "foo")

Expand All @@ -280,7 +297,7 @@ js, _ := jetstream.New(nc)
stream, _ := js.Stream(ctx, "ORDERS")

// create consumer
cons, _ := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
cons, _ := stream.CreateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "foo",
AckPolicy: jetstream.AckExplicitPolicy,
})
Expand Down
22 changes: 20 additions & 2 deletions jetstream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ type (
// CachedInfo returns [*ConsumerInfo] cached on a consumer struct
CachedInfo() *ConsumerInfo
}

createConsumerRequest struct {
Stream string `json:"stream_name"`
Config *ConsumerConfig `json:"config"`
Action string `json:"action"`
}
)

// Info returns [ConsumerInfo] for a given consumer
Expand Down Expand Up @@ -84,14 +90,15 @@ func (p *pullConsumer) CachedInfo() *ConsumerInfo {
return p.info
}

func upsertConsumer(ctx context.Context, js *jetStream, stream string, cfg ConsumerConfig) (Consumer, error) {
func upsertConsumer(ctx context.Context, js *jetStream, stream string, cfg ConsumerConfig, action string) (Consumer, error) {
ctx, cancel := wrapContextWithoutDeadline(ctx)
if cancel != nil {
defer cancel()
}
req := createConsumerRequest{
Stream: stream,
Config: &cfg,
Action: action,
}
reqJSON, err := json.Marshal(req)
if err != nil {
Expand All @@ -111,7 +118,7 @@ func upsertConsumer(ctx context.Context, js *jetStream, stream string, cfg Consu
}

var ccSubj string
if cfg.FilterSubject != "" {
if cfg.FilterSubject != "" && len(cfg.FilterSubjects) == 0 {
ccSubj = apiSubj(js.apiPrefix, fmt.Sprintf(apiConsumerCreateWithFilterSubjectT, stream, consumerName, cfg.FilterSubject))
} else {
ccSubj = apiSubj(js.apiPrefix, fmt.Sprintf(apiConsumerCreateT, stream, consumerName))
Expand All @@ -128,6 +135,11 @@ func upsertConsumer(ctx context.Context, js *jetStream, stream string, cfg Consu
return nil, resp.Error
}

// check whether multiple filter subjects (if used) are reflected in the returned ConsumerInfo
if len(cfg.FilterSubjects) != 0 && len(resp.Config.FilterSubjects) == 0 {
return nil, ErrConsumerMultipleFilterSubjectsNotSupported
}

return &pullConsumer{
jetStream: js,
stream: stream,
Expand All @@ -138,6 +150,12 @@ func upsertConsumer(ctx context.Context, js *jetStream, stream string, cfg Consu
}, nil
}

const (
consumerActionCreate = "create"
consumerActionUpdate = "update"
consumerActionCreateOrUpdate = ""
)

func generateConsName() string {
name := nuid.Next()
sha := sha256.New()
Expand Down
5 changes: 5 additions & 0 deletions jetstream/consumer_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ type (

// NOTE: FilterSubjects requires nats-server v2.10.0+
FilterSubjects []string `json:"filter_subjects,omitempty"`

// Metadata is additional metadata for the Consumer.
// Keys starting with `_nats` are reserved.
// NOTE: Metadata requires nats-server v2.10.0+
Metadata map[string]string `json:"metadata,omitempty"`
}

OrderedConsumerConfig struct {
Expand Down
53 changes: 49 additions & 4 deletions jetstream/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,15 @@ const (
JSErrCodeStreamNotFound ErrorCode = 10059
JSErrCodeStreamNameInUse ErrorCode = 10058

JSErrCodeConsumerCreate ErrorCode = 10012
JSErrCodeConsumerNotFound ErrorCode = 10014
JSErrCodeConsumerNameExists ErrorCode = 10013
JSErrCodeConsumerAlreadyExists ErrorCode = 10105
JSErrCodeConsumerCreate ErrorCode = 10012
JSErrCodeConsumerNotFound ErrorCode = 10014
JSErrCodeConsumerNameExists ErrorCode = 10013
JSErrCodeConsumerAlreadyExists ErrorCode = 10105
JSErrCodeConsumerExists ErrorCode = 10148
JSErrCodeDuplicateFilterSubjects ErrorCode = 10136
JSErrCodeOverlappingFilterSubjects ErrorCode = 10138
JSErrCodeConsumerEmptyFilter ErrorCode = 10139
JSErrCodeConsumerDoesNotExist ErrorCode = 10149

JSErrCodeMessageNotFound ErrorCode = 10037

Expand All @@ -76,9 +81,35 @@ var (
// ErrStreamNameAlreadyInUse is returned when a stream with given name already exists and has a different configuration.
ErrStreamNameAlreadyInUse JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamNameInUse, Description: "stream name already in use", Code: 400}}

// ErrStreamSubjectTransformNotSupported is returned when the connected nats-server version does not support setting
// the stream subject transform. If this error is returned when executing CreateStream(), the stream with invalid
// configuration was already created in the server.
ErrStreamSubjectTransformNotSupported JetStreamError = &jsError{message: "stream subject transformation not supported by nats-server"}

// ErrStreamSourceSubjectTransformNotSupported is returned when the connected nats-server version does not support setting
// the stream source subject transform. If this error is returned when executing CreateStream(), the stream with invalid
// configuration was already created in the server.
ErrStreamSourceSubjectTransformNotSupported JetStreamError = &jsError{message: "stream subject transformation not supported by nats-server"}

// ErrStreamSourceNotSupported is returned when the connected nats-server version does not support setting
// the stream sources. If this error is returned when executing CreateStream(), the stream with invalid
// configuration was already created in the server.
ErrStreamSourceNotSupported JetStreamError = &jsError{message: "stream sourcing is not supported by nats-server"}

// ErrStreamSourceMultipleFilterSubjectsNotSupported is returned when the connected nats-server version does not support setting
// the stream sources. If this error is returned when executing CreateStream(), the stream with invalid
// configuration was already created in the server.
ErrStreamSourceMultipleFilterSubjectsNotSupported JetStreamError = &jsError{message: "stream sourcing with multiple subject filters not supported by nats-server"}

// ErrConsumerNotFound is an error returned when consumer with given name does not exist.
ErrConsumerNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerNotFound, Description: "consumer not found", Code: 404}}

// ErrConsumerExists is returned when attempting to create a consumer with CreateConsumer but a consumer with given name already exists.
ErrConsumerExists JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerExists, Description: "consumer already exists", Code: 400}}

// ErrConsumerNameExists is returned when attempting to update a consumer with UpdateConsumer but a consumer with given name does not exist.
ErrConsumerDoesNotExist JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerDoesNotExist, Description: "consumer does not exist", Code: 400}}

// ErrMsgNotFound is returned when message with provided sequence number does not exist.
ErrMsgNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeMessageNotFound, Description: "message not found", Code: 404}}

Expand All @@ -88,8 +119,22 @@ var (
// ErrConsumerCreate is returned when nats-server reports error when creating consumer (e.g. illegal update).
ErrConsumerCreate JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerCreate, Description: "could not create consumer", Code: 500}}

// ErrDuplicateFilterSubjects is returned when both FilterSubject and FilterSubjects are specified when creating consumer.
ErrDuplicateFilterSubjects JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeDuplicateFilterSubjects, Description: "consumer cannot have both FilterSubject and FilterSubjects specified", Code: 500}}

// ErrDuplicateFilterSubjects is returned when filter subjects overlap when creating consumer.
ErrOverlappingFilterSubjects JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeOverlappingFilterSubjects, Description: "consumer subject filters cannot overlap", Code: 500}}

// ErrEmptyFilter is returned when a filter in FilterSubjects is empty.
ErrEmptyFilter JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerEmptyFilter, Description: "consumer filter in FilterSubjects cannot be empty", Code: 500}}

// Client errors

// ErrConsumerMultipleFilterSubjectsNotSupported is returned when the connected nats-server version does not support setting
// multiple filter subjects with filter_subjects field. If this error is returned when executing AddConsumer(), the consumer with invalid
// configuration was already created in the server.
ErrConsumerMultipleFilterSubjectsNotSupported JetStreamError = &jsError{message: "multiple consumer filter subjects not supported by nats-server"}

// ErrConsumerNotFound is an error returned when consumer with given name does not exist.
ErrConsumerNameAlreadyInUse JetStreamError = &jsError{message: "consumer name already in use"}

Expand Down
56 changes: 55 additions & 1 deletion jetstream/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ type (
// If consumer already exists, it will be updated (if possible).
// Consumer interface is returned, serving as a hook to operate on a consumer (e.g. fetch messages)
CreateOrUpdateConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error)
// CreateConsumer creates a consumer on a given stream with given config.
// If consumer already exists, ErrConsumerExists is returned.
// Consumer interface is returned, serving as a hook to operate on a consumer (e.g. fetch messages)
CreateConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error)
// UpdateConsumer updates an existing consumer.
// If consumer does not exist, ErrConsumerDoesNotExist is returned.
// Consumer interface is returned, serving as a hook to operate on a consumer (e.g. fetch messages)
UpdateConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error)
// OrderedConsumer returns an OrderedConsumer instance.
// OrderedConsumer allows fetching messages from a stream (just like standard consumer),
// for in order delivery of messages. Underlying consumer is re-created when necessary,
Expand Down Expand Up @@ -350,6 +358,22 @@ func (js *jetStream) CreateStream(ctx context.Context, cfg StreamConfig) (Stream
return nil, resp.Error
}

// check that input subject transform (if used) is reflected in the returned StreamInfo
if cfg.SubjectTransform != nil && resp.StreamInfo.Config.SubjectTransform == nil {
return nil, ErrStreamSubjectTransformNotSupported
}

if len(cfg.Sources) != 0 {
if len(cfg.Sources) != len(resp.Sources) {
return nil, ErrStreamSourceNotSupported
}
for i := range cfg.Sources {
if len(cfg.Sources[i].SubjectTransforms) != 0 && len(resp.Sources[i].SubjectTransforms) == 0 {
return nil, ErrStreamSourceMultipleFilterSubjectsNotSupported
}
}
}

return &stream{
jetStream: js,
name: cfg.Name,
Expand Down Expand Up @@ -413,6 +437,22 @@ func (js *jetStream) UpdateStream(ctx context.Context, cfg StreamConfig) (Stream
return nil, resp.Error
}

// check that input subject transform (if used) is reflected in the returned StreamInfo
if cfg.SubjectTransform != nil && resp.StreamInfo.Config.SubjectTransform == nil {
return nil, ErrStreamSubjectTransformNotSupported
}

if len(cfg.Sources) != 0 {
if len(cfg.Sources) != len(resp.Sources) {
return nil, ErrStreamSourceNotSupported
}
for i := range cfg.Sources {
if len(cfg.Sources[i].SubjectTransforms) != 0 && len(resp.Sources[i].SubjectTransforms) == 0 {
return nil, ErrStreamSourceMultipleFilterSubjectsNotSupported
}
}
}

return &stream{
jetStream: js,
name: cfg.Name,
Expand Down Expand Up @@ -480,7 +520,21 @@ func (js *jetStream) CreateOrUpdateConsumer(ctx context.Context, stream string,
if err := validateStreamName(stream); err != nil {
return nil, err
}
return upsertConsumer(ctx, js, stream, cfg)
return upsertConsumer(ctx, js, stream, cfg, consumerActionCreateOrUpdate)
}

func (js *jetStream) CreateConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error) {
if err := validateStreamName(stream); err != nil {
return nil, err
}
return upsertConsumer(ctx, js, stream, cfg, consumerActionCreate)
}

func (js *jetStream) UpdateConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (Consumer, error) {
if err := validateStreamName(stream); err != nil {
return nil, err
}
return upsertConsumer(ctx, js, stream, cfg, consumerActionUpdate)
}

func (js *jetStream) OrderedConsumer(ctx context.Context, stream string, cfg OrderedConsumerConfig) (Consumer, error) {
Expand Down
12 changes: 10 additions & 2 deletions jetstream/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,12 +351,20 @@ func (js *jetStream) CreateKeyValue(ctx context.Context, cfg KeyValueConfig) (Ke
} else if len(cfg.Sources) > 0 {
// For now we do not allow direct subjects for sources. If that is desired a user could use stream API directly.
for _, ss := range cfg.Sources {
if !strings.HasPrefix(ss.Name, kvBucketNamePre) {
ss = ss.copy()
var sourceBucketName string
if strings.HasPrefix(ss.Name, kvBucketNamePre) {
sourceBucketName = ss.Name[len(kvBucketNamePre):]
} else {
sourceBucketName = ss.Name
ss.Name = fmt.Sprintf(kvBucketNameTmpl, ss.Name)
}

if ss.External == nil || sourceBucketName != cfg.Bucket {
ss.SubjectTransforms = []SubjectTransformConfig{{Source: fmt.Sprintf(kvSubjectsTmpl, sourceBucketName), Destination: fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)}}
}
scfg.Sources = append(scfg.Sources, ss)
}
scfg.Subjects = []string{fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)}
} else {
scfg.Subjects = []string{fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)}
}
Expand Down
Loading

0 comments on commit b3fab9b

Please sign in to comment.