diff --git a/server/errors.json b/server/errors.json index 84c4b33884..123889ef7d 100644 --- a/server/errors.json +++ b/server/errors.json @@ -1423,7 +1423,7 @@ "constant": "JSSourceMultipleFiltersNotAllowed", "code": 400, "error_code": 10144, - "description": "source with multiple subject filters cannot also have a single subject filter", + "description": "source with multiple subject transforms cannot also have a single subject filter", "comment": "", "help": "", "url": "", @@ -1478,5 +1478,35 @@ "help": "", "url": "", "deprecates": "" + }, + { + "constant": "JSMirrorMultipleFiltersNotAllowed", + "code": 400, + "error_code": 10150, + "description": "mirror with multiple subject transforms cannot also have a single subject filter", + "comment": "", + "help": "", + "url": "", + "deprecates": "" + }, + { + "constant": "JSMirrorInvalidSubjectFilter", + "code": 400, + "error_code": 10151, + "description": "mirror subject filter is invalid", + "comment": "", + "help": "", + "url": "", + "deprecates": "" + }, + { + "constant": "JSMirrorOverlappingSubjectFilters", + "code": 400, + "error_code": 10152, + "description": "mirror subject filters can not overlap", + "comment": "", + "help": "", + "url": "", + "deprecates": "" } ] diff --git a/server/jetstream_errors_generated.go b/server/jetstream_errors_generated.go index 2ca8a6d3f7..a39e612c3c 100644 --- a/server/jetstream_errors_generated.go +++ b/server/jetstream_errors_generated.go @@ -233,9 +233,18 @@ const ( // JSMirrorInvalidStreamName mirrored stream name is invalid JSMirrorInvalidStreamName ErrorIdentifier = 10142 + // JSMirrorInvalidSubjectFilter mirror subject filter is invalid + JSMirrorInvalidSubjectFilter ErrorIdentifier = 10151 + // JSMirrorMaxMessageSizeTooBigErr stream mirror must have max message size >= source JSMirrorMaxMessageSizeTooBigErr ErrorIdentifier = 10030 + // JSMirrorMultipleFiltersNotAllowed mirror with multiple subject transforms cannot also have a single subject filter + JSMirrorMultipleFiltersNotAllowed ErrorIdentifier = 10150 + + // JSMirrorOverlappingSubjectFilters mirror subject filters can not overlap + JSMirrorOverlappingSubjectFilters ErrorIdentifier = 10152 + // JSMirrorWithFirstSeqErr stream mirrors can not have first sequence configured JSMirrorWithFirstSeqErr ErrorIdentifier = 10143 @@ -305,7 +314,7 @@ const ( // JSSourceMaxMessageSizeTooBigErr stream source must have max message size >= target JSSourceMaxMessageSizeTooBigErr ErrorIdentifier = 10046 - // JSSourceMultipleFiltersNotAllowed source with multiple subject filters cannot also have a single subject filter + // JSSourceMultipleFiltersNotAllowed source with multiple subject transforms cannot also have a single subject filter JSSourceMultipleFiltersNotAllowed ErrorIdentifier = 10144 // JSSourceOverlappingSubjectFilters source filters can not overlap @@ -528,7 +537,10 @@ var ( JSMemoryResourcesExceededErr: {Code: 500, ErrCode: 10028, Description: "insufficient memory resources available"}, JSMirrorConsumerSetupFailedErrF: {Code: 500, ErrCode: 10029, Description: "{err}"}, JSMirrorInvalidStreamName: {Code: 400, ErrCode: 10142, Description: "mirrored stream name is invalid"}, + JSMirrorInvalidSubjectFilter: {Code: 400, ErrCode: 10151, Description: "mirror subject filter is invalid"}, JSMirrorMaxMessageSizeTooBigErr: {Code: 400, ErrCode: 10030, Description: "stream mirror must have max message size >= source"}, + JSMirrorMultipleFiltersNotAllowed: {Code: 400, ErrCode: 10150, Description: "mirror with multiple subject transforms cannot also have a single subject filter"}, + JSMirrorOverlappingSubjectFilters: {Code: 400, ErrCode: 10152, Description: "mirror subject filters can not overlap"}, JSMirrorWithFirstSeqErr: {Code: 400, ErrCode: 10143, Description: "stream mirrors can not have first sequence configured"}, JSMirrorWithSourcesErr: {Code: 400, ErrCode: 10031, Description: "stream mirrors can not also contain other sources"}, JSMirrorWithStartSeqAndTimeErr: {Code: 400, ErrCode: 10032, Description: "stream mirrors can not have both start seq and start time configured"}, @@ -552,7 +564,7 @@ var ( JSSourceInvalidSubjectFilter: {Code: 400, ErrCode: 10145, Description: "source subject filter is invalid"}, JSSourceInvalidTransformDestination: {Code: 400, ErrCode: 10146, Description: "source transform destination is invalid"}, JSSourceMaxMessageSizeTooBigErr: {Code: 400, ErrCode: 10046, Description: "stream source must have max message size >= target"}, - JSSourceMultipleFiltersNotAllowed: {Code: 400, ErrCode: 10144, Description: "source with multiple subject filters cannot also have a single subject filter"}, + JSSourceMultipleFiltersNotAllowed: {Code: 400, ErrCode: 10144, Description: "source with multiple subject transforms cannot also have a single subject filter"}, JSSourceOverlappingSubjectFilters: {Code: 400, ErrCode: 10147, Description: "source filters can not overlap"}, JSStorageResourcesExceededErr: {Code: 500, ErrCode: 10047, Description: "insufficient storage resources available"}, JSStreamAssignmentErrF: {Code: 500, ErrCode: 10048, Description: "{err}"}, @@ -1451,6 +1463,16 @@ func NewJSMirrorInvalidStreamNameError(opts ...ErrorOption) *ApiError { return ApiErrors[JSMirrorInvalidStreamName] } +// NewJSMirrorInvalidSubjectFilterError creates a new JSMirrorInvalidSubjectFilter error: "mirror subject filter is invalid" +func NewJSMirrorInvalidSubjectFilterError(opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + return ApiErrors[JSMirrorInvalidSubjectFilter] +} + // NewJSMirrorMaxMessageSizeTooBigError creates a new JSMirrorMaxMessageSizeTooBigErr error: "stream mirror must have max message size >= source" func NewJSMirrorMaxMessageSizeTooBigError(opts ...ErrorOption) *ApiError { eopts := parseOpts(opts) @@ -1461,6 +1483,26 @@ func NewJSMirrorMaxMessageSizeTooBigError(opts ...ErrorOption) *ApiError { return ApiErrors[JSMirrorMaxMessageSizeTooBigErr] } +// NewJSMirrorMultipleFiltersNotAllowedError creates a new JSMirrorMultipleFiltersNotAllowed error: "mirror with multiple subject transforms cannot also have a single subject filter" +func NewJSMirrorMultipleFiltersNotAllowedError(opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + return ApiErrors[JSMirrorMultipleFiltersNotAllowed] +} + +// NewJSMirrorOverlappingSubjectFiltersError creates a new JSMirrorOverlappingSubjectFilters error: "mirror subject filters can not overlap" +func NewJSMirrorOverlappingSubjectFiltersError(opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + return ApiErrors[JSMirrorOverlappingSubjectFilters] +} + // NewJSMirrorWithFirstSeqError creates a new JSMirrorWithFirstSeqErr error: "stream mirrors can not have first sequence configured" func NewJSMirrorWithFirstSeqError(opts ...ErrorOption) *ApiError { eopts := parseOpts(opts) @@ -1715,7 +1757,7 @@ func NewJSSourceMaxMessageSizeTooBigError(opts ...ErrorOption) *ApiError { return ApiErrors[JSSourceMaxMessageSizeTooBigErr] } -// NewJSSourceMultipleFiltersNotAllowedError creates a new JSSourceMultipleFiltersNotAllowed error: "source with multiple subject filters cannot also have a single subject filter" +// NewJSSourceMultipleFiltersNotAllowedError creates a new JSSourceMultipleFiltersNotAllowed error: "source with multiple subject transforms cannot also have a single subject filter" func NewJSSourceMultipleFiltersNotAllowedError(opts ...ErrorOption) *ApiError { eopts := parseOpts(opts) if ae, ok := eopts.err.(*ApiError); ok { diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 1147f8a5e2..297bddfb28 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -11435,13 +11435,13 @@ func TestJetStreamMirrorBasics(t *testing.T) { // Clear subjects. cfg.Subjects = nil - // Source + // Mirrored scfg := &nats.StreamConfig{ Name: "S1", Subjects: []string{"foo", "bar", "baz"}, } - // Create source stream + // Create mirrored stream createStreamOk(scfg) // Now create our mirror stream. @@ -11471,7 +11471,7 @@ func TestJetStreamMirrorBasics(t *testing.T) { return nil }) - // Purge the source stream. + // Purge the mirrored stream. if err := js.PurgeStream("S1"); err != nil { t.Fatalf("Unexpected purge error: %v", err) } @@ -11551,6 +11551,101 @@ func TestJetStreamMirrorBasics(t *testing.T) { } return nil }) + + // Test subject filtering and transformation + createStreamServerStreamConfig := func(cfg *StreamConfig, errToCheck uint16) { + t.Helper() + req, err := json.Marshal(cfg) + require_NoError(t, err) + + rm, err := nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), req, time.Second) + require_NoError(t, err) + + var resp JSApiStreamCreateResponse + if err := json.Unmarshal(rm.Data, &resp); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if errToCheck == 0 { + if resp.Error != nil { + t.Fatalf("Unexpected error: %+v", resp.Error) + } + } else { + if resp.Error.ErrCode != errToCheck { + t.Fatalf("Expected error %+v, got: %+v", errToCheck, resp.Error) + } + } + } + + // check for errors + createStreamServerStreamConfig(&StreamConfig{ + Name: "MBAD", + Storage: FileStorage, + Mirror: &StreamSource{Name: "S1", FilterSubject: "foo", SubjectTransforms: []SubjectTransformConfig{{Source: "foo", Destination: "foo3"}}}, + }, ApiErrors[JSMirrorMultipleFiltersNotAllowed].ErrCode) + + createStreamServerStreamConfig(&StreamConfig{ + Name: "MBAD", + Storage: FileStorage, + Mirror: &StreamSource{Name: "S1", SubjectTransforms: []SubjectTransformConfig{{Source: ".*.", Destination: "foo3"}}}, + }, ApiErrors[JSMirrorInvalidSubjectFilter].ErrCode) + + createStreamServerStreamConfig(&StreamConfig{ + Name: "MBAD", + Storage: FileStorage, + Mirror: &StreamSource{Name: "S1", SubjectTransforms: []SubjectTransformConfig{{Source: "*", Destination: "{{wildcard(2)}}"}}}, + }, ApiErrors[JSStreamCreateErrF].ErrCode) + + createStreamServerStreamConfig(&StreamConfig{ + Name: "MBAD", + Storage: FileStorage, + Mirror: &StreamSource{Name: "S1", SubjectTransforms: []SubjectTransformConfig{{Source: "foo", Destination: ""}, {Source: "foo", Destination: "bar"}}}, + }, ApiErrors[JSMirrorOverlappingSubjectFilters].ErrCode) + + createStreamServerStreamConfig(&StreamConfig{ + Name: "M5", + Storage: FileStorage, + Mirror: &StreamSource{Name: "S1", FilterSubject: "foo", SubjectTransformDest: "foo2"}, + }, 0) + + createStreamServerStreamConfig(&StreamConfig{ + Name: "M6", + Storage: FileStorage, + Mirror: &StreamSource{Name: "S1", SubjectTransforms: []SubjectTransformConfig{{Source: "bar", Destination: "bar2"}, {Source: "baz", Destination: "baz2"}}}, + }, 0) + + // Send 100 messages on foo (there should already be 50 messages on bar and 100 on baz in the stream) + for i := 0; i < 100; i++ { + if _, err := js.Publish("foo", []byte("OK")); err != nil { + t.Fatalf("Unexpected publish error: %v", err) + } + } + + var f = func(streamName string, subject string, subjectNumMsgs uint64, streamNumMsg uint64, firstSeq uint64, lastSeq uint64) func() error { + return func() error { + si, err := js2.StreamInfo(streamName, &nats.StreamInfoRequest{SubjectsFilter: ">"}) + require_NoError(t, err) + if ss, ok := si.State.Subjects[subject]; !ok { + t.Log("Expected messages with the transformed subject") + } else { + if ss != subjectNumMsgs { + t.Fatalf("Expected %d messages on the transformed subject but got %d", subjectNumMsgs, ss) + } + } + if si.State.Msgs != streamNumMsg { + return fmt.Errorf("Expected %d stream messages, got state: %+v", streamNumMsg, si.State) + } + if si.State.FirstSeq != firstSeq || si.State.LastSeq != lastSeq { + return fmt.Errorf("Expected first sequence=%d and last sequence=%d, but got state: %+v", firstSeq, lastSeq, si.State) + } + return nil + } + } + + checkFor(t, 2*time.Second, 100*time.Millisecond, f("M5", "foo2", 100, 100, 251, 350)) + checkFor(t, 2*time.Second, 100*time.Millisecond, f("M6", "bar2", 50, 150, 101, 250)) + checkFor(t, 2*time.Second, 100*time.Millisecond, f("M6", "baz2", 100, 150, 101, 250)) + } func TestJetStreamMirrorUpdatePreventsSubjects(t *testing.T) { diff --git a/server/stream.go b/server/stream.go index d99ff9493e..bd63866113 100644 --- a/server/stream.go +++ b/server/stream.go @@ -458,7 +458,37 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt } } - // Setup our internal indexed names here for sources and check if the transform (if any) is valid. + // If mirror, check if the transforms (if any) are valid. + if cfg.Mirror != nil { + if len(cfg.Mirror.SubjectTransforms) == 0 { + if cfg.Mirror.FilterSubject != _EMPTY_ && !IsValidSubject(cfg.Mirror.FilterSubject) { + jsa.mu.Unlock() + return nil, fmt.Errorf("subject filter '%s' for the mirror %w", cfg.Mirror.FilterSubject, ErrBadSubject) + } + if cfg.Mirror.SubjectTransformDest != _EMPTY_ { + if _, err = NewSubjectTransform(cfg.Mirror.FilterSubject, cfg.Mirror.SubjectTransformDest); err != nil { + jsa.mu.Unlock() + return nil, fmt.Errorf("subject transform from '%s' to '%s' for the mirror %w", cfg.Mirror.FilterSubject, cfg.Mirror.SubjectTransformDest, err) + } + } + } else { + for _, st := range cfg.Mirror.SubjectTransforms { + if st.Source != _EMPTY_ && !IsValidSubject(st.Source) { + jsa.mu.Unlock() + return nil, fmt.Errorf("invalid subject transform source '%s' for the mirror: %w", st.Source, ErrBadSubject) + } + // check the transform, if any, is valid + if st.Destination != _EMPTY_ { + if _, err = NewSubjectTransform(st.Source, st.Destination); err != nil { + jsa.mu.Unlock() + return nil, fmt.Errorf("subject transform from '%s' to '%s' for the mirror: %w", st.Source, st.Destination, err) + } + } + } + } + } + + // Setup our internal indexed names here for sources and check if the transforms (if any) are valid. for _, ssi := range cfg.Sources { if len(ssi.SubjectTransforms) == 0 { // check the filter, if any, is valid @@ -1209,11 +1239,24 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi } if len(cfg.Subjects) > 0 { return StreamConfig{}, NewJSMirrorWithSubjectsError() - } if len(cfg.Sources) > 0 { return StreamConfig{}, NewJSMirrorWithSourcesError() } + if (cfg.Mirror.FilterSubject != _EMPTY_ || cfg.Mirror.SubjectTransformDest != _EMPTY_) && len(cfg.Mirror.SubjectTransforms) != 0 { + return StreamConfig{}, NewJSMirrorMultipleFiltersNotAllowedError() + } + // Check subject filters overlap. + for outer, tr := range cfg.Mirror.SubjectTransforms { + if !IsValidSubject(tr.Source) { + return StreamConfig{}, NewJSMirrorInvalidSubjectFilterError() + } + for inner, innertr := range cfg.Mirror.SubjectTransforms { + if inner != outer && subjectIsSubsetMatch(tr.Source, innertr.Source) { + return StreamConfig{}, NewJSMirrorOverlappingSubjectFiltersError() + } + } + } // Do not perform checks if External is provided, as it could lead to // checking against itself (if sourced stream name is the same on different JetStream) if cfg.Mirror.External == nil { @@ -1303,12 +1346,13 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi } } continue - } - if src.External.DeliverPrefix != _EMPTY_ { - deliveryPrefixes = append(deliveryPrefixes, src.External.DeliverPrefix) - } - if src.External.ApiPrefix != _EMPTY_ { - apiPrefixes = append(apiPrefixes, src.External.ApiPrefix) + } else { + if src.External.DeliverPrefix != _EMPTY_ { + deliveryPrefixes = append(deliveryPrefixes, src.External.DeliverPrefix) + } + if src.External.ApiPrefix != _EMPTY_ { + apiPrefixes = append(apiPrefixes, src.External.ApiPrefix) + } } } @@ -1698,7 +1742,11 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool) si = &sourceInfo{name: s.Name, iname: s.iname} for i := range s.SubjectTransforms { // err can be ignored as already validated in config check - si.trs[i], _ = NewSubjectTransform(s.SubjectTransforms[i].Source, s.SubjectTransforms[i].Destination) + var err error + si.trs[i], err = NewSubjectTransform(s.SubjectTransforms[i].Source, s.SubjectTransforms[i].Destination) + if err != nil { + mset.srv.Errorf("Unable to get subject transform for source: %v", err) + } } } @@ -2167,6 +2215,23 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) bool { js, stype := mset.js, mset.cfg.Storage mset.mu.Unlock() + // Do the subject transform if there's one + if mset.mirror.tr != nil { + m.subj = mset.mirror.tr.TransformSubject(m.subj) + } else { + for _, tr := range mset.mirror.trs { + if tr == nil { + continue + } else { + tsubj, err := tr.Match(m.subj) + if err == nil { + m.subj = tsubj + break + } + } + } + } + s := mset.srv var err error if node != nil { @@ -2377,7 +2442,26 @@ func (mset *stream) setupMirrorConsumer() error { // Filters if mset.cfg.Mirror.FilterSubject != _EMPTY_ { req.Config.FilterSubject = mset.cfg.Mirror.FilterSubject + if mset.cfg.Mirror.SubjectTransformDest != _EMPTY_ { + var err error + mirror.tr, err = NewSubjectTransform(mset.cfg.Mirror.FilterSubject, mset.cfg.Mirror.SubjectTransformDest) + if err != nil { + mset.srv.Errorf("Unable to get transform for mirror consumer: %v", err) + } + } + } + + var filters []string + for _, tr := range mset.cfg.Mirror.SubjectTransforms { + // will not fail as already checked before that the transform will work + subjectTransform, err := NewSubjectTransform(tr.Source, tr.Destination) + if err != nil { + mset.srv.Errorf("Unable to get transform for mirror consumer: %v", err) + } + mirror.trs = append(mirror.trs, subjectTransform) + filters = append(filters, tr.Source) } + req.Config.FilterSubjects = filters respCh := make(chan *JSApiConsumerCreateResponse, 1) reply := infoReplySubject() @@ -3177,12 +3261,19 @@ func (mset *stream) startingSequenceForSources() { // Check for transform. if ssi.SubjectTransformDest != _EMPTY_ { // no need to check the error as already validated that it will not before - si.tr, _ = NewSubjectTransform(ssi.FilterSubject, ssi.SubjectTransformDest) + var err error + si.tr, err = NewSubjectTransform(ssi.FilterSubject, ssi.SubjectTransformDest) + if err != nil { + mset.srv.Errorf("Unable to get subject transform for source: %v", err) + } } } else { var trs []*subjectTransform for _, str := range ssi.SubjectTransforms { - tr, _ := NewSubjectTransform(str.Source, str.Destination) + tr, err := NewSubjectTransform(str.Source, str.Destination) + if err != nil { + mset.srv.Errorf("Unable to get subject transform for source: %v", err) + } trs = append(trs, tr) } si = &sourceInfo{name: ssi.Name, iname: ssi.iname, trs: trs}