Skip to content

Commit

Permalink
Implement overflow placement for JetStream streams.
Browse files Browse the repository at this point in the history
This allows stream placement to overflow to adjacent clusters.
We also do more balanced placement based on resources (store or mem). We can continue to expand this as well.
We also introduce an account requirement that stream configs contain a MaxBytes value.

We now track account limits and server limits more distinctly, and do not reserver server resources based on account limits themselves.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Jan 7, 2022
1 parent ccc9e16 commit d96e3f3
Show file tree
Hide file tree
Showing 15 changed files with 664 additions and 342 deletions.
48 changes: 36 additions & 12 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3948,6 +3948,11 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
if err := json.Unmarshal(getHeader(ClientInfoHdr, msg[:c.pa.hdr]), &cis); err == nil {
ci = &cis
ci.Service = acc.Name
// Check if we are moving into a share details account from a non-shared
// and add in server and cluster details.
if !share && si.share {
c.addServerAndClusterInfo(ci)
}
}
} else if c.kind != LEAF || c.pa.hdr < 0 || len(getHeader(ClientInfoHdr, msg[:c.pa.hdr])) == 0 {
ci = c.getClientInfo(share)
Expand Down Expand Up @@ -5034,40 +5039,59 @@ func (ci *ClientInfo) serviceAccount() string {
return ci.Account
}

// Add in our server and cluster information to this client info.
func (c *client) addServerAndClusterInfo(ci *ClientInfo) {
if ci == nil {
return
}
// Server
if c.kind != LEAF {
ci.Server = c.srv.Name()
} else if c.kind == LEAF {
ci.Server = c.leaf.remoteServer
}
// Cluster
ci.Cluster = c.srv.cachedClusterName()
// If we have gateways fill in cluster alternates.
// These will be in RTT asc order.
if c.srv.gateway.enabled {
var gws []*client
c.srv.getOutboundGatewayConnections(&gws)
for _, c := range gws {
c.mu.Lock()
cn := c.gw.name
c.mu.Unlock()
ci.Alternates = append(ci.Alternates, cn)
}
}
}

// Grabs the information for this client.
func (c *client) getClientInfo(detailed bool) *ClientInfo {
if c == nil || (c.kind != CLIENT && c.kind != LEAF && c.kind != JETSTREAM) {
return nil
}

// Server name. Defaults to server ID if not set explicitly.
var cn, sn string
// Result
var ci ClientInfo

if detailed {
if c.kind != LEAF {
sn = c.srv.Name()
}
cn = c.srv.cachedClusterName()
c.addServerAndClusterInfo(&ci)
}

c.mu.Lock()
var ci ClientInfo
// RTT and Account are always added.
ci.Account = accForClient(c)
ci.RTT = c.rtt
// Detailed signals additional opt in.
if detailed {
if c.kind == LEAF {
sn = c.leaf.remoteServer
}
ci.Start = &c.start
ci.Host = c.host
ci.ID = c.cid
ci.Name = c.opts.Name
ci.User = c.getRawAuthUser()
ci.Lang = c.opts.Lang
ci.Version = c.opts.Version
ci.Server = sn
ci.Cluster = cn
ci.Jwt = c.opts.JWT
ci.IssuerKey = issuerForClient(c)
ci.NameTag = c.nameTag
Expand Down
8 changes: 4 additions & 4 deletions server/consumer.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019-2021 The NATS Authors
// Copyright 2019-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -2104,8 +2104,8 @@ func (o *consumer) processNextMsgReq(_ *subscription, c *client, _ *Account, _,
sendBatch(&wr)
} else {
// Check for API outstanding requests.
if apiOut := atomic.AddInt64(&js.apiCalls, 1); apiOut > maxJSApiOut {
atomic.AddInt64(&js.apiCalls, -1)
if apiOut := atomic.AddInt64(&js.apiInflight, 1); apiOut > maxJSApiOut {
atomic.AddInt64(&js.apiInflight, -1)
sendErr(503, "JetStream API limit exceeded")
s.Warnf("JetStream API limit exceeded: %d calls outstanding", apiOut)
return
Expand All @@ -2115,7 +2115,7 @@ func (o *consumer) processNextMsgReq(_ *subscription, c *client, _ *Account, _,
o.mu.Lock()
sendBatch(&wr)
o.mu.Unlock()
atomic.AddInt64(&js.apiCalls, -1)
atomic.AddInt64(&js.apiInflight, -1)
}()
}
}
Expand Down
12 changes: 11 additions & 1 deletion server/errors.json
Original file line number Diff line number Diff line change
Expand Up @@ -1108,5 +1108,15 @@
"help": "Returned when the delivery subject on a Push Consumer is not a valid NATS Subject",
"url": "",
"deprecates": ""
},
{
"constant": "JSStreamMaxBytesRequired",
"code": 400,
"error_code": 10113,
"description": "account requires a stream config to have max bytes set",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
}
]
]
95 changes: 68 additions & 27 deletions server/events.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2018-2021 The NATS Authors
// Copyright 2018-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -181,6 +181,7 @@ type ClientInfo struct {
RTT time.Duration `json:"rtt,omitempty"`
Server string `json:"server,omitempty"`
Cluster string `json:"cluster,omitempty"`
Alternates []string `json:"alts,omitempty"`
Stop *time.Time `json:"stop,omitempty"`
Jwt string `json:"jwt,omitempty"`
IssuerKey string `json:"issuer_key,omitempty"`
Expand Down Expand Up @@ -669,6 +670,14 @@ func (s *Server) sendStatsz(subj string) {
jStat.Config = &c
js.mu.RUnlock()
jStat.Stats = js.usageStats()
// Update our own usage since we do not echo so we will not hear ourselves.
ourNode := string(getHash(s.serverName()))
if v, ok := s.nodeToInfo.Load(ourNode); ok && v != nil {
ni := v.(nodeInfo)
ni.stats = jStat.Stats
s.nodeToInfo.Store(ourNode, ni)
}
// Metagroup info.
if mg := js.getMetaGroup(); mg != nil {
if mg.Leader() {
if ci := s.raftNodeToClusterInfo(mg); ci != nil {
Expand All @@ -695,9 +704,11 @@ func (s *Server) sendStatsz(subj string) {
func (s *Server) heartbeatStatsz() {
if s.sys.stmr != nil {
// Increase after startup to our max.
s.sys.cstatsz *= 4
if s.sys.cstatsz > s.sys.statsz {
s.sys.cstatsz = s.sys.statsz
if s.sys.cstatsz < s.sys.statsz {
s.sys.cstatsz *= 2
if s.sys.cstatsz > s.sys.statsz {
s.sys.cstatsz = s.sys.statsz
}
}
s.sys.stmr.Reset(s.sys.cstatsz)
}
Expand All @@ -714,7 +725,7 @@ func (s *Server) sendStatszUpdate() {
func (s *Server) startStatszTimer() {
// We will start by sending out more of these and trail off to the statsz being the max.
s.sys.cstatsz = 250 * time.Millisecond
// Send out the first one after 250ms.
// Send out the first one quickly, we will slowly back off.
s.sys.stmr = time.AfterFunc(s.sys.cstatsz, s.wrapChk(s.heartbeatStatsz))
}

Expand Down Expand Up @@ -1031,10 +1042,10 @@ func (s *Server) processRemoteServerShutdown(sid string) {
})
// Update any state in nodeInfo.
s.nodeToInfo.Range(func(k, v interface{}) bool {
si := v.(nodeInfo)
if si.id == sid {
si.offline = true
s.nodeToInfo.Store(k, si)
ni := v.(nodeInfo)
if ni.id == sid {
ni.offline = true
s.nodeToInfo.Store(k, ni)
return false
}
return true
Expand Down Expand Up @@ -1071,12 +1082,14 @@ func (s *Server) remoteServerShutdown(sub *subscription, c *client, _ *Account,
s.Debugf("Received bad server info for remote server shutdown")
return
}
// Additional processing here.
if !s.sameDomain(si.Domain) {
return
}

// JetStream node updates if applicable.
node := string(getHash(si.Name))
s.nodeToInfo.Store(node, nodeInfo{si.Name, si.Cluster, si.Domain, si.ID, true, true})
if v, ok := s.nodeToInfo.Load(node); ok && v != nil {
ni := v.(nodeInfo)
ni.offline = true
s.nodeToInfo.Store(node, ni)
}

sid := toks[serverSubjectIndex]
if su := s.sys.servers[sid]; su != nil {
Expand All @@ -1095,44 +1108,66 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su
return
}
si := ssm.Server

// JetStream node updates.
if !s.sameDomain(si.Domain) {
return
}

var cfg *JetStreamConfig
var stats *JetStreamStats

if ssm.Stats.JetStream != nil {
cfg = ssm.Stats.JetStream.Config
stats = ssm.Stats.JetStream.Stats
}

node := string(getHash(si.Name))
s.nodeToInfo.Store(node, nodeInfo{si.Name, si.Cluster, si.Domain, si.ID, false, si.JetStream})
s.nodeToInfo.Store(node, nodeInfo{
si.Name,
si.Cluster,
si.Domain,
si.ID,
cfg,
stats,
false, si.JetStream,
})
}

// updateRemoteServer is called when we have an update from a remote server.
// This allows us to track remote servers, respond to shutdown messages properly,
// make sure that messages are ordered, and allow us to prune dead servers.
// Lock should be held upon entry.
func (s *Server) updateRemoteServer(ms *ServerInfo) {
su := s.sys.servers[ms.ID]
func (s *Server) updateRemoteServer(si *ServerInfo) {
su := s.sys.servers[si.ID]
if su == nil {
s.sys.servers[ms.ID] = &serverUpdate{ms.Seq, time.Now()}
s.processNewServer(ms)
s.sys.servers[si.ID] = &serverUpdate{si.Seq, time.Now()}
s.processNewServer(si)
} else {
// Should always be going up.
if ms.Seq <= su.seq {
s.Errorf("Received out of order remote server update from: %q", ms.ID)
if si.Seq <= su.seq {
s.Errorf("Received out of order remote server update from: %q", si.ID)
return
}
su.seq = ms.Seq
su.seq = si.Seq
su.ltime = time.Now()
}
}

// processNewServer will hold any logic we want to use when we discover a new server.
// Lock should be held upon entry.
func (s *Server) processNewServer(ms *ServerInfo) {
func (s *Server) processNewServer(si *ServerInfo) {
// Right now we only check if we have leafnode servers and if so send another
// connect update to make sure they switch this account to interest only mode.
s.ensureGWsInterestOnlyForLeafNodes()

// Add to our nodeToName
if s.sameDomain(ms.Domain) {
node := string(getHash(ms.Name))
s.nodeToInfo.Store(node, nodeInfo{ms.Name, ms.Cluster, ms.Domain, ms.ID, false, ms.JetStream})
if s.sameDomain(si.Domain) {
node := string(getHash(si.Name))
// Only update if non-existent
if _, ok := s.nodeToInfo.Load(node); !ok {
s.nodeToInfo.Store(node, nodeInfo{si.Name, si.Cluster, si.Domain, si.ID, nil, nil, false, si.JetStream})
}
}
// Announce ourselves..
s.sendStatsz(fmt.Sprintf(serverStatsSubj, s.info.ID))
Expand Down Expand Up @@ -1378,9 +1413,15 @@ type ServerAPIConnzResponse struct {

// statszReq is a request for us to respond with current statsz.
func (s *Server) statszReq(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
if !s.EventsEnabled() || reply == _EMPTY_ {
if !s.EventsEnabled() {
return
}

// No reply is a signal that we should use our normal broadcast subject.
if reply == _EMPTY_ {
reply = fmt.Sprintf(serverStatsSubj, s.info.ID)
}

opts := StatszEventOptions{}
if _, msg := c.msgParts(rmsg); len(msg) != 0 {
if err := json.Unmarshal(msg, &opts); err != nil {
Expand Down

0 comments on commit d96e3f3

Please sign in to comment.