From c2d3ef1021fa1b5dd436d3ea15490757eceaf210 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-No=C3=ABl=20Moyne?= Date: Tue, 15 Aug 2023 16:35:19 -0700 Subject: [PATCH 1/2] Fix potential out of range for stream source transform update. Clean up un-needed if statement as it's ok to call NewSubjectTransform with an empty destination (ie no transformation) it will return nil MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jean-Noël Moyne --- server/stream.go | 39 ++++++++++++++++++--------------------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/server/stream.go b/server/stream.go index bd63866113..278ed65bb2 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1730,21 +1730,21 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool) if len(s.SubjectTransforms) == 0 { si = &sourceInfo{name: s.Name, iname: s.iname, sf: s.FilterSubject} - // Check for transform. - if s.SubjectTransformDest != _EMPTY_ { - var err error - if si.tr, err = NewSubjectTransform(s.FilterSubject, s.SubjectTransformDest); err != nil { - mset.mu.Unlock() - return fmt.Errorf("stream source subject transform from '%s' to '%s': %w", s.FilterSubject, s.SubjectTransformDest, err) - } + // set for transform if any + var err error + if si.tr, err = NewSubjectTransform(s.FilterSubject, s.SubjectTransformDest); err != nil { + mset.mu.Unlock() + return fmt.Errorf("stream source subject transform from '%s' to '%s': %w", s.FilterSubject, s.SubjectTransformDest, err) } } else { si = &sourceInfo{name: s.Name, iname: s.iname} + si.trs = make([]*subjectTransform, len(s.SubjectTransforms)) for i := range s.SubjectTransforms { // err can be ignored as already validated in config check var err error si.trs[i], err = NewSubjectTransform(s.SubjectTransforms[i].Source, s.SubjectTransforms[i].Destination) if err != nil { + mset.mu.Unlock() mset.srv.Errorf("Unable to get subject transform for source: %v", err) } } @@ -2442,12 +2442,11 @@ func (mset *stream) setupMirrorConsumer() error { // Filters if mset.cfg.Mirror.FilterSubject != _EMPTY_ { req.Config.FilterSubject = mset.cfg.Mirror.FilterSubject - if mset.cfg.Mirror.SubjectTransformDest != _EMPTY_ { - var err error - mirror.tr, err = NewSubjectTransform(mset.cfg.Mirror.FilterSubject, mset.cfg.Mirror.SubjectTransformDest) - if err != nil { - mset.srv.Errorf("Unable to get transform for mirror consumer: %v", err) - } + // Set transform if any + var err error + mirror.tr, err = NewSubjectTransform(mset.cfg.Mirror.FilterSubject, mset.cfg.Mirror.SubjectTransformDest) + if err != nil { + mset.srv.Errorf("Unable to get transform for mirror consumer: %v", err) } } @@ -3258,14 +3257,12 @@ func (mset *stream) startingSequenceForSources() { if len(ssi.SubjectTransforms) == 0 { si = &sourceInfo{name: ssi.Name, iname: ssi.iname, sf: ssi.FilterSubject} - // Check for transform. - if ssi.SubjectTransformDest != _EMPTY_ { - // no need to check the error as already validated that it will not before - var err error - si.tr, err = NewSubjectTransform(ssi.FilterSubject, ssi.SubjectTransformDest) - if err != nil { - mset.srv.Errorf("Unable to get subject transform for source: %v", err) - } + // Set the transform if any + // technically no need to check the error as already validated that it will not before + var err error + si.tr, err = NewSubjectTransform(ssi.FilterSubject, ssi.SubjectTransformDest) + if err != nil { + mset.srv.Errorf("Unable to get subject transform for source: %v", err) } } else { var trs []*subjectTransform From 0cc43acb84ae522502b3ed2664093150ac0635b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-No=C3=ABl=20Moyne?= Date: Tue, 15 Aug 2023 19:22:09 -0700 Subject: [PATCH 2/2] Fix Nats-Stream-Source header parsing when using multi-filter transforms MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jean-Noël Moyne --- server/stream.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/stream.go b/server/stream.go index 278ed65bb2..9fe4b92ff1 100644 --- a/server/stream.go +++ b/server/stream.go @@ -3137,7 +3137,7 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool { // Generate a new (2.10) style source header (stream name, sequence number, source filter, source destination transform). func (si *sourceInfo) genSourceHeader(reply string) string { var b strings.Builder - iNameParts := strings.Fields(si.iname) + iNameParts := strings.Split(si.iname, " ") b.WriteString(iNameParts[0]) b.WriteByte(' ') @@ -3186,7 +3186,7 @@ func streamAndSeq(shdr string) (string, string, uint64) { return streamAndSeqFromAckReply(shdr) } // New version which is stream index name sequence - fields := strings.Fields(shdr) + fields := strings.Split(shdr, " ") nFields := len(fields) if nFields != 2 && nFields <= 3 {