Skip to content

Commit

Permalink
remove the connection count on server, only use the metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Keao <yangkeao@chunibyo.icu>
  • Loading branch information
YangKeao committed Mar 21, 2024
1 parent bdc64e5 commit 3ce1491
Show file tree
Hide file tree
Showing 10 changed files with 126 additions and 55 deletions.
7 changes: 7 additions & 0 deletions pkg/executor/simple.go
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/auth"
"github.com/pingcap/tidb/pkg/parser/model"
Expand Down Expand Up @@ -2850,6 +2851,7 @@ func (e *SimpleExec) executeAdminUnsetBDRRole() error {
}

func (e *SimpleExec) executeSetResourceGroupName(s *ast.SetResourceGroupStmt) error {
originalResourceGroup := e.Ctx().GetSessionVars().ResourceGroupName
if s.Name.L != "" {
if _, ok := e.is.ResourceGroupByName(s.Name); !ok {
return infoschema.ErrResourceGroupNotExists.GenWithStackByArgs(s.Name.O)
Expand All @@ -2858,6 +2860,11 @@ func (e *SimpleExec) executeSetResourceGroupName(s *ast.SetResourceGroupStmt) er
} else {
e.Ctx().GetSessionVars().ResourceGroupName = resourcegroup.DefaultResourceGroupName
}
newResourceGroup := e.Ctx().GetSessionVars().ResourceGroupName
if originalResourceGroup != newResourceGroup {
metrics.ConnGauge.WithLabelValues(originalResourceGroup).Dec()
metrics.ConnGauge.WithLabelValues(newResourceGroup).Inc()
}
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/metrics/grafana/tidb_summary.json
Expand Up @@ -163,7 +163,7 @@
"expr": "tidb_server_connections{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "{{instance}}",
"legendFormat": "{{instance}} {{resource_group}}",
"refId": "A"
},
{
Expand Down Expand Up @@ -265,7 +265,7 @@
"intervalFactor": 2,
"legendFormat": "quota-{{instance}}",
"refId": "B"
}
}
],
"thresholds": [ ],
"timeFrom": null,
Expand Down Expand Up @@ -365,7 +365,7 @@
"intervalFactor": 2,
"legendFormat": "quota-{{instance}}",
"refId": "C"
}
}
],
"thresholds": [ ],
"timeFrom": null,
Expand Down
14 changes: 13 additions & 1 deletion pkg/metrics/grafana/tidb_summary.jsonnet
Expand Up @@ -111,7 +111,7 @@ local connectionP = graphPanel.new(
.addTarget(
prometheus.target(
'tidb_server_connections{k8s_cluster="$k8s_cluster", tidb_cluster="$tidb_cluster", instance=~"$instance"}',
legendFormat='{{instance}}',
legendFormat='{{instance}} {{resource_group}}',
)
)
.addTarget(
Expand All @@ -133,6 +133,12 @@ local cpuP = graphPanel.new(
'rate(process_cpu_seconds_total{k8s_cluster="$k8s_cluster", tidb_cluster="$tidb_cluster", instance=~"$instance", job="tidb"}[1m])',
legendFormat='{{instance}}',
)
)
.addTarget(
prometheus.target(
'tidb_server_maxprocs{k8s_cluster="$k8s_cluster", tidb_cluster="$tidb_cluster", instance=~"$instance"}',
legendFormat='quota-{{instance}}',
)
);

local memP = graphPanel.new(
Expand All @@ -153,6 +159,12 @@ local memP = graphPanel.new(
'go_memory_classes_heap_objects_bytes{k8s_cluster="$k8s_cluster", tidb_cluster="$tidb_cluster", instance=~"$instance", job="tidb"} + go_memory_classes_heap_unused_bytes{k8s_cluster="$k8s_cluster", tidb_cluster="$tidb_cluster", instance=~"$instance", job="tidb"}',
legendFormat='HeapInuse-{{instance}}',
)
)
.addTarget(
prometheus.target(
'tidb_server_memory_quota_bytes{k8s_cluster="$k8s_cluster", tidb_cluster="$tidb_cluster", instance=~"$instance", job="tidb"}',
legendFormat='quota-{{instance}}',
)
);

// Query Summary
Expand Down
39 changes: 16 additions & 23 deletions pkg/server/conn.go
Expand Up @@ -361,29 +361,23 @@ func (cc *clientConn) handshake(ctx context.Context) error {
}

func (cc *clientConn) Close() error {
// Be careful, this function should be re-entrant. It might be called more than once for a single connection.
// Any logic which is not idempotent should be in closeConn() and wrapped with `cc.closeOnce.Do`, like decresing
// metrics, releasing resources, etc.
//
// TODO: avoid calling this function multiple times. It's not intuitive that a connection can be closed multiple
// times.
cc.server.rwlock.Lock()
delete(cc.server.clients, cc.connectionID)
resourceGroupName, count := "", 0
if ctx := cc.getCtx(); ctx != nil {
resourceGroupName = ctx.GetSessionVars().ResourceGroupName
count = cc.server.ConnNumByResourceGroup[resourceGroupName]
if count <= 1 {
delete(cc.server.ConnNumByResourceGroup, resourceGroupName)
} else {
cc.server.ConnNumByResourceGroup[resourceGroupName]--
}
}
cc.server.rwlock.Unlock()
return closeConn(cc, resourceGroupName, count)
return closeConn(cc)
}

// closeConn is idempotent and thread-safe.
// It will be called on the same `clientConn` more than once to avoid connection leak.
func closeConn(cc *clientConn, resourceGroupName string, count int) error {
func closeConn(cc *clientConn) error {
var err error
cc.closeOnce.Do(func() {
metrics.ConnGauge.WithLabelValues(resourceGroupName).Set(float64(count))

if cc.connectionID > 0 {
cc.server.dom.ReleaseConnID(cc.connectionID)
cc.connectionID = 0
Expand All @@ -397,8 +391,14 @@ func closeConn(cc *clientConn, resourceGroupName string, count int) error {
}
}
// Close statements and session
// This will release advisory locks, row locks, etc.
// At first, it'll decrese the count of connections in the resource group, update the corresponding gauge.
// Then it'll close the statements and session, which release advisory locks, row locks, etc.
if ctx := cc.getCtx(); ctx != nil {
cc.server.rwlock.Lock()
resourceGroupName := ctx.GetSessionVars().ResourceGroupName
metrics.ConnGauge.WithLabelValues(resourceGroupName).Dec()
cc.server.rwlock.Unlock()

err = ctx.Close()
}
})
Expand All @@ -407,14 +407,7 @@ func closeConn(cc *clientConn, resourceGroupName string, count int) error {

func (cc *clientConn) closeWithoutLock() error {
delete(cc.server.clients, cc.connectionID)
name := cc.getCtx().GetSessionVars().ResourceGroupName
count := cc.server.ConnNumByResourceGroup[name]
if count <= 1 {
delete(cc.server.ConnNumByResourceGroup, name)
} else {
cc.server.ConnNumByResourceGroup[name]--
}
return closeConn(cc, name, count-1)
return closeConn(cc)
}

// writeInitialHandshake sends server version, connection ID, server capability, collation, server status
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/conn_test.go
Expand Up @@ -2079,7 +2079,7 @@ func TestCloseConn(t *testing.T) {
for i := 0; i < numGoroutines; i++ {
go func() {
defer wg.Done()
err := closeConn(cc, "", 1)
err := closeConn(cc)
require.NoError(t, err)
}()
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/internal/testserverclient/BUILD.bazel
Expand Up @@ -8,6 +8,7 @@ go_library(
deps = [
"//pkg/errno",
"//pkg/kv",
"//pkg/metrics",
"//pkg/parser/mysql",
"//pkg/server",
"//pkg/testkit",
Expand All @@ -17,6 +18,7 @@ go_library(
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_log//:log",
"@com_github_prometheus_client_model//go",
"@com_github_stretchr_testify//require",
"@org_uber_go_zap//:zap",
],
Expand Down
66 changes: 66 additions & 0 deletions pkg/server/internal/testserverclient/server_client.go
Expand Up @@ -39,11 +39,13 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
tmysql "github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/server"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/testenv"
"github.com/pingcap/tidb/pkg/util/versioninfo"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -2661,4 +2663,68 @@ func (cli *TestServerClient) RunTestSQLModeIsLoadedBeforeQuery(t *testing.T) {
})
}

func (cli *TestServerClient) RunTestConnectionCount(t *testing.T) {
readConnCount := func(resourceGroupName string) float64 {
metric, err := metrics.ConnGauge.GetMetricWith(map[string]string{
metrics.LblResourceGroup: resourceGroupName,
})
require.NoError(t, err)
output := &dto.Metric{}
metric.Write(output)

return output.GetGauge().GetValue()
}

resourceGroupConnCountReached := func(t *testing.T, resourceGroupName string, expected float64) {
require.Eventually(t, func() bool {
return readConnCount(resourceGroupName) == expected
}, 5*time.Second, 100*time.Millisecond)
}

cli.RunTests(t, nil, func(dbt *testkit.DBTestKit) {
ctx := context.Background()
dbt.GetDB().SetMaxIdleConns(0)

// start 100 connections
conns := make([]*sql.Conn, 100)
for i := 0; i < 100; i++ {
conn, err := dbt.GetDB().Conn(ctx)
require.NoError(t, err)
conns[i] = conn
}
resourceGroupConnCountReached(t, "default", 100.0)

// close 50 connections
for i := 0; i < 50; i++ {
err := conns[i].Close()
require.NoError(t, err)
}
resourceGroupConnCountReached(t, "default", 50.0)

// close 25 connections
for i := 50; i < 75; i++ {
err := conns[i].Close()
require.NoError(t, err)
}
resourceGroupConnCountReached(t, "default", 25.0)

// change the following 25 connections from `default` resource group to `test`
dbt.MustExec("create resource group test RU_PER_SEC = 1000;")
for i := 75; i < 100; i++ {
_, err := conns[i].ExecContext(ctx, "set resource group test")
require.NoError(t, err)
}
resourceGroupConnCountReached(t, "default", 0.0)
resourceGroupConnCountReached(t, "test", 25.0)

// close 25 connections
for i := 75; i < 100; i++ {
err := conns[i].Close()
require.NoError(t, err)
}
resourceGroupConnCountReached(t, "default", 0.0)
resourceGroupConnCountReached(t, "test", 0.0)
})
}

//revive:enable:exported
38 changes: 12 additions & 26 deletions pkg/server/server.go
Expand Up @@ -120,9 +120,8 @@ type Server struct {
socket net.Listener
concurrentLimiter *TokenLimiter

rwlock sync.RWMutex
clients map[uint64]*clientConn
ConnNumByResourceGroup map[string]int
rwlock sync.RWMutex
clients map[uint64]*clientConn

capability uint32
dom *domain.Domain
Expand Down Expand Up @@ -240,15 +239,14 @@ func (s *Server) newConn(conn net.Conn) *clientConn {
// NewServer creates a new Server.
func NewServer(cfg *config.Config, driver IDriver) (*Server, error) {
s := &Server{
cfg: cfg,
driver: driver,
concurrentLimiter: NewTokenLimiter(cfg.TokenLimit),
clients: make(map[uint64]*clientConn),
ConnNumByResourceGroup: make(map[string]int),
internalSessions: make(map[any]struct{}, 100),
health: uatomic.NewBool(false),
inShutdownMode: uatomic.NewBool(false),
printMDLLogTime: time.Now(),
cfg: cfg,
driver: driver,
concurrentLimiter: NewTokenLimiter(cfg.TokenLimit),
clients: make(map[uint64]*clientConn),
internalSessions: make(map[any]struct{}, 100),
health: uatomic.NewBool(false),
inShutdownMode: uatomic.NewBool(false),
printMDLLogTime: time.Now(),
}
s.capability = defaultCapability
setTxnScope()
Expand Down Expand Up @@ -634,27 +632,15 @@ func (s *Server) Close() {
func (s *Server) registerConn(conn *clientConn) bool {
s.rwlock.Lock()
defer s.rwlock.Unlock()
connections := make(map[string]int, 0)
for _, conn := range s.clients {
resourceGroup := conn.getCtx().GetSessionVars().ResourceGroupName
connections[resourceGroup]++
}

logger := logutil.BgLogger()
if s.inShutdownMode.Load() {
logger.Info("close connection directly when shutting down")
for resourceGroupName, count := range s.ConnNumByResourceGroup {
metrics.ConnGauge.WithLabelValues(resourceGroupName).Set(float64(count))
}
terror.Log(closeConn(conn, "", 0))
terror.Log(closeConn(conn))
return false
}
s.clients[conn.connectionID] = conn
s.ConnNumByResourceGroup[conn.getCtx().GetSessionVars().ResourceGroupName]++

for name, count := range s.ConnNumByResourceGroup {
metrics.ConnGauge.WithLabelValues(name).Set(float64(count))
}
metrics.ConnGauge.WithLabelValues(conn.getCtx().GetSessionVars().ResourceGroupName).Inc()
return true
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/server/tests/commontest/BUILD.bazel
Expand Up @@ -8,7 +8,7 @@ go_test(
"tidb_test.go",
],
flaky = True,
shard_count = 48,
shard_count = 49,
deps = [
"//pkg/config",
"//pkg/ddl/util",
Expand Down
5 changes: 5 additions & 0 deletions pkg/server/tests/commontest/tidb_test.go
Expand Up @@ -3077,3 +3077,8 @@ func TestSQLModeIsLoadedBeforeQuery(t *testing.T) {
ts := servertestkit.CreateTidbTestSuite(t)
ts.RunTestSQLModeIsLoadedBeforeQuery(t)
}

func TestConnectionCount(t *testing.T) {
ts := servertestkit.CreateTidbTestSuite(t)
ts.RunTestConnectionCount(t)
}

0 comments on commit 3ce1491

Please sign in to comment.