From 50722e9ec10de8d3cdafda12c8aadd724ff8e23b Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Fri, 15 Sep 2023 16:11:00 -0700 Subject: [PATCH 1/4] When scaling a consumer down make sure to pop the loopAndForwardProposals go routine Signed-off-by: Derek Collison --- server/consumer.go | 11 +- server/jetstream_cluster_3_test.go | 175 +++++++++++++++++++++++++++++ server/jetstream_helpers_test.go | 15 +++ 3 files changed, 199 insertions(+), 2 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index a4dab3759d..e9175da35a 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1816,8 +1816,12 @@ func (o *consumer) loopAndForwardProposals(qch chan struct{}) { return } - forwardProposals := func() { + forwardProposals := func() error { o.mu.Lock() + if o.node != node || node.State() != Leader { + o.mu.Unlock() + return errors.New("no longer leader") + } proposal := o.phead o.phead, o.ptail = nil, nil o.mu.Unlock() @@ -1839,6 +1843,7 @@ func (o *consumer) loopAndForwardProposals(qch chan struct{}) { if len(entries) > 0 { node.ProposeDirect(entries) } + return nil } // In case we have anything pending on entry. @@ -1850,7 +1855,9 @@ func (o *consumer) loopAndForwardProposals(qch chan struct{}) { forwardProposals() return case <-pch: - forwardProposals() + if err := forwardProposals(); err != nil { + return + } } } } diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 7786f414bb..3b95396d22 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -18,6 +18,7 @@ package server import ( "bytes" + "context" "encoding/json" "errors" "fmt" @@ -5384,3 +5385,177 @@ func TestJetStreamClusterCheckFileStoreBlkSizes(t *testing.T) { require_True(t, blkSize(fs) == defaultMediumBlockSize) } } + +func TestJetStreamClusterDetectOrphanNRGs(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + // Normal Stream + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"*"}, + Replicas: 3, + }) + require_NoError(t, err) + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "DC", + AckPolicy: nats.AckExplicitPolicy, + }) + require_NoError(t, err) + + // We will force an orphan for a certain server. + s := c.randomNonStreamLeader(globalAccountName, "TEST") + + mset, err := s.GlobalAccount().lookupStream("TEST") + require_NoError(t, err) + sgn := mset.raftNode().Group() + mset.clearRaftNode() + + o := mset.lookupConsumer("DC") + require_True(t, o != nil) + ogn := o.raftNode().Group() + o.clearRaftNode() + + require_NoError(t, js.DeleteStream("TEST")) + + // Check that we do in fact have orphans. + require_True(t, s.numRaftNodes() > 1) + + // This function will detect orphans and clean them up. + s.checkForNRGOrphans() + + // Should only be meta NRG left. + require_True(t, s.numRaftNodes() == 1) + require_True(t, s.lookupRaftNode(sgn) == nil) + require_True(t, s.lookupRaftNode(ogn) == nil) +} + +func TestJetStreamClusterRestartThenScaleStreamReplicas(t *testing.T) { + t.Skip("This test takes too long, need to make shorter") + + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + s := c.randomNonLeader() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + nc2, producer := jsClientConnect(t, s) + defer nc2.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + c.waitOnStreamLeader(globalAccountName, "TEST") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + end := time.Now().Add(2 * time.Second) + for time.Now().Before(end) { + producer.Publish("foo", []byte(strings.Repeat("A", 128))) + time.Sleep(time.Millisecond) + } + + var wg sync.WaitGroup + for i := 0; i < 5; i++ { + sub, err := js.PullSubscribe("foo", fmt.Sprintf("C-%d", i)) + require_NoError(t, err) + + wg.Add(1) + go func() { + defer wg.Done() + for range time.NewTicker(10 * time.Millisecond).C { + select { + case <-ctx.Done(): + return + default: + } + + msgs, err := sub.Fetch(1) + if err != nil && !errors.Is(err, nats.ErrTimeout) && !errors.Is(err, nats.ErrConnectionClosed) { + t.Logf("Pull Error: %v", err) + } + for _, msg := range msgs { + msg.Ack() + } + } + }() + } + c.lameDuckRestartAll() + c.waitOnStreamLeader(globalAccountName, "TEST") + + // Swap the logger to try to detect the condition after the restart. + loggers := make([]*captureDebugLogger, 3) + for i, srv := range c.servers { + l := &captureDebugLogger{dbgCh: make(chan string, 10)} + loggers[i] = l + srv.SetLogger(l, true, false) + } + condition := `Direct proposal ignored, not leader (state: CLOSED)` + errCh := make(chan error, 10) + + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case dl := <-loggers[0].dbgCh: + if strings.Contains(dl, condition) { + errCh <- fmt.Errorf(condition) + } + case dl := <-loggers[1].dbgCh: + if strings.Contains(dl, condition) { + errCh <- fmt.Errorf(condition) + } + case dl := <-loggers[2].dbgCh: + if strings.Contains(dl, condition) { + errCh <- fmt.Errorf(condition) + } + case <-ctx.Done(): + return + } + } + }() + + // Start publishing again for a while. + end = time.Now().Add(2 * time.Second) + for time.Now().Before(end) { + producer.Publish("foo", []byte(strings.Repeat("A", 128))) + time.Sleep(time.Millisecond) + } + + // Try to do a stream edit back to R=1 after doing all the upgrade. + info, _ := js.StreamInfo("TEST") + sconfig := info.Config + sconfig.Replicas = 1 + _, err = js.UpdateStream(&sconfig) + require_NoError(t, err) + + // Leave running for some time after the update. + time.Sleep(2 * time.Second) + + info, _ = js.StreamInfo("TEST") + sconfig = info.Config + sconfig.Replicas = 3 + _, err = js.UpdateStream(&sconfig) + require_NoError(t, err) + + select { + case e := <-errCh: + t.Fatalf("Bad condition on raft node: %v", e) + case <-time.After(2 * time.Second): + // Done + } + + // Stop goroutines and wait for them to exit. + cancel() + wg.Wait() +} diff --git a/server/jetstream_helpers_test.go b/server/jetstream_helpers_test.go index 505b829306..573db86f99 100644 --- a/server/jetstream_helpers_test.go +++ b/server/jetstream_helpers_test.go @@ -1521,6 +1521,21 @@ func (c *cluster) restartAll() { c.waitOnClusterReady() } +func (c *cluster) lameDuckRestartAll() { + c.t.Helper() + for i, s := range c.servers { + s.lameDuckMode() + s.WaitForShutdown() + if !s.Running() { + opts := c.opts[i] + s, o := RunServerWithConfig(opts.ConfigFile) + c.servers[i] = s + c.opts[i] = o + } + } + c.waitOnClusterReady() +} + func (c *cluster) restartAllSamePorts() { c.t.Helper() for i, s := range c.servers { From 0ac7895b983a4dbb12f28bd680abbc028a643439 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 11 Sep 2023 16:35:16 -0700 Subject: [PATCH 2/4] Add in utility to detect and delete any NRG orphans. Signed-off-by: Derek Collison --- server/jetstream_cluster.go | 73 +++++++++++++++++++++++++++++- server/jetstream_cluster_3_test.go | 2 +- 2 files changed, 73 insertions(+), 2 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 52ccf5bddf..4bc068d93e 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1159,6 +1159,65 @@ func (js *jetStream) checkForOrphans() { } } +// Check and delete any orphans we may come across. +func (s *Server) checkForNRGOrphans() { + js, cc := s.getJetStreamCluster() + if js == nil || cc == nil || js.isMetaRecovering() { + // No cluster means no NRGs. Also return if still recovering. + return + } + + // Track which assets R>1 should be on this server. + nrgMap := make(map[string]struct{}) + trackGroup := func(rg *raftGroup) { + // If R>1 track this as a legit NRG. + if rg.node != nil { + nrgMap[rg.Name] = struct{}{} + } + } + // Register our meta. + js.mu.RLock() + meta := cc.meta + if meta == nil { + js.mu.RUnlock() + // Bail with no meta node. + return + } + + ourID := meta.ID() + nrgMap[meta.Group()] = struct{}{} + + // Collect all valid groups from our assignments. + for _, asa := range cc.streams { + for _, sa := range asa { + if sa.Group.isMember(ourID) && sa.Restore == nil { + trackGroup(sa.Group) + for _, ca := range sa.consumers { + if ca.Group.isMember(ourID) { + trackGroup(ca.Group) + } + } + } + } + } + js.mu.RUnlock() + + // Check NRGs that are running. + var needDelete []RaftNode + s.rnMu.RLock() + for name, n := range s.raftNodes { + if _, ok := nrgMap[name]; !ok { + needDelete = append(needDelete, n) + } + } + s.rnMu.RUnlock() + + for _, n := range needDelete { + s.Warnf("Detected orphaned NRG %q, will cleanup", n.Group()) + n.Delete() + } +} + func (js *jetStream) monitorCluster() { s, n := js.server(), js.getMetaGroup() qch, rqch, lch, aq := js.clusterQuitC(), n.QuitC(), n.LeadChangeC(), n.ApplyQ() @@ -1189,6 +1248,8 @@ func (js *jetStream) monitorCluster() { if hs := s.healthz(nil); hs.Error != _EMPTY_ { s.Warnf("%v", hs.Error) } + // Also check for orphaned NRGs. + s.checkForNRGOrphans() } var ( @@ -1269,7 +1330,6 @@ func (js *jetStream) monitorCluster() { go checkHealth() continue } - // FIXME(dlc) - Deal with errors. if didSnap, didStreamRemoval, didConsumerRemoval, err := js.applyMetaEntries(ce.Entries, ru); err == nil { _, nb := n.Applied(ce.Index) if js.hasPeerEntries(ce.Entries) || didStreamRemoval || (didSnap && !isLeader) { @@ -1280,6 +1340,8 @@ func (js *jetStream) monitorCluster() { doSnapshot() } ce.ReturnToPool() + } else { + s.Warnf("Error applying JetStream cluster entries: %v", err) } } aq.recycle(&ces) @@ -2028,6 +2090,15 @@ func (mset *stream) removeNode() { } } +func (mset *stream) clearRaftNode() { + if mset == nil { + return + } + mset.mu.Lock() + defer mset.mu.Unlock() + mset.node = nil +} + // Helper function to generate peer info. // lists and sets for old and new. func genPeerInfo(peers []string, split int) (newPeers, oldPeers []string, newPeerSet, oldPeerSet map[string]bool) { diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 3b95396d22..8a24c6d0e9 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -5306,7 +5306,7 @@ func TestJetStreamClusterCheckFileStoreBlkSizes(t *testing.T) { nc, js := jsClientConnect(t, c.randomServer()) defer nc.Close() - // Nowmal Stream + // Normal Stream _, err := js.AddStream(&nats.StreamConfig{ Name: "TEST", Subjects: []string{"*"}, From 9f16edd4314c5f469791929cb3949c4a81ef24d3 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sun, 24 Sep 2023 13:15:39 -0700 Subject: [PATCH 3/4] Make sure to not forward a message across a route for dq sub when we are a spoke leaf node. Signed-off-by: Derek Collison Signed-off-by: Waldemar Quevedo --- server/client.go | 6 ++++ server/leafnode_test.go | 75 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 81 insertions(+) diff --git a/server/client.go b/server/client.go index e98c605470..b44745c497 100644 --- a/server/client.go +++ b/server/client.go @@ -4375,6 +4375,12 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, continue } + // If we are a spoke leaf node make sure to not forward across routes. + // This mimics same behavior for normal subs above. + if c.kind == LEAF && c.isSpokeLeafNode() && sub.client.kind == ROUTER { + continue + } + // We have taken care of preferring local subs for a message from a route above. // Here we just care about a client or leaf and skipping a leaf and preferring locals. if dst := sub.client.kind; dst == ROUTER || dst == LEAF { diff --git a/server/leafnode_test.go b/server/leafnode_test.go index e6577fd373..f29cea5e58 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -5561,3 +5561,78 @@ func TestLeafNodeWithWeightedDQResponsesWithStreamImportAccountsWithUnsub(t *tes closeSubs(rsubs) checkFor(t, time.Second, 200*time.Millisecond, checkInterest) } + +// https://github.com/nats-io/nats-server/issues/4367 +func TestLeafNodeDQMultiAccountExportImport(t *testing.T) { + bConf := createConfFile(t, []byte(` + listen: 127.0.0.1:-1 + server_name: cluster-b-0 + accounts { + $SYS: { users: [ { user: admin, password: pwd } ] }, + AGG: { + exports: [ { service: "PING.>" } ] + users: [ { user: agg, password: agg } ] + } + } + leaf { listen: 127.0.0.1:-1 } + `)) + + sb, ob := RunServerWithConfig(bConf) + defer sb.Shutdown() + + tmpl := ` + listen: 127.0.0.1:-1 + server_name: %s + jetstream: { store_dir: '%s' } + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + accounts { + $SYS: { users: [ { user: admin, password: pwd } ] }, + A: { + mappings: { "A.>" : ">" } + exports: [ { service: A.> } ] + users: [ { user: a, password: a } ] + }, + AGG: { + imports: [ { service: { subject: A.>, account: A } } ] + users: [ { user: agg, password: agg } ] + }, + } + leaf { + remotes: [ { + urls: [ nats-leaf://agg:agg@127.0.0.1:{LEAF_PORT} ] + account: AGG + } ] + } + ` + tmpl = strings.Replace(tmpl, "{LEAF_PORT}", fmt.Sprintf("%d", ob.LeafNode.Port), 1) + c := createJetStreamCluster(t, tmpl, "cluster-a", "cluster-a-", 3, 22110, false) + defer c.shutdown() + + // Make sure all servers are connected via leafnode to the hub, the b server. + for _, s := range c.servers { + checkLeafNodeConnectedCount(t, s, 1) + } + + // Connect to a server in the cluster and create a DQ listener. + nc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("a", "a")) + defer nc.Close() + + var got atomic.Int32 + + natsQueueSub(t, nc, "PING", "Q", func(m *nats.Msg) { + got.Add(1) + m.Respond([]byte("REPLY")) + }) + + // Now connect to B and send the request. + ncb, _ := jsClientConnect(t, sb, nats.UserInfo("agg", "agg")) + defer ncb.Close() + + _, err := ncb.Request("A.PING", []byte("REQUEST"), time.Second) + require_NoError(t, err) + require_Equal(t, got.Load(), 1) +} From 28eb7c0ac2fec792c9223001445f3befc5de55c3 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 28 Sep 2023 12:52:27 -0700 Subject: [PATCH 4/4] Only setup auto no-auth for $G account iff no authorization block was defined. Signed-off-by: Derek Collison --- server/opts.go | 5 ++++- server/opts_test.go | 17 ++++++++++------- server/routes_test.go | 3 ++- server/server.go | 4 ++-- server/server_test.go | 27 +++++++++++++++++++++++++++ 5 files changed, 45 insertions(+), 11 deletions(-) diff --git a/server/opts.go b/server/opts.go index 1cb531dae0..42b9195f9d 100644 --- a/server/opts.go +++ b/server/opts.go @@ -345,6 +345,9 @@ type Options struct { // OCSP Cache config enables next-gen cache for OCSP features OCSPCacheConfig *OCSPResponseCacheConfig + + // Used to mark that we had a top level authorization block. + authBlockDefined bool } // WebsocketOpts are options for websocket @@ -829,7 +832,7 @@ func (o *Options) processConfigFileLine(k string, v interface{}, errors *[]error *errors = append(*errors, err) return } - + o.authBlockDefined = true o.Username = auth.user o.Password = auth.pass o.Authorization = auth.token diff --git a/server/opts_test.go b/server/opts_test.go index d52ae4c447..15267c75f6 100644 --- a/server/opts_test.go +++ b/server/opts_test.go @@ -119,6 +119,7 @@ func TestConfigFile(t *testing.T) { LameDuckDuration: 4 * time.Minute, ConnectErrorReports: 86400, ReconnectErrorReports: 5, + authBlockDefined: true, } opts, err := ProcessConfigFile("./configs/test.conf") @@ -131,13 +132,14 @@ func TestConfigFile(t *testing.T) { func TestTLSConfigFile(t *testing.T) { golden := &Options{ - ConfigFile: "./configs/tls.conf", - Host: "127.0.0.1", - Port: 4443, - Username: "derek", - Password: "foo", - AuthTimeout: 1.0, - TLSTimeout: 2.0, + ConfigFile: "./configs/tls.conf", + Host: "127.0.0.1", + Port: 4443, + Username: "derek", + Password: "foo", + AuthTimeout: 1.0, + TLSTimeout: 2.0, + authBlockDefined: true, } opts, err := ProcessConfigFile("./configs/tls.conf") if err != nil { @@ -282,6 +284,7 @@ func TestMergeOverrides(t *testing.T) { LameDuckDuration: 4 * time.Minute, ConnectErrorReports: 86400, ReconnectErrorReports: 5, + authBlockDefined: true, } fopts, err := ProcessConfigFile("./configs/test.conf") if err != nil { diff --git a/server/routes_test.go b/server/routes_test.go index 505c428f38..5c90fa8d77 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -84,7 +84,8 @@ func TestRouteConfig(t *testing.T) { NoAdvertise: true, ConnectRetries: 2, }, - PidFile: "/tmp/nats-server/nats_cluster_test.pid", + PidFile: "/tmp/nats-server/nats_cluster_test.pid", + authBlockDefined: true, } // Setup URLs diff --git a/server/server.go b/server/server.go index 3ecb4e8750..fd25acf635 100644 --- a/server/server.go +++ b/server/server.go @@ -933,8 +933,8 @@ func (s *Server) configureAccounts(reloading bool) (map[string]struct{}, error) // If we have defined a system account here check to see if its just us and the $G account. // We would do this to add user/pass to the system account. If this is the case add in // no-auth-user for $G. - // Only do this if non-operator mode. - if len(opts.TrustedOperators) == 0 && numAccounts == 2 && opts.NoAuthUser == _EMPTY_ { + // Only do this if non-operator mode and we did not have an authorization block defined. + if len(opts.TrustedOperators) == 0 && numAccounts == 2 && opts.NoAuthUser == _EMPTY_ && !opts.authBlockDefined { // If we come here from config reload, let's not recreate the fake user name otherwise // it will cause currently clients to be disconnected. uname := s.sysAccOnlyNoAuthUser diff --git a/server/server_test.go b/server/server_test.go index 64b6b3f230..703c8c063e 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -19,6 +19,7 @@ import ( "context" "crypto/tls" "encoding/json" + "errors" "flag" "fmt" "io" @@ -2068,3 +2069,29 @@ func TestServerRateLimitLogging(t *testing.T) { checkLog(c1, c2) } + +// https://github.com/nats-io/nats-server/discussions/4535 +func TestServerAuthBlockAndSysAccounts(t *testing.T) { + conf := createConfFile(t, []byte(` + listen: 127.0.0.1:-1 + server_name: s-test + authorization { + users = [ { user: "u", password: "pass"} ] + } + accounts { + $SYS: { users: [ { user: admin, password: pwd } ] } + } + `)) + + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() + + // This should work of course. + nc, err := nats.Connect(s.ClientURL(), nats.UserInfo("u", "pass")) + require_NoError(t, err) + defer nc.Close() + + // This should not. + _, err = nats.Connect(s.ClientURL()) + require_Error(t, err, nats.ErrAuthorization, errors.New("nats: Authorization Violation")) +}