Skip to content

Commit

Permalink
ddl: set context correctly in the setDDLLabelForDiagnosis (#40090) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot committed Aug 17, 2023
1 parent 1420ca4 commit 7c1e445
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 36 deletions.
6 changes: 5 additions & 1 deletion ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ type reorgBackfillTask struct {
startKey kv.Key
endKey kv.Key
endInclude bool
source string
}

func (r *reorgBackfillTask) excludedEndKey() kv.Key {
Expand Down Expand Up @@ -500,6 +501,7 @@ func (dc *ddlCtx) handleRangeTasks(scheduler *backfillScheduler, t table.Table,
}
// Build reorg tasks.
job := reorgInfo.Job
source := getDDLRequestSource(job)
for i, keyRange := range kvRanges {
endKey := keyRange.EndKey
endK, err := getRangeEndKey(scheduler.jobCtx, dc.store, job.Priority, prefix, keyRange.StartKey, endKey)
Expand All @@ -517,7 +519,9 @@ func (dc *ddlCtx) handleRangeTasks(scheduler *backfillScheduler, t table.Table,
startKey: keyRange.StartKey,
endKey: endKey,
// If the boundaries overlap, we should ignore the preceding endKey.
endInclude: endK.Cmp(keyRange.EndKey) != 0 || i == len(kvRanges)-1}
endInclude: endK.Cmp(keyRange.EndKey) != 0 || i == len(kvRanges)-1,
source: source,
}
batchTasks = append(batchTasks, task)

if len(batchTasks) >= backfillTaskChanSize {
Expand Down
8 changes: 6 additions & 2 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,9 +439,9 @@ func (dc *ddlCtx) setDDLSourceForDiagnosis(job *model.Job) {
ctx, exists := dc.jobCtx.jobCtxMap[job.ID]
if !exists {
ctx = NewJobContext()
ctx.setDDLLabelForDiagnosis(job)
dc.jobCtx.jobCtxMap[job.ID] = ctx
}
ctx.setDDLLabelForDiagnosis(job)
}

func (dc *ddlCtx) getResourceGroupTaggerForTopSQL(job *model.Job) tikvrpc.ResourceGroupTagger {
Expand Down Expand Up @@ -1786,7 +1786,11 @@ func (s *session) execute(ctx context.Context, query string, label string) ([]ch
defer func() {
metrics.DDLJobTableDuration.WithLabelValues(label + "-" + metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
}()
rs, err := s.Context.(sqlexec.SQLExecutor).ExecuteInternal(kv.WithInternalSourceType(ctx, kv.InternalTxnDDL), query)

if ctx.Value(kv.RequestSourceKey) == nil {
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnDDL)
}
rs, err := s.Context.(sqlexec.SQLExecutor).ExecuteInternal(ctx, query)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
5 changes: 4 additions & 1 deletion ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func NewJobContext() *JobContext {
cacheSQL: "",
cacheNormalizedSQL: "",
cacheDigest: nil,
tp: "unknown",
tp: "",
}
}

Expand Down Expand Up @@ -774,6 +774,9 @@ func getDDLRequestSource(job *model.Job) string {
}

func (w *JobContext) setDDLLabelForDiagnosis(job *model.Job) {
if w.tp != "" {
return
}
w.tp = getDDLRequestSource(job)
w.ddlJobCtx = kv.WithInternalSourceType(w.ddlJobCtx, w.ddlJobSourceType())
}
Expand Down
9 changes: 8 additions & 1 deletion ddl/index_cop.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ func (c *copReqSender) run() {
p.resultsCh <- idxRecResult{id: task.id, err: err}
return
}
rs, err := p.copCtx.buildTableScan(p.ctx, ver.Ver, task.startKey, task.excludedEndKey())
ctx := kv.WithInternalSourceType(p.ctx, task.source)
rs, err := p.copCtx.buildTableScan(ctx, ver.Ver, task.startKey, task.excludedEndKey())
if err != nil {
p.resultsCh <- idxRecResult{id: task.id, err: err}
return
Expand Down Expand Up @@ -422,6 +423,12 @@ func (c *copContext) buildTableScan(ctx context.Context, startTS uint64, start,
SetFromInfoSchema(c.sessCtx.GetDomainInfoSchema()).
SetConcurrency(1).
Build()
builder.RequestSource.RequestSourceInternal = true
if source := ctx.Value(kv.RequestSourceKey); source != nil {
builder.RequestSource.RequestSourceType = source.(kv.RequestSource).RequestSourceType
} else {
builder.RequestSource.RequestSourceType = kv.InternalTxnDDL
}
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,8 @@ func updateDDLJob2Table(sctx *session, job *model.Job, updateRawArgs bool) error
// getDDLReorgHandle gets DDL reorg handle.
func getDDLReorgHandle(sess *session, job *model.Job) (element *meta.Element, startKey, endKey kv.Key, physicalTableID int64, err error) {
sql := fmt.Sprintf("select ele_id, ele_type, start_key, end_key, physical_id from mysql.tidb_ddl_reorg where job_id = %d", job.ID)
rows, err := sess.execute(context.Background(), sql, "get_handle")
ctx := kv.WithInternalSourceType(context.Background(), getDDLRequestSource(job))
rows, err := sess.execute(ctx, sql, "get_handle")
if err != nil {
return nil, nil, nil, 0, err
}
Expand Down
4 changes: 2 additions & 2 deletions executor/simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ import (
"testing"

"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/auth"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/server"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/util"
"github.com/stretchr/testify/require"
tikvutil "github.com/tikv/client-go/v2/util"
)

func TestKillStmt(t *testing.T) {
Expand Down Expand Up @@ -86,7 +86,7 @@ func TestKillStmt(t *testing.T) {
func TestUserAttributes(t *testing.T) {
store, _ := testkit.CreateMockStoreAndDomain(t)
rootTK := testkit.NewTestKit(t, store)
ctx := context.WithValue(context.Background(), tikvutil.RequestSourceKey, tikvutil.RequestSource{RequestSourceInternal: true})
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnPrivilege)

// https://dev.mysql.com/doc/refman/8.0/en/create-user.html#create-user-comments-attributes
rootTK.MustExec(`CREATE USER testuser COMMENT '1234'`)
Expand Down
28 changes: 15 additions & 13 deletions kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,20 +195,22 @@ func BackOff(attempts uint) int {
func setRequestSourceForInnerTxn(ctx context.Context, txn Transaction) {
if source := ctx.Value(RequestSourceKey); source != nil {
requestSource := source.(RequestSource)
if !requestSource.RequestSourceInternal {
logutil.Logger(ctx).Warn("`RunInNewTxn` should be used by inner txn only")
if requestSource.RequestSourceType != "" {
if !requestSource.RequestSourceInternal {
logutil.Logger(ctx).Warn("`RunInNewTxn` should be used by inner txn only")
}
txn.SetOption(RequestSourceInternal, requestSource.RequestSourceInternal)
txn.SetOption(RequestSourceType, requestSource.RequestSourceType)
return
}
txn.SetOption(RequestSourceInternal, requestSource.RequestSourceInternal)
txn.SetOption(RequestSourceType, requestSource.RequestSourceType)
}
// panic in test mode in case there are requests without source in the future.
// log warnings in production mode.
if flag.Lookup("test.v") != nil || flag.Lookup("check.v") != nil {
panic("unexpected no source type context, if you see this error, " +
"the `RequestSourceTypeKey` is missing in your context")
} else {
// panic in test mode in case there are requests without source in the future.
// log warnings in production mode.
if flag.Lookup("test.v") != nil || flag.Lookup("check.v") != nil {
panic("unexpected no source type context, if you see this error, " +
"the `RequestSourceTypeKey` is missing in your context")
} else {
logutil.Logger(ctx).Warn("unexpected no source type context, if you see this warning, " +
"the `RequestSourceTypeKey` is missing in the context")
}
logutil.Logger(ctx).Warn("unexpected no source type context, if you see this warning, " +
"the `RequestSourceTypeKey` is missing in the context")
}
}
33 changes: 18 additions & 15 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -4130,23 +4130,26 @@ func (s *session) setRequestSource(ctx context.Context, stmtLabel string, stmtNo
} else {
s.sessionVars.RequestSourceType = stmtLabel
}
} else {
if source := ctx.Value(kv.RequestSourceKey); source != nil {
s.sessionVars.RequestSourceType = source.(kv.RequestSource).RequestSourceType
} else {
// panic in test mode in case there are requests without source in the future.
// log warnings in production mode.
if flag.Lookup("test.v") != nil || flag.Lookup("check.v") != nil {
panic("unexpected no source type context, if you see this error, " +
"the `RequestSourceTypeKey` is missing in your context")
} else {
logutil.Logger(ctx).Warn("unexpected no source type context, if you see this warning, "+
"the `RequestSourceTypeKey` is missing in the context",
zap.Bool("internal", s.isInternal()),
zap.String("sql", stmtNode.Text()))
}
return
}
if source := ctx.Value(kv.RequestSourceKey); source != nil {
requestSource := source.(kv.RequestSource)
if requestSource.RequestSourceType != "" {
s.sessionVars.RequestSourceType = requestSource.RequestSourceType
return
}
}
// panic in test mode in case there are requests without source in the future.
// log warnings in production mode.
if flag.Lookup("test.v") != nil || flag.Lookup("check.v") != nil {
panic("unexpected no source type context, if you see this error, " +
"the `RequestSourceTypeKey` is missing in your context")
} else {
logutil.Logger(ctx).Warn("unexpected no source type context, if you see this warning, "+
"the `RequestSourceTypeKey` is missing in the context",
zap.Bool("internal", s.isInternal()),
zap.String("sql", stmtNode.Text()))
}
}

// RemoveLockDDLJobs removes the DDL jobs which doesn't get the metadata lock from job2ver.
Expand Down

0 comments on commit 7c1e445

Please sign in to comment.