Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: support concurrent ddl #32169

Merged
merged 27 commits into from Jul 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e03352f
1. init ddl tables
xiongjiwei Jun 7, 2022
4fb3c22
2. setup concurrent ddl env and add ddl worker pool
xiongjiwei Jun 7, 2022
12f8486
3. add ddl manager to handle ddl job
xiongjiwei Jul 3, 2022
03ba58e
4. reorg handler for concurrent ddl
xiongjiwei Jul 5, 2022
7d3fd2c
5. manage ddl jobs for concurrent ddl
xiongjiwei Jul 7, 2022
e541bbb
6. add metrics for concurrent ddl
xiongjiwei Jun 8, 2022
3225073
7. support multiple tables
xiongjiwei Jun 30, 2022
f91dedb
8. fix test
xiongjiwei Jul 7, 2022
ad93678
9. migrate ddl between table and queue
xiongjiwei Jul 10, 2022
74aeb59
10. check tikv version and set reorg worker count according cpu count
xiongjiwei Jul 18, 2022
7612504
*: add featuretag on tests
hawkingrei Jul 12, 2022
e6eeb84
use a determined table id for 3 tables
xiongjiwei Jul 18, 2022
f0f6f28
remove ctx value
xiongjiwei Jul 19, 2022
1f17ebc
add GetSchemaVersionWithNonEmptyDiff function
xiongjiwei Jul 19, 2022
c659bed
address tangenta and zimulala comment
xiongjiwei Jul 19, 2022
8e9f646
use only one etcd path
xiongjiwei Jul 20, 2022
2200af8
make ActionRenameTable support multi-schema
xiongjiwei Jul 20, 2022
88d7216
reset sql digest to make top sql work correct
xiongjiwei Jul 20, 2022
f9e93b6
add comment
xiongjiwei Jul 20, 2022
ebdb2c3
fix test
xiongjiwei Jul 20, 2022
d8bfb40
remove 0 for schema version lock
xiongjiwei Jul 20, 2022
3b1c623
Merge branch 'master' into concurrent_ddl_workspace
ti-chi-bot Jul 20, 2022
e542bcc
Merge branch 'master' into concurrent_ddl_workspace
ti-chi-bot Jul 20, 2022
2ff012c
Merge branch 'master' into concurrent_ddl_workspace
wjhuang2016 Jul 20, 2022
501f38b
Merge branch 'master' into concurrent_ddl_workspace
wjhuang2016 Jul 20, 2022
c95797c
Merge branch 'master' into concurrent_ddl_workspace
ti-chi-bot Jul 20, 2022
2e4b1d1
Merge branch 'master' into concurrent_ddl_workspace
ti-chi-bot Jul 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions Makefile
Expand Up @@ -438,6 +438,9 @@ bazel_coverage_test: failpoint-enable bazel_ci_prepare
bazel --output_user_root=/home/jenkins/.tidb/tmp coverage --config=ci --@io_bazel_rules_go//go/config:cover_format=go_cover \
-- //... -//cmd/... -//tests/graceshutdown/... \
-//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test
bazel --output_user_root=/home/jenkins/.tidb/tmp coverage --config=ci --@io_bazel_rules_go//go/config:cover_format=go_cover --define gotags=featuretag \
-- //... -//cmd/... -//tests/graceshutdown/... \
-//tests/globalkilltest/... -//tests/readonlytest/... -//br/pkg/task:task_test

