Skip to content

Commit

Permalink
Move NRG traffic into asset account
Browse files Browse the repository at this point in the history
This adds a new account NRG capability into statsz so that we can
detect when all servers in the cluster support moving traffic into
the asset account, instead of all being in the system account.

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed May 21, 2024
1 parent 0b39e9e commit 6ed6d78
Show file tree
Hide file tree
Showing 8 changed files with 209 additions and 21 deletions.
1 change: 1 addition & 0 deletions server/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type Account struct {
sqmu sync.Mutex
sl *Sublist
ic *client
sq *sendq
isid uint64
etmr *time.Timer
ctmr *time.Timer
Expand Down
57 changes: 57 additions & 0 deletions server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ type ServerInfo struct {
const (
JetStreamEnabled ServerCapability = 1 << iota // Server had JetStream enabled.
BinaryStreamSnapshot // New stream snapshot capability.
AccountNRG // Move NRG traffic out of system account.
)

// Set JetStream capability.
Expand All @@ -281,6 +282,17 @@ func (si *ServerInfo) BinaryStreamSnapshot() bool {
return si.Flags&BinaryStreamSnapshot != 0
}

// Set account NRG capability.
func (si *ServerInfo) SetAccountNRG() {
si.Flags |= AccountNRG
}

// AccountNRG indicates whether or not we support moving the NRG traffic out of the
// system account and into the asset account.
func (si *ServerInfo) AccountNRG() bool {
return si.Flags&AccountNRG != 0
}

// ClientInfo is detailed information about the client forming a connection.
type ClientInfo struct {
Start *time.Time `json:"start,omitempty"`
Expand Down Expand Up @@ -479,6 +491,7 @@ RESET:
// New capability based flags.
si.SetJetStreamEnabled()
si.SetBinaryStreamSnapshot()
si.SetAccountNRG()
}
}
var b []byte
Expand Down Expand Up @@ -1543,7 +1556,9 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su
false,
si.JetStreamEnabled(),
si.BinaryStreamSnapshot(),
si.AccountNRG(),
})
s.updateNRGAccountStatus()
}

// updateRemoteServer is called when we have an update from a remote server.
Expand Down Expand Up @@ -1590,14 +1605,56 @@ func (s *Server) processNewServer(si *ServerInfo) {
false,
si.JetStreamEnabled(),
si.BinaryStreamSnapshot(),
si.AccountNRG(),
})
}
}
go s.updateNRGAccountStatus()
// Announce ourselves..
// Do this in a separate Go routine.
go s.sendStatszUpdate()
}

// Works out whether all nodes support moving the NRG traffic into
// the account and moves it appropriately.
// Server lock MUST NOT be held on entry.
func (s *Server) updateNRGAccountStatus() {
var raftNodes []RaftNode
s.optsMu.RLock()
supported := s.opts.JetStreamAccountNRG
s.optsMu.RUnlock()
if supported {
s.rnMu.Lock()
raftNodes = make([]RaftNode, 0, len(s.raftNodes))
for _, n := range s.raftNodes {
raftNodes = append(raftNodes, n)
}
s.rnMu.Unlock()
s.mu.Lock()
s.nodeToInfo.Range(func(key, value any) bool {
si := value.(nodeInfo)
if !s.sameDomain(si.domain) {
return true
}
if supported = supported && si.accountNRG; !supported {
return false
}
return true
})
s.mu.Unlock()
}
if s.accountNRG.CompareAndSwap(!supported, supported) {
if supported {
s.Noticef("Moving NRG traffic into asset accounts")
} else {
s.Warnf("Moving NRG traffic back into system account due to old nodes coming online")
}
for _, n := range raftNodes {
n.RecreateInternalSubs(supported)
}
}
}

// If GW is enabled on this server and there are any leaf node connections,
// this function will send a LeafNode connect system event to the super cluster
// to ensure that the GWs are in interest-only mode for this account.
Expand Down
74 changes: 74 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2184,3 +2184,77 @@ func TestJetStreamClusterBusyStreams(t *testing.T) {
}
})
}

func TestJetStreamClusterAccountNRG(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

snc, _ := jsClientConnect(t, c.randomServer(), nats.UserInfo("admin", "s3cr3t!"))
defer snc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Storage: nats.MemoryStorage,
Retention: nats.WorkQueuePolicy,
Replicas: 3,
})
require_NoError(t, err)

