Skip to content

Commit

Permalink
*: label external the request source via sessionvar (#44770)
Browse files Browse the repository at this point in the history
ref #44769
  • Loading branch information
nolouch committed Jul 11, 2023
1 parent 295246e commit 3aa21d0
Show file tree
Hide file tree
Showing 19 changed files with 90 additions and 6 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -3871,8 +3871,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:GqsAoNiOFxbCJ8U8Lnts8BvdYd6HDWDsIm/oJY1sIMM=",
version = "v2.0.8-0.20230704071705-c0cf773917d9",
sum = "h1:pLUQsFZGE3z7OlZddP+WHkb85rLoxPwRd8CknfSw804=",
version = "v2.0.8-0.20230707070242-178f6fa01aab",
)

go_repository(
Expand Down
7 changes: 7 additions & 0 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req
}
builder.RequestSource.RequestSourceInternal = sv.InRestrictedSQL
builder.RequestSource.RequestSourceType = sv.RequestSourceType
builder.RequestSource.ExplicitRequestSourceType = sv.ExplicitRequestSourceType
builder.StoreBatchSize = sv.StoreBatchSize
builder.Request.ResourceGroupName = sv.ResourceGroupName
builder.Request.StoreBusyThreshold = sv.LoadBasedReplicaReadThreshold
Expand Down Expand Up @@ -358,6 +359,12 @@ func (builder *RequestBuilder) SetResourceGroupName(name string) *RequestBuilder
return builder
}

// SetExplicitRequestSourceType sets the explicit request source type.
func (builder *RequestBuilder) SetExplicitRequestSourceType(sourceType string) *RequestBuilder {
builder.RequestSource.ExplicitRequestSourceType = sourceType
return builder
}