bazel_build: bazel_ci_prepare
mkdir -p bin
Expand Down
1 change: 1 addition & 0 deletions br/pkg/backup/BUILD.bazel
Expand Up @@ -29,6 +29,7 @@ go_library(
"//meta",
"//meta/autoid",
"//parser/model",
"//sessionctx",
"//statistics/handle",
"//util",
"//util/codec",
Expand Down
21 changes: 16 additions & 5 deletions br/pkg/backup/client.go
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/ranger"
Expand Down Expand Up @@ -472,21 +473,31 @@ func skipUnsupportedDDLJob(job *model.Job) bool {
}

// WriteBackupDDLJobs sends the ddl jobs are done in (lastBackupTS, backupTS] to metaWriter.
func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, store kv.Storage, lastBackupTS, backupTS uint64) error {
func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, se sessionctx.Context, store kv.Storage, lastBackupTS, backupTS uint64) error {
snapshot := store.GetSnapshot(kv.NewVersion(backupTS))
snapMeta := meta.NewSnapshotMeta(snapshot)
lastSnapshot := store.GetSnapshot(kv.NewVersion(lastBackupTS))
lastSnapMeta := meta.NewSnapshotMeta(lastSnapshot)
lastSchemaVersion, err := lastSnapMeta.GetSchemaVersion()
lastSchemaVersion, err := lastSnapMeta.GetSchemaVersionWithNonEmptyDiff()
if err != nil {
return errors.Trace(err)
}
allJobs, err := ddl.GetAllDDLJobs(snapMeta)
backupSchemaVersion, err := snapMeta.GetSchemaVersionWithNonEmptyDiff()
if err != nil {
return errors.Trace(err)
}

version, err := store.CurrentVersion(kv.GlobalTxnScope)
if err != nil {
return errors.Trace(err)
}
newestMeta := meta.NewSnapshotMeta(store.GetSnapshot(kv.NewVersion(version.Ver)))
allJobs, err := ddl.GetAllDDLJobs(se, newestMeta)
if err != nil {
return errors.Trace(err)
}
log.Debug("get all jobs", zap.Int("jobs", len(allJobs)))
historyJobs, err := ddl.GetAllHistoryDDLJobs(snapMeta)
historyJobs, err := ddl.GetAllHistoryDDLJobs(newestMeta)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -500,7 +511,7 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, store kv.Storage, lastB
}

