From a66c67baa5b41e7c95cf24afa4b6ef02608b082f Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Fri, 14 Apr 2023 12:10:15 +0200 Subject: [PATCH] Fix stream sourcing & mirroring overlap errors When adding or updating sources/mirrors, server was checking if the stream with a given name exists to check for subject overlaps, among other things. However, if sourced/mirrored stream was `External`, checks should not be executed, as not only stream would never be found, but also, if `External` stream had the same name as the sourcing stream, the check would be wrongly performed against itself. Signed-off-by: Tomasz Pietrek --- server/jetstream_cluster_1_test.go | 24 -------- server/jetstream_test.go | 97 ++++++++++++++++++++++++++++++ server/stream.go | 90 ++++++++++++++------------- 3 files changed, 145 insertions(+), 66 deletions(-) diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 20566f013d..8df84b3798 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -5819,18 +5819,6 @@ func TestJetStreamClusterFailMirrorsAndSources(t *testing.T) { }) } - testPrefix("mirror-bad-deliverprefix", JSStreamExternalDelPrefixOverlapsErrF, StreamConfig{ - Name: "MY_MIRROR_TEST", - Storage: FileStorage, - Mirror: &StreamSource{ - Name: "TEST", - External: &ExternalStream{ - ApiPrefix: "RI.JS.API", - // this will result in test.test.> which test.> would match - DeliverPrefix: "test", - }, - }, - }) testPrefix("mirror-bad-apiprefix", JSStreamExternalApiOverlapErrF, StreamConfig{ Name: "MY_MIRROR_TEST", Storage: FileStorage, @@ -5842,18 +5830,6 @@ func TestJetStreamClusterFailMirrorsAndSources(t *testing.T) { }, }, }) - testPrefix("source-bad-deliverprefix", JSStreamExternalDelPrefixOverlapsErrF, StreamConfig{ - Name: "MY_SOURCE_TEST", - Storage: FileStorage, - Sources: []*StreamSource{{ - Name: "TEST", - External: &ExternalStream{ - ApiPrefix: "RI.JS.API", - DeliverPrefix: "test", - }, - }, - }, - }) testPrefix("source-bad-apiprefix", JSStreamExternalApiOverlapErrF, StreamConfig{ Name: "MY_SOURCE_TEST", Storage: FileStorage, diff --git a/server/jetstream_test.go b/server/jetstream_test.go index cdf3abccf2..7e657f2463 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -19815,3 +19815,100 @@ func TestJetStreamConsumerWithFormattingSymbol(t *testing.T) { _, err = sub.NextMsg(time.Second * 5) require_NoError(t, err) } + +func TestJetStreamStreamUpdateWithExternalSource(t *testing.T) { + ho := DefaultTestOptions + ho.Port = -1 + ho.LeafNode.Host = "127.0.0.1" + ho.LeafNode.Port = -1 + ho.JetStream = true + ho.JetStreamDomain = "hub" + ho.StoreDir = t.TempDir() + hs := RunServer(&ho) + defer hs.Shutdown() + + lu, err := url.Parse(fmt.Sprintf("nats://127.0.0.1:%d", ho.LeafNode.Port)) + require_NoError(t, err) + + lo1 := DefaultTestOptions + lo1.Port = -1 + lo1.ServerName = "a-leaf" + lo1.JetStream = true + lo1.StoreDir = t.TempDir() + lo1.JetStreamDomain = "a-leaf" + lo1.LeafNode.Remotes = []*RemoteLeafOpts{{URLs: []*url.URL{lu}}} + l1 := RunServer(&lo1) + defer l1.Shutdown() + + checkLeafNodeConnected(t, l1) + + // Test sources with `External` provided + ncl, jsl := jsClientConnect(t, l1) + defer ncl.Close() + + // Hub stream. + _, err = jsl.AddStream(&nats.StreamConfig{Name: "stream", Subjects: []string{"leaf"}}) + require_NoError(t, err) + + nch, jsh := jsClientConnect(t, hs) + defer nch.Close() + + // Leaf stream. + // Both streams uses the same name, as we're testing if overlap does not check against itself + // if `External` stream has the same name. + _, err = jsh.AddStream(&nats.StreamConfig{ + Name: "stream", + Subjects: []string{"hub"}, + }) + require_NoError(t, err) + + // Add `Sources`. + // This should not validate subjects overlap against itself. + _, err = jsh.UpdateStream(&nats.StreamConfig{ + Name: "stream", + Subjects: []string{"hub"}, + Sources: []*nats.StreamSource{ + { + Name: "stream", + FilterSubject: "leaf", + External: &nats.ExternalStream{ + APIPrefix: "$JS.a-leaf.API", + }, + }, + }, + }) + require_NoError(t, err) + + // Specifying not existing FilterSubject should also be fine, as we do not validate `External` stream. + _, err = jsh.UpdateStream(&nats.StreamConfig{ + Name: "stream", + Subjects: []string{"hub"}, + Sources: []*nats.StreamSource{ + { + Name: "stream", + FilterSubject: "foo", + External: &nats.ExternalStream{ + APIPrefix: "$JS.a-leaf.API", + }, + }, + }, + }) + require_NoError(t, err) + + // Add one more stream to the Hub, so when we source it, it is not `External`. + _, err = jsh.AddStream(&nats.StreamConfig{Name: "other", Subjects: []string{"other"}}) + require_NoError(t, err) + + _, err = jsh.UpdateStream(&nats.StreamConfig{ + Name: "stream", + Subjects: []string{"hub"}, + Sources: []*nats.StreamSource{ + { + Name: "other", + FilterSubject: "foo", + }, + }, + }) + require_Error(t, err) + require_True(t, strings.Contains(err.Error(), "does not overlap")) +} diff --git a/server/stream.go b/server/stream.go index 3e2e730b6a..7be712af4b 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1098,62 +1098,68 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi if len(cfg.Sources) > 0 { return StreamConfig{}, NewJSMirrorWithSourcesError() } - // We do not require other stream to exist anymore, but if we can see it check payloads. - exists, maxMsgSize, subs := hasStream(cfg.Mirror.Name) - if len(subs) > 0 { - streamSubs = append(streamSubs, subs...) - } - if exists { - if cfg.MaxMsgSize > 0 && maxMsgSize > 0 && cfg.MaxMsgSize < maxMsgSize { - return StreamConfig{}, NewJSMirrorMaxMessageSizeTooBigError() + // Do not perform checks if External is provided, as it could lead to + // checking against itself (if sourced stream name is the same on different JetStream) + if cfg.Mirror.External == nil { + // We do not require other stream to exist anymore, but if we can see it check payloads. + exists, maxMsgSize, subs := hasStream(cfg.Mirror.Name) + if len(subs) > 0 { + streamSubs = append(streamSubs, subs...) } - if !isRecovering && !hasFilterSubjectOverlap(cfg.Mirror.FilterSubject, subs) { - return StreamConfig{}, NewJSStreamInvalidConfigError( - fmt.Errorf("mirror '%s' filter subject '%s' does not overlap with any origin stream subject", - cfg.Mirror.Name, cfg.Mirror.FilterSubject)) + if exists { + if cfg.MaxMsgSize > 0 && maxMsgSize > 0 && cfg.MaxMsgSize < maxMsgSize { + return StreamConfig{}, NewJSMirrorMaxMessageSizeTooBigError() + } + if !isRecovering && !hasFilterSubjectOverlap(cfg.Mirror.FilterSubject, subs) { + return StreamConfig{}, NewJSStreamInvalidConfigError( + fmt.Errorf("mirror '%s' filter subject '%s' does not overlap with any origin stream subject", + cfg.Mirror.Name, cfg.Mirror.FilterSubject)) + } } - } - if cfg.Mirror.External != nil { + // Determine if we are inheriting direct gets. + if exists, ocfg := getStream(cfg.Mirror.Name); exists { + cfg.MirrorDirect = ocfg.AllowDirect + } else if js := s.getJetStream(); js != nil && js.isClustered() { + // Could not find it here. If we are clustered we can look it up. + js.mu.RLock() + if cc := js.cluster; cc != nil { + if as := cc.streams[acc.Name]; as != nil { + if sa := as[cfg.Mirror.Name]; sa != nil { + cfg.MirrorDirect = sa.Config.AllowDirect + } + } + } + js.mu.RUnlock() + } + } else { if cfg.Mirror.External.DeliverPrefix != _EMPTY_ { deliveryPrefixes = append(deliveryPrefixes, cfg.Mirror.External.DeliverPrefix) } if cfg.Mirror.External.ApiPrefix != _EMPTY_ { apiPrefixes = append(apiPrefixes, cfg.Mirror.External.ApiPrefix) } - } - // Determine if we are inheriting direct gets. - if exists, ocfg := getStream(cfg.Mirror.Name); exists { - cfg.MirrorDirect = ocfg.AllowDirect - } else if js := s.getJetStream(); js != nil && js.isClustered() { - // Could not find it here. If we are clustered we can look it up. - js.mu.RLock() - if cc := js.cluster; cc != nil { - if as := cc.streams[acc.Name]; as != nil { - if sa := as[cfg.Mirror.Name]; sa != nil { - cfg.MirrorDirect = sa.Config.AllowDirect - } - } - } - js.mu.RUnlock() + } } if len(cfg.Sources) > 0 { for _, src := range cfg.Sources { - exists, maxMsgSize, subs := hasStream(src.Name) - if len(subs) > 0 { - streamSubs = append(streamSubs, subs...) - } - if exists { - if cfg.MaxMsgSize > 0 && maxMsgSize > 0 && cfg.MaxMsgSize < maxMsgSize { - return StreamConfig{}, NewJSSourceMaxMessageSizeTooBigError() + // Do not perform checks if External is provided, as it could lead to + // checking against itself (if sourced stream name is the same on different JetStream) + if src.External == nil { + exists, maxMsgSize, subs := hasStream(src.Name) + if len(subs) > 0 { + streamSubs = append(streamSubs, subs...) } - if !isRecovering && !hasFilterSubjectOverlap(src.FilterSubject, streamSubs) { - return StreamConfig{}, NewJSStreamInvalidConfigError( - fmt.Errorf("source '%s' filter subject '%s' does not overlap with any origin stream subject", - src.Name, src.FilterSubject)) + if exists { + if cfg.MaxMsgSize > 0 && maxMsgSize > 0 && cfg.MaxMsgSize < maxMsgSize { + return StreamConfig{}, NewJSSourceMaxMessageSizeTooBigError() + } + if !isRecovering && !hasFilterSubjectOverlap(src.FilterSubject, streamSubs) { + return StreamConfig{}, NewJSStreamInvalidConfigError( + fmt.Errorf("source '%s' filter subject '%s' does not overlap with any origin stream subject", + src.Name, src.FilterSubject)) + } } - } - if src.External == nil { continue } if src.External.DeliverPrefix != _EMPTY_ {