Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track slow consumers per connection type #4330

Merged
merged 1 commit into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
- name: "Run all tests from all other packages"
env: TEST_SUITE=non_srv_pkg_tests
- name: "Compile with older Go release"
go: 1.18.x
go: 1.19.x
env: TEST_SUITE=build_only

script: ./scripts/runTestsOnTravis.sh $TEST_SUITE
Expand Down
15 changes: 14 additions & 1 deletion server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1723,8 +1723,18 @@ func (c *client) handleWriteTimeout(written, attempted int64, numChunks int) boo
return true
}

// Slow consumer here..
// Aggregate slow consumers.
atomic.AddInt64(&c.srv.slowConsumers, 1)
switch c.kind {
case CLIENT:
c.srv.scStats.clients.Add(1)
case ROUTER:
c.srv.scStats.routes.Add(1)
case GATEWAY:
c.srv.scStats.gateways.Add(1)
case LEAF:
c.srv.scStats.leafs.Add(1)
}
if c.acc != nil {
atomic.AddInt64(&c.acc.slowConsumers, 1)
}
Expand Down Expand Up @@ -2224,7 +2234,10 @@ func (c *client) queueOutbound(data []byte) {
// Perf wise, it looks like it is faster to optimistically add than
// checking current pb+len(data) and then add to pb.
c.out.pb -= int64(len(data))

// Increment the total and client's slow consumer counters.
atomic.AddInt64(&c.srv.slowConsumers, 1)
c.srv.scStats.clients.Add(1)
if c.acc != nil {
atomic.AddInt64(&c.acc.slowConsumers, 1)
}
Expand Down
10 changes: 10 additions & 0 deletions server/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2027,8 +2027,18 @@ func TestClientSlowConsumerWithoutConnect(t *testing.T) {
if n := atomic.LoadInt64(&s.slowConsumers); n != 1 {
return fmt.Errorf("Expected 1 slow consumer, got: %v", n)
}
if n := s.scStats.clients.Load(); n != 1 {
return fmt.Errorf("Expected 1 slow consumer, got: %v", n)
}
return nil
})
varz, err := s.Varz(nil)
if err != nil {
t.Fatal(err)
}
if varz.SlowConsumersStats.Clients != 1 {
t.Error("Expected a slow consumer client in varz")
}
}

func TestClientNoSlowConsumerIfConnectExpected(t *testing.T) {
Expand Down
8 changes: 5 additions & 3 deletions server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -2043,7 +2043,7 @@ func (s *Server) sendAccConnsUpdate(a *Account, subj ...string) {
a.mu.Unlock()
}

// Lock shoulc be held on entry
// Lock should be held on entry.
func (a *Account) statz() *AccountStat {
localConns := a.numLocalConnections()
leafConns := a.numLocalLeafNodes()
Expand All @@ -2055,10 +2055,12 @@ func (a *Account) statz() *AccountStat {
NumSubs: a.sl.Count(),
Received: DataStats{
Msgs: atomic.LoadInt64(&a.inMsgs),
Bytes: atomic.LoadInt64(&a.inBytes)},
Bytes: atomic.LoadInt64(&a.inBytes),
},
Sent: DataStats{
Msgs: atomic.LoadInt64(&a.outMsgs),
Bytes: atomic.LoadInt64(&a.outBytes)},
Bytes: atomic.LoadInt64(&a.outBytes),
},
SlowConsumers: atomic.LoadInt64(&a.slowConsumers),
}
}
Expand Down
35 changes: 35 additions & 0 deletions server/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6871,3 +6871,38 @@ func TestGatewaySwitchToInterestOnlyModeImmediately(t *testing.T) {
natsFlush(t, nc)
checkCount(t, gwcb, 1)
}

func TestGatewaySlowConsumer(t *testing.T) {
gatewayMaxPingInterval = 50 * time.Millisecond
defer func() { gatewayMaxPingInterval = gwMaxPingInterval }()

ob := testDefaultOptionsForGateway("B")
sb := RunServer(ob)
defer sb.Shutdown()

oa := testGatewayOptionsFromToWithServers(t, "A", "B", sb)
sa := RunServer(oa)
defer sa.Shutdown()

waitForInboundGateways(t, sa, 1, 2*time.Second)
waitForOutboundGateways(t, sa, 1, 2*time.Second)
waitForInboundGateways(t, sb, 1, 2*time.Second)
waitForOutboundGateways(t, sb, 1, 2*time.Second)

c := sa.getOutboundGatewayConnection("B")
c.mu.Lock()
c.out.wdl = time.Nanosecond
c.mu.Unlock()

<-time.After(250 * time.Millisecond)
got := sa.NumSlowConsumersGateways()
expected := uint64(1)
if got != 1 {
t.Errorf("got: %d, expected: %d", got, expected)
}
got = sb.NumSlowConsumersGateways()
expected = 0
if got != expected {
t.Errorf("got: %d, expected: %d", got, expected)
}
}
45 changes: 45 additions & 0 deletions server/leafnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7024,3 +7024,48 @@ func TestLeafNodeSameLocalAccountToMultipleHubs(t *testing.T) {
natsPub(t, nch2, "C", []byte("msgC2"))
checkNoMsg(subc)
}