if (job.State == model.JobStateDone || job.State == model.JobStateSynced) &&
(job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion > lastSchemaVersion) {
(job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion > lastSchemaVersion && job.BinlogInfo.SchemaVersion <= backupSchemaVersion) {
if job.BinlogInfo.DBInfo != nil {
// ignore all placement policy info during incremental backup for now.
job.BinlogInfo.DBInfo.PlacementPolicyRef = nil
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/backup/client_test.go
Expand Up @@ -280,7 +280,7 @@ func TestSkipUnsupportedDDLJob(t *testing.T) {
metaWriter := metautil.NewMetaWriter(s.storage, metautil.MetaFileSize, false, "", &cipher)
ctx := context.Background()
metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDDL)
err = backup.WriteBackupDDLJobs(metaWriter, s.cluster.Storage, lastTS, ts)
err = backup.WriteBackupDDLJobs(metaWriter, tk.Session(), s.cluster.Storage, lastTS, ts)
require.NoErrorf(t, err, "Error get ddl jobs: %s", err)
err = metaWriter.FinishWriteMetas(ctx, metautil.AppendDDL)
require.NoError(t, err, "Flush failed", err)
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/restore/db_test.go
Expand Up @@ -194,7 +194,7 @@ func TestFilterDDLJobs(t *testing.T) {
metaWriter := metautil.NewMetaWriter(s.storage, metautil.MetaFileSize, false, "", &cipher)
ctx := context.Background()
metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDDL)
err = backup.WriteBackupDDLJobs(metaWriter, s.mock.Storage, lastTS, ts)
err = backup.WriteBackupDDLJobs(metaWriter, tk.Session(), s.mock.Storage, lastTS, ts)
require.NoErrorf(t, err, "Error get ddl jobs: %s", err)
err = metaWriter.FinishWriteMetas(ctx, metautil.AppendDDL)
require.NoErrorf(t, err, "Flush failed", err)
Expand Down Expand Up @@ -258,7 +258,7 @@ func TestFilterDDLJobsV2(t *testing.T) {
metaWriter := metautil.NewMetaWriter(s.storage, metautil.MetaFileSize, true, "", &cipher)
ctx := context.Background()
metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDDL)
err = backup.WriteBackupDDLJobs(metaWriter, s.mock.Storage, lastTS, ts)
err = backup.WriteBackupDDLJobs(metaWriter, tk.Session(), s.mock.Storage, lastTS, ts)
require.NoErrorf(t, err, "Error get ddl jobs: %s", err)
err = metaWriter.FinishWriteMetas(ctx, metautil.AppendDDL)
require.NoErrorf(t, err, "Flush failed", err)
Expand Down
1 change: 1 addition & 0 deletions br/pkg/task/BUILD.bazel
Expand Up @@ -34,6 +34,7 @@ go_library(
"//config",
"//kv",
"//parser/mysql",
"//sessionctx",
"//sessionctx/stmtctx",
"//sessionctx/variable",
"//statistics/handle",
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/task/backup.go
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/br/pkg/summary"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -399,7 +400,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
}

metawriter.StartWriteMetasAsync(ctx, metautil.AppendDDL)
err = backup.WriteBackupDDLJobs(metawriter, mgr.GetStorage(), cfg.LastBackupTS, backupTS)
err = backup.WriteBackupDDLJobs(metawriter, se.(sessionctx.Context), mgr.GetStorage(), cfg.LastBackupTS, backupTS)
if err != nil {
return errors.Trace(err)
}
Expand Down
4 changes: 4 additions & 0 deletions ddl/BUILD.bazel
Expand Up @@ -6,16 +6,19 @@ go_library(
"backfilling.go",
"callback.go",
"column.go",
"constant.go",
"ddl.go",
"ddl_algorithm.go",
"ddl_api.go",
"ddl_tiflash_api.go",
"ddl_worker.go",
"ddl_workerpool.go",
"delete_range.go",
"delete_range_util.go",
"foreign_key.go",
"generated_column.go",
"index.go",
"job_table.go",
"mock.go",
"multi_schema_change.go",
"options.go",
Expand Down Expand Up @@ -144,6 +147,7 @@ go_test(
"index_change_test.go",
"index_modify_test.go",
"integration_test.go",
"job_table_test.go",
"main_test.go",
"modify_column_test.go",
"multi_schema_change_test.go",
Expand Down
2 changes: 1 addition & 1 deletion ddl/backfilling.go
Expand Up @@ -409,7 +409,7 @@ func (w *worker) handleReorgTasks(reorgInfo *reorgInfo, totalAddedCount *int64,

if err != nil {
// Update the reorg handle that has been processed.
err1 := reorgInfo.UpdateReorgMeta(nextKey)
err1 := reorgInfo.UpdateReorgMeta(nextKey, w.sessPool)
metrics.BatchAddIdxHistogram.WithLabelValues(metrics.LblError).Observe(elapsedTime.Seconds())
logutil.BgLogger().Warn("[ddl] backfill worker handle batch tasks failed",
zap.ByteString("elementType", reorgInfo.currElement.TypeKey),
Expand Down
14 changes: 14 additions & 0 deletions ddl/callback.go
Expand Up @@ -55,6 +55,10 @@ type Callback interface {
OnJobUpdated(job *model.Job)
// OnWatched is called after watching owner is completed.
OnWatched(ctx context.Context)
// OnGetJobBefore is called before getting job.
OnGetJobBefore(jobType string)
// OnGetJobAfter is called after getting job.
OnGetJobAfter(jobType string, job *model.Job)
}

// BaseCallback implements Callback.OnChanged interface.
Expand Down Expand Up @@ -86,6 +90,16 @@ func (*BaseCallback) OnWatched(ctx context.Context) {
// Nothing to do.
}

// OnGetJobBefore implements Callback.OnGetJobBefore interface.
func (c *BaseCallback) OnGetJobBefore(jobType string) {
// Nothing to do.
}

// OnGetJobAfter implements Callback.OnGetJobAfter interface.
func (c *BaseCallback) OnGetJobAfter(jobType string, job *model.Job) {
// Nothing to do.
}

// DomainReloader is used to avoid import loop.
type DomainReloader interface {
Reload() error
Expand Down
20 changes: 20 additions & 0 deletions ddl/callback_test.go
Expand Up @@ -52,6 +52,8 @@ type TestDDLCallback struct {
onJobUpdated func(*model.Job)
OnJobUpdatedExported func(*model.Job)
onWatched func(ctx context.Context)
OnGetJobBeforeExported func(string)
OnGetJobAfterExported func(string, *model.Job)
}

// OnChanged mock the same behavior with the main DDL hook.
Expand Down Expand Up @@ -118,6 +120,24 @@ func (tc *TestDDLCallback) OnWatched(ctx context.Context) {
tc.BaseCallback.OnWatched(ctx)
}

// OnGetJobBefore implements Callback.OnGetJobBefore interface.
func (tc *TestDDLCallback) OnGetJobBefore(jobType string) {
if tc.OnGetJobBeforeExported != nil {
tc.OnGetJobBeforeExported(jobType)
return
}
tc.BaseCallback.OnGetJobBefore(jobType)
}

// OnGetJobAfter implements Callback.OnGetJobAfter interface.
func (tc *TestDDLCallback) OnGetJobAfter(jobType string, job *model.Job) {
if tc.OnGetJobAfterExported != nil {
tc.OnGetJobAfterExported(jobType, job)
return
}
tc.BaseCallback.OnGetJobAfter(jobType, job)
}

func TestCallback(t *testing.T) {
cb := &BaseCallback{}
require.Nil(t, cb.OnChanged(nil))
Expand Down
8 changes: 4 additions & 4 deletions ddl/column.go
Expand Up @@ -701,7 +701,7 @@ func (w *worker) doModifyColumnTypeWithData(
// Make sure job args change after `updateVersionAndTableInfoWithCheck`, otherwise, the job args will
// be updated in `updateDDLJob` even if it meets an error in `updateVersionAndTableInfoWithCheck`.
job.SchemaState = model.StateDeleteOnly
metrics.GetBackfillProgressByLabel(metrics.LblModifyColumn).Set(0)
metrics.GetBackfillProgressByLabel(metrics.LblModifyColumn, job.SchemaName, tblInfo.Name.String()).Set(0)
job.Args = append(job.Args, changingCol, changingIdxs, rmIdxIDs)
case model.StateDeleteOnly:
// Column from null to not null.
Expand Down Expand Up @@ -791,7 +791,7 @@ func doReorgWorkForModifyColumnMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, j

func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table,
oldCol, changingCol *model.ColumnInfo, changingIdxs []*model.IndexInfo) (done bool, ver int64, err error) {
rh := newReorgHandler(t)
rh := newReorgHandler(t, w.sess, w.concurrentDDL)
reorgInfo, err := getReorgInfo(d.jobContext(job), d, rh, job, tbl, BuildElements(changingCol, changingIdxs))
if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
Expand Down Expand Up @@ -1062,7 +1062,7 @@ func (w *worker) updateColumnAndIndexes(t table.Table, oldCol, col *model.Column
// Update the element in the reorgInfo for updating the reorg meta below.
reorgInfo.currElement = reorgInfo.elements[i+1]
// Write the reorg info to store so the whole reorganize process can recover from panic.
err := reorgInfo.UpdateReorgMeta(reorgInfo.StartKey)
err := reorgInfo.UpdateReorgMeta(reorgInfo.StartKey, w.sessPool)
logutil.BgLogger().Info("[ddl] update column and indexes",
zap.Int64("jobID", reorgInfo.Job.ID),
zap.ByteString("elementType", reorgInfo.currElement.TypeKey),
Expand Down Expand Up @@ -1103,7 +1103,7 @@ func newUpdateColumnWorker(sessCtx sessionctx.Context, id int, t table.PhysicalT
backfillWorker: newBackfillWorker(sessCtx, id, t, reorgInfo),
oldColInfo: oldCol,
newColInfo: newCol,
metricCounter: metrics.BackfillTotalCounter.WithLabelValues("update_col_rate"),
metricCounter: metrics.BackfillTotalCounter.WithLabelValues(metrics.GenerateReorgLabel("update_col_rate", reorgInfo.SchemaName, t.Meta().Name.String())),
rowDecoder: rowDecoder,
rowMap: make(map[int64]types.Datum, len(decodeColMap)),
sqlMode: reorgInfo.ReorgMeta.SQLMode,
Expand Down
35 changes: 35 additions & 0 deletions ddl/constant.go
@@ -0,0 +1,35 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ddl

import (
"github.com/pingcap/tidb/meta"
)

const (
// JobTable stores the information of DDL jobs.
JobTable = "tidb_ddl_job"
// ReorgTable stores the information of DDL reorganization.
ReorgTable = "tidb_ddl_reorg"
// HistoryTable stores the history DDL jobs.
HistoryTable = "tidb_ddl_history"

// JobTableID is the table ID of `tidb_ddl_job`.
JobTableID = meta.MaxInt48 - 1
// ReorgTableID is the table ID of `tidb_ddl_reorg`.
ReorgTableID = meta.MaxInt48 - 2
// HistoryTableID is the table ID of `tidb_ddl_history`.
HistoryTableID = meta.MaxInt48 - 3
)
4 changes: 2 additions & 2 deletions ddl/db_change_test.go
Expand Up @@ -1378,7 +1378,7 @@ func prepareTestControlParallelExecSQL(t *testing.T, store kv.Storage, dom *doma
require.NoError(t, err)
txn, err := sess.Txn(true)
require.NoError(t, err)
jobs, err := ddl.GetAllDDLJobs(meta.NewMeta(txn))
jobs, err := ddl.GetAllDDLJobs(sess, meta.NewMeta(txn))
require.NoError(t, err)
qLen = len(jobs)
if qLen == 2 {
Expand Down Expand Up @@ -1407,7 +1407,7 @@ func prepareTestControlParallelExecSQL(t *testing.T, store kv.Storage, dom *doma
require.NoError(t, err)
txn, err := sess.Txn(true)
require.NoError(t, err)
jobs, err := ddl.GetAllDDLJobs(meta.NewMeta(txn))
jobs, err := ddl.GetAllDDLJobs(sess, meta.NewMeta(txn))
require.NoError(t, err)
qLen = len(jobs)
if qLen == 1 {
Expand Down
4 changes: 3 additions & 1 deletion ddl/db_test.go
Expand Up @@ -647,7 +647,7 @@ func TestAddExpressionIndexRollback(t *testing.T) {
txn, err := ctx.Txn(true)
require.NoError(t, err)
m := meta.NewMeta(txn)
element, start, end, physicalID, err := m.GetDDLReorgHandle(currJob)
element, start, end, physicalID, err := ddl.NewReorgHandlerForTest(m, testkit.NewTestKit(t, store).Session()).GetDDLReorgHandle(currJob)
require.True(t, meta.ErrDDLReorgElementNotExist.Equal(err))
require.Nil(t, element)
require.Nil(t, start)
Expand Down Expand Up @@ -1274,6 +1274,8 @@ func TestCancelJobWriteConflict(t *testing.T) {
stmt := fmt.Sprintf("admin cancel ddl jobs %d", job.ID)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn", `return("no_retry")`))
defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/kv/mockCommitErrorInNewTxn")) }()
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockCancelConcurencyDDL", `return(true)`))
defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockCancelConcurencyDDL")) }()
rs, cancelErr = tk2.Session().Execute(context.Background(), stmt)
}
}
Expand Down