Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

server, metrics: remove the connection count on server, only use the metrics #51996

Merged
merged 1 commit into from Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/executor/simple.go
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/auth"
"github.com/pingcap/tidb/pkg/parser/model"
Expand Down Expand Up @@ -2850,6 +2851,7 @@ func (e *SimpleExec) executeAdminUnsetBDRRole() error {
}

func (e *SimpleExec) executeSetResourceGroupName(s *ast.SetResourceGroupStmt) error {
originalResourceGroup := e.Ctx().GetSessionVars().ResourceGroupName
if s.Name.L != "" {
if _, ok := e.is.ResourceGroupByName(s.Name); !ok {
return infoschema.ErrResourceGroupNotExists.GenWithStackByArgs(s.Name.O)
Expand All @@ -2858,6 +2860,11 @@ func (e *SimpleExec) executeSetResourceGroupName(s *ast.SetResourceGroupStmt) er
} else {
e.Ctx().GetSessionVars().ResourceGroupName = resourcegroup.DefaultResourceGroupName
}
newResourceGroup := e.Ctx().GetSessionVars().ResourceGroupName
if originalResourceGroup != newResourceGroup {
metrics.ConnGauge.WithLabelValues(originalResourceGroup).Dec()
metrics.ConnGauge.WithLabelValues(newResourceGroup).Inc()
}
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/metrics/grafana/tidb_summary.json
Expand Up @@ -163,7 +163,7 @@
"expr": "tidb_server_connections{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "{{instance}}",
"legendFormat": "{{instance}} {{resource_group}}",
"refId": "A"
},
{
Expand Down Expand Up @@ -265,7 +265,7 @@
"intervalFactor": 2,
"legendFormat": "quota-{{instance}}",
"refId": "B"
}
}
],
"thresholds": [ ],
"timeFrom": null,
Expand Down Expand Up @@ -365,7 +365,7 @@
"intervalFactor": 2,
"legendFormat": "quota-{{instance}}",
"refId": "C"
}
}
],
"thresholds": [ ],
"timeFrom": null,
Expand Down
14 changes: 13 additions & 1 deletion pkg/metrics/grafana/tidb_summary.jsonnet
Expand Up @@ -111,7 +111,7 @@ local connectionP = graphPanel.new(
.addTarget(
prometheus.target(
'tidb_server_connections{k8s_cluster="$k8s_cluster", tidb_cluster="$tidb_cluster", instance=~"$instance"}',
legendFormat='{{instance}}',
legendFormat='{{instance}} {{resource_group}}',
)
)
.addTarget(
Expand All @@ -133,6 +133,12 @@ local cpuP = graphPanel.new(
'rate(process_cpu_seconds_total{k8s_cluster="$k8s_cluster", tidb_cluster="$tidb_cluster", instance=~"$instance", job="tidb"}[1m])',
legendFormat='{{instance}}',
)
)
.addTarget(
prometheus.target(
'tidb_server_maxprocs{k8s_cluster="$k8s_cluster", tidb_cluster="$tidb_cluster", instance=~"$instance"}',
legendFormat='quota-{{instance}}',
)
);

local memP = graphPanel.new(
Expand All @@ -153,6 +159,12 @@ local memP = graphPanel.new(
'go_memory_classes_heap_objects_bytes{k8s_cluster="$k8s_cluster", tidb_cluster="$tidb_cluster", instance=~"$instance", job="tidb"} + go_memory_classes_heap_unused_bytes{k8s_cluster="$k8s_cluster", tidb_cluster="$tidb_cluster", instance=~"$instance", job="tidb"}',
legendFormat='HeapInuse-{{instance}}',
)
)
.addTarget(
prometheus.target(
'tidb_server_memory_quota_bytes{k8s_cluster="$k8s_cluster", tidb_cluster="$tidb_cluster", instance=~"$instance", job="tidb"}',
legendFormat='quota-{{instance}}',
)
);

// Query Summary
Expand Down
37 changes: 14 additions & 23 deletions pkg/server/conn.go
Expand Up @@ -361,29 +361,23 @@ func (cc *clientConn) handshake(ctx context.Context) error {
}

func (cc *clientConn) Close() error {
// Be careful, this function should be re-entrant. It might be called more than once for a single connection.
// Any logic which is not idempotent should be in closeConn() and wrapped with `cc.closeOnce.Do`, like decresing
// metrics, releasing resources, etc.
//
// TODO: avoid calling this function multiple times. It's not intuitive that a connection can be closed multiple
// times.
cc.server.rwlock.Lock()
delete(cc.server.clients, cc.connectionID)
resourceGroupName, count := "", 0
if ctx := cc.getCtx(); ctx != nil {
resourceGroupName = ctx.GetSessionVars().ResourceGroupName
count = cc.server.ConnNumByResourceGroup[resourceGroupName]
if count <= 1 {
delete(cc.server.ConnNumByResourceGroup, resourceGroupName)
} else {
cc.server.ConnNumByResourceGroup[resourceGroupName]--
}
}
cc.server.rwlock.Unlock()
return closeConn(cc, resourceGroupName, count)
return closeConn(cc)
}

// closeConn is idempotent and thread-safe.
// It will be called on the same `clientConn` more than once to avoid connection leak.
func closeConn(cc *clientConn, resourceGroupName string, count int) error {
func closeConn(cc *clientConn) error {
var err error
cc.closeOnce.Do(func() {
metrics.ConnGauge.WithLabelValues(resourceGroupName).Set(float64(count))

if cc.connectionID > 0 {
cc.server.dom.ReleaseConnID(cc.connectionID)
cc.connectionID = 0
Expand All @@ -397,8 +391,12 @@ func closeConn(cc *clientConn, resourceGroupName string, count int) error {
}
}
// Close statements and session
// This will release advisory locks, row locks, etc.
// At first, it'll decrese the count of connections in the resource group, update the corresponding gauge.
// Then it'll close the statements and session, which release advisory locks, row locks, etc.
if ctx := cc.getCtx(); ctx != nil {
resourceGroupName := ctx.GetSessionVars().ResourceGroupName
metrics.ConnGauge.WithLabelValues(resourceGroupName).Dec()

err = ctx.Close()
}
})
Expand All @@ -407,14 +405,7 @@ func closeConn(cc *clientConn, resourceGroupName string, count int) error {

func (cc *clientConn) closeWithoutLock() error {
delete(cc.server.clients, cc.connectionID)
name := cc.getCtx().GetSessionVars().ResourceGroupName
count := cc.server.ConnNumByResourceGroup[name]
if count <= 1 {
delete(cc.server.ConnNumByResourceGroup, name)
} else {
cc.server.ConnNumByResourceGroup[name]--
}
return closeConn(cc, name, count-1)
return closeConn(cc)
}

// writeInitialHandshake sends server version, connection ID, server capability, collation, server status
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/conn_test.go
Expand Up @@ -2079,7 +2079,7 @@ func TestCloseConn(t *testing.T) {
for i := 0; i < numGoroutines; i++ {
go func() {
defer wg.Done()
err := closeConn(cc, "", 1)
err := closeConn(cc)
require.NoError(t, err)
}()
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/internal/testserverclient/BUILD.bazel
Expand Up @@ -8,6 +8,7 @@ go_library(
deps = [
"//pkg/errno",
"//pkg/kv",
"//pkg/metrics",
"//pkg/parser/mysql",
"//pkg/server",
"//pkg/testkit",
Expand All @@ -17,6 +18,7 @@ go_library(
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_log//:log",
"@com_github_prometheus_client_model//go",
"@com_github_stretchr_testify//require",
"@org_uber_go_zap//:zap",
],
Expand Down
66 changes: 66 additions & 0 deletions pkg/server/internal/testserverclient/server_client.go
Expand Up @@ -39,11 +39,13 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
tmysql "github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/server"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/testenv"
"github.com/pingcap/tidb/pkg/util/versioninfo"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -2661,4 +2663,68 @@ func (cli *TestServerClient) RunTestSQLModeIsLoadedBeforeQuery(t *testing.T) {
})
}

func (cli *TestServerClient) RunTestConnectionCount(t *testing.T) {
readConnCount := func(resourceGroupName string) float64 {
metric, err := metrics.ConnGauge.GetMetricWith(map[string]string{
metrics.LblResourceGroup: resourceGroupName,
})
require.NoError(t, err)
output := &dto.Metric{}
metric.Write(output)

return output.GetGauge().GetValue()
}

resourceGroupConnCountReached := func(t *testing.T, resourceGroupName string, expected float64) {
require.Eventually(t, func() bool {
return readConnCount(resourceGroupName) == expected
}, 5*time.Second, 100*time.Millisecond)
}

cli.RunTests(t, nil, func(dbt *testkit.DBTestKit) {
ctx := context.Background()
dbt.GetDB().SetMaxIdleConns(0)

// start 100 connections
conns := make([]*sql.Conn, 100)
for i := 0; i < 100; i++ {
conn, err := dbt.GetDB().Conn(ctx)
require.NoError(t, err)
conns[i] = conn
}
resourceGroupConnCountReached(t, "default", 100.0)

// close 50 connections
for i := 0; i < 50; i++ {
err := conns[i].Close()
require.NoError(t, err)
}
resourceGroupConnCountReached(t, "default", 50.0)

// close 25 connections
for i := 50; i < 75; i++ {
err := conns[i].Close()
require.NoError(t, err)
}
resourceGroupConnCountReached(t, "default", 25.0)

// change the following 25 connections from `default` resource group to `test`
dbt.MustExec("create resource group test RU_PER_SEC = 1000;")
for i := 75; i < 100; i++ {
_, err := conns[i].ExecContext(ctx, "set resource group test")
require.NoError(t, err)
}
resourceGroupConnCountReached(t, "default", 0.0)
resourceGroupConnCountReached(t, "test", 25.0)

// close 25 connections
for i := 75; i < 100; i++ {
err := conns[i].Close()
require.NoError(t, err)
}
resourceGroupConnCountReached(t, "default", 0.0)
resourceGroupConnCountReached(t, "test", 0.0)
})
}

//revive:enable:exported
38 changes: 12 additions & 26 deletions pkg/server/server.go
Expand Up @@ -120,9 +120,8 @@ type Server struct {
socket net.Listener
concurrentLimiter *TokenLimiter

rwlock sync.RWMutex
clients map[uint64]*clientConn
ConnNumByResourceGroup map[string]int
rwlock sync.RWMutex
clients map[uint64]*clientConn

capability uint32
dom *domain.Domain
Expand Down Expand Up @@ -240,15 +239,14 @@ func (s *Server) newConn(conn net.Conn) *clientConn {
// NewServer creates a new Server.
func NewServer(cfg *config.Config, driver IDriver) (*Server, error) {
s := &Server{
cfg: cfg,
driver: driver,
concurrentLimiter: NewTokenLimiter(cfg.TokenLimit),
clients: make(map[uint64]*clientConn),
ConnNumByResourceGroup: make(map[string]int),
internalSessions: make(map[any]struct{}, 100),
health: uatomic.NewBool(false),
inShutdownMode: uatomic.NewBool(false),
printMDLLogTime: time.Now(),
cfg: cfg,
driver: driver,
concurrentLimiter: NewTokenLimiter(cfg.TokenLimit),
clients: make(map[uint64]*clientConn),
internalSessions: make(map[any]struct{}, 100),
health: uatomic.NewBool(false),
inShutdownMode: uatomic.NewBool(false),
printMDLLogTime: time.Now(),
}
s.capability = defaultCapability
setTxnScope()
Expand Down Expand Up @@ -634,27 +632,15 @@ func (s *Server) Close() {
func (s *Server) registerConn(conn *clientConn) bool {
s.rwlock.Lock()
defer s.rwlock.Unlock()
connections := make(map[string]int, 0)
for _, conn := range s.clients {
resourceGroup := conn.getCtx().GetSessionVars().ResourceGroupName
connections[resourceGroup]++
}

logger := logutil.BgLogger()
if s.inShutdownMode.Load() {
logger.Info("close connection directly when shutting down")
for resourceGroupName, count := range s.ConnNumByResourceGroup {
metrics.ConnGauge.WithLabelValues(resourceGroupName).Set(float64(count))
}
terror.Log(closeConn(conn, "", 0))
terror.Log(closeConn(conn))
return false
}
s.clients[conn.connectionID] = conn
s.ConnNumByResourceGroup[conn.getCtx().GetSessionVars().ResourceGroupName]++

for name, count := range s.ConnNumByResourceGroup {
metrics.ConnGauge.WithLabelValues(name).Set(float64(count))
}
metrics.ConnGauge.WithLabelValues(conn.getCtx().GetSessionVars().ResourceGroupName).Inc()
return true
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/server/tests/commontest/BUILD.bazel
Expand Up @@ -8,7 +8,7 @@ go_test(
"tidb_test.go",
],
flaky = True,
shard_count = 48,
shard_count = 49,
deps = [
"//pkg/config",
"//pkg/ddl/util",
Expand Down
5 changes: 5 additions & 0 deletions pkg/server/tests/commontest/tidb_test.go
Expand Up @@ -3077,3 +3077,8 @@ func TestSQLModeIsLoadedBeforeQuery(t *testing.T) {
ts := servertestkit.CreateTidbTestSuite(t)
ts.RunTestSQLModeIsLoadedBeforeQuery(t)
}

func TestConnectionCount(t *testing.T) {
ts := servertestkit.CreateTidbTestSuite(t)
ts.RunTestConnectionCount(t)
}