Skip to content

Commit

Permalink
executor: global kill 32bits (local connID part) (pingcap#25385)
Browse files Browse the repository at this point in the history
  • Loading branch information
pingyu committed Jun 6, 2023
1 parent 223564a commit 6ba0501
Show file tree
Hide file tree
Showing 42 changed files with 733 additions and 283 deletions.
1 change: 1 addition & 0 deletions domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ go_library(
"//util/execdetails",
"//util/expensivequery",
"//util/gctuner",
"//util/globalconn",
"//util/intest",
"//util/logutil",
"//util/memory",
Expand Down
2 changes: 0 additions & 2 deletions domain/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ func TestNormalSessionPool(t *testing.T) {
svr, err := server.NewServer(conf, nil)
require.NoError(t, err)
svr.SetDomain(domain)
svr.InitGlobalConnID(domain.ServerID)
info.SetSessionManager(svr)

pool := domain.SysSessionPool()
Expand Down Expand Up @@ -117,7 +116,6 @@ func TestAbnormalSessionPool(t *testing.T) {
svr, err := server.NewServer(conf, nil)
require.NoError(t, err)
svr.SetDomain(domain)
svr.InitGlobalConnID(domain.ServerID)
info.SetSessionManager(svr)

pool := domain.SysSessionPool()
Expand Down
39 changes: 33 additions & 6 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ import (
"github.com/pingcap/tidb/util/etcd"
"github.com/pingcap/tidb/util/expensivequery"
"github.com/pingcap/tidb/util/gctuner"
"github.com/pingcap/tidb/util/globalconn"
"github.com/pingcap/tidb/util/intest"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
Expand Down Expand Up @@ -162,8 +163,10 @@ type Domain struct {
serverID uint64
serverIDSession *concurrency.Session
isLostConnectionToPD atomicutil.Int32 // !0: true, 0: false.
onClose func()
sysExecutorFactory func(*Domain) (pools.Resource, error)
connIDAllocator globalconn.Allocator

onClose func()
sysExecutorFactory func(*Domain) (pools.Resource, error)

sysProcesses SysProcesses

Expand Down Expand Up @@ -1141,6 +1144,8 @@ func (do *Domain) Init(
}

if config.GetGlobalConfig().EnableGlobalKill {
do.connIDAllocator = globalconn.NewGlobalAllocator(do.ServerID)

if do.etcdClient != nil {
err := do.acquireServerID(ctx)
if err != nil {
Expand All @@ -1156,6 +1161,8 @@ func (do *Domain) Init(
// set serverID for standalone deployment to enable 'KILL'.
atomic.StoreUint64(&do.serverID, serverIDForStandalone)
}
} else {
do.connIDAllocator = globalconn.NewSimpleAllocator()
}

// step 1: prepare the info/schema syncer which domain reload needed.
Expand Down Expand Up @@ -1509,6 +1516,7 @@ func (p *sessionPool) Put(resource pools.Resource) {
resource.Close()
}
}

func (p *sessionPool) Close() {
p.mu.Lock()
if p.mu.closed {
Expand Down Expand Up @@ -2066,7 +2074,7 @@ func (do *Domain) StatsHandle() *handle.Handle {

// CreateStatsHandle is used only for test.
func (do *Domain) CreateStatsHandle(ctx, initStatsCtx sessionctx.Context) error {
h, err := handle.NewHandle(ctx, initStatsCtx, do.statsLease, do.sysSessionPool, &do.sysProcesses, do.ServerID)
h, err := handle.NewHandle(ctx, initStatsCtx, do.statsLease, do.sysSessionPool, &do.sysProcesses, do.GetAutoAnalyzeProcID)
if err != nil {
return err
}
Expand Down Expand Up @@ -2142,7 +2150,7 @@ func (do *Domain) LoadAndUpdateStatsLoop(ctxs []sessionctx.Context, initStatsCtx
// It should be called only once in BootstrapSession.
func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) error {
ctx.GetSessionVars().InRestrictedSQL = true
statsHandle, err := handle.NewHandle(ctx, initStatsCtx, do.statsLease, do.sysSessionPool, &do.sysProcesses, do.ServerID)
statsHandle, err := handle.NewHandle(ctx, initStatsCtx, do.statsLease, do.sysSessionPool, &do.sysProcesses, do.GetAutoAnalyzeProcID)
if err != nil {
return err
}
Expand Down Expand Up @@ -2532,12 +2540,31 @@ func (do *Domain) IsLostConnectionToPD() bool {
return do.isLostConnectionToPD.Load() != 0
}

// NextConnID return next connection ID.
func (do *Domain) NextConnID() uint64 {
return do.connIDAllocator.NextID()
}

// ReleaseConnID releases connection ID.
func (do *Domain) ReleaseConnID(connID uint64) {
do.connIDAllocator.Release(connID)
}

// GetAutoAnalyzeProcID returns processID for auto analyze
// TODO: support IDs for concurrent auto-analyze
func (do *Domain) GetAutoAnalyzeProcID() uint64 {
return do.connIDAllocator.GetReservedConnID(reservedConnAnalyze)
}

const (
serverIDEtcdPath = "/tidb/server_id"
refreshServerIDRetryCnt = 3
acquireServerIDRetryInterval = 300 * time.Millisecond
acquireServerIDTimeout = 10 * time.Second
retrieveServerIDSessionTimeout = 10 * time.Second

// reservedConnXXX must be within [0, globalconn.ReservedCount)
reservedConnAnalyze = 0
)

var (
Expand Down Expand Up @@ -2631,8 +2658,8 @@ func (do *Domain) acquireServerID(ctx context.Context) error {
}

for {
// get a random serverID: [1, MaxServerID]
randServerID := rand.Int63n(int64(util.MaxServerID)) + 1 // #nosec G404
// get a random serverID: [1, MaxServerID64]
randServerID := rand.Int63n(int64(globalconn.MaxServerID64)) + 1 // #nosec G404
key := fmt.Sprintf("%s/%v", serverIDEtcdPath, randServerID)
cmp := clientv3.Compare(clientv3.CreateRevision(key), "=", 0)
value := "0"
Expand Down
2 changes: 2 additions & 0 deletions executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ go_library(
"//util/execdetails",
"//util/format",
"//util/gcutil",
"//util/globalconn",
"//util/hack",
"//util/hint",
"//util/intest",
Expand Down Expand Up @@ -428,6 +429,7 @@ go_test(
"//util/disk",
"//util/execdetails",
"//util/gcutil",
"//util/globalconn",
"//util/hack",
"//util/logutil",
"//util/mathutil",
Expand Down
4 changes: 2 additions & 2 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
}
failpoint.Inject("mockKillPendingAnalyzeJob", func() {
dom := domain.GetDomain(e.ctx)
dom.SysProcTracker().KillSysProcess(util.GetAutoAnalyzeProcID(dom.ServerID))
dom.SysProcTracker().KillSysProcess(dom.GetAutoAnalyzeProcID())
})
for _, task := range tasks {
taskCh <- task
Expand All @@ -194,7 +194,7 @@ func (e *AnalyzeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
}
failpoint.Inject("mockKillFinishedAnalyzeJob", func() {
dom := domain.GetDomain(e.ctx)
dom.SysProcTracker().KillSysProcess(util.GetAutoAnalyzeProcID(dom.ServerID))
dom.SysProcTracker().KillSysProcess(dom.GetAutoAnalyzeProcID())
})

// If we enabled dynamic prune mode, then we need to generate global stats here for partition tables.
Expand Down
2 changes: 1 addition & 1 deletion executor/analyze_col.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range, needExtStats boo
for {
failpoint.Inject("mockKillRunningV1AnalyzeJob", func() {
dom := domain.GetDomain(e.ctx)
dom.SysProcTracker().KillSysProcess(util.GetAutoAnalyzeProcID(dom.ServerID))
dom.SysProcTracker().KillSysProcess(dom.GetAutoAnalyzeProcID())
})
if atomic.LoadUint32(&e.ctx.GetSessionVars().Killed) == 1 {
return nil, nil, nil, nil, nil, errors.Trace(exeerrors.ErrQueryInterrupted)
Expand Down
2 changes: 1 addition & 1 deletion executor/analyze_col_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,7 @@ func readDataAndSendTask(ctx sessionctx.Context, handler *tableResultHandler, me
for {
failpoint.Inject("mockKillRunningV2AnalyzeJob", func() {
dom := domain.GetDomain(ctx)
dom.SysProcTracker().KillSysProcess(util.GetAutoAnalyzeProcID(dom.ServerID))
dom.SysProcTracker().KillSysProcess(dom.GetAutoAnalyzeProcID())
})
if atomic.LoadUint32(&ctx.GetSessionVars().Killed) == 1 {
return errors.Trace(exeerrors.ErrQueryInterrupted)
Expand Down
3 changes: 1 addition & 2 deletions executor/analyze_idx.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/dbterror/exeerrors"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/ranger"
Expand Down Expand Up @@ -195,7 +194,7 @@ func (e *AnalyzeIndexExec) buildStatsFromResult(result distsql.SelectResult, nee
for {
failpoint.Inject("mockKillRunningAnalyzeIndexJob", func() {
dom := domain.GetDomain(e.ctx)
dom.SysProcTracker().KillSysProcess(util.GetAutoAnalyzeProcID(dom.ServerID))
dom.SysProcTracker().KillSysProcess(dom.GetAutoAnalyzeProcID())
})
if atomic.LoadUint32(&e.ctx.GetSessionVars().Killed) == 1 {
return nil, nil, nil, nil, errors.Trace(exeerrors.ErrQueryInterrupted)
Expand Down
19 changes: 10 additions & 9 deletions executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/collate"
"github.com/pingcap/tidb/util/dbterror/exeerrors"
"github.com/pingcap/tidb/util/globalconn"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
Expand Down Expand Up @@ -2574,7 +2575,7 @@ func (e *SimpleExec) executeKillStmt(ctx context.Context, s *ast.KillStmt) error
return nil
}

connID, isTruncated, err := util.ParseGlobalConnID(s.ConnectionID)
gcid, isTruncated, err := globalconn.ParseConnID(s.ConnectionID)
if err != nil {
err1 := errors.New("Parse ConnectionID failed: " + err.Error())
e.ctx.GetSessionVars().StmtCtx.AppendWarning(err1)
Expand All @@ -2590,8 +2591,8 @@ func (e *SimpleExec) executeKillStmt(ctx context.Context, s *ast.KillStmt) error
return nil
}

if connID.ServerID != sm.ServerID() {
if err := killRemoteConn(ctx, e.ctx, &connID, s.Query); err != nil {
if gcid.ServerID != sm.ServerID() {
if err := killRemoteConn(ctx, e.ctx, &gcid, s.Query); err != nil {
err1 := errors.New("KILL remote connection failed: " + err.Error())
e.ctx.GetSessionVars().StmtCtx.AppendWarning(err1)
}
Expand All @@ -2602,14 +2603,14 @@ func (e *SimpleExec) executeKillStmt(ctx context.Context, s *ast.KillStmt) error
return nil
}

func killRemoteConn(ctx context.Context, sctx sessionctx.Context, connID *util.GlobalConnID, query bool) error {
if connID.ServerID == 0 {
func killRemoteConn(ctx context.Context, sctx sessionctx.Context, gcid *globalconn.GCID, query bool) error {
if gcid.ServerID == 0 {
return errors.New("Unexpected ZERO ServerID. Please file a bug to the TiDB Team")
}

killExec := &tipb.Executor{
Tp: tipb.ExecType_TypeKill,
Kill: &tipb.Kill{ConnID: connID.ID(), Query: query},
Kill: &tipb.Kill{ConnID: gcid.ToConnID(), Query: query},
}

dagReq := &tipb.DAGRequest{}
Expand All @@ -2628,7 +2629,7 @@ func killRemoteConn(ctx context.Context, sctx sessionctx.Context, connID *util.G
SetFromSessionVars(sctx.GetSessionVars()).
SetFromInfoSchema(sctx.GetInfoSchema()).
SetStoreType(kv.TiDB).
SetTiDBServerID(connID.ServerID).
SetTiDBServerID(gcid.ServerID).
Build()
if err != nil {
return err
Expand All @@ -2639,8 +2640,8 @@ func killRemoteConn(ctx context.Context, sctx sessionctx.Context, connID *util.G
return err
}

logutil.BgLogger().Info("Killed remote connection", zap.Uint64("serverID", connID.ServerID),
zap.Uint64("conn", connID.ID()), zap.Bool("query", query))
logutil.BgLogger().Info("Killed remote connection", zap.Uint64("serverID", gcid.ServerID),
zap.Uint64("conn", gcid.ToConnID()), zap.Bool("query", query))
return err
}

Expand Down
9 changes: 5 additions & 4 deletions executor/simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/server"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/globalconn"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -71,11 +71,12 @@ func TestKillStmt(t *testing.T) {
// excceed int64
tk.MustExec("kill 9223372036854775808") // 9223372036854775808 == 2^63
result = tk.MustQuery("show warnings")
result.Check(testkit.Rows("Warning 1105 Parse ConnectionID failed: Unexpected connectionID excceeds int64"))
result.Check(testkit.Rows("Warning 1105 Parse ConnectionID failed: unexpected connectionID exceeds int64"))

// local kill
killConnID := util.NewGlobalConnID(connID, true)
tk.MustExec("kill " + strconv.FormatUint(killConnID.ID(), 10))
connIDAllocator := globalconn.NewGlobalAllocator(dom.ServerID)
killConnID := connIDAllocator.NextID()
tk.MustExec("kill " + strconv.FormatUint(killConnID, 10))
result = tk.MustQuery("show warnings")
result.Check(testkit.Rows())

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ require (
github.com/lestrrat-go/jwx/v2 v2.0.6
github.com/mgechev/revive v1.3.2
github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7
github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef
github.com/nishanths/predeclared v0.2.2
github.com/opentracing/basictracer-go v1.0.0
github.com/opentracing/opentracing-go v1.2.0
Expand Down Expand Up @@ -225,7 +226,6 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/nbutton23/zxcvbn-go v0.0.0-20210217022336-fa2cb2858354 // indirect
github.com/ncw/directio v1.0.5 // indirect
github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/petermattis/goid v0.0.0-20211229010228-4d14c490ee36 // indirect
Expand Down
2 changes: 1 addition & 1 deletion planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3529,7 +3529,7 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan,
b.visitInfo = appendDynamicVisitInfo(b.visitInfo, "CONNECTION_ADMIN", false, err)
b.visitInfo = appendVisitInfoIsRestrictedUser(b.visitInfo, b.ctx, &auth.UserIdentity{Username: pi.User, Hostname: pi.Host}, "RESTRICTED_CONNECTION_ADMIN")
}
} else if raw.ConnectionID == util2.GetAutoAnalyzeProcID(domain.GetDomain(b.ctx).ServerID) {
} else if raw.ConnectionID == domain.GetDomain(b.ctx).GetAutoAnalyzeProcID() {
// Only the users with SUPER or CONNECTION_ADMIN privilege can kill auto analyze.
err := ErrSpecificAccessDenied.GenWithStackByArgs("SUPER or CONNECTION_ADMIN")
b.visitInfo = appendDynamicVisitInfo(b.visitInfo, "CONNECTION_ADMIN", false, err)
Expand Down
3 changes: 2 additions & 1 deletion server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ const (
func newClientConn(s *Server) *clientConn {
return &clientConn{
server: s,
connectionID: s.globalConnID.NextID(),
connectionID: s.dom.NextConnID(),
collation: mysql.DefaultCollationID,
alloc: arena.NewAllocator(32 * 1024),
chunkAlloc: chunk.NewAllocator(),
Expand Down Expand Up @@ -330,6 +330,7 @@ func (cc *clientConn) Close() error {

func closeConn(cc *clientConn, connections int) error {
metrics.ConnGauge.Set(float64(connections))
cc.server.dom.ReleaseConnID(cc.connectionID)
if cc.bufReadConn != nil {
err := cc.bufReadConn.Close()
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions server/extract_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func TestExtractHandler(t *testing.T) {
require.NoError(t, err)
defer server.Close()

dom, err := session.GetDomain(store)
require.NoError(t, err)
server.SetDomain(dom)

client.port = getPortFromTCPAddr(server.listener.Addr())
client.statusPort = getPortFromTCPAddr(server.statusListener.Addr())
go func() {
Expand All @@ -62,8 +66,6 @@ func TestExtractHandler(t *testing.T) {
prepareData4ExtractPlanTask(t, client)
time.Sleep(time.Second)
endTime := time.Now()
dom, err := session.GetDomain(store)
require.NoError(t, err)
eh := &ExtractTaskServeHandler{extractHandler: dom.GetExtractHandle()}
router := mux.NewRouter()
router.Handle("/extract_task/dump", eh)
Expand Down
1 change: 1 addition & 0 deletions server/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ func (ts *basicHTTPHandlerTestSuite) startServer(t *testing.T) {
ts.port = getPortFromTCPAddr(server.listener.Addr())
ts.statusPort = getPortFromTCPAddr(server.statusListener.Addr())
ts.server = server
ts.server.SetDomain(ts.domain)
go func() {
err := server.Run()
require.NoError(t, err)
Expand Down
4 changes: 4 additions & 0 deletions server/mock_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/auth"
tmysql "github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/util/arena"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/intest"
Expand Down Expand Up @@ -90,6 +91,9 @@ func CreateMockServer(t *testing.T, store kv.Storage) *Server {
cfg.Security.AutoTLS = false
server, err := NewServer(cfg, tidbdrv)
require.NoError(t, err)
dom, err := session.GetDomain(store)
require.NoError(t, err)
server.SetDomain(dom)
return server
}

Expand Down
6 changes: 4 additions & 2 deletions server/optimize_trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ func TestDumpOptimizeTraceAPI(t *testing.T) {
require.NoError(t, err)
defer server.Close()

dom, err := session.GetDomain(store)
require.NoError(t, err)
server.SetDomain(dom)

client.port = getPortFromTCPAddr(server.listener.Addr())
client.statusPort = getPortFromTCPAddr(server.statusListener.Addr())
go func() {
Expand All @@ -58,8 +62,6 @@ func TestDumpOptimizeTraceAPI(t *testing.T) {
}()
client.waitUntilServerOnline()

dom, err := session.GetDomain(store)
require.NoError(t, err)
statsHandler := &StatsHandler{dom}

otHandler := &OptimizeTraceHandler{}
Expand Down

0 comments on commit 6ba0501

Please sign in to comment.