func TestLeafNodeSlowConsumer(t *testing.T) {
ao := DefaultOptions()
ao.LeafNode.Host = "127.0.0.1"
ao.LeafNode.Port = -1
a := RunServer(ao)
defer a.Shutdown()

c, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", ao.LeafNode.Port))
if err != nil {
t.Fatalf("Error connecting: %v", err)
}
// Only leafnode slow consumers that made it past connect are tracked
// in the slow consumers counter.
if _, err := c.Write([]byte("CONNECT {}\r\n")); err != nil {
t.Fatalf("Error writing connect: %v", err)
}
if _, err := c.Write([]byte("PING\r\n")); err != nil {
t.Fatalf("Unexpected error writing PING: %v", err)
}
defer c.Close()
// Read info
br := bufio.NewReader(c)
br.ReadLine()
a.mu.Lock()

checkFor(t, time.Second, 15*time.Millisecond, func() error {
a.grMu.Lock()
defer a.grMu.Unlock()
for _, cli := range a.grTmpClients {
cli.out.wdl = time.Nanosecond
return nil
}
return nil
})
a.mu.Unlock()
<-time.After(250 * time.Millisecond)
var (
got = a.NumSlowConsumersLeafs()
expected uint64 = 1
)
if got != expected {
t.Errorf("got: %d, expected: %d", got, expected)
}
}
15 changes: 15 additions & 0 deletions server/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1214,6 +1214,7 @@ type Varz struct {
SystemAccount string `json:"system_account,omitempty"`
PinnedAccountFail uint64 `json:"pinned_account_fails,omitempty"`
OCSPResponseCache OCSPResponseCacheVarz `json:"ocsp_peer_cache,omitempty"`
SlowConsumersStats *SlowConsumersStats `json:"slow_consumer_stats"`
}

// JetStreamVarz contains basic runtime information about jetstream
Expand Down Expand Up @@ -1333,6 +1334,14 @@ type OCSPResponseCacheVarz struct {
// Currently, there are no options defined.
type VarzOptions struct{}

// SlowConsumersStats contains information about the slow consumers from different type of connections.
type SlowConsumersStats struct {
Clients uint64 `json:"clients"`
Routes uint64 `json:"routes"`
Gateways uint64 `json:"gateways"`
Leafs uint64 `json:"leafs"`
}

func myUptime(d time.Duration) string {
// Just use total seconds for uptime, and display days / years
tsecs := d / time.Second
Expand Down Expand Up @@ -1689,6 +1698,12 @@ func (s *Server) updateVarzRuntimeFields(v *Varz, forceUpdate bool, pcpu float64
v.OutMsgs = atomic.LoadInt64(&s.outMsgs)
v.OutBytes = atomic.LoadInt64(&s.outBytes)
v.SlowConsumers = atomic.LoadInt64(&s.slowConsumers)
v.SlowConsumersStats = &SlowConsumersStats{
Clients: s.NumSlowConsumersClients(),
Routes: s.NumSlowConsumersRoutes(),
Gateways: s.NumSlowConsumersGateways(),
Leafs: s.NumSlowConsumersLeafs(),
}
v.PinnedAccountFail = atomic.LoadUint64(&s.pinnedAccFail)

// Make sure to reset in case we are re-using.
Expand Down
25 changes: 25 additions & 0 deletions server/routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3800,6 +3800,31 @@ func TestRouteNoLeakOnSlowConsumer(t *testing.T) {
}
return nil
})
var got, expected int64
got = s1.NumSlowConsumers()
expected = 1
if got != expected {
t.Errorf("got: %d, expected: %d", got, expected)
}
got = int64(s1.NumSlowConsumersRoutes())
if got != expected {
t.Errorf("got: %d, expected: %d", got, expected)
}
got = int64(s1.NumSlowConsumersClients())
expected = 0
if got != expected {
t.Errorf("got: %d, expected: %d", got, expected)
}
varz, err := s1.Varz(nil)
if err != nil {
t.Fatal(err)
}
if varz.SlowConsumersStats.Clients != 0 {
t.Error("Expected no slow consumer clients")
}
if varz.SlowConsumersStats.Routes != 1 {
t.Error("Expected a slow consumer route")
}
}

func TestRouteNoLeakOnAuthTimeout(t *testing.T) {
Expand Down
29 changes: 29 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ type Server struct {
// How often user logon fails due to the issuer account not being pinned.
pinnedAccFail uint64
stats
scStats
mu sync.RWMutex
kp nkeys.KeyPair
xkp nkeys.KeyPair
Expand Down Expand Up @@ -338,6 +339,14 @@ type stats struct {
slowConsumers int64
wallyqs marked this conversation as resolved.
Show resolved Hide resolved
}

// scStats includes the total and per connection counters of Slow Consumers.
type scStats struct {
clients atomic.Uint64
routes atomic.Uint64
leafs atomic.Uint64
gateways atomic.Uint64
}

// This is used by tests so we can run all server tests with a default route
// or leafnode compression mode. For instance:
// go test -race -v ./server -cluster_compression=fast
Expand Down Expand Up @@ -3412,6 +3421,26 @@ func (s *Server) NumSlowConsumers() int64 {
return atomic.LoadInt64(&s.slowConsumers)
}

// NumSlowConsumersClients will report the number of slow consumers clients.
func (s *Server) NumSlowConsumersClients() uint64 {
return s.scStats.clients.Load()
}

// NumSlowConsumersRoutes will report the number of slow consumers routes.
func (s *Server) NumSlowConsumersRoutes() uint64 {
return s.scStats.routes.Load()
}

// NumSlowConsumersGateways will report the number of slow consumers leafs.
func (s *Server) NumSlowConsumersGateways() uint64 {
return s.scStats.gateways.Load()
}

// NumSlowConsumersLeafs will report the number of slow consumers leafs.
func (s *Server) NumSlowConsumersLeafs() uint64 {
return s.scStats.leafs.Load()
}

// ConfigTime will report the last time the server configuration was loaded.
func (s *Server) ConfigTime() time.Time {
s.mu.RLock()
Expand Down