diff --git a/server/errors.json b/server/errors.json index e0c6864817..ab7f15e916 100644 --- a/server/errors.json +++ b/server/errors.json @@ -1378,5 +1378,15 @@ "help": "", "url": "", "deprecates": "" + }, + { + "constant": "JSSourceDuplicateDetected", + "code": 400, + "error_code": 10140, + "description": "duplicate source configuration detected", + "comment": "source stream, filter and transform (plus external if present) must form a unique combination", + "help": "", + "url": "", + "deprecates": "" } ] \ No newline at end of file diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 1c1ac3a410..8a39a94c04 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -5252,7 +5252,7 @@ func TestJetStreamClusterLeaderStepdown(t *testing.T) { } } -func TestJetStreamClusterSourcesFilterSubjectUpdate(t *testing.T) { +func TestJetStreamClusterSourcesFilteringAndUpdating(t *testing.T) { c := createJetStreamClusterExplicit(t, "MSR", 5) defer c.shutdown() @@ -5295,6 +5295,7 @@ func TestJetStreamClusterSourcesFilterSubjectUpdate(t *testing.T) { require_NoError(t, err) defer js.DeleteStream("TEST") + // Create M stream with a single source on "foo" _, err = js.AddStream(&nats.StreamConfig{ Name: "M", Sources: []*nats.StreamSource{{Name: "TEST", FilterSubject: "foo"}}, @@ -5303,13 +5304,14 @@ func TestJetStreamClusterSourcesFilterSubjectUpdate(t *testing.T) { require_NoError(t, err) defer js.DeleteStream("M") + // check a message on "bar" doesn't get sourced sendBatch("bar", 100) checkSync(100, 0) + // check a message on "foo" does get sourced sendBatch("foo", 100) - // The source stream remains at 100 msgs as it filters for foo checkSync(200, 100) - // change filter subject + // change remove the source on "foo" and add a new source on "bar" _, err = js.UpdateStream(&nats.StreamConfig{ Name: "M", Sources: []*nats.StreamSource{{Name: "TEST", FilterSubject: "bar"}}, @@ -5317,40 +5319,62 @@ func TestJetStreamClusterSourcesFilterSubjectUpdate(t *testing.T) { }) require_NoError(t, err) - sendBatch("foo", 100) - // The source stream remains at 100 msgs as it filters for bar - checkSync(300, 100) + // as it is a new source (never been sourced before) it starts sourcing at the start of TEST + // and therefore sources the message on "bar" that is in TEST + checkSync(200, 200) - sendBatch("bar", 100) - checkSync(400, 200) - - // test unsuspected re delivery by sending to filtered subject + // new messages on "foo" are being filtered as it's not being currently sourced sendBatch("foo", 100) - checkSync(500, 200) + checkSync(300, 200) + // new messages on "bar" are being sourced + sendBatch("bar", 100) + checkSync(400, 300) - // change filter subject to foo, as the internal sequence number does not cover the previously filtered tail end + // re-add the source for "foo" keep the source on "bar" _, err = js.UpdateStream(&nats.StreamConfig{ Name: "M", - Sources: []*nats.StreamSource{{Name: "TEST", FilterSubject: "foo"}}, + Sources: []*nats.StreamSource{{Name: "TEST", FilterSubject: "bar"}, {Name: "TEST", FilterSubject: "foo"}}, Replicas: 2, }) require_NoError(t, err) - // The filter was completely switched, which is why we only receive new messages - checkSync(500, 200) - sendBatch("foo", 100) - checkSync(600, 300) + + // check the 'backfill' of messages on "foo" that were published while the source was inactive + checkSync(400, 400) + + // causes startingSequenceForSources() to be called + nc.Close() + c.stopAll() + c.restartAll() + c.waitOnStreamLeader("$G", "TEST") + c.waitOnStreamLeader("$G", "M") + + nc, js = jsClientConnect(t, c.randomServer()) + defer nc.Close() + + // check that it restarted the sources' consumers at the right place + checkSync(400, 400) + + // check both sources are still active sendBatch("bar", 100) - checkSync(700, 300) + checkSync(500, 500) + sendBatch("foo", 100) + checkSync(600, 600) - // change filter subject to *, as the internal sequence number does not cover the previously filtered tail end - _, err = js.UpdateStream(&nats.StreamConfig{ - Name: "M", - Sources: []*nats.StreamSource{{Name: "TEST", FilterSubject: "*"}}, - Replicas: 2, - }) - require_NoError(t, err) - // no send was necessary as we received previously filtered messages - checkSync(700, 400) + // Check that purging the stream and does not cause the sourcing of the messages + js.PurgeStream("M") + checkSync(600, 0) + + // Even after a leader change or restart + nc.Close() + c.stopAll() + c.restartAll() + c.waitOnStreamLeader("$G", "TEST") + c.waitOnStreamLeader("$G", "M") + + nc, js = jsClientConnect(t, c.randomServer()) + defer nc.Close() + + checkSync(600, 0) } func TestJetStreamClusterSourcesUpdateOriginError(t *testing.T) { diff --git a/server/jetstream_errors_generated.go b/server/jetstream_errors_generated.go index 2d4eed6aa1..9a5328e50b 100644 --- a/server/jetstream_errors_generated.go +++ b/server/jetstream_errors_generated.go @@ -278,6 +278,9 @@ const ( // JSSourceConsumerSetupFailedErrF General source consumer setup failure string ({err}) JSSourceConsumerSetupFailedErrF ErrorIdentifier = 10045 + // JSSourceDuplicateDetected source stream, filter and transform must form a unique combination (duplicate source configuration detected) + JSSourceDuplicateDetected ErrorIdentifier = 10140 + // JSSourceMaxMessageSizeTooBigErr stream source must have max message size >= target JSSourceMaxMessageSizeTooBigErr ErrorIdentifier = 10046 @@ -513,6 +516,7 @@ var ( JSSequenceNotFoundErrF: {Code: 400, ErrCode: 10043, Description: "sequence {seq} not found"}, JSSnapshotDeliverSubjectInvalidErr: {Code: 400, ErrCode: 10015, Description: "deliver subject not valid"}, JSSourceConsumerSetupFailedErrF: {Code: 500, ErrCode: 10045, Description: "{err}"}, + JSSourceDuplicateDetected: {Code: 400, ErrCode: 10140, Description: "duplicate source configuration detected"}, JSSourceMaxMessageSizeTooBigErr: {Code: 400, ErrCode: 10046, Description: "stream source must have max message size >= target"}, JSStorageResourcesExceededErr: {Code: 500, ErrCode: 10047, Description: "insufficient storage resources available"}, JSStreamAssignmentErrF: {Code: 500, ErrCode: 10048, Description: "{err}"}, @@ -1585,6 +1589,16 @@ func NewJSSourceConsumerSetupFailedError(err error, opts ...ErrorOption) *ApiErr } } +// NewJSSourceDuplicateDetectedError creates a new JSSourceDuplicateDetected error: "duplicate source configuration detected" +func NewJSSourceDuplicateDetectedError(opts ...ErrorOption) *ApiError { + eopts := parseOpts(opts) + if ae, ok := eopts.err.(*ApiError); ok { + return ae + } + + return ApiErrors[JSSourceDuplicateDetected] +} + // NewJSSourceMaxMessageSizeTooBigError creates a new JSSourceMaxMessageSizeTooBigErr error: "stream source must have max message size >= target" func NewJSSourceMaxMessageSizeTooBigError(opts ...ErrorOption) *ApiError { eopts := parseOpts(opts) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 54566aba84..3803010a21 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -11415,12 +11415,12 @@ func TestJetStreamSourceBasics(t *testing.T) { return nil }) - m, err := js.GetMsg("MS", 1) + ss, err := js.SubscribeSync("foo2.foo", nats.BindStream("MS")) require_NoError(t, err) - - if m.Subject != "foo2.foo" { - t.Fatalf("Expected message subject foo2.foo, got %s", m.Subject) - } + // we must have at least one message on the transformed subject name (ie no timeout) + _, err = ss.NextMsg(time.Millisecond) + require_NoError(t, err) + ss.Drain() // Test Source Updates ncfg := &nats.StreamConfig{ @@ -11461,11 +11461,11 @@ func TestJetStreamSourceBasics(t *testing.T) { return nil }) // Double check first starting. - m, err = js.GetMsg("FMS", 1) + m, err := js.GetMsg("FMS", 1) require_NoError(t, err) if shdr := m.Header.Get(JSStreamSource); shdr == _EMPTY_ { t.Fatalf("Expected a header, got none") - } else if _, sseq := streamAndSeq(shdr); sseq != 26 { + } else if _, _, sseq := streamAndSeq(shdr); sseq != 26 { t.Fatalf("Expected header sequence of 26, got %d", sseq) } @@ -11493,7 +11493,7 @@ func TestJetStreamSourceBasics(t *testing.T) { } if shdr := m.Header.Get(JSStreamSource); shdr == _EMPTY_ { t.Fatalf("Expected a header, got none") - } else if _, sseq := streamAndSeq(shdr); sseq != 11 { + } else if _, _, sseq := streamAndSeq(shdr); sseq != 11 { t.Fatalf("Expected header sequence of 11, got %d", sseq) } } diff --git a/server/stream.go b/server/stream.go index 7336302573..2be6565524 100644 --- a/server/stream.go +++ b/server/stream.go @@ -446,8 +446,8 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt // Setup our internal indexed names here for sources and check if the transform (if any) is valid. if len(cfg.Sources) > 0 { - for i, ssi := range cfg.Sources { - ssi.setIndexName(i) + for _, ssi := range cfg.Sources { + ssi.setIndexName() // check the filter, if any, is valid if ssi.FilterSubject != _EMPTY_ && !IsValidSubject(ssi.FilterSubject) { jsa.mu.Unlock() @@ -620,15 +620,31 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt return mset, nil } -// Sets the index name. Usually just the stream name and the source's index (so you can source from the same stream more than once) -// but when the stream is external we will -// use additional information in case the stream names are the same. -func (ssi *StreamSource) setIndexName(i int) { +// Composes the index name. Contains the stream name, subject filter, and transform destination +// when the stream is external we will use additional information in case the (external) stream names are the same. +func (ssi *StreamSource) composeIName() string { + var iName = ssi.Name + if ssi.External != nil { - ssi.iname = strconv.Itoa(i) + ":" + ssi.Name + ":" + getHash(ssi.External.ApiPrefix) - } else { - ssi.iname = strconv.Itoa(i) + ":" + ssi.Name + iName = iName + ":" + getHash(ssi.External.ApiPrefix) + } + + filter := ssi.FilterSubject + // normalize filter and destination in case they are empty + if filter == _EMPTY_ { + filter = fwcs + } + destination := ssi.SubjectTransformDest + if destination == _EMPTY_ { + destination = fwcs } + + return strings.Join([]string{iName, filter, destination}, " ") +} + +// Sets the index name. +func (ssi *StreamSource) setIndexName() { + ssi.iname = ssi.composeIName() } func (mset *stream) streamAssignment() *streamAssignment { @@ -1160,6 +1176,15 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi } } if len(cfg.Sources) > 0 { + // check for duplicates + var iNames = make(map[string]struct{}) + for _, src := range cfg.Sources { + if _, ok := iNames[src.composeIName()]; !ok { + iNames[src.composeIName()] = struct{}{} + } else { + return StreamConfig{}, NewJSSourceDuplicateDetectedError() + } + } for _, src := range cfg.Sources { exists, maxMsgSize, subs := hasStream(src.Name) if len(subs) > 0 { @@ -1536,15 +1561,14 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool) // Check for Sources. if len(cfg.Sources) > 0 || len(ocfg.Sources) > 0 { - currentFilter := make(map[string]string) - currentTransformDest := make(map[string]string) + currentIName := make(map[string]struct{}) for _, s := range ocfg.Sources { - currentFilter[s.iname] = s.FilterSubject - currentTransformDest[s.iname] = s.SubjectTransformDest + currentIName[s.iname] = struct{}{} } - for i, s := range cfg.Sources { - s.setIndexName(i) - if oFilter, ok := currentFilter[s.iname]; !ok { + for _, s := range cfg.Sources { + s.setIndexName() + if _, ok := currentIName[s.iname]; !ok { + // new source if mset.sources == nil { mset.sources = make(map[string]*sourceInfo) } @@ -1558,60 +1582,18 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool) return fmt.Errorf("stream source subject transform from '%s' to '%s' %w", s.FilterSubject, s.SubjectTransformDest, err) } } - mset.sources[s.iname] = si - mset.setStartingSequenceForSource(s.iname) + mset.setStartingSequenceForSource(s.iname, s.External) mset.setSourceConsumer(s.iname, si.sseq+1, time.Time{}) - } else if oFilter != s.FilterSubject { - if si, ok := mset.sources[s.iname]; ok { - if s.SubjectTransformDest != _EMPTY_ { - var err error - if si.tr, err = NewSubjectTransform(s.FilterSubject, s.SubjectTransformDest); err != nil { - mset.mu.Unlock() - return fmt.Errorf("stream source subject transform from '%s' to '%s' %w", s.FilterSubject, s.SubjectTransformDest, err) - } - } - filterOverlap := true - if oFilter != _EMPTY_ && s.FilterSubject != _EMPTY_ { - newFilter := strings.Split(s.FilterSubject, tsep) - oldFilter := strings.Split(oFilter, tsep) - if !isSubsetMatchTokenized(oldFilter, newFilter) && - !isSubsetMatchTokenized(newFilter, oldFilter) { - filterOverlap = false - } - } - if filterOverlap { - // si.sseq is the last message we received - // if upstream has more messages (with a bigger sequence number) - // that we used to filter, now we get them - mset.setSourceConsumer(s.iname, si.sseq+1, time.Time{}) - } else { - // since the filter has no overlap at all, we will request messages starting now - mset.setSourceConsumer(s.iname, si.sseq+1, time.Now()) - } - } - } else if currentTransformDest[s.iname] != s.SubjectTransformDest { - // transform destination has changed - if si, ok := mset.sources[s.iname]; ok { - if s.SubjectTransformDest == _EMPTY_ { - // remove the transform - si.tr = nil - } else { - // update the transform with the new destination if it's valid - var err error - if si.tr, err = NewSubjectTransform(s.FilterSubject, s.SubjectTransformDest); err != nil { - mset.mu.Unlock() - return fmt.Errorf("stream source subject transform from '%s' to '%s' %w", s.FilterSubject, s.SubjectTransformDest, err) - } - } - } + } else { + // source already exists + delete(currentIName, s.iname) } - delete(currentFilter, s.iname) } - // What is left in currentFilter needs to be deleted. - for iname := range currentFilter { - mset.cancelSourceConsumer(iname) - delete(mset.sources, iname) + // What is left in cuurentIName needs to be deleted. + for iName := range currentIName { + mset.cancelSourceConsumer(iName) + delete(mset.sources, iName) } } } @@ -2370,16 +2352,19 @@ func (mset *stream) streamSource(iname string) *StreamSource { return nil } -func (mset *stream) retrySourceConsumer(sname string) { +func (mset *stream) retrySourceConsumer(iName string) { mset.mu.Lock() defer mset.mu.Unlock() - si := mset.sources[sname] + si := mset.sources[iName] if si == nil { return } - mset.setStartingSequenceForSource(sname) - mset.retrySourceConsumerAtSeq(sname, si.sseq+1) + var ss = mset.streamSource(iName) + if ss != nil { + mset.setStartingSequenceForSource(iName, ss.External) + mset.retrySourceConsumerAtSeq(iName, si.sseq+1) + } } // Same than setSourceConsumer but simply issue a debug statement indicating @@ -2861,11 +2846,12 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool { return true } -// Generate a new style source header. +// Generate a new (2.10) style source header (stream name, sequence number, source filter, source destination transform). func (si *sourceInfo) genSourceHeader(reply string) string { var b strings.Builder + iNameParts := strings.Fields(si.iname) - b.WriteString(si.iname) + b.WriteString(iNameParts[0]) b.WriteByte(' ') // Grab sequence as text here from reply subject. var tsa [expectedNumReplyTokens]string @@ -2881,11 +2867,16 @@ func (si *sourceInfo) genSourceHeader(reply string) string { seq = tokens[5] } b.WriteString(seq) + + b.WriteByte(' ') + b.WriteString(iNameParts[1]) + b.WriteByte(' ') + b.WriteString(iNameParts[2]) return b.String() } // Original version of header that stored ack reply direct. -func streamAndSeqFromAckReply(reply string) (string, uint64) { +func streamAndSeqFromAckReply(reply string) (string, string, uint64) { tsa := [expectedNumReplyTokens]string{} start, tokens := 0, tsa[:0] for i := 0; i < len(reply); i++ { @@ -2895,27 +2886,36 @@ func streamAndSeqFromAckReply(reply string) (string, uint64) { } tokens = append(tokens, reply[start:]) if len(tokens) != expectedNumReplyTokens || tokens[0] != "$JS" || tokens[1] != "ACK" { - return _EMPTY_, 0 + return _EMPTY_, _EMPTY_, 0 } - return tokens[2], uint64(parseAckReplyNum(tokens[5])) + return tokens[2], _EMPTY_, uint64(parseAckReplyNum(tokens[5])) } -// Extract the stream (indexed name) and sequence from the source header. -func streamAndSeq(shdr string) (string, uint64) { +// Extract the stream name, the source index name and the message sequence number from the source header. +// Uses the filter and transform arguments to provide backwards compatibility +func streamAndSeq(shdr string) (string, string, uint64) { if strings.HasPrefix(shdr, jsAckPre) { return streamAndSeqFromAckReply(shdr) } // New version which is stream index name sequence fields := strings.Fields(shdr) - if len(fields) != 2 { - return _EMPTY_, 0 + nFields := len(fields) + + if nFields != 2 && nFields <= 3 { + return _EMPTY_, _EMPTY_, 0 } - return fields[0], uint64(parseAckReplyNum(fields[1])) + + if nFields >= 4 { + return fields[0], strings.Join([]string{fields[0], fields[2], fields[3]}, " "), uint64(parseAckReplyNum(fields[1])) + } else { + return fields[0], _EMPTY_, uint64(parseAckReplyNum(fields[1])) + } + } // Lock should be held. -func (mset *stream) setStartingSequenceForSource(sname string) { - si := mset.sources[sname] +func (mset *stream) setStartingSequenceForSource(iName string, external *ExternalStream) { + si := mset.sources[iName] if si == nil { return } @@ -2939,8 +2939,8 @@ func (mset *stream) setStartingSequenceForSource(sname string) { if len(ss) == 0 { continue } - iname, sseq := streamAndSeq(string(ss)) - if iname == sname { + streamName, indexName, sseq := streamAndSeq(string(ss)) + if indexName == si.iname || (indexName == _EMPTY_ && (streamName == si.name || (external != nil && streamName == si.name+":"+getHash(external.ApiPrefix)))) { si.sseq = sseq si.dseq = 0 return @@ -2960,9 +2960,9 @@ func (mset *stream) startingSequenceForSources() { // Always reset here. mset.sources = make(map[string]*sourceInfo) - for i, ssi := range mset.cfg.Sources { + for _, ssi := range mset.cfg.Sources { if ssi.iname == _EMPTY_ { - ssi.setIndexName(i) + ssi.setIndexName() } si := &sourceInfo{name: ssi.Name, iname: ssi.iname, sf: ssi.FilterSubject} // Check for transform. @@ -3016,16 +3016,29 @@ func (mset *stream) startingSequenceForSources() { if len(ss) == 0 { continue } - name, sseq := streamAndSeq(string(ss)) - // Only update active in case we have older ones in here that got configured out. - if si := mset.sources[name]; si != nil { - if _, ok := seqs[name]; !ok { - seqs[name] = sseq - if len(seqs) == expected { - return + + var update = func(iName string, seq uint64) { + // Only update active in case we have older ones in here that got configured out. + if si := mset.sources[iName]; si != nil { + if _, ok := seqs[iName]; !ok { + seqs[iName] = seq } } } + + streamName, iName, sSeq := streamAndSeq(string(ss)) + if iName == _EMPTY_ { // Pre-2.10 message header means it's a match for any source using that stream name + for _, ssi := range mset.cfg.Sources { + if streamName == ssi.Name || streamName == ssi.Name+":"+getHash(ssi.External.ApiPrefix) { + update(ssi.iname, sSeq) + } + } + } else { + update(iName, sSeq) + } + if len(seqs) == expected { + return + } } }