From 46daa763937d8ac137cb1343e4f96fea7f87b0f2 Mon Sep 17 00:00:00 2001 From: Alex Ullrich Date: Sat, 9 Sep 2023 18:42:38 -0400 Subject: [PATCH] jetstream: add option to pass arbitrary headers on publish This will allow callers to pass custom header values without needing to import anything beyond the jetstream sub-package. --- jetstream/jetstream_test.go | 24 ++++++++++++++++++++++++ jetstream/options.go | 13 +++++++++++++ jetstream/publish.go | 9 +++++++++ 3 files changed, 46 insertions(+) diff --git a/jetstream/jetstream_test.go b/jetstream/jetstream_test.go index e7f2317db..3d922a1a7 100644 --- a/jetstream/jetstream_test.go +++ b/jetstream/jetstream_test.go @@ -16,6 +16,7 @@ package jetstream import ( "errors" "fmt" + "reflect" "testing" "time" @@ -116,3 +117,26 @@ func TestValidateSubject(t *testing.T) { }) } } + +func TestOptions(t *testing.T) { + t.Run("publish", func(t *testing.T) { + t.Run("WithHeader", func(t *testing.T) { + customHeader := "test" + customHeaderValues := []string{"some", "values"} + po := pubOpts{} + sut := WithHeader(customHeader, customHeaderValues) + + err := sut(&po) + if err != nil { + t.Fatalf("expected nil error got: %s", err) + } + if po.headerValues == nil { + t.Fatal("nil headerValues") + } + + if got := po.headerValues.Values(customHeader); !reflect.DeepEqual(got, customHeaderValues) { + t.Fatalf("header %s not set - expected %+v got %+v", customHeader, customHeaderValues, got) + } + }) + }) +} diff --git a/jetstream/options.go b/jetstream/options.go index e1b4b5b33..b42809c63 100644 --- a/jetstream/options.go +++ b/jetstream/options.go @@ -16,6 +16,8 @@ package jetstream import ( "fmt" "time" + + "github.com/nats-io/nats.go" ) type pullOptFunc func(*consumeOpts) error @@ -250,6 +252,17 @@ func WithStreamListSubject(subject string) StreamListOpt { } } +// WithHeader adds an arbitrary header to the underlying message. +func WithHeader(key string, value []string) PublishOpt { + return func(opts *pubOpts) error { + if opts.headerValues == nil { + opts.headerValues = nats.Header{} + } + opts.headerValues[key] = value + return nil + } +} + // WithMsgID sets the message ID used for deduplication. func WithMsgID(id string) PublishOpt { return func(opts *pubOpts) error { diff --git a/jetstream/publish.go b/jetstream/publish.go index 1215687ee..5cee64efc 100644 --- a/jetstream/publish.go +++ b/jetstream/publish.go @@ -52,6 +52,9 @@ type ( // stallWait is the max wait of a async pub ack. stallWait time.Duration + + // headerValues contains any arbitrary headers provided by the caller + headerValues nats.Header } // PubAckFuture is a future for a PubAck. @@ -171,6 +174,12 @@ func (js *jetStream) PublishMsg(ctx context.Context, m *nats.Msg, opts ...Publis m.Header.Set(ExpectedLastSubjSeqHeader, strconv.FormatUint(*o.lastSubjectSeq, 10)) } + if o.headerValues != nil { + for k, v := range o.headerValues { + m.Header[k] = v + } + } + var resp *nats.Msg var err error