Skip to content

Commit

Permalink
ddl: manage job lifecycle in worker (#53747)
Browse files Browse the repository at this point in the history
ref #53246
  • Loading branch information
D3Hunter committed Jun 7, 2024
1 parent 9c500ad commit 1870d5f
Show file tree
Hide file tree
Showing 32 changed files with 488 additions and 217 deletions.
2 changes: 2 additions & 0 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ go_library(
"delete_range.go",
"delete_range_util.go",
"dist_owner.go",
"doc.go",
"foreign_key.go",
"generated_column.go",
"index.go",
Expand Down Expand Up @@ -76,6 +77,7 @@ go_library(
"//pkg/ddl/placement",
"//pkg/ddl/resourcegroup",
"//pkg/ddl/syncer",
"//pkg/ddl/systable",
"//pkg/ddl/util",
"//pkg/distsql",
"//pkg/distsql/context",
Expand Down
18 changes: 9 additions & 9 deletions pkg/ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,16 @@ const (
keyRangesOffset
)

func closePDSchedule() error {
func closePDSchedule(ctx context.Context) error {
closeMap := make(map[string]any)
for _, key := range pdScheduleKey {
closeMap[key] = 0
}
return infosync.SetPDScheduleConfig(context.Background(), closeMap)
return infosync.SetPDScheduleConfig(ctx, closeMap)
}

func savePDSchedule(job *model.Job) error {
retValue, err := infosync.GetPDScheduleConfig(context.Background())
func savePDSchedule(ctx context.Context, job *model.Job) error {
retValue, err := infosync.GetPDScheduleConfig(ctx)
if err != nil {
return err
}
Expand All @@ -94,11 +94,11 @@ func savePDSchedule(job *model.Job) error {
return nil
}

func recoverPDSchedule(pdScheduleParam map[string]any) error {
func recoverPDSchedule(ctx context.Context, pdScheduleParam map[string]any) error {
if pdScheduleParam == nil {
return nil
}
return infosync.SetPDScheduleConfig(context.Background(), pdScheduleParam)
return infosync.SetPDScheduleConfig(ctx, pdScheduleParam)
}

func getStoreGlobalMinSafeTS(s kv.Storage) time.Time {
Expand Down Expand Up @@ -228,7 +228,7 @@ func checkAndSetFlashbackClusterInfo(ctx context.Context, se sessionctx.Context,
if err = gcutil.DisableGC(se); err != nil {
return err
}
if err = closePDSchedule(); err != nil {
if err = closePDSchedule(ctx); err != nil {
return err
}
if err = setTiDBEnableAutoAnalyze(ctx, se, variable.Off); err != nil {
Expand Down Expand Up @@ -664,7 +664,7 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
switch job.SchemaState {
// Stage 1, check and set FlashbackClusterJobID, and update job args.
case model.StateNone:
if err = savePDSchedule(job); err != nil {
if err = savePDSchedule(w.ctx, job); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -801,7 +801,7 @@ func finishFlashbackCluster(w *worker, job *model.Job) error {
defer w.sessPool.Put(sess)

err = kv.RunInNewTxn(w.ctx, w.store, true, func(context.Context, kv.Transaction) error {
if err = recoverPDSchedule(pdScheduleValue); err != nil {
if err = recoverPDSchedule(w.ctx, pdScheduleValue); err != nil {
return err
}
if gcEnabled {
Expand Down
3 changes: 1 addition & 2 deletions pkg/ddl/constraint.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package ddl

import (
"context"
"fmt"
"strings"

Expand Down Expand Up @@ -374,7 +373,7 @@ func (w *worker) verifyRemainRecordsForCheckConstraint(dbInfo *model.DBInfo, tab
// can let the check expression restored string as the filter in where clause directly.
// Prepare internal SQL to fetch data from physical table under this filter.
sql := fmt.Sprintf("select 1 from `%s`.`%s` where not %s limit 1", dbInfo.Name.L, tableInfo.Name.L, constr.ExprString)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
ctx := kv.WithInternalSourceType(w.ctx, kv.InternalTxnDDL)
rows, _, err := sctx.GetRestrictedSQLExecutor().ExecRestrictedSQL(ctx, nil, sql)
if err != nil {
return errors.Trace(err)
Expand Down
12 changes: 6 additions & 6 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,7 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
d.wg.Run(func() {
d.limitDDLJobs(d.limitJobChV2, d.addBatchLocalDDLJobs)
})
d.sessPool = sess.NewSessionPool(ctxPool, d.store)
d.sessPool = sess.NewSessionPool(ctxPool)

d.delRangeMgr = d.newDeleteRangeManager(ctxPool == nil)

Expand Down Expand Up @@ -1597,7 +1597,7 @@ type Info struct {
// GetDDLInfoWithNewTxn returns DDL information using a new txn.
func GetDDLInfoWithNewTxn(s sessionctx.Context) (*Info, error) {
se := sess.NewSession(s)
err := se.Begin()
err := se.Begin(context.Background())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1757,7 +1757,7 @@ func processJobs(process func(*sess.Session, *model.Job, model.AdminCommandOpera
idsStr = append(idsStr, strconv.FormatInt(id, 10))
}

err = ns.Begin()
err = ns.Begin(context.Background())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1797,7 +1797,7 @@ func processJobs(process func(*sess.Session, *model.Job, model.AdminCommandOpera
})

// There may be some conflict during the update, try it again
if err = ns.Commit(); err != nil {
if err = ns.Commit(context.Background()); err != nil {
continue
}

Expand Down Expand Up @@ -1848,7 +1848,7 @@ func processAllJobs(process func(*sess.Session, *model.Job, model.AdminCommandOp
var jobErrs = make(map[int64]error)

ns := sess.NewSession(se)
err = ns.Begin()
err = ns.Begin(context.Background())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1894,7 +1894,7 @@ func processAllJobs(process func(*sess.Session, *model.Job, model.AdminCommandOp
jobID = jobIDMax + 1
}

err = ns.Commit()
err = ns.Commit(context.Background())
if err != nil {
return nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/ddl/ddl_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/model"
sessiontypes "github.com/pingcap/tidb/pkg/session/types"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -163,14 +164,14 @@ func TestCreateViewConcurrently(t *testing.T) {
counterErr error
counter int
)
failpoint.EnableCall("github.com/pingcap/tidb/pkg/ddl/onDDLCreateView", func(job *model.Job) {
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/onDDLCreateView", func(job *model.Job) {
counter++
if counter > 1 {
counterErr = fmt.Errorf("create view job should not run concurrently")
return
}
})
failpoint.EnableCall("github.com/pingcap/tidb/pkg/ddl/afterDelivery2Worker", func(job *model.Job) {
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterDelivery2Worker", func(job *model.Job) {
if job.Type == model.ActionCreateView {
counter--
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/ddl/ddl_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ const (
)

// AddHistoryDDLJob record the history job.
func AddHistoryDDLJob(sess *sess.Session, t *meta.Meta, job *model.Job, updateRawArgs bool) error {
err := addHistoryDDLJob2Table(sess, job, updateRawArgs)
func AddHistoryDDLJob(ctx context.Context, sess *sess.Session, t *meta.Meta, job *model.Job, updateRawArgs bool) error {
err := addHistoryDDLJob2Table(ctx, sess, job, updateRawArgs)
if err != nil {
logutil.DDLLogger().Info("failed to add DDL job to history table", zap.Error(err))
}
Expand All @@ -50,12 +50,12 @@ func AddHistoryDDLJob(sess *sess.Session, t *meta.Meta, job *model.Job, updateRa
}

// addHistoryDDLJob2Table adds DDL job to history table.
func addHistoryDDLJob2Table(sess *sess.Session, job *model.Job, updateRawArgs bool) error {
func addHistoryDDLJob2Table(ctx context.Context, sess *sess.Session, job *model.Job, updateRawArgs bool) error {
b, err := job.Encode(updateRawArgs)
if err != nil {
return err
}
_, err = sess.Execute(context.Background(),
_, err = sess.Execute(ctx,
fmt.Sprintf("insert ignore into mysql.tidb_ddl_history(job_id, job_meta, db_name, table_name, schema_ids, table_ids, create_time) values (%d, %s, %s, %s, %s, %s, %v)",
job.ID, util.WrapKey2String(b), strconv.Quote(job.SchemaName), strconv.Quote(job.TableName),
strconv.Quote(strconv.FormatInt(job.SchemaID, 10)),
Expand Down
6 changes: 3 additions & 3 deletions pkg/ddl/ddl_history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ func TestDDLHistoryBasic(t *testing.T) {
newTk := testkit.NewTestKit(t, store)
return newTk.Session(), nil
}, 8, 8, 0)
sessPool := session.NewSessionPool(rs, store)
sessPool := session.NewSessionPool(rs)
sessCtx, err := sessPool.Get()
require.NoError(t, err)
sess := session.NewSession(sessCtx)

ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnLightning)
err = kv.RunInNewTxn(ctx, store, false, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
return ddl.AddHistoryDDLJob(sess, t, &model.Job{
return ddl.AddHistoryDDLJob(context.Background(), sess, t, &model.Job{
ID: 1,
}, false)
})
Expand All @@ -55,7 +55,7 @@ func TestDDLHistoryBasic(t *testing.T) {

err = kv.RunInNewTxn(ctx, store, false, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
return ddl.AddHistoryDDLJob(sess, t, &model.Job{
return ddl.AddHistoryDDLJob(context.Background(), sess, t, &model.Job{
ID: 2,
}, false)
})
Expand Down
30 changes: 14 additions & 16 deletions pkg/ddl/ddl_running_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,40 +53,38 @@ func (j *runningJobs) clear() {
j.unfinishedSchema = make(map[string]map[string]struct{})
}

func (j *runningJobs) add(job *model.Job) {
func (j *runningJobs) add(jobID int64, involvedSchemaInfos []model.InvolvingSchemaInfo) {
j.Lock()
defer j.Unlock()
j.processingIDs[job.ID] = struct{}{}
j.processingIDs[jobID] = struct{}{}
j.updateInternalRunningJobIDs()

if _, ok := j.unfinishedIDs[job.ID]; ok {
if _, ok := j.unfinishedIDs[jobID]; ok {
// Already exists, no need to add it again.
return
}
j.unfinishedIDs[job.ID] = struct{}{}
for _, info := range job.GetInvolvingSchemaInfo() {
j.unfinishedIDs[jobID] = struct{}{}
for _, info := range involvedSchemaInfos {
if _, ok := j.unfinishedSchema[info.Database]; !ok {
j.unfinishedSchema[info.Database] = make(map[string]struct{})
}
j.unfinishedSchema[info.Database][info.Table] = struct{}{}
}
}

func (j *runningJobs) remove(job *model.Job) {
func (j *runningJobs) remove(jobID int64, involvedSchemaInfos []model.InvolvingSchemaInfo) {
j.Lock()
defer j.Unlock()
delete(j.processingIDs, job.ID)
delete(j.processingIDs, jobID)
j.updateInternalRunningJobIDs()

if job.IsFinished() || job.IsSynced() {
delete(j.unfinishedIDs, job.ID)
for _, info := range job.GetInvolvingSchemaInfo() {
if db, ok := j.unfinishedSchema[info.Database]; ok {
delete(db, info.Table)
}
if len(j.unfinishedSchema[info.Database]) == 0 {
delete(j.unfinishedSchema, info.Database)
}
delete(j.unfinishedIDs, jobID)
for _, info := range involvedSchemaInfos {
if db, ok := j.unfinishedSchema[info.Database]; ok {
delete(db, info.Table)
}
if len(j.unfinishedSchema[info.Database]) == 0 {
delete(j.unfinishedSchema, info.Database)
}
}
}
Expand Down
22 changes: 11 additions & 11 deletions pkg/ddl/ddl_running_jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func TestRunningJobs(t *testing.T) {
require.True(t, runnable)
job1 := mkJob(1, "db1.t1", "db1.t2")
job2 := mkJob(2, "db2.t3")
j.add(job1)
j.add(job2)
j.add(job1.ID, job1.GetInvolvingSchemaInfo())
j.add(job2.ID, job2.GetInvolvingSchemaInfo())
require.Equal(t, "1,2", orderedAllIDs(j.allIDs()))
runnable = j.checkRunnable(mkJob(0, "db1.t1"))
require.False(t, runnable)
Expand All @@ -79,37 +79,37 @@ func TestRunningJobs(t *testing.T) {
require.True(t, runnable)

job3 := mkJob(3, "db1.*")
j.add(job3)
j.add(job3.ID, job3.GetInvolvingSchemaInfo())
require.Equal(t, "1,2,3", orderedAllIDs(j.allIDs()))
runnable = j.checkRunnable(mkJob(0, "db1.t100"))
require.False(t, runnable)

job4 := mkJob(4, "db4.")
j.add(job4)
j.add(job4.ID, job4.GetInvolvingSchemaInfo())
require.Equal(t, "1,2,3,4", orderedAllIDs(j.allIDs()))
runnable = j.checkRunnable(mkJob(0, "db4.t100"))
require.True(t, runnable)

job5 := mkJob(5, "*.*")
j.add(job5)
j.add(job5.ID, job5.GetInvolvingSchemaInfo())
require.Equal(t, "1,2,3,4,5", orderedAllIDs(j.allIDs()))
runnable = j.checkRunnable(mkJob(0, "db100.t100"))
require.False(t, runnable)

job5.State = model.JobStateDone
j.remove(job5)
j.remove(job5.ID, job5.GetInvolvingSchemaInfo())
require.Equal(t, "1,2,3,4", orderedAllIDs(j.allIDs()))
runnable = j.checkRunnable(mkJob(0, "db100.t100"))
require.True(t, runnable)

job3.State = model.JobStateDone
j.remove(job3)
j.remove(job3.ID, job3.GetInvolvingSchemaInfo())
require.Equal(t, "1,2,4", orderedAllIDs(j.allIDs()))
runnable = j.checkRunnable(mkJob(0, "db1.t100"))
require.True(t, runnable)

job1.State = model.JobStateDone
j.remove(job1)
j.remove(job1.ID, job1.GetInvolvingSchemaInfo())
require.Equal(t, "2,4", orderedAllIDs(j.allIDs()))
runnable = j.checkRunnable(mkJob(0, "db1.t1"))
require.True(t, runnable)
Expand All @@ -119,15 +119,15 @@ func TestOwnerRetireThenToBeOwner(t *testing.T) {
j := newRunningJobs()
require.Equal(t, "", j.allIDs())
job := mkJob(1, "test.t1")
j.add(job)
j.add(job.ID, job.GetInvolvingSchemaInfo())
require.False(t, j.checkRunnable(job))
// retire
j.clear()
// to be owner, try to start a new job.
require.False(t, j.checkRunnable(job))
// previous job removed.
j.remove(job)
j.remove(job.ID, job.GetInvolvingSchemaInfo())
require.True(t, j.checkRunnable(job))
j.add(job)
j.add(job.ID, job.GetInvolvingSchemaInfo())
require.False(t, j.checkRunnable(job))
}
Loading

0 comments on commit 1870d5f

Please sign in to comment.