leader := c.streamLeader(globalAccountName, "TEST")
stream, err := leader.gacc.lookupStream("TEST")
require_NoError(t, err)
rg := stream.node.(*raft)

// System account should have interest, but the global account
// shouldn't.
for _, s := range c.servers {
require_True(t, s.sys.account.sl.hasInterest(rg.asubj, true))
require_False(t, s.gacc.sl.hasInterest(rg.asubj, true))
}

// First of all check that the Raft traffic is in the system
// account, as we haven't moved it elsewhere yet.
{
sub, err := snc.SubscribeSync(rg.asubj)
require_NoError(t, err)
require_NoError(t, sub.AutoUnsubscribe(1))

msg, err := sub.NextMsg(time.Second * 3)
require_NoError(t, err)
require_True(t, msg != nil)
}

// Switch on account NRG on all servers in the cluster. Then
// we wait, as we will need statsz to be sent for all servers
// in the cluster.
for _, s := range c.servers {
s.optsMu.Lock()
s.opts.JetStreamAccountNRG = true
s.optsMu.Unlock()
s.updateNRGAccountStatus()
}

// Now check that the traffic has moved into the asset acc.
// In this case the system account should no longer have
// subscriptions for those subjects.
{
sub, err := nc.SubscribeSync(rg.asubj)
require_NoError(t, err)
require_NoError(t, sub.AutoUnsubscribe(1))

msg, err := sub.NextMsg(time.Second * 3)
require_NoError(t, err)
require_True(t, msg != nil)
}

// The global account should now have interest and the
// system account shouldn't.
for _, s := range c.servers {
require_False(t, s.sys.account.sl.hasInterest(rg.asubj, true))
require_True(t, s.gacc.sl.hasInterest(rg.asubj, true))
}
}
3 changes: 3 additions & 0 deletions server/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ type Options struct {
JetStreamLimits JSLimitOpts
JetStreamTpm JSTpmOpts
JetStreamMaxCatchup int64
JetStreamAccountNRG bool
StoreDir string `json:"-"`
SyncInterval time.Duration `json:"-"`
SyncAlways bool `json:"-"`
Expand Down Expand Up @@ -2310,6 +2311,8 @@ func parseJetStream(v any, opts *Options, errors *[]error, warnings *[]error) er
return &configErr{tk, fmt.Sprintf("%s %s", strings.ToLower(mk), err)}
}
opts.JetStreamMaxCatchup = s
case "account_nrg":
opts.JetStreamAccountNRG = mv.(bool)
default:
if !tk.IsUsedVariable() {
err := &unknownConfigFieldErr{
Expand Down
77 changes: 62 additions & 15 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type RaftNode interface {
Stop()
Delete()
Wipe()
RecreateInternalSubs(acc bool) error
}

type WAL interface {
Expand Down Expand Up @@ -128,6 +129,7 @@ type raft struct {

created time.Time // Time that the group was created
accName string // Account name of the asset this raft group is for
acc *Account // Account that NRG traffic will be sent/received in
group string // Raft group
sd string // Store directory
id string // Node ID
Expand Down Expand Up @@ -346,8 +348,6 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
s.mu.RUnlock()
return nil, ErrNoSysAccount
}
sq := s.sys.sq
sacc := s.sys.account
hash := s.sys.shash
s.mu.RUnlock()

Expand Down Expand Up @@ -375,9 +375,7 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
acks: make(map[uint64]map[string]struct{}),
pae: make(map[uint64]*appendEntry),
s: s,
c: s.createInternalSystemClient(),
js: s.getJetStream(),
sq: sq,
quit: make(chan struct{}),
reqs: newIPQueue[*voteRequest](s, qpfx+"vreq"),
votes: newIPQueue[*voteResponse](s, qpfx+"vresp"),
Expand All @@ -391,7 +389,14 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
observer: cfg.Observer,
extSt: ps.domainExt,
}
n.c.registerWithAccount(sacc)

// Setup our internal subscriptions for proposals, votes and append entries.
// If we fail to do this for some reason then this is fatal — we cannot
// continue setting up or the Raft node may be partially/totally isolated.
if err := n.RecreateInternalSubs(n.s.opts.JetStreamAccountNRG); err != nil {
n.shutdown(true)
return nil, err
}

if atomic.LoadInt32(&s.logging.debug) > 0 {
n.dflag = true
Expand Down Expand Up @@ -488,14 +493,6 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
}
}

// Setup our internal subscriptions for proposals, votes and append entries.
// If we fail to do this for some reason then this is fatal — we cannot
// continue setting up or the Raft node may be partially/totally isolated.
if err := n.createInternalSubs(); err != nil {
n.shutdown(true)
return nil, err
}

n.debug("Started")

// Check if we need to start in observer mode due to lame duck status.
Expand Down Expand Up @@ -524,6 +521,57 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
return n, nil
}

