Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement overflow placement for JetStream streams. #2771

Merged
merged 4 commits into from
Jan 7, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
55 changes: 41 additions & 14 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3948,11 +3948,19 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt
if err := json.Unmarshal(getHeader(ClientInfoHdr, msg[:c.pa.hdr]), &cis); err == nil {
ci = &cis
ci.Service = acc.Name
// Check if we are moving into a share details account from a non-shared
// and add in server and cluster details.
if !share && si.share {
c.addServerAndClusterInfo(ci)
}
}
} else if c.kind != LEAF || c.pa.hdr < 0 || len(getHeader(ClientInfoHdr, msg[:c.pa.hdr])) == 0 {
ci = c.getClientInfo(share)
} else if c.kind == LEAF && si.share {
// We have a leaf header here for ci, augment as above.
ci = c.getClientInfo(si.share)
}

// Set clientInfo if present.
if ci != nil {
if b, _ := json.Marshal(ci); b != nil {
msg = c.setHeader(ClientInfoHdr, string(b), msg)
Expand Down Expand Up @@ -5034,40 +5042,59 @@ func (ci *ClientInfo) serviceAccount() string {
return ci.Account
}

// Add in our server and cluster information to this client info.
func (c *client) addServerAndClusterInfo(ci *ClientInfo) {
if ci == nil {
return
}
// Server
if c.kind != LEAF {
ci.Server = c.srv.Name()
} else if c.kind == LEAF {
ci.Server = c.leaf.remoteServer
}
// Cluster
ci.Cluster = c.srv.cachedClusterName()
// If we have gateways fill in cluster alternates.
// These will be in RTT asc order.
if c.srv.gateway.enabled {
var gws []*client
c.srv.getOutboundGatewayConnections(&gws)
for _, c := range gws {
c.mu.Lock()
cn := c.gw.name
c.mu.Unlock()
ci.Alternates = append(ci.Alternates, cn)
}
}
}

// Grabs the information for this client.
func (c *client) getClientInfo(detailed bool) *ClientInfo {
if c == nil || (c.kind != CLIENT && c.kind != LEAF && c.kind != JETSTREAM) {
if c == nil || (c.kind != CLIENT && c.kind != LEAF && c.kind != JETSTREAM && c.kind != ACCOUNT) {
return nil
}

// Server name. Defaults to server ID if not set explicitly.
var cn, sn string
// Result
var ci ClientInfo

if detailed {
if c.kind != LEAF {
sn = c.srv.Name()
}
cn = c.srv.cachedClusterName()
c.addServerAndClusterInfo(&ci)
}

c.mu.Lock()
var ci ClientInfo
// RTT and Account are always added.
ci.Account = accForClient(c)
ci.RTT = c.rtt
// Detailed signals additional opt in.
if detailed {
if c.kind == LEAF {
sn = c.leaf.remoteServer
}
ci.Start = &c.start
ci.Host = c.host
ci.ID = c.cid
ci.Name = c.opts.Name
ci.User = c.getRawAuthUser()
ci.Lang = c.opts.Lang
ci.Version = c.opts.Version
ci.Server = sn
ci.Cluster = cn
ci.Jwt = c.opts.JWT
ci.IssuerKey = issuerForClient(c)
ci.NameTag = c.nameTag
Expand Down
8 changes: 4 additions & 4 deletions server/consumer.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019-2021 The NATS Authors
// Copyright 2019-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -2104,8 +2104,8 @@ func (o *consumer) processNextMsgReq(_ *subscription, c *client, _ *Account, _,
sendBatch(&wr)
} else {
// Check for API outstanding requests.
if apiOut := atomic.AddInt64(&js.apiCalls, 1); apiOut > maxJSApiOut {
atomic.AddInt64(&js.apiCalls, -1)
if apiOut := atomic.AddInt64(&js.apiInflight, 1); apiOut > maxJSApiOut {
atomic.AddInt64(&js.apiInflight, -1)
sendErr(503, "JetStream API limit exceeded")
s.Warnf("JetStream API limit exceeded: %d calls outstanding", apiOut)
return
Expand All @@ -2115,7 +2115,7 @@ func (o *consumer) processNextMsgReq(_ *subscription, c *client, _ *Account, _,
o.mu.Lock()
sendBatch(&wr)
o.mu.Unlock()
atomic.AddInt64(&js.apiCalls, -1)
atomic.AddInt64(&js.apiInflight, -1)
}()
}
}
Expand Down
12 changes: 11 additions & 1 deletion server/errors.json
Original file line number Diff line number Diff line change
Expand Up @@ -1108,5 +1108,15 @@
"help": "Returned when the delivery subject on a Push Consumer is not a valid NATS Subject",
"url": "",
"deprecates": ""
},
{
"constant": "JSStreamMaxBytesRequired",
"code": 400,
"error_code": 10113,
"description": "account requires a stream config to have max bytes set",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
}
]
]
95 changes: 68 additions & 27 deletions server/events.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2018-2021 The NATS Authors
// Copyright 2018-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -181,6 +181,7 @@ type ClientInfo struct {
RTT time.Duration `json:"rtt,omitempty"`
Server string `json:"server,omitempty"`
Cluster string `json:"cluster,omitempty"`
Alternates []string `json:"alts,omitempty"`
ripienaar marked this conversation as resolved.
Show resolved Hide resolved
Stop *time.Time `json:"stop,omitempty"`
Jwt string `json:"jwt,omitempty"`
IssuerKey string `json:"issuer_key,omitempty"`
Expand Down Expand Up @@ -669,6 +670,14 @@ func (s *Server) sendStatsz(subj string) {
jStat.Config = &c
js.mu.RUnlock()
jStat.Stats = js.usageStats()
// Update our own usage since we do not echo so we will not hear ourselves.
ourNode := string(getHash(s.serverName()))
if v, ok := s.nodeToInfo.Load(ourNode); ok && v != nil {
ni := v.(nodeInfo)
ni.stats = jStat.Stats
s.nodeToInfo.Store(ourNode, ni)
}
// Metagroup info.
if mg := js.getMetaGroup(); mg != nil {
if mg.Leader() {
if ci := s.raftNodeToClusterInfo(mg); ci != nil {
Expand All @@ -695,9 +704,11 @@ func (s *Server) sendStatsz(subj string) {
func (s *Server) heartbeatStatsz() {
if s.sys.stmr != nil {
// Increase after startup to our max.
s.sys.cstatsz *= 4
if s.sys.cstatsz > s.sys.statsz {
s.sys.cstatsz = s.sys.statsz
if s.sys.cstatsz < s.sys.statsz {
s.sys.cstatsz *= 2
if s.sys.cstatsz > s.sys.statsz {
s.sys.cstatsz = s.sys.statsz
}
}
s.sys.stmr.Reset(s.sys.cstatsz)
}
Expand All @@ -714,7 +725,7 @@ func (s *Server) sendStatszUpdate() {
func (s *Server) startStatszTimer() {
// We will start by sending out more of these and trail off to the statsz being the max.
s.sys.cstatsz = 250 * time.Millisecond
// Send out the first one after 250ms.
// Send out the first one quickly, we will slowly back off.
s.sys.stmr = time.AfterFunc(s.sys.cstatsz, s.wrapChk(s.heartbeatStatsz))
}

Expand Down Expand Up @@ -1031,10 +1042,10 @@ func (s *Server) processRemoteServerShutdown(sid string) {
})
// Update any state in nodeInfo.
s.nodeToInfo.Range(func(k, v interface{}) bool {
si := v.(nodeInfo)
if si.id == sid {
si.offline = true
s.nodeToInfo.Store(k, si)
ni := v.(nodeInfo)
if ni.id == sid {
ni.offline = true
s.nodeToInfo.Store(k, ni)
return false
}
return true
Expand Down Expand Up @@ -1071,12 +1082,14 @@ func (s *Server) remoteServerShutdown(sub *subscription, c *client, _ *Account,
s.Debugf("Received bad server info for remote server shutdown")
return
}
// Additional processing here.
if !s.sameDomain(si.Domain) {
return
}

// JetStream node updates if applicable.
node := string(getHash(si.Name))
s.nodeToInfo.Store(node, nodeInfo{si.Name, si.Cluster, si.Domain, si.ID, true, true})
if v, ok := s.nodeToInfo.Load(node); ok && v != nil {
ni := v.(nodeInfo)
ni.offline = true
s.nodeToInfo.Store(node, ni)
}

sid := toks[serverSubjectIndex]
if su := s.sys.servers[sid]; su != nil {
Expand All @@ -1095,44 +1108,66 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su
return
}
si := ssm.Server

// JetStream node updates.
if !s.sameDomain(si.Domain) {
return
}

var cfg *JetStreamConfig
var stats *JetStreamStats

if ssm.Stats.JetStream != nil {
cfg = ssm.Stats.JetStream.Config
stats = ssm.Stats.JetStream.Stats
}

node := string(getHash(si.Name))
s.nodeToInfo.Store(node, nodeInfo{si.Name, si.Cluster, si.Domain, si.ID, false, si.JetStream})
s.nodeToInfo.Store(node, nodeInfo{
si.Name,
si.Cluster,
si.Domain,
si.ID,
cfg,
stats,
false, si.JetStream,
})
}

// updateRemoteServer is called when we have an update from a remote server.
// This allows us to track remote servers, respond to shutdown messages properly,
// make sure that messages are ordered, and allow us to prune dead servers.
// Lock should be held upon entry.
func (s *Server) updateRemoteServer(ms *ServerInfo) {
su := s.sys.servers[ms.ID]
func (s *Server) updateRemoteServer(si *ServerInfo) {
su := s.sys.servers[si.ID]
if su == nil {
s.sys.servers[ms.ID] = &serverUpdate{ms.Seq, time.Now()}
s.processNewServer(ms)
s.sys.servers[si.ID] = &serverUpdate{si.Seq, time.Now()}
s.processNewServer(si)
} else {
// Should always be going up.
if ms.Seq <= su.seq {
s.Errorf("Received out of order remote server update from: %q", ms.ID)
if si.Seq <= su.seq {
s.Errorf("Received out of order remote server update from: %q", si.ID)
return
}
su.seq = ms.Seq
su.seq = si.Seq
su.ltime = time.Now()
}
}

// processNewServer will hold any logic we want to use when we discover a new server.
// Lock should be held upon entry.
func (s *Server) processNewServer(ms *ServerInfo) {
func (s *Server) processNewServer(si *ServerInfo) {
// Right now we only check if we have leafnode servers and if so send another
// connect update to make sure they switch this account to interest only mode.
s.ensureGWsInterestOnlyForLeafNodes()

// Add to our nodeToName
if s.sameDomain(ms.Domain) {
node := string(getHash(ms.Name))
s.nodeToInfo.Store(node, nodeInfo{ms.Name, ms.Cluster, ms.Domain, ms.ID, false, ms.JetStream})
if s.sameDomain(si.Domain) {
node := string(getHash(si.Name))
// Only update if non-existent
if _, ok := s.nodeToInfo.Load(node); !ok {
s.nodeToInfo.Store(node, nodeInfo{si.Name, si.Cluster, si.Domain, si.ID, nil, nil, false, si.JetStream})
}
}
// Announce ourselves..
s.sendStatsz(fmt.Sprintf(serverStatsSubj, s.info.ID))
Expand Down Expand Up @@ -1378,9 +1413,15 @@ type ServerAPIConnzResponse struct {

// statszReq is a request for us to respond with current statsz.
func (s *Server) statszReq(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
if !s.EventsEnabled() || reply == _EMPTY_ {
if !s.EventsEnabled() {
return
}

// No reply is a signal that we should use our normal broadcast subject.
if reply == _EMPTY_ {
reply = fmt.Sprintf(serverStatsSubj, s.info.ID)
}

opts := StatszEventOptions{}
if _, msg := c.msgParts(rmsg); len(msg) != 0 {
if err := json.Unmarshal(msg, &opts); err != nil {
Expand Down