Skip to content

Commit

Permalink
Enable in JetStream options
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed May 20, 2024
1 parent a58eb98 commit a88c956
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 19 deletions.
39 changes: 21 additions & 18 deletions server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -1614,25 +1614,28 @@ func (s *Server) processNewServer(si *ServerInfo) {
// the account and moves it appropriately.
// Server lock MUST NOT be held on entry.
func (s *Server) updateNRGAccountStatus() {
supported := true
s.rnMu.Lock()
raftNodes := make([]RaftNode, 0, len(s.raftNodes))
for _, n := range s.raftNodes {
raftNodes = append(raftNodes, n)
}
s.rnMu.Unlock()
s.mu.Lock()
s.nodeToInfo.Range(func(key, value any) bool {
si := value.(nodeInfo)
if !s.sameDomain(si.domain) {
return true
}
if supported = supported && si.accountNRG; !supported {
return false
var raftNodes []RaftNode
supported := s.opts.JetStreamAccountNRG
if supported {
s.rnMu.Lock()
raftNodes = make([]RaftNode, 0, len(s.raftNodes))
for _, n := range s.raftNodes {
raftNodes = append(raftNodes, n)
}
return true
})
s.mu.Unlock()
s.rnMu.Unlock()
s.mu.Lock()
s.nodeToInfo.Range(func(key, value any) bool {
si := value.(nodeInfo)
if !s.sameDomain(si.domain) {
return true
}
if supported = supported && si.accountNRG; !supported {
return false
}
return true
})
s.mu.Unlock()
}
if s.accountNRG.CompareAndSwap(!supported, supported) {
if supported {
s.Noticef("Moving NRG traffic into asset accounts")
Expand Down
3 changes: 3 additions & 0 deletions server/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ type Options struct {
JetStreamLimits JSLimitOpts
JetStreamTpm JSTpmOpts
JetStreamMaxCatchup int64
JetStreamAccountNRG bool
StoreDir string `json:"-"`
SyncInterval time.Duration `json:"-"`
SyncAlways bool `json:"-"`
Expand Down Expand Up @@ -2310,6 +2311,8 @@ func parseJetStream(v any, opts *Options, errors *[]error, warnings *[]error) er
return &configErr{tk, fmt.Sprintf("%s %s", strings.ToLower(mk), err)}
}
opts.JetStreamMaxCatchup = s
case "account_nrg":
opts.JetStreamAccountNRG = mv.(bool)
default:
if !tk.IsUsedVariable() {
err := &unknownConfigFieldErr{
Expand Down
2 changes: 1 addition & 1 deletion server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
// Setup our internal subscriptions for proposals, votes and append entries.
// If we fail to do this for some reason then this is fatal — we cannot
// continue setting up or the Raft node may be partially/totally isolated.
if err := n.RecreateInternalSubs(n.s.accountNRG.Load()); err != nil {
if err := n.RecreateInternalSubs(n.s.opts.JetStreamAccountNRG); err != nil {
n.shutdown(true)
return nil, err
}
Expand Down

0 comments on commit a88c956

Please sign in to comment.