func (n *raft) RecreateInternalSubs(acc bool) error {
n.Lock()
defer n.Unlock()

// Need to cancel any in-progress catch-ups, otherwise the
// inboxes are about to be pulled out from underneath it in
// the next step...
n.cancelCatchup()

// If we have an existing client then tear down any existing
// subscriptions and close the internal client.
if c := n.c; c != nil {
var subs []*subscription
c.mu.Lock()
for _, sub := range c.subs {
subs = append(subs, sub)
}
c.mu.Unlock()
for _, sub := range subs {
n.unsubscribe(sub)
}
c.closeConnection(InternalClient)
}

// Look up which account we think we should be participating
// on. This will either be the system account (default) or it
// will be the account that the asset is resident in.
var nrgAcc *Account
if n.s.sys != nil {
nrgAcc = n.s.sys.account
}
if acc { // Should we setup in the asset account?
var err error
if nrgAcc, err = n.s.lookupAccount(n.accName); err != nil {
return err
}
}
c := n.s.createInternalSystemClient()
c.registerWithAccount(nrgAcc)
if nrgAcc.sq == nil {
nrgAcc.sq = n.s.newSendQ(nrgAcc)
}
n.c = c
n.sq = nrgAcc.sq
n.acc = nrgAcc

// Recreate any internal subscriptions for voting, append
// entries etc in the new account.
return n.createInternalSubs()
}

// outOfResources checks to see if we are out of resources.
func (n *raft) outOfResources() bool {
js := n.js
Expand Down Expand Up @@ -1734,9 +1782,8 @@ func (n *raft) unsubscribe(sub *subscription) {
}
}

// Lock should be held.
func (n *raft) createInternalSubs() error {
n.Lock()
defer n.Unlock()
n.vsubj, n.vreply = fmt.Sprintf(raftVoteSubj, n.group), n.newInbox()
n.asubj, n.areply = fmt.Sprintf(raftAppendSubj, n.group), n.newInbox()
n.psubj = fmt.Sprintf(raftPropSubj, n.group)
Expand Down
2 changes: 1 addition & 1 deletion server/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -2065,7 +2065,7 @@ func (s *Server) addRoute(c *client, didSolicit bool, info *Info, accName string
// check to be consistent and future proof. but will be same domain
if s.sameDomain(info.Domain) {
s.nodeToInfo.Store(rHash,
nodeInfo{rn, s.info.Version, s.info.Cluster, info.Domain, id, nil, nil, nil, false, info.JetStream, false})
nodeInfo{rn, s.info.Version, s.info.Cluster, info.Domain, id, nil, nil, nil, false, info.JetStream, false, false})
}
}

Expand Down
8 changes: 5 additions & 3 deletions server/sendq.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ type sendq struct {
mu sync.Mutex
q *ipQueue[*outMsg]
s *Server
a *Account
}

func (s *Server) newSendQ() *sendq {
sq := &sendq{s: s, q: newIPQueue[*outMsg](s, "SendQ")}
func (s *Server) newSendQ(acc *Account) *sendq {
sq := &sendq{s: s, q: newIPQueue[*outMsg](s, "SendQ"), a: acc}
s.startGoRoutine(sq.internalLoop)
return sq
}
Expand All @@ -44,8 +45,9 @@ func (sq *sendq) internalLoop() {

defer s.grWG.Done()

//c := s.createInternalAccountClient()
c := s.createInternalSystemClient()
c.registerWithAccount(s.SystemAccount())
c.registerWithAccount(sq.a)
c.noIcb = true

defer c.closeConnection(ClientClosed)
Expand Down

0 comments on commit 6ed6d78

Please sign in to comment.