Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream subject transform improvements #3827

Merged
merged 2 commits into from
Jan 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions server/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -1830,7 +1830,7 @@ func (a *Account) addServiceImport(dest *Account, from, to string, claim *jwt.Im
// Create a transform. Do so in reverse such that $ symbols only exist in to
if tr, err = NewSubjectTransform(to, transformTokenize(from)); err != nil {
a.mu.Unlock()
return nil, fmt.Errorf("failed to create mapping transform for service import subject %q to %q: %v",
return nil, fmt.Errorf("failed to create mapping transform for service import subject from %q to %q: %v",
from, to, err)
} else {
// un-tokenize and reverse transform so we get the transform needed
Expand Down Expand Up @@ -2376,7 +2376,7 @@ func (a *Account) AddMappedStreamImportWithClaim(account *Account, from, to stri
} else {
// Create a transform
if tr, err = NewSubjectTransform(from, transformTokenize(to)); err != nil {
return fmt.Errorf("failed to create mapping transform for stream import subject %q to %q: %v",
return fmt.Errorf("failed to create mapping transform for stream import subject from %q to %q: %v",
from, to, err)
}
to, _ = transformUntokenize(to)
Expand Down
84 changes: 70 additions & 14 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,11 +428,16 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
if len(cfg.Sources) > 0 {
for i, ssi := range cfg.Sources {
ssi.setIndexName(i)
// check the filter, if any, is valid
if ssi.FilterSubject != _EMPTY_ && !IsValidSubject(ssi.FilterSubject) {
jsa.mu.Unlock()
return nil, fmt.Errorf("subject filter '%s' for the source %w", ssi.FilterSubject, ErrBadSubject)
}
// check the transform, if any, is valid
if ssi.SubjectTransformDest != _EMPTY_ {
if _, err = NewSubjectTransform(ssi.FilterSubject, ssi.SubjectTransformDest); err != nil {
jsa.mu.Unlock()
return nil, fmt.Errorf("subject transform for the source not valid %w", err)
return nil, fmt.Errorf("subject transform from '%s' to '%s' for the source %w", ssi.FilterSubject, ssi.SubjectTransformDest, err)
}
}
}
Expand Down Expand Up @@ -486,7 +491,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
tr, err := NewSubjectTransform(cfg.SubjectTransform.Source, cfg.SubjectTransform.Destination)
if err != nil {
jsa.mu.Unlock()
return nil, fmt.Errorf("stream input subject transform not valid %w", err)
return nil, fmt.Errorf("stream subject transform from '%s' to '%s' %w", cfg.SubjectTransform.Source, cfg.SubjectTransform.Destination, err)
}
mset.itr = tr
}
Expand All @@ -496,7 +501,7 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
tr, err := NewSubjectTransform(cfg.RePublish.Source, cfg.RePublish.Destination)
if err != nil {
jsa.mu.Unlock()
return nil, fmt.Errorf("stream republish transform not valid %w", err)
return nil, fmt.Errorf("stream republish transform from '%s' to '%s' %w", cfg.RePublish.Source, cfg.RePublish.Destination, err)
}
// Assign our transform for republishing.
mset.tr = tr
Expand Down Expand Up @@ -1305,7 +1310,7 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration for republish destination forms a cycle"))
}
if _, err := NewSubjectTransform(cfg.RePublish.Source, cfg.RePublish.Destination); err != nil {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration for republish not valid"))
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("stream configuration for republish with transform from '%s' to '%s' not valid", cfg.RePublish.Source, cfg.RePublish.Destination))
}
}

Expand Down Expand Up @@ -1520,13 +1525,15 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)

// Check for Sources.
if len(cfg.Sources) > 0 || len(ocfg.Sources) > 0 {
current := make(map[string]string)
currentFilter := make(map[string]string)
currentTransformDest := make(map[string]string)
for _, s := range ocfg.Sources {
current[s.iname] = s.FilterSubject
currentFilter[s.iname] = s.FilterSubject
currentTransformDest[s.iname] = s.SubjectTransformDest
}
for i, s := range cfg.Sources {
s.setIndexName(i)
if oFilter, ok := current[s.iname]; !ok {
if oFilter, ok := currentFilter[s.iname]; !ok {
if mset.sources == nil {
mset.sources = make(map[string]*sourceInfo)
}
Expand All @@ -1536,7 +1543,8 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)
if s.SubjectTransformDest != _EMPTY_ {
var err error
if si.tr, err = NewSubjectTransform(s.FilterSubject, s.SubjectTransformDest); err != nil {
return err
mset.mu.Unlock()
return fmt.Errorf("stream source subject transform from '%s' to '%s' %w", s.FilterSubject, s.SubjectTransformDest, err)
}
}

Expand All @@ -1545,6 +1553,13 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)
mset.setSourceConsumer(s.iname, si.sseq+1, time.Time{})
} else if oFilter != s.FilterSubject {
if si, ok := mset.sources[s.iname]; ok {
if s.SubjectTransformDest != _EMPTY_ {
var err error
if si.tr, err = NewSubjectTransform(s.FilterSubject, s.SubjectTransformDest); err != nil {
mset.mu.Unlock()
return fmt.Errorf("stream source subject transform from '%s' to '%s' %w", s.FilterSubject, s.SubjectTransformDest, err)
}
}
filterOverlap := true
if oFilter != _EMPTY_ && s.FilterSubject != _EMPTY_ {
newFilter := strings.Split(s.FilterSubject, tsep)
Expand All @@ -1564,11 +1579,26 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)
mset.setSourceConsumer(s.iname, si.sseq+1, time.Now())
}
}
} else if currentTransformDest[s.iname] != s.SubjectTransformDest {
// transform destination has changed
if si, ok := mset.sources[s.iname]; ok {
if s.SubjectTransformDest == _EMPTY_ {
// remove the transform
si.tr = nil
} else {
// update the transform with the new destination if it's valid
var err error
if si.tr, err = NewSubjectTransform(s.FilterSubject, s.SubjectTransformDest); err != nil {
mset.mu.Unlock()
return fmt.Errorf("stream source subject transform from '%s' to '%s' %w", s.FilterSubject, s.SubjectTransformDest, err)
}
}
}
}
delete(current, s.iname)
delete(currentFilter, s.iname)
}
// What is left in current needs to be deleted.
for iname := range current {
// What is left in currentFilter needs to be deleted.
for iname := range currentFilter {
mset.cancelSourceConsumer(iname)
delete(mset.sources, iname)
}
Expand All @@ -1592,17 +1622,40 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)
if cfg.RePublish.Source == _EMPTY_ {
cfg.RePublish.Source = fwcs
}
if cfg.RePublish.Destination == _EMPTY_ {
cfg.RePublish.Destination = fwcs
}
tr, err := NewSubjectTransform(cfg.RePublish.Source, cfg.RePublish.Destination)
if err != nil {
jsa.mu.Unlock()
return fmt.Errorf("stream configuration for republish not valid")
mset.mu.Unlock()
return fmt.Errorf("stream configuration for republish from '%s' to '%s' %w", cfg.RePublish.Source, cfg.RePublish.Destination, err)
}
// Assign our transform for republishing.
mset.tr = tr
} else {
mset.tr = nil
}

