diff --git a/server/accounts.go b/server/accounts.go index 4576c9fea8..c22c3b94c3 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -61,6 +61,7 @@ type Account struct { sqmu sync.Mutex sl *Sublist ic *client + sq *sendq isid uint64 etmr *time.Timer ctmr *time.Timer diff --git a/server/events.go b/server/events.go index 9c214cc6b3..76ee5eb68c 100644 --- a/server/events.go +++ b/server/events.go @@ -256,6 +256,7 @@ type ServerInfo struct { const ( JetStreamEnabled ServerCapability = 1 << iota // Server had JetStream enabled. BinaryStreamSnapshot // New stream snapshot capability. + AccountNRG // Move NRG traffic out of system account. ) // Set JetStream capability. @@ -281,6 +282,17 @@ func (si *ServerInfo) BinaryStreamSnapshot() bool { return si.Flags&BinaryStreamSnapshot != 0 } +// Set account NRG capability. +func (si *ServerInfo) SetAccountNRG() { + si.Flags |= AccountNRG +} + +// AccountNRG indicates whether or not we support moving the NRG traffic out of the +// system account and into the asset account. +func (si *ServerInfo) AccountNRG() bool { + return si.Flags&AccountNRG != 0 +} + // ClientInfo is detailed information about the client forming a connection. type ClientInfo struct { Start *time.Time `json:"start,omitempty"` @@ -479,6 +491,7 @@ RESET: // New capability based flags. si.SetJetStreamEnabled() si.SetBinaryStreamSnapshot() + si.SetAccountNRG() } } var b []byte @@ -1543,7 +1556,9 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su false, si.JetStreamEnabled(), si.BinaryStreamSnapshot(), + si.AccountNRG(), }) + s.updateNRGAccountStatus() } // updateRemoteServer is called when we have an update from a remote server. @@ -1590,14 +1605,56 @@ func (s *Server) processNewServer(si *ServerInfo) { false, si.JetStreamEnabled(), si.BinaryStreamSnapshot(), + si.AccountNRG(), }) } } + go s.updateNRGAccountStatus() // Announce ourselves.. // Do this in a separate Go routine. go s.sendStatszUpdate() } +// Works out whether all nodes support moving the NRG traffic into +// the account and moves it appropriately. +// Server lock MUST NOT be held on entry. +func (s *Server) updateNRGAccountStatus() { + var raftNodes []RaftNode + s.optsMu.RLock() + supported := s.opts.JetStreamAccountNRG + s.optsMu.RUnlock() + if supported { + s.rnMu.Lock() + raftNodes = make([]RaftNode, 0, len(s.raftNodes)) + for _, n := range s.raftNodes { + raftNodes = append(raftNodes, n) + } + s.rnMu.Unlock() + s.mu.Lock() + s.nodeToInfo.Range(func(key, value any) bool { + si := value.(nodeInfo) + if !s.sameDomain(si.domain) { + return true + } + if supported = supported && si.accountNRG; !supported { + return false + } + return true + }) + s.mu.Unlock() + } + if s.accountNRG.CompareAndSwap(!supported, supported) { + if supported { + s.Noticef("Moving NRG traffic into asset accounts") + } else { + s.Warnf("Moving NRG traffic back into system account due to old nodes coming online") + } + for _, n := range raftNodes { + n.RecreateInternalSubs(supported) + } + } +} + // If GW is enabled on this server and there are any leaf node connections, // this function will send a LeafNode connect system event to the super cluster // to ensure that the GWs are in interest-only mode for this account. diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 0013e5db89..f7d4401dee 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -2184,3 +2184,77 @@ func TestJetStreamClusterBusyStreams(t *testing.T) { } }) } + +func TestJetStreamClusterAccountNRG(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + snc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("admin", "s3cr3t!")) + defer snc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Storage: nats.MemoryStorage, + Retention: nats.WorkQueuePolicy, + Replicas: 3, + }) + require_NoError(t, err) + + leader := c.streamLeader(globalAccountName, "TEST") + stream, err := leader.gacc.lookupStream("TEST") + require_NoError(t, err) + rg := stream.node.(*raft) + + // System account should have interest, but the global account + // shouldn't. + for _, s := range c.servers { + require_True(t, s.sys.account.sl.hasInterest(rg.asubj, true)) + require_False(t, s.gacc.sl.hasInterest(rg.asubj, true)) + } + + // First of all check that the Raft traffic is in the system + // account, as we haven't moved it elsewhere yet. + { + sub, err := snc.SubscribeSync(rg.asubj) + require_NoError(t, err) + require_NoError(t, sub.AutoUnsubscribe(1)) + + msg, err := sub.NextMsg(time.Second * 3) + require_NoError(t, err) + require_True(t, msg != nil) + } + + // Switch on account NRG on all servers in the cluster. Then + // we wait, as we will need statsz to be sent for all servers + // in the cluster. + for _, s := range c.servers { + s.optsMu.Lock() + s.opts.JetStreamAccountNRG = true + s.optsMu.Unlock() + s.updateNRGAccountStatus() + } + + // Now check that the traffic has moved into the asset acc. + // In this case the system account should no longer have + // subscriptions for those subjects. + { + sub, err := nc.SubscribeSync(rg.asubj) + require_NoError(t, err) + require_NoError(t, sub.AutoUnsubscribe(1)) + + msg, err := sub.NextMsg(time.Second * 3) + require_NoError(t, err) + require_True(t, msg != nil) + } + + // The global account should now have interest and the + // system account shouldn't. + for _, s := range c.servers { + require_False(t, s.sys.account.sl.hasInterest(rg.asubj, true)) + require_True(t, s.gacc.sl.hasInterest(rg.asubj, true)) + } +} diff --git a/server/opts.go b/server/opts.go index e717f26ea4..8fe776a5fa 100644 --- a/server/opts.go +++ b/server/opts.go @@ -317,6 +317,7 @@ type Options struct { JetStreamLimits JSLimitOpts JetStreamTpm JSTpmOpts JetStreamMaxCatchup int64 + JetStreamAccountNRG bool StoreDir string `json:"-"` SyncInterval time.Duration `json:"-"` SyncAlways bool `json:"-"` @@ -2310,6 +2311,8 @@ func parseJetStream(v any, opts *Options, errors *[]error, warnings *[]error) er return &configErr{tk, fmt.Sprintf("%s %s", strings.ToLower(mk), err)} } opts.JetStreamMaxCatchup = s + case "account_nrg": + opts.JetStreamAccountNRG = mv.(bool) default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ diff --git a/server/raft.go b/server/raft.go index d374c85163..dbb105dc94 100644 --- a/server/raft.go +++ b/server/raft.go @@ -76,6 +76,7 @@ type RaftNode interface { Stop() Delete() Wipe() + RecreateInternalSubs(acc bool) error } type WAL interface { @@ -128,6 +129,7 @@ type raft struct { created time.Time // Time that the group was created accName string // Account name of the asset this raft group is for + acc *Account // Account that NRG traffic will be sent/received in group string // Raft group sd string // Store directory id string // Node ID @@ -346,8 +348,6 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe s.mu.RUnlock() return nil, ErrNoSysAccount } - sq := s.sys.sq - sacc := s.sys.account hash := s.sys.shash s.mu.RUnlock() @@ -375,9 +375,7 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe acks: make(map[uint64]map[string]struct{}), pae: make(map[uint64]*appendEntry), s: s, - c: s.createInternalSystemClient(), js: s.getJetStream(), - sq: sq, quit: make(chan struct{}), reqs: newIPQueue[*voteRequest](s, qpfx+"vreq"), votes: newIPQueue[*voteResponse](s, qpfx+"vresp"), @@ -391,7 +389,14 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe observer: cfg.Observer, extSt: ps.domainExt, } - n.c.registerWithAccount(sacc) + + // Setup our internal subscriptions for proposals, votes and append entries. + // If we fail to do this for some reason then this is fatal — we cannot + // continue setting up or the Raft node may be partially/totally isolated. + if err := n.RecreateInternalSubs(n.s.opts.JetStreamAccountNRG); err != nil { + n.shutdown(true) + return nil, err + } if atomic.LoadInt32(&s.logging.debug) > 0 { n.dflag = true @@ -488,14 +493,6 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe } } - // Setup our internal subscriptions for proposals, votes and append entries. - // If we fail to do this for some reason then this is fatal — we cannot - // continue setting up or the Raft node may be partially/totally isolated. - if err := n.createInternalSubs(); err != nil { - n.shutdown(true) - return nil, err - } - n.debug("Started") // Check if we need to start in observer mode due to lame duck status. @@ -524,6 +521,57 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe return n, nil } +func (n *raft) RecreateInternalSubs(acc bool) error { + n.Lock() + defer n.Unlock() + + // Need to cancel any in-progress catch-ups, otherwise the + // inboxes are about to be pulled out from underneath it in + // the next step... + n.cancelCatchup() + + // If we have an existing client then tear down any existing + // subscriptions and close the internal client. + if c := n.c; c != nil { + var subs []*subscription + c.mu.Lock() + for _, sub := range c.subs { + subs = append(subs, sub) + } + c.mu.Unlock() + for _, sub := range subs { + n.unsubscribe(sub) + } + c.closeConnection(InternalClient) + } + + // Look up which account we think we should be participating + // on. This will either be the system account (default) or it + // will be the account that the asset is resident in. + var nrgAcc *Account + if n.s.sys != nil { + nrgAcc = n.s.sys.account + } + if acc { // Should we setup in the asset account? + var err error + if nrgAcc, err = n.s.lookupAccount(n.accName); err != nil { + return err + } + } + c := n.s.createInternalSystemClient() + c.registerWithAccount(nrgAcc) + if nrgAcc.sq == nil { + nrgAcc.sq = n.s.newSendQ(nrgAcc) + } + n.c = c + n.sq = nrgAcc.sq + n.acc = nrgAcc + + // Recreate any internal subscriptions for voting, append + // entries etc in the new account. + return n.createInternalSubs() +} + // outOfResources checks to see if we are out of resources. func (n *raft) outOfResources() bool { js := n.js @@ -1734,9 +1782,8 @@ func (n *raft) unsubscribe(sub *subscription) { } } +// Lock should be held. func (n *raft) createInternalSubs() error { - n.Lock() - defer n.Unlock() n.vsubj, n.vreply = fmt.Sprintf(raftVoteSubj, n.group), n.newInbox() n.asubj, n.areply = fmt.Sprintf(raftAppendSubj, n.group), n.newInbox() n.psubj = fmt.Sprintf(raftPropSubj, n.group) diff --git a/server/route.go b/server/route.go index 7bbcccddf8..f5354a18f8 100644 --- a/server/route.go +++ b/server/route.go @@ -2065,7 +2065,7 @@ func (s *Server) addRoute(c *client, didSolicit bool, info *Info, accName string // check to be consistent and future proof. but will be same domain if s.sameDomain(info.Domain) { s.nodeToInfo.Store(rHash, - nodeInfo{rn, s.info.Version, s.info.Cluster, info.Domain, id, nil, nil, nil, false, info.JetStream, false}) + nodeInfo{rn, s.info.Version, s.info.Cluster, info.Domain, id, nil, nil, nil, false, info.JetStream, false, false}) } } diff --git a/server/sendq.go b/server/sendq.go index 0287c5548a..2e7b5d0345 100644 --- a/server/sendq.go +++ b/server/sendq.go @@ -29,10 +29,11 @@ type sendq struct { mu sync.Mutex q *ipQueue[*outMsg] s *Server + a *Account } -func (s *Server) newSendQ() *sendq { - sq := &sendq{s: s, q: newIPQueue[*outMsg](s, "SendQ")} +func (s *Server) newSendQ(acc *Account) *sendq { + sq := &sendq{s: s, q: newIPQueue[*outMsg](s, "SendQ"), a: acc} s.startGoRoutine(sq.internalLoop) return sq } @@ -44,8 +45,9 @@ func (sq *sendq) internalLoop() { defer s.grWG.Done() + //c := s.createInternalAccountClient() c := s.createInternalSystemClient() - c.registerWithAccount(s.SystemAccount()) + c.registerWithAccount(sq.a) c.noIcb = true defer c.closeConnection(ClientClosed) diff --git a/server/server.go b/server/server.go index 33cd155e58..d408240060 100644 --- a/server/server.go +++ b/server/server.go @@ -358,6 +358,9 @@ type Server struct { // Queue to process JS API requests that come from routes (or gateways) jsAPIRoutedReqs *ipQueue[*jsAPIRoutedReq] + + // Whether account NRG is supported cluster-wide or not. + accountNRG atomic.Bool } // For tracking JS nodes. @@ -373,6 +376,7 @@ type nodeInfo struct { offline bool js bool binarySnapshots bool + accountNRG bool } // Make sure all are 64bits for atomic use @@ -762,7 +766,7 @@ func NewServer(opts *Options) (*Server, error) { opts.Tags, &JetStreamConfig{MaxMemory: opts.JetStreamMaxMemory, MaxStore: opts.JetStreamMaxStore, CompressOK: true}, nil, - false, true, true, + false, true, true, true, }) } @@ -1740,7 +1744,7 @@ func (s *Server) setSystemAccount(acc *Account) error { sendq: newIPQueue[*pubMsg](s, "System sendQ"), recvq: newIPQueue[*inSysMsg](s, "System recvQ"), resetCh: make(chan struct{}), - sq: s.newSendQ(), + sq: s.newSendQ(acc), statsz: eventsHBInterval, orphMax: 5 * eventsHBInterval, chkOrph: 3 * eventsHBInterval,