diff --git a/server/client.go b/server/client.go index e3364c8a80f..273dc49925a 100644 --- a/server/client.go +++ b/server/client.go @@ -3968,6 +3968,24 @@ func (c *client) setupResponseServiceImport(acc *Account, si *serviceImport, tra return rsi } +func removeAllNatsHeadersIfPresent(hdr []byte) []byte { + for { + start := bytes.Index(hdr, []byte(JSHeaderPrefix)) + // key can't be first and we want to check that it is preceded by a '\n' + if start < 1 || hdr[start-1] != '\n' { + return hdr + } + end := bytes.Index(hdr[start:], []byte(_CRLF_)) + if end < 0 { + return hdr + } + hdr = append(hdr[:start], hdr[start+end+len(_CRLF_):]...) + if len(hdr) <= len(emptyHdrLine) { + return nil + } + } +} + // Will remove a header if present. func removeHeaderIfPresent(hdr []byte, key string) []byte { start := bytes.Index(hdr, []byte(key)) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index db2439acdad..3249f828e83 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -21931,3 +21931,106 @@ func TestJetStreamFilteredSubjectUsesNewConsumerCreateSubject(t *testing.T) { }, }) } + +func TestJetStreamNoDuplicateHeadersOnDirectGet(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, _ := jsClientConnect(t, s) + defer nc.Close() + + name := nuid.Next() + + req := StreamConfig{ + Name: name, + Storage: MemoryStorage, + Subjects: []string{fmt.Sprintf("%s.>", name)}, + RePublish: &RePublish{ + Source: ">", + Destination: fmt.Sprintf("X%s.>", name), + }, + } + reqJson, err := json.Marshal(req) + require_NoError(t, err) + _, err = nc.Request(fmt.Sprintf(JSApiStreamCreateT, name), reqJson, time.Second) + require_NoError(t, err) + + targetStream := fmt.Sprintf("%s-2", name) + req = StreamConfig{ + Name: targetStream, + Storage: MemoryStorage, + Subjects: []string{fmt.Sprintf("X%s.>", name)}, + AllowDirect: true, + } + reqJson, err = json.Marshal(req) + require_NoError(t, err) + _, err = nc.Request(fmt.Sprintf(JSApiStreamCreateT, req.Name), reqJson, time.Second) + require_NoError(t, err) + + // Now publish a message to the source stream. + sendStreamMsg(t, nc, fmt.Sprintf("%s.data", name), "data") + + getSubj := fmt.Sprintf(JSDirectMsgGetT, targetStream) + + r, err := nc.Request(getSubj, []byte("{\"seq\":1}"), time.Second) + require_NoError(t, err) + + require_Equal(t, len(r.Header.Values(JSStream)), 1) + require_Equal(t, len(r.Header.Values(JSSubject)), 1) + require_Equal(t, len(r.Header.Values(JSSequence)), 1) + require_Equal(t, len(r.Header.Values(JSTimeStamp)), 1) +} + +func TestRemoveAllJetStreamHeadersIfPresent(t *testing.T) { + // copied private fn from nats.go + headerBytes := func(h nats.Header) ([]byte, error) { + var hdr []byte + if len(h) == 0 { + return hdr, nil + } + + var b bytes.Buffer + _, err := b.WriteString(hdrLine) + if err != nil { + return nil, err + } + + err = http.Header(h).Write(&b) + if err != nil { + return nil, err + } + + _, err = b.WriteString("\r\n") + if err != nil { + return nil, err + } + + return b.Bytes(), nil + } + + h := nats.Header{} + // expect empty to become nil + hb, err := headerBytes(h) + require_NoError(t, err) + hb = removeAllNatsHeadersIfPresent(hb) + if hb != nil { + t.Fatalf("expected headers to be nil") + } + + // expect Nats-* to be removed, thus be nil + h.Add("Nats-Test", "test") + h.Add("Nats-Something-Else", "test") + hb, err = headerBytes(h) + require_NoError(t, err) + hb = removeAllNatsHeadersIfPresent(hb) + if hb != nil { + t.Fatalf("expected headers to be nil") + } + + // expect one header to remain + h.Add("Something-Else", "hey") + hb, err = headerBytes(h) + require_NoError(t, err) + hb = removeAllNatsHeadersIfPresent(hb) + require_Equal(t, "NATS/1.0\r\nSomething-Else: hey\r\n\r\n", string(hb)) +} diff --git a/server/stream.go b/server/stream.go index 72ca5f354d1..3e7871cda2e 100644 --- a/server/stream.go +++ b/server/stream.go @@ -341,6 +341,7 @@ const ( // Headers for republished messages and direct gets. const ( + JSHeaderPrefix = "Nats-" JSStream = "Nats-Stream" JSSequence = "Nats-Sequence" JSTimeStamp = "Nats-Time-Stamp" @@ -4399,6 +4400,11 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte, } } + // if we are NOT doing subject remapping, this is a message to on-board it shouldn't store Nats-* headers + if tsubj == _EMPTY_ { + hdr = removeAllNatsHeadersIfPresent(hdr) + } + // Store actual msg. if lseq == 0 && ts == 0 { seq, ts, err = store.StoreMsg(subject, hdr, msg)