From 6ed6d784bf77e13a18dcd84842b2ed85fb3fb741 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Wed, 21 Feb 2024 16:15:15 +0000 Subject: [PATCH] Move NRG traffic into asset account This adds a new account NRG capability into statsz so that we can detect when all servers in the cluster support moving traffic into the asset account, instead of all being in the system account. Signed-off-by: Neil Twigg --- server/accounts.go | 1 + server/events.go | 57 ++++++++++++++++++++++ server/jetstream_cluster_4_test.go | 74 ++++++++++++++++++++++++++++ server/opts.go | 3 ++ server/raft.go | 77 ++++++++++++++++++++++++------ server/route.go | 2 +- server/sendq.go | 8 ++-- server/server.go | 8 +++- 8 files changed, 209 insertions(+), 21 deletions(-) diff --git a/server/accounts.go b/server/accounts.go index 4576c9fea8..c22c3b94c3 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -61,6 +61,7 @@ type Account struct { sqmu sync.Mutex sl *Sublist ic *client + sq *sendq isid uint64 etmr *time.Timer ctmr *time.Timer diff --git a/server/events.go b/server/events.go index 9c214cc6b3..76ee5eb68c 100644 --- a/server/events.go +++ b/server/events.go @@ -256,6 +256,7 @@ type ServerInfo struct { const ( JetStreamEnabled ServerCapability = 1 << iota // Server had JetStream enabled. BinaryStreamSnapshot // New stream snapshot capability. + AccountNRG // Move NRG traffic out of system account. ) // Set JetStream capability. @@ -281,6 +282,17 @@ func (si *ServerInfo) BinaryStreamSnapshot() bool { return si.Flags&BinaryStreamSnapshot != 0 } +// Set account NRG capability. +func (si *ServerInfo) SetAccountNRG() { + si.Flags |= AccountNRG +} + +// AccountNRG indicates whether or not we support moving the NRG traffic out of the +// system account and into the asset account. +func (si *ServerInfo) AccountNRG() bool { + return si.Flags&AccountNRG != 0 +} + // ClientInfo is detailed information about the client forming a connection. type ClientInfo struct { Start *time.Time `json:"start,omitempty"` @@ -479,6 +491,7 @@ RESET: // New capability based flags. si.SetJetStreamEnabled() si.SetBinaryStreamSnapshot() + si.SetAccountNRG() } } var b []byte @@ -1543,7 +1556,9 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su false, si.JetStreamEnabled(), si.BinaryStreamSnapshot(), + si.AccountNRG(), }) + s.updateNRGAccountStatus() } // updateRemoteServer is called when we have an update from a remote server. @@ -1590,14 +1605,56 @@ func (s *Server) processNewServer(si *ServerInfo) { false, si.JetStreamEnabled(), si.BinaryStreamSnapshot(), + si.AccountNRG(), }) } } + go s.updateNRGAccountStatus() // Announce ourselves.. // Do this in a separate Go routine. go s.sendStatszUpdate() } +// Works out whether all nodes support moving the NRG traffic into +// the account and moves it appropriately. +// Server lock MUST NOT be held on entry. +func (s *Server) updateNRGAccountStatus() { + var raftNodes []RaftNode + s.optsMu.RLock() + supported := s.opts.JetStreamAccountNRG + s.optsMu.RUnlock() + if supported { + s.rnMu.Lock() + raftNodes = make([]RaftNode, 0, len(s.raftNodes)) + for _, n := range s.raftNodes { + raftNodes = append(raftNodes, n) + } + s.rnMu.Unlock() + s.mu.Lock() + s.nodeToInfo.Range(func(key, value any) bool { + si := value.(nodeInfo) + if !s.sameDomain(si.domain) { + return true + } + if supported = supported && si.accountNRG; !supported { + return false + } + return true + }) + s.mu.Unlock() + } + if s.accountNRG.CompareAndSwap(!supported, supported) { + if supported { + s.Noticef("Moving NRG traffic into asset accounts") + } else { + s.Warnf("Moving NRG traffic back into system account due to old nodes coming online") + } + for _, n := range raftNodes { + n.RecreateInternalSubs(supported) + } + } +} + // If GW is enabled on this server and there are any leaf node connections, // this function will send a LeafNode connect system event to the super cluster // to ensure that the GWs are in interest-only mode for this account. diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 0013e5db89..f7d4401dee 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -2184,3 +2184,77 @@ func TestJetStreamClusterBusyStreams(t *testing.T) { } }) } + +func TestJetStreamClusterAccountNRG(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + snc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("admin", "s3cr3t!")) + defer snc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Storage: nats.MemoryStorage, + Retention: nats.WorkQueuePolicy, + Replicas: 3, + }) + require_NoError(t, err) + + leader := c.streamLeader(globalAccountName, "TEST") + stream, err := leader.gacc.lookupStream("TEST") + require_NoError(t, err) + rg := stream.node.(*raft) + + // System account should have interest, but the global account + // shouldn't. + for _, s := range c.servers { + require_True(t, s.sys.account.sl.hasInterest(rg.asubj, true)) + require_False(t, s.gacc.sl.hasInterest(rg.asubj, true)) + } + + // First of all check that the Raft traffic is in the system + // account, as we haven't moved it elsewhere yet. + { + sub, err := snc.SubscribeSync(rg.asubj) + require_NoError(t, err) + require_NoError(t, sub.AutoUnsubscribe(1)) + + msg, err := sub.NextMsg(time.Second * 3) + require_NoError(t, err) + require_True(t, msg != nil) + } + + // Switch on account NRG on all servers in the cluster. Then + // we wait, as we will need statsz to be sent for all servers + // in the cluster. + for _, s := range c.servers { + s.optsMu.Lock() + s.opts.JetStreamAccountNRG = true + s.optsMu.Unlock() + s.updateNRGAccountStatus() + } + + // Now check that the traffic has moved into the asset acc. + // In this case the system account should no longer have + // subscriptions for those subjects. + { + sub, err := nc.SubscribeSync(rg.asubj) + require_NoError(t, err) + require_NoError(t, sub.AutoUnsubscribe(1)) + + msg, err := sub.NextMsg(time.Second * 3) + require_NoError(t, err) + require_True(t, msg != nil) + } + + // The global account should now have interest and the + // system account shouldn't. + for _, s := range c.servers { + require_False(t, s.sys.account.sl.hasInterest(rg.asubj, true)) + require_True(t, s.gacc.sl.hasInterest(rg.asubj, true)) + } +} diff --git a/server/opts.go b/server/opts.go index e717f26ea4..8fe776a5fa 100644 --- a/server/opts.go +++ b/server/opts.go @@ -317,6 +317,7 @@ type Options struct { JetStreamLimits JSLimitOpts JetStreamTpm JSTpmOpts JetStreamMaxCatchup int64 + JetStreamAccountNRG bool StoreDir string `json:"-"` SyncInterval time.Duration `json:"-"` SyncAlways bool `json:"-"` @@ -2310,6 +2311,8 @@ func parseJetStream(v any, opts *Options, errors *[]error, warnings *[]error) er return &configErr{tk, fmt.Sprintf("%s %s", strings.ToLower(mk), err)} } opts.JetStreamMaxCatchup = s + case "account_nrg": + opts.JetStreamAccountNRG = mv.(bool) default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ diff --git a/server/raft.go b/server/raft.go index d374c85163..dbb105dc94 100644 --- a/server/raft.go +++ b/server/raft.go @@ -76,6 +76,7 @@ type RaftNode interface { Stop() Delete() Wipe() + RecreateInternalSubs(acc bool) error } type WAL interface { @@ -128,6 +129,7 @@ type raft struct { created time.Time // Time that the group was created accName string // Account name of the asset this raft group is for + acc *Account // Account that NRG traffic will be sent/received in group string // Raft group sd string // Store directory id string // Node ID @@ -346,8 +348,6 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe s.mu.RUnlock() return nil, ErrNoSysAccount } - sq := s.sys.sq - sacc := s.sys.account hash := s.sys.shash s.mu.RUnlock() @@ -375,9 +375,7 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe acks: make(map[uint64]map[string]struct{}), pae: make(map[uint64]*appendEntry), s: s, - c: s.createInternalSystemClient(), js: s.getJetStream(), - sq: sq, quit: make(chan struct{}), reqs: newIPQueue[*voteRequest](s, qpfx+"vreq"), votes: newIPQueue[*voteResponse](s, qpfx+"vresp"), @@ -391,7 +389,14 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe observer: cfg.Observer, extSt: ps.domainExt, } - n.c.registerWithAccount(sacc) + + // Setup our internal subscriptions for proposals, votes and append entries. + // If we fail to do this for some reason then this is fatal — we cannot + // continue setting up or the Raft node may be partially/totally isolated. + if err := n.RecreateInternalSubs(n.s.opts.JetStreamAccountNRG); err != nil { + n.shutdown(true) + return nil, err + } if atomic.LoadInt32(&s.logging.debug) > 0 { n.dflag = true @@ -488,14 +493,6 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe } } - // Setup our internal subscriptions for proposals, votes and append entries. - // If we fail to do this for some reason then this is fatal — we cannot - // continue setting up or the Raft node may be partially/totally isolated. - if err := n.createInternalSubs(); err != nil { - n.shutdown(true) - return nil, err - } - n.debug("Started") // Check if we need to start in observer mode due to lame duck status. @@ -524,6 +521,57 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe return n, nil } +func (n *raft) RecreateInternalSubs(acc bool) error { + n.Lock() + defer n.Unlock() + + // Need to cancel any in-progress catch-ups, otherwise the + // inboxes are about to be pulled out from underneath it in + // the next step... + n.cancelCatchup() + + // If we have an existing client then tear down any existing + // subscriptions and close the internal client. + if c := n.c; c != nil { + var subs []*subscription + c.mu.Lock() + for _, sub := range c.subs { + subs = append(subs, sub) + } + c.mu.Unlock() + for _, sub := range subs { + n.unsubscribe(sub) + } + c.closeConnection(InternalClient) + } + + // Look up which account we think we should be participating + // on. This will either be the system account (default) or it + // will be the account that the asset is resident in. + var nrgAcc *Account + if n.s.sys != nil { + nrgAcc = n.s.sys.account + } + if acc { // Should we setup in the asset account? + var err error + if nrgAcc, err = n.s.lookupAccount(n.accName); err != nil { + return err + } + } + c := n.s.createInternalSystemClient() + c.registerWithAccount(nrgAcc) + if nrgAcc.sq == nil { + nrgAcc.sq = n.s.newSendQ(nrgAcc) + } + n.c = c + n.sq = nrgAcc.sq + n.acc = nrgAcc + + // Recreate any internal subscriptions for voting, append + // entries etc in the new account. + return n.createInternalSubs() +} + // outOfResources checks to see if we are out of resources. func (n *raft) outOfResources() bool { js := n.js @@ -1734,9 +1782,8 @@ func (n *raft) unsubscribe(sub *subscription) { } } +// Lock should be held. func (n *raft) createInternalSubs() error { - n.Lock() - defer n.Unlock() n.vsubj, n.vreply = fmt.Sprintf(raftVoteSubj, n.group), n.newInbox() n.asubj, n.areply = fmt.Sprintf(raftAppendSubj, n.group), n.newInbox() n.psubj = fmt.Sprintf(raftPropSubj, n.group) diff --git a/server/route.go b/server/route.go index 7bbcccddf8..f5354a18f8 100644 --- a/server/route.go +++ b/server/route.go @@ -2065,7 +2065,7 @@ func (s *Server) addRoute(c *client, didSolicit bool, info *Info, accName string // check to be consistent and future proof. but will be same domain if s.sameDomain(info.Domain) { s.nodeToInfo.Store(rHash, - nodeInfo{rn, s.info.Version, s.info.Cluster, info.Domain, id, nil, nil, nil, false, info.JetStream, false}) + nodeInfo{rn, s.info.Version, s.info.Cluster, info.Domain, id, nil, nil, nil, false, info.JetStream, false, false}) } } diff --git a/server/sendq.go b/server/sendq.go index 0287c5548a..2e7b5d0345 100644 --- a/server/sendq.go +++ b/server/sendq.go @@ -29,10 +29,11 @@ type sendq struct { mu sync.Mutex q *ipQueue[*outMsg] s *Server + a *Account } -func (s *Server) newSendQ() *sendq { - sq := &sendq{s: s, q: newIPQueue[*outMsg](s, "SendQ")} +func (s *Server) newSendQ(acc *Account) *sendq { + sq := &sendq{s: s, q: newIPQueue[*outMsg](s, "SendQ"), a: acc} s.startGoRoutine(sq.internalLoop) return sq } @@ -44,8 +45,9 @@ func (sq *sendq) internalLoop() { defer s.grWG.Done() + //c := s.createInternalAccountClient() c := s.createInternalSystemClient() - c.registerWithAccount(s.SystemAccount()) + c.registerWithAccount(sq.a) c.noIcb = true defer c.closeConnection(ClientClosed) diff --git a/server/server.go b/server/server.go index 33cd155e58..d408240060 100644 --- a/server/server.go +++ b/server/server.go @@ -358,6 +358,9 @@ type Server struct { // Queue to process JS API requests that come from routes (or gateways) jsAPIRoutedReqs *ipQueue[*jsAPIRoutedReq] + + // Whether account NRG is supported cluster-wide or not. + accountNRG atomic.Bool } // For tracking JS nodes. @@ -373,6 +376,7 @@ type nodeInfo struct { offline bool js bool binarySnapshots bool + accountNRG bool } // Make sure all are 64bits for atomic use @@ -762,7 +766,7 @@ func NewServer(opts *Options) (*Server, error) { opts.Tags, &JetStreamConfig{MaxMemory: opts.JetStreamMaxMemory, MaxStore: opts.JetStreamMaxStore, CompressOK: true}, nil, - false, true, true, + false, true, true, true, }) } @@ -1740,7 +1744,7 @@ func (s *Server) setSystemAccount(acc *Account) error { sendq: newIPQueue[*pubMsg](s, "System sendQ"), recvq: newIPQueue[*inSysMsg](s, "System recvQ"), resetCh: make(chan struct{}), - sq: s.newSendQ(), + sq: s.newSendQ(acc), statsz: eventsHBInterval, orphMax: 5 * eventsHBInterval, chkOrph: 3 * eventsHBInterval,