Skip to content

Commit

Permalink
Add an opt-in parameter to create the stream and subject if not exist
Browse files Browse the repository at this point in the history
Signed-off-by: Marco Amador <amador.marco@gmail.com>
  • Loading branch information
mfamador committed Jun 4, 2024
1 parent 45d2239 commit acf260b
Showing 1 changed file with 63 additions and 1 deletion.
64 changes: 63 additions & 1 deletion internal/impl/nats/input_jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -79,6 +80,14 @@ xref:configuration:interpolation.adoc#bloblang-queries[function interpolation].
Description("The maximum number of outstanding acks to be allowed before consuming is halted.").
Advanced().
Default(1024)).
Field(service.NewBoolField("create_if_not_exists").
Description("Create the `stream` and `subject` if do not exist.").
Advanced().
Default(false)).
Field(service.NewStringEnumField("storage_type", "memory", "file").
Description("Storage type to use when the stream does not exist and is created when `create_if_not_exists` is set to true. Can be `memory` or `file` storage").
Advanced().
Default("memory")).
Field(service.NewDurationField("nak_delay").
Description("An optional delay duration on redelivering the messages when negatively acknowledged.").
Example("1m").
Expand Down Expand Up @@ -122,6 +131,8 @@ type jetStreamReader struct {
nakDelay time.Duration
nakDelayUntilHeader string
maxAckPending int
createIfNotExists bool
storageType string

log *service.Logger

Expand Down Expand Up @@ -192,7 +203,7 @@ func newJetStreamReaderFromConfig(conf *service.ParsedConfig, mgr *service.Resou
}
} else {
if j.subject == "" && j.stream == "" {
return nil, errors.New("subject and stream is empty")
return nil, errors.New("subject and stream are empty")
}
}

Expand All @@ -207,6 +218,17 @@ func newJetStreamReaderFromConfig(conf *service.ParsedConfig, mgr *service.Resou
}
}

if conf.Contains("create_if_not_exists") {
if j.createIfNotExists, err = conf.FieldBool("create_if_not_exists"); err != nil {
return nil, err
}
}
if conf.Contains("storage_type") {
if j.storageType, err = conf.FieldString("storage_type"); err != nil {
return nil, err
}
}

if conf.Contains("nak_delay") {
if j.nakDelay, err = conf.FieldDuration("nak_delay"); err != nil {
return nil, err
Expand Down Expand Up @@ -304,7 +326,47 @@ func (j *jetStreamReader) Connect(ctx context.Context) (err error) {
natsSub, err = jCtx.QueueSubscribeSync(j.subject, j.queue, options...)
}
}

if err != nil {
if j.createIfNotExists {
var natsErr *nats.APIError
if errors.As(err, &natsErr) {
if natsErr.ErrorCode == nats.JSErrCodeStreamNotFound {
_, err = jCtx.AddStream(&nats.StreamConfig{
Name: j.stream,
Subjects: func() []string {
if j.subject == "" {
return nil
}
return []string{j.subject}
}(),
Storage: func() nats.StorageType {
if j.storageType == "file" {
return nats.FileStorage
}
return nats.MemoryStorage
}(),
})
}
} else if strings.Contains(err.Error(), "does not match consumer") {
// create subject to existent stream .stream
_, err = jCtx.UpdateStream(&nats.StreamConfig{
Name: j.stream,
Subjects: func() []string {
if j.subject == "" {
return nil
}
return []string{j.subject}
}(),
Storage: func() nats.StorageType {
if j.storageType == "file" {
return nats.FileStorage
}
return nats.MemoryStorage
}(),
})
}
}
return err
}

Expand Down

0 comments on commit acf260b

Please sign in to comment.