From a9276b107235481e56e27cc559862fdb8392a491 Mon Sep 17 00:00:00 2001 From: Marco Amador Date: Thu, 23 May 2024 19:37:38 +0100 Subject: [PATCH] Add an opt-in parameter to create the stream and subject if not exist Signed-off-by: Marco Amador --- internal/impl/nats/input_jetstream.go | 64 ++++++++++++++++++++++++++- 1 file changed, 63 insertions(+), 1 deletion(-) diff --git a/internal/impl/nats/input_jetstream.go b/internal/impl/nats/input_jetstream.go index b745438b9..4ea477b9e 100644 --- a/internal/impl/nats/input_jetstream.go +++ b/internal/impl/nats/input_jetstream.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "strconv" + "strings" "sync" "time" @@ -79,6 +80,14 @@ xref:configuration:interpolation.adoc#bloblang-queries[function interpolation]. Description("The maximum number of outstanding acks to be allowed before consuming is halted."). Advanced(). Default(1024)). + Field(service.NewBoolField("create_if_not_exists"). + Description("Create the `stream` and `subject` if do not exist."). + Advanced(). + Default(false)). + Field(service.NewStringEnumField("storage_type", "memory", "file"). + Description("Storage type to use when the stream does not exist and is created when `create_if_not_exists` is set to true. Can be `memory` or `file` storage"). + Advanced(). + Default("memory")). Field(service.NewDurationField("nak_delay"). Description("An optional delay duration on redelivering the messages when negatively acknowledged."). Example("1m"). @@ -122,6 +131,8 @@ type jetStreamReader struct { nakDelay time.Duration nakDelayUntilHeader string maxAckPending int + createIfNotExists bool + storageType string log *service.Logger @@ -192,7 +203,7 @@ func newJetStreamReaderFromConfig(conf *service.ParsedConfig, mgr *service.Resou } } else { if j.subject == "" && j.stream == "" { - return nil, errors.New("subject and stream is empty") + return nil, errors.New("subject and stream are empty") } } @@ -207,6 +218,17 @@ func newJetStreamReaderFromConfig(conf *service.ParsedConfig, mgr *service.Resou } } + if conf.Contains("create_if_not_exists") { + if j.createIfNotExists, err = conf.FieldBool("create_if_not_exists"); err != nil { + return nil, err + } + } + if conf.Contains("storage_type") { + if j.storageType, err = conf.FieldString("storage_type"); err != nil { + return nil, err + } + } + if conf.Contains("nak_delay") { if j.nakDelay, err = conf.FieldDuration("nak_delay"); err != nil { return nil, err @@ -304,7 +326,47 @@ func (j *jetStreamReader) Connect(ctx context.Context) (err error) { natsSub, err = jCtx.QueueSubscribeSync(j.subject, j.queue, options...) } } + if err != nil { + if j.createIfNotExists { + var natsErr *nats.APIError + if errors.As(err, &natsErr) { + if natsErr.ErrorCode == nats.JSErrCodeStreamNotFound { + _, err = jCtx.AddStream(&nats.StreamConfig{ + Name: j.stream, + Subjects: func() []string { + if j.subject == "" { + return nil + } + return []string{j.subject} + }(), + Storage: func() nats.StorageType { + if j.storageType == "file" { + return nats.FileStorage + } + return nats.MemoryStorage + }(), + }) + } + } else if strings.Contains(err.Error(), "does not match consumer") { + // create subject to existent stream .stream + _, err = jCtx.UpdateStream(&nats.StreamConfig{ + Name: j.stream, + Subjects: func() []string { + if j.subject == "" { + return nil + } + return []string{j.subject} + }(), + Storage: func() nats.StorageType { + if j.storageType == "file" { + return nats.FileStorage + } + return nats.MemoryStorage + }(), + }) + } + } return err }