From 8b7dfe7d74a46b98252dbbe582783cc63a2820b1 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Thu, 20 Jul 2023 15:51:02 -0700 Subject: [PATCH] monitoring: track slow consumers per connection type Signed-off-by: Waldemar Quevedo --- .travis.yml | 2 +- server/client.go | 15 +++++++++++++- server/client_test.go | 10 +++++++++ server/events.go | 8 +++++--- server/gateway_test.go | 35 ++++++++++++++++++++++++++++++++ server/leafnode_test.go | 45 +++++++++++++++++++++++++++++++++++++++++ server/monitor.go | 15 ++++++++++++++ server/routes_test.go | 25 +++++++++++++++++++++++ server/server.go | 29 ++++++++++++++++++++++++++ 9 files changed, 179 insertions(+), 5 deletions(-) diff --git a/.travis.yml b/.travis.yml index 57468b4930..5654f898a9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -41,7 +41,7 @@ jobs: - name: "Run all tests from all other packages" env: TEST_SUITE=non_srv_pkg_tests - name: "Compile with older Go release" - go: 1.18.x + go: 1.19.x env: TEST_SUITE=build_only script: ./scripts/runTestsOnTravis.sh $TEST_SUITE diff --git a/server/client.go b/server/client.go index 3a5d5241c2..c0f9e844a0 100644 --- a/server/client.go +++ b/server/client.go @@ -1723,8 +1723,18 @@ func (c *client) handleWriteTimeout(written, attempted int64, numChunks int) boo return true } - // Slow consumer here.. + // Aggregate slow consumers. atomic.AddInt64(&c.srv.slowConsumers, 1) + switch c.kind { + case CLIENT: + c.srv.scStats.clients.Add(1) + case ROUTER: + c.srv.scStats.routes.Add(1) + case GATEWAY: + c.srv.scStats.gateways.Add(1) + case LEAF: + c.srv.scStats.leafs.Add(1) + } if c.acc != nil { atomic.AddInt64(&c.acc.slowConsumers, 1) } @@ -2224,7 +2234,10 @@ func (c *client) queueOutbound(data []byte) { // Perf wise, it looks like it is faster to optimistically add than // checking current pb+len(data) and then add to pb. c.out.pb -= int64(len(data)) + + // Increment the total and client's slow consumer counters. atomic.AddInt64(&c.srv.slowConsumers, 1) + c.srv.scStats.clients.Add(1) if c.acc != nil { atomic.AddInt64(&c.acc.slowConsumers, 1) } diff --git a/server/client_test.go b/server/client_test.go index 3f488346ae..95051a4f66 100644 --- a/server/client_test.go +++ b/server/client_test.go @@ -2027,8 +2027,18 @@ func TestClientSlowConsumerWithoutConnect(t *testing.T) { if n := atomic.LoadInt64(&s.slowConsumers); n != 1 { return fmt.Errorf("Expected 1 slow consumer, got: %v", n) } + if n := s.scStats.clients.Load(); n != 1 { + return fmt.Errorf("Expected 1 slow consumer, got: %v", n) + } return nil }) + varz, err := s.Varz(nil) + if err != nil { + t.Fatal(err) + } + if varz.SlowConsumersStats.Clients != 1 { + t.Error("Expected a slow consumer client in varz") + } } func TestClientNoSlowConsumerIfConnectExpected(t *testing.T) { diff --git a/server/events.go b/server/events.go index 01ed965da3..75cd9aa0db 100644 --- a/server/events.go +++ b/server/events.go @@ -2043,7 +2043,7 @@ func (s *Server) sendAccConnsUpdate(a *Account, subj ...string) { a.mu.Unlock() } -// Lock shoulc be held on entry +// Lock should be held on entry. func (a *Account) statz() *AccountStat { localConns := a.numLocalConnections() leafConns := a.numLocalLeafNodes() @@ -2055,10 +2055,12 @@ func (a *Account) statz() *AccountStat { NumSubs: a.sl.Count(), Received: DataStats{ Msgs: atomic.LoadInt64(&a.inMsgs), - Bytes: atomic.LoadInt64(&a.inBytes)}, + Bytes: atomic.LoadInt64(&a.inBytes), + }, Sent: DataStats{ Msgs: atomic.LoadInt64(&a.outMsgs), - Bytes: atomic.LoadInt64(&a.outBytes)}, + Bytes: atomic.LoadInt64(&a.outBytes), + }, SlowConsumers: atomic.LoadInt64(&a.slowConsumers), } } diff --git a/server/gateway_test.go b/server/gateway_test.go index ba92179430..d3531e0cc5 100644 --- a/server/gateway_test.go +++ b/server/gateway_test.go @@ -6871,3 +6871,38 @@ func TestGatewaySwitchToInterestOnlyModeImmediately(t *testing.T) { natsFlush(t, nc) checkCount(t, gwcb, 1) } + +func TestGatewaySlowConsumer(t *testing.T) { + gatewayMaxPingInterval = 50 * time.Millisecond + defer func() { gatewayMaxPingInterval = gwMaxPingInterval }() + + ob := testDefaultOptionsForGateway("B") + sb := RunServer(ob) + defer sb.Shutdown() + + oa := testGatewayOptionsFromToWithServers(t, "A", "B", sb) + sa := RunServer(oa) + defer sa.Shutdown() + + waitForInboundGateways(t, sa, 1, 2*time.Second) + waitForOutboundGateways(t, sa, 1, 2*time.Second) + waitForInboundGateways(t, sb, 1, 2*time.Second) + waitForOutboundGateways(t, sb, 1, 2*time.Second) + + c := sa.getOutboundGatewayConnection("B") + c.mu.Lock() + c.out.wdl = time.Nanosecond + c.mu.Unlock() + + <-time.After(250 * time.Millisecond) + got := sa.NumSlowConsumersGateways() + expected := uint64(1) + if got != 1 { + t.Errorf("got: %d, expected: %d", got, expected) + } + got = sb.NumSlowConsumersGateways() + expected = 0 + if got != expected { + t.Errorf("got: %d, expected: %d", got, expected) + } +} diff --git a/server/leafnode_test.go b/server/leafnode_test.go index dddf976474..7ce05bb64d 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -7024,3 +7024,48 @@ func TestLeafNodeSameLocalAccountToMultipleHubs(t *testing.T) { natsPub(t, nch2, "C", []byte("msgC2")) checkNoMsg(subc) } + +func TestLeafNodeSlowConsumer(t *testing.T) { + ao := DefaultOptions() + ao.LeafNode.Host = "127.0.0.1" + ao.LeafNode.Port = -1 + a := RunServer(ao) + defer a.Shutdown() + + c, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", ao.LeafNode.Port)) + if err != nil { + t.Fatalf("Error connecting: %v", err) + } + // Only leafnode slow consumers that made it past connect are tracked + // in the slow consumers counter. + if _, err := c.Write([]byte("CONNECT {}\r\n")); err != nil { + t.Fatalf("Error writing connect: %v", err) + } + if _, err := c.Write([]byte("PING\r\n")); err != nil { + t.Fatalf("Unexpected error writing PING: %v", err) + } + defer c.Close() + // Read info + br := bufio.NewReader(c) + br.ReadLine() + a.mu.Lock() + + checkFor(t, time.Second, 15*time.Millisecond, func() error { + a.grMu.Lock() + defer a.grMu.Unlock() + for _, cli := range a.grTmpClients { + cli.out.wdl = time.Nanosecond + return nil + } + return nil + }) + a.mu.Unlock() + <-time.After(250 * time.Millisecond) + var ( + got = a.NumSlowConsumersLeafs() + expected uint64 = 1 + ) + if got != expected { + t.Errorf("got: %d, expected: %d", got, expected) + } +} diff --git a/server/monitor.go b/server/monitor.go index 1854e1b316..6203cf7b59 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1214,6 +1214,7 @@ type Varz struct { SystemAccount string `json:"system_account,omitempty"` PinnedAccountFail uint64 `json:"pinned_account_fails,omitempty"` OCSPResponseCache OCSPResponseCacheVarz `json:"ocsp_peer_cache,omitempty"` + SlowConsumersStats *SlowConsumersStats `json:"slow_consumer_stats"` } // JetStreamVarz contains basic runtime information about jetstream @@ -1333,6 +1334,14 @@ type OCSPResponseCacheVarz struct { // Currently, there are no options defined. type VarzOptions struct{} +// SlowConsumersStats contains information about the slow consumers from different type of connections. +type SlowConsumersStats struct { + Clients uint64 `json:"clients"` + Routes uint64 `json:"routes"` + Gateways uint64 `json:"gateways"` + Leafs uint64 `json:"leafs"` +} + func myUptime(d time.Duration) string { // Just use total seconds for uptime, and display days / years tsecs := d / time.Second @@ -1689,6 +1698,12 @@ func (s *Server) updateVarzRuntimeFields(v *Varz, forceUpdate bool, pcpu float64 v.OutMsgs = atomic.LoadInt64(&s.outMsgs) v.OutBytes = atomic.LoadInt64(&s.outBytes) v.SlowConsumers = atomic.LoadInt64(&s.slowConsumers) + v.SlowConsumersStats = &SlowConsumersStats{ + Clients: s.NumSlowConsumersClients(), + Routes: s.NumSlowConsumersRoutes(), + Gateways: s.NumSlowConsumersGateways(), + Leafs: s.NumSlowConsumersLeafs(), + } v.PinnedAccountFail = atomic.LoadUint64(&s.pinnedAccFail) // Make sure to reset in case we are re-using. diff --git a/server/routes_test.go b/server/routes_test.go index 62bf2438c4..73218a5d71 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -3800,6 +3800,31 @@ func TestRouteNoLeakOnSlowConsumer(t *testing.T) { } return nil }) + var got, expected int64 + got = s1.NumSlowConsumers() + expected = 1 + if got != expected { + t.Errorf("got: %d, expected: %d", got, expected) + } + got = int64(s1.NumSlowConsumersRoutes()) + if got != expected { + t.Errorf("got: %d, expected: %d", got, expected) + } + got = int64(s1.NumSlowConsumersClients()) + expected = 0 + if got != expected { + t.Errorf("got: %d, expected: %d", got, expected) + } + varz, err := s1.Varz(nil) + if err != nil { + t.Fatal(err) + } + if varz.SlowConsumersStats.Clients != 0 { + t.Error("Expected no slow consumer clients") + } + if varz.SlowConsumersStats.Routes != 1 { + t.Error("Expected a slow consumer route") + } } func TestRouteNoLeakOnAuthTimeout(t *testing.T) { diff --git a/server/server.go b/server/server.go index eaf88027d4..b677eab751 100644 --- a/server/server.go +++ b/server/server.go @@ -122,6 +122,7 @@ type Server struct { // How often user logon fails due to the issuer account not being pinned. pinnedAccFail uint64 stats + scStats mu sync.RWMutex kp nkeys.KeyPair xkp nkeys.KeyPair @@ -338,6 +339,14 @@ type stats struct { slowConsumers int64 } +// scStats includes the total and per connection counters of Slow Consumers. +type scStats struct { + clients atomic.Uint64 + routes atomic.Uint64 + leafs atomic.Uint64 + gateways atomic.Uint64 +} + // This is used by tests so we can run all server tests with a default route // or leafnode compression mode. For instance: // go test -race -v ./server -cluster_compression=fast @@ -3412,6 +3421,26 @@ func (s *Server) NumSlowConsumers() int64 { return atomic.LoadInt64(&s.slowConsumers) } +// NumSlowConsumersClients will report the number of slow consumers clients. +func (s *Server) NumSlowConsumersClients() uint64 { + return s.scStats.clients.Load() +} + +// NumSlowConsumersRoutes will report the number of slow consumers routes. +func (s *Server) NumSlowConsumersRoutes() uint64 { + return s.scStats.routes.Load() +} + +// NumSlowConsumersGateways will report the number of slow consumers leafs. +func (s *Server) NumSlowConsumersGateways() uint64 { + return s.scStats.gateways.Load() +} + +// NumSlowConsumersLeafs will report the number of slow consumers leafs. +func (s *Server) NumSlowConsumersLeafs() uint64 { + return s.scStats.leafs.Load() +} + // ConfigTime will report the last time the server configuration was loaded. func (s *Server) ConfigTime() time.Time { s.mu.RLock()