Skip to content

Commit

Permalink
Fix stream config update of source transforms (#4400)
Browse files Browse the repository at this point in the history
 - [x] Build is green in Travis CI
- [X] You have certified that the contribution is your original work and
that you license the work to the project under the [Apache 2
license](https://github.com/nats-io/nats-server/blob/main/LICENSE)

Fixes potential out of range access during some stream source transform
configuration updates and tiny clean up
Fixes stream sourcing message header parsing for multi-subject transform
in sources
  • Loading branch information
neilalexander committed Aug 16, 2023
2 parents c2d1e6d + 0cc43ac commit 7670cf5
Showing 1 changed file with 20 additions and 23 deletions.
43 changes: 20 additions & 23 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1730,21 +1730,21 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)

if len(s.SubjectTransforms) == 0 {
si = &sourceInfo{name: s.Name, iname: s.iname, sf: s.FilterSubject}
// Check for transform.
if s.SubjectTransformDest != _EMPTY_ {
var err error
if si.tr, err = NewSubjectTransform(s.FilterSubject, s.SubjectTransformDest); err != nil {
mset.mu.Unlock()
return fmt.Errorf("stream source subject transform from '%s' to '%s': %w", s.FilterSubject, s.SubjectTransformDest, err)
}
// set for transform if any
var err error
if si.tr, err = NewSubjectTransform(s.FilterSubject, s.SubjectTransformDest); err != nil {
mset.mu.Unlock()
return fmt.Errorf("stream source subject transform from '%s' to '%s': %w", s.FilterSubject, s.SubjectTransformDest, err)
}
} else {
si = &sourceInfo{name: s.Name, iname: s.iname}
si.trs = make([]*subjectTransform, len(s.SubjectTransforms))
for i := range s.SubjectTransforms {
// err can be ignored as already validated in config check
var err error
si.trs[i], err = NewSubjectTransform(s.SubjectTransforms[i].Source, s.SubjectTransforms[i].Destination)
if err != nil {
mset.mu.Unlock()
mset.srv.Errorf("Unable to get subject transform for source: %v", err)
}
}
Expand Down Expand Up @@ -2442,12 +2442,11 @@ func (mset *stream) setupMirrorConsumer() error {
// Filters
if mset.cfg.Mirror.FilterSubject != _EMPTY_ {
req.Config.FilterSubject = mset.cfg.Mirror.FilterSubject
if mset.cfg.Mirror.SubjectTransformDest != _EMPTY_ {
var err error
mirror.tr, err = NewSubjectTransform(mset.cfg.Mirror.FilterSubject, mset.cfg.Mirror.SubjectTransformDest)
if err != nil {
mset.srv.Errorf("Unable to get transform for mirror consumer: %v", err)
}
// Set transform if any
var err error
mirror.tr, err = NewSubjectTransform(mset.cfg.Mirror.FilterSubject, mset.cfg.Mirror.SubjectTransformDest)
if err != nil {
mset.srv.Errorf("Unable to get transform for mirror consumer: %v", err)
}
}

Expand Down Expand Up @@ -3138,7 +3137,7 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool {
// Generate a new (2.10) style source header (stream name, sequence number, source filter, source destination transform).
func (si *sourceInfo) genSourceHeader(reply string) string {
var b strings.Builder
iNameParts := strings.Fields(si.iname)
iNameParts := strings.Split(si.iname, " ")

b.WriteString(iNameParts[0])
b.WriteByte(' ')
Expand Down Expand Up @@ -3187,7 +3186,7 @@ func streamAndSeq(shdr string) (string, string, uint64) {
return streamAndSeqFromAckReply(shdr)
}
// New version which is stream index name <SPC> sequence
fields := strings.Fields(shdr)
fields := strings.Split(shdr, " ")
nFields := len(fields)

if nFields != 2 && nFields <= 3 {
Expand Down Expand Up @@ -3258,14 +3257,12 @@ func (mset *stream) startingSequenceForSources() {

if len(ssi.SubjectTransforms) == 0 {
si = &sourceInfo{name: ssi.Name, iname: ssi.iname, sf: ssi.FilterSubject}
// Check for transform.
if ssi.SubjectTransformDest != _EMPTY_ {
// no need to check the error as already validated that it will not before
var err error
si.tr, err = NewSubjectTransform(ssi.FilterSubject, ssi.SubjectTransformDest)
if err != nil {
mset.srv.Errorf("Unable to get subject transform for source: %v", err)
}
// Set the transform if any
// technically no need to check the error as already validated that it will not before
var err error
si.tr, err = NewSubjectTransform(ssi.FilterSubject, ssi.SubjectTransformDest)
if err != nil {
mset.srv.Errorf("Unable to get subject transform for source: %v", err)
}
} else {
var trs []*subjectTransform
Expand Down

0 comments on commit 7670cf5

Please sign in to comment.