// Check for changes to subject transform
if ocfg.SubjectTransform == nil && cfg.SubjectTransform != nil {
tr, err := NewSubjectTransform(cfg.SubjectTransform.Source, cfg.SubjectTransform.Destination)
if err != nil {
mset.mu.Unlock()
return fmt.Errorf("stream configuration for subject transform from '%s' to '%s' %w", cfg.SubjectTransform.Source, cfg.SubjectTransform.Destination, err)
}
mset.itr = tr
} else if ocfg.SubjectTransform != nil && cfg.SubjectTransform != nil &&
(ocfg.SubjectTransform.Source != cfg.SubjectTransform.Source || ocfg.SubjectTransform.Destination != cfg.SubjectTransform.Destination) {
tr, err := NewSubjectTransform(cfg.SubjectTransform.Source, cfg.SubjectTransform.Destination)
if err != nil {
mset.mu.Unlock()
return fmt.Errorf("stream configuration for subject transform from '%s' to '%s' %w", cfg.SubjectTransform.Source, cfg.SubjectTransform.Destination, err)
}
mset.itr = tr
} else if ocfg.SubjectTransform != nil && cfg.SubjectTransform == nil {
mset.itr = nil
}

js := mset.js

if targetTier := tierName(cfg); mset.tier != targetTier {
Expand Down Expand Up @@ -2547,7 +2600,10 @@ func (mset *stream) setSourceConsumer(iname string, seq uint64, startTime time.T
if si := mset.sources[iname]; si != nil && si.sub != nil {
si.err = nil
if ccr.Error != nil || ccr.ConsumerInfo == nil {
mset.srv.Warnf("JetStream error response for create source consumer: %+v", ccr.Error)
// Note: this warning can happen a few times when starting up the server when sourcing streams are
// defined, this is normal as the streams are re-created in no particular order and it is possible
// that a stream sourcing another could come up before all of its sources have been recreated.
mset.srv.Warnf("JetStream error response for stream %s create source consumer %s: %+v", mset.cfg.Name, si.name, ccr.Error)
si.err = ccr.Error
// Let's retry as soon as possible, but we are gated by sourceConsumerRetryThreshold
retry = true
Expand Down