From 34c9222a8f01f3a92431af864ac222ba030be8a3 Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Tue, 3 Oct 2023 10:04:11 -0500 Subject: [PATCH] [FIX] direct get APIs can contain duplicate `Nats-*` headers, because it simply appends JSON bytes to existing headers. If the message was onboarded on a republish, this will contain system headers. Since headers are not ordered and can contain multiple values for the same header, this can break KV clients as well as create ambiguity on the stream that yielded the message. Fix #4573 --- server/jetstream_test.go | 49 ++++++++++++++++++++++++++++++++++++++++ server/stream.go | 4 ++++ 2 files changed, 53 insertions(+) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index db2439acdad..5e230d7afee 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -21931,3 +21931,52 @@ func TestJetStreamFilteredSubjectUsesNewConsumerCreateSubject(t *testing.T) { }, }) } + +func Test_NoDuplicateHeadersOnDirectGet(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, _ := jsClientConnect(t, s) + defer nc.Close() + + name := nuid.Next() + + req := StreamConfig{ + Name: name, + Storage: MemoryStorage, + Subjects: []string{fmt.Sprintf("%s.>", name)}, + RePublish: &RePublish{ + Source: ">", + Destination: fmt.Sprintf("X%s.>", name), + }, + } + reqJson, err := json.Marshal(req) + require_NoError(t, err) + _, err = nc.Request(fmt.Sprintf(JSApiStreamCreateT, name), reqJson, time.Second) + require_NoError(t, err) + + targetStream := fmt.Sprintf("%s-2", name) + req = StreamConfig{ + Name: targetStream, + Storage: MemoryStorage, + Subjects: []string{fmt.Sprintf("X%s.>", name)}, + AllowDirect: true, + } + reqJson, err = json.Marshal(req) + require_NoError(t, err) + _, err = nc.Request(fmt.Sprintf(JSApiStreamCreateT, req.Name), reqJson, time.Second) + require_NoError(t, err) + + // Now publish a message to the source stream. + sendStreamMsg(t, nc, fmt.Sprintf("%s.data", name), "data") + + getSubj := fmt.Sprintf(JSDirectMsgGetT, targetStream) + + r, err := nc.Request(getSubj, []byte("{\"seq\":1}"), time.Second) + require_NoError(t, err) + + require_Equal(t, len(r.Header.Values(JSStream)), 1) + require_Equal(t, len(r.Header.Values(JSSubject)), 1) + require_Equal(t, len(r.Header.Values(JSSequence)), 1) + require_Equal(t, len(r.Header.Values(JSTimeStamp)), 1) +} diff --git a/server/stream.go b/server/stream.go index 72ca5f354d1..1eb3ef81fef 100644 --- a/server/stream.go +++ b/server/stream.go @@ -4051,9 +4051,13 @@ func (mset *stream) getDirectRequest(req *JSApiMsgGetRequest, reply string) { hdr = []byte(fmt.Sprintf(ht, name, sm.subj, sm.seq, ts.Format(time.RFC3339Nano))) } else { hdr = copyBytes(hdr) + hdr = removeHeaderIfPresent(hdr, JSStream) hdr = genHeader(hdr, JSStream, name) + hdr = removeHeaderIfPresent(hdr, JSSubject) hdr = genHeader(hdr, JSSubject, sm.subj) + hdr = removeHeaderIfPresent(hdr, JSSequence) hdr = genHeader(hdr, JSSequence, strconv.FormatUint(sm.seq, 10)) + hdr = removeHeaderIfPresent(hdr, JSTimeStamp) hdr = genHeader(hdr, JSTimeStamp, ts.Format(time.RFC3339Nano)) } mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, sm.msg, nil, 0))