func (builder *RequestBuilder) verifyTxnScope() error {
txnScope := builder.TxnScope
if txnScope == "" || txnScope == kv.GlobalReplicaScope || builder.is == nil {
Expand Down
1 change: 1 addition & 0 deletions executor/analyze_col.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func (e *AnalyzeColumnsExec) buildResp(ranges []*ranger.Range) (distsql.SelectRe
SetConcurrency(e.concurrency).
SetMemTracker(e.memTracker).
SetResourceGroupName(e.ctx.GetSessionVars().ResourceGroupName).
SetExplicitRequestSourceType(e.ctx.GetSessionVars().ExplicitRequestSourceType).
Build()
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions executor/analyze_idx.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ func (e *AnalyzeIndexExec) fetchAnalyzeResult(ranges []*ranger.Range, isNullRang
SetKeepOrder(true).
SetConcurrency(e.concurrency).
SetResourceGroupName(e.ctx.GetSessionVars().ResourceGroupName).
SetExplicitRequestSourceType(e.ctx.GetSessionVars().ExplicitRequestSourceType).
Build()
if err != nil {
return err
Expand Down
2 changes: 2 additions & 0 deletions executor/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ func (c *checksumContext) buildTableRequest(ctx sessionctx.Context, tableID int6
SetStartTS(c.StartTs).
SetConcurrency(ctx.GetSessionVars().DistSQLScanConcurrency()).
SetResourceGroupName(ctx.GetSessionVars().ResourceGroupName).
SetExplicitRequestSourceType(ctx.GetSessionVars().ExplicitRequestSourceType).
Build()
}

Expand All @@ -266,6 +267,7 @@ func (c *checksumContext) buildIndexRequest(ctx sessionctx.Context, tableID int6
SetStartTS(c.StartTs).
SetConcurrency(ctx.GetSessionVars().DistSQLScanConcurrency()).
SetResourceGroupName(ctx.GetSessionVars().ResourceGroupName).
SetExplicitRequestSourceType(ctx.GetSessionVars().ExplicitRequestSourceType).
Build()
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ require (
github.com/stretchr/testify v1.8.4
github.com/tdakkota/asciicheck v0.2.0
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.8-0.20230704071705-c0cf773917d9
github.com/tikv/client-go/v2 v2.0.8-0.20230707070242-178f6fa01aab
github.com/tikv/pd/client v0.0.0-20230613052906-7158cb319935
github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966
github.com/twmb/murmur3 v1.1.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -977,8 +977,8 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJf
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4=
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/client-go/v2 v2.0.8-0.20230704071705-c0cf773917d9 h1:GqsAoNiOFxbCJ8U8Lnts8BvdYd6HDWDsIm/oJY1sIMM=
github.com/tikv/client-go/v2 v2.0.8-0.20230704071705-c0cf773917d9/go.mod h1:4KkKqjJgKlvvWMyNqdnAlYFfV4QjEj1fEb5Hb/FoT88=
github.com/tikv/client-go/v2 v2.0.8-0.20230707070242-178f6fa01aab h1:pLUQsFZGE3z7OlZddP+WHkb85rLoxPwRd8CknfSw804=
github.com/tikv/client-go/v2 v2.0.8-0.20230707070242-178f6fa01aab/go.mod h1:4KkKqjJgKlvvWMyNqdnAlYFfV4QjEj1fEb5Hb/FoT88=
github.com/tikv/pd/client v0.0.0-20230613052906-7158cb319935 h1:a5SATBxu/0Z6qNnz4KXDN91gDA06waaYcHM6dkb6lz4=
github.com/tikv/pd/client v0.0.0-20230613052906-7158cb319935/go.mod h1:YmNkj9UT8IjwFov9k3oquH0UgIUHniUaQT3jXKgZYbM=
github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966 h1:quvGphlmUVU+nhpFa4gg4yJyTRJ13reZMDHrKwYw53M=
Expand Down
4 changes: 3 additions & 1 deletion kv/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ const (
RequestSourceInternal
// RequestSourceType set request source type of the current statement.
RequestSourceType
// ExplicitRequestSourceType is a complement of RequestSourceType, it may specified by the client or users.
ExplicitRequestSourceType
// ReplicaReadAdjuster set the adjust function of cop requsts.
ReplicaReadAdjuster
// ScanBatchSize set the iter scan batch size.
Expand Down Expand Up @@ -188,7 +190,7 @@ const (
// Do not classify different tools by now.
InternalTxnTools = "tools"
// InternalTxnBR is the type of BR usage.
InternalTxnBR = InternalTxnTools
InternalTxnBR = "br"
// InternalTxnTrace handles the trace statement.
InternalTxnTrace = "Trace"
// InternalTxnTTL is the type of TTL usage
Expand Down
1 change: 1 addition & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,7 @@ func (s *session) doCommit(ctx context.Context) error {
s.txn.SetOption(kv.EnableAsyncCommit, sessVars.EnableAsyncCommit)
s.txn.SetOption(kv.Enable1PC, sessVars.Enable1PC)
s.txn.SetOption(kv.ResourceGroupTagger, sessVars.StmtCtx.GetResourceGroupTagger())
s.txn.SetOption(kv.ExplicitRequestSourceType, sessVars.ExplicitRequestSourceType)
if sessVars.StmtCtx.KvExecCounter != nil {
// Bind an interceptor for client-go to count the number of SQL executions of each TiKV.
s.txn.SetOption(kv.RPCInterceptor, sessVars.StmtCtx.KvExecCounter.RPCInterceptor())
Expand Down
4 changes: 4 additions & 0 deletions session/sessiontest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,12 @@ go_test(
"//util/memory",
"//util/sqlexec",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/coprocessor",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@com_github_tikv_client_go_v2//tikvrpc/interceptor",
"@org_uber_go_goleak//:goleak",
],
)
35 changes: 35 additions & 0 deletions session/sessiontest/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/expression"
Expand All @@ -48,6 +50,8 @@ import (
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/tikvrpc/interceptor"
)

func TestSchemaCheckerSQL(t *testing.T) {
Expand Down Expand Up @@ -2434,3 +2438,34 @@ func TestSQLModeOp(t *testing.T) {
a = mysql.SetSQLMode(s, mysql.ModeAllowInvalidDates)
require.Equal(t, mysql.ModeNoBackslashEscapes|mysql.ModeOnlyFullGroupBy|mysql.ModeAllowInvalidDates, a)
}

func TestRequestSource(t *testing.T) {
store := testkit.CreateMockStore(t, mockstore.WithStoreType(mockstore.MockTiKV))
tk := testkit.NewTestKit(t, store)
withCheckInterceptor := func(source string) interceptor.RPCInterceptor {
return interceptor.NewRPCInterceptor("kv-request-source-verify", func(next interceptor.RPCInterceptorFunc) interceptor.RPCInterceptorFunc {
return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
requestSource := ""
switch r := req.Req.(type) {
case *kvrpcpb.PrewriteRequest:
requestSource = r.GetContext().GetRequestSource()
case *kvrpcpb.CommitRequest:
requestSource = r.GetContext().GetRequestSource()
case *coprocessor.Request:
requestSource = r.GetContext().GetRequestSource()
}
require.Equal(t, source, requestSource)
return next(target, req)
}
})
}
ctx := context.Background()
tk.MustExecWithContext(ctx, "use test")
tk.MustExecWithContext(ctx, "create table t(a int primary key, b int)")
tk.MustExecWithContext(ctx, "set @@tidb_request_source_type = 'lightning'")
tk.MustQueryWithContext(ctx, "select @@tidb_request_source_type").Check(testkit.Rows("lightning"))
insertCtx := interceptor.WithRPCInterceptor(context.Background(), withCheckInterceptor("external_Insert_lightning"))
tk.MustExecWithContext(insertCtx, "insert into t values(1, 1)")
selectCtx := interceptor.WithRPCInterceptor(context.Background(), withCheckInterceptor("external_Select_lightning"))
tk.MustExecWithContext(selectCtx, "select count(*) from t;")
}
12 changes: 12 additions & 0 deletions sessionctx/sessionstates/session_states_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,18 @@ func TestSessionCtx(t *testing.T) {
`└─TableFullScan_4 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo`))
},
},
{
// check request source
setFunc: func(tk *testkit.TestKit) any {
tk.MustExec(`set @@tidb_request_source_type="lightning"`)
require.Equal(t, "lightning", tk.Session().GetSessionVars().ExplicitRequestSourceType)
return nil
},
checkFunc: func(tk *testkit.TestKit, param any) {
tk.MustExec(`select count(*) from test.t1`)
tk.MustQuery(`select @@tidb_request_source_type`).Check(testkit.Rows("lightning"))
},
},
}

for _, tt := range tests {
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ go_library(
"@com_github_tikv_client_go_v2//kv",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//util",
"@com_github_twmb_murmur3//:murmur3",
"@org_golang_x_exp//maps",
"@org_golang_x_exp//slices",
Expand Down
2 changes: 2 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1324,6 +1324,8 @@ type SessionVars struct {

// RequestSourceType is the type of inner request.
RequestSourceType string
// ExplicitRequestSourceType is the type of origin external request.
ExplicitRequestSourceType string

// MemoryDebugModeMinHeapInUse indicated the minimum heapInUse threshold that triggers the memoryDebugMode.
MemoryDebugModeMinHeapInUse int64
Expand Down
7 changes: 7 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"github.com/pingcap/tidb/util/versioninfo"
tikvcfg "github.com/tikv/client-go/v2/config"
tikvstore "github.com/tikv/client-go/v2/kv"
tikvcliutil "github.com/tikv/client-go/v2/util"
atomic2 "go.uber.org/atomic"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -385,6 +386,12 @@ var defaultSysVars = []*SysVar{
{Scope: ScopeSession, Name: TiDBUseAlloc, Value: BoolToOnOff(DefTiDBUseAlloc), Type: TypeBool, ReadOnly: true, GetSession: func(s *SessionVars) (string, error) {
return BoolToOnOff(s.preUseChunkAlloc), nil
}},
{Scope: ScopeSession, Name: TiDBExplicitRequestSourceType, Value: "", Type: TypeEnum, PossibleValues: tikvcliutil.ExplicitTypeList, GetSession: func(s *SessionVars) (string, error) {
return s.ExplicitRequestSourceType, nil
}, SetSession: func(s *SessionVars, val string) error {
s.ExplicitRequestSourceType = val
return nil
}},
/* The system variables below have INSTANCE scope */
{Scope: ScopeInstance, Name: TiDBLogFileMaxDays, Value: strconv.Itoa(config.GetGlobalConfig().Log.File.MaxDays), Type: TypeInt, MinValue: 0, MaxValue: math.MaxInt32, SetGlobal: func(_ context.Context, s *SessionVars, val string) error {
maxAge, err := strconv.ParseInt(val, 10, 32)
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,10 @@ const (

// TiDBUseAlloc indicates whether the last statement used chunk alloc
TiDBUseAlloc = "last_sql_use_alloc"

// TiDBExplicitRequestSourceType indicates the source of the request, it's a complement of RequestSourceType.
// The value maybe "lightning", "br", "dumpling" etc.
TiDBExplicitRequestSourceType = "tidb_request_source_type"
)

// TiDB system variable names that both in session and global scope.
Expand Down
1 change: 1 addition & 0 deletions store/copr/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ go_library(
"@com_github_tikv_client_go_v2//metrics",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@com_github_tikv_client_go_v2//tikvrpc/interceptor",
"@com_github_tikv_client_go_v2//txnkv/txnlock",
"@com_github_tikv_client_go_v2//txnkv/txnsnapshot",
"@com_github_tikv_client_go_v2//util",
Expand Down
2 changes: 2 additions & 0 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/tikvrpc/interceptor"
"github.com/tikv/client-go/v2/txnkv/txnlock"
"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
"github.com/tikv/client-go/v2/util"
Expand Down Expand Up @@ -88,6 +89,7 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa
}
ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs)
ctx = context.WithValue(ctx, util.RequestSourceKey, req.RequestSource)
ctx = interceptor.WithRPCInterceptor(ctx, interceptor.GetRPCInterceptorFromCtx(ctx))
enabledRateLimitAction := option.EnabledRateLimitAction
sessionMemTracker := option.SessionMemTracker
it, errRes := c.BuildCopIterator(ctx, req, vars, option)
Expand Down
2 changes: 2 additions & 0 deletions store/driver/txn/txn_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) {
txn.KVTxn.SetRequestSourceInternal(val.(bool))
case kv.RequestSourceType:
txn.KVTxn.SetRequestSourceType(val.(string))
case kv.ExplicitRequestSourceType:
txn.KVTxn.SetExplicitRequestSourceType(val.(string))
case kv.ReplicaReadAdjuster:
txn.KVTxn.GetSnapshot().SetReplicaReadAdjuster(val.(txnkv.ReplicaReadAdjuster))
case kv.TxnSource:
Expand Down

0 comments on commit 3aa21d0

Please sign in to comment.