Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: redirect the DML writes to a temp index during creating index #37709

Merged
merged 31 commits into from
Sep 15, 2022
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
07a093f
*: redirect the kv writes to a temp index during creating index
tangenta Sep 8, 2022
6595320
fix linter revive
tangenta Sep 8, 2022
34e4125
update bazel
tangenta Sep 8, 2022
1b7c476
remove unnecessary change
tangenta Sep 8, 2022
99b9991
address comment
tangenta Sep 9, 2022
b7db256
Update tablecodec/tablecodec.go
tangenta Sep 9, 2022
4cc9327
Update tablecodec/tablecodec.go
tangenta Sep 9, 2022
17a43cd
Update tablecodec/tablecodec.go
tangenta Sep 9, 2022
61225b0
Update tablecodec/tablecodec.go
tangenta Sep 9, 2022
85b8788
Update table/tables/mutation_checker.go
tangenta Sep 9, 2022
de76bf5
address comment
tangenta Sep 9, 2022
7a66dcb
Update tablecodec/tablecodec.go
tangenta Sep 13, 2022
4154f3d
Update tablecodec/tablecodec.go
tangenta Sep 13, 2022
74211fd
Update ddl/index.go
tangenta Sep 13, 2022
b09d33d
Update ddl/index_merge_tmp.go
tangenta Sep 13, 2022
82b6b25
Update ddl/index.go
tangenta Sep 13, 2022
5b9d2b9
address comment
tangenta Sep 13, 2022
23b45ca
revert useless changes
tangenta Sep 13, 2022
93e41ae
add ut for TempIndexKey2IndexKey
tangenta Sep 13, 2022
e4449e3
refine the parameters of TempIndexKey2IndexKey
tangenta Sep 13, 2022
84336d2
refine the parameters of TempIndexKey2IndexKey
tangenta Sep 13, 2022
e524ad2
fix realtikvtest
tangenta Sep 13, 2022
d5f0bda
refine variable name and license
tangenta Sep 13, 2022
b78398c
Merge branch 'master' into add-index-merge
tangenta Sep 13, 2022
88653a3
Merge branch 'master' into add-index-merge
Benjamin2037 Sep 14, 2022
f24db25
Merge branch 'master' into add-index-merge
tangenta Sep 14, 2022
e4a1ba7
Merge branch 'master' into add-index-merge
Benjamin2037 Sep 14, 2022
a3038a1
address comment
tangenta Sep 15, 2022
2bda745
simplify if
tangenta Sep 15, 2022
ead1325
address comment
tangenta Sep 15, 2022
6a70056
Merge branch 'master' into add-index-merge
ti-chi-bot Sep 15, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ go_library(
"foreign_key.go",
"generated_column.go",
"index.go",
"index_merge_tmp.go",
"job_table.go",
"mock.go",
"multi_schema_change.go",
Expand Down Expand Up @@ -164,6 +165,7 @@ go_test(
"fail_test.go",
"foreign_key_test.go",
"index_change_test.go",
"index_merge_tmp_test.go",
"index_modify_test.go",
"integration_test.go",
"job_table_test.go",
Expand Down Expand Up @@ -193,6 +195,7 @@ go_test(
shard_count = 50,
deps = [
"//config",
"//ddl/ingest",
"//ddl/placement",
"//ddl/schematracker",
"//ddl/testutil",
Expand Down
28 changes: 22 additions & 6 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ import (
type backfillWorkerType byte

const (
typeAddIndexWorker backfillWorkerType = 0
typeUpdateColumnWorker backfillWorkerType = 1
typeCleanUpIndexWorker backfillWorkerType = 2
typeAddIndexWorker backfillWorkerType = 0
typeUpdateColumnWorker backfillWorkerType = 1
typeCleanUpIndexWorker backfillWorkerType = 2
typeAddIndexMergeTmpWorker backfillWorkerType = 3
)

// By now the DDL jobs that need backfilling include:
Expand Down Expand Up @@ -116,6 +117,8 @@ func (bWT backfillWorkerType) String() string {
return "update column"
case typeCleanUpIndexWorker:
return "clean up index"
case typeAddIndexMergeTmpWorker:
return "merge temporary index"
default:
return "unknown"
}
Expand Down Expand Up @@ -466,15 +469,21 @@ func (dc *ddlCtx) handleRangeTasks(sessPool *sessionPool, t table.Table, workers
batchTasks := make([]*reorgBackfillTask, 0, len(workers))
physicalTableID := reorgInfo.PhysicalTableID

var prefix kv.Key
if reorgInfo.mergingTmpIdx {
prefix = t.IndexPrefix()
} else {
prefix = t.RecordPrefix()
}
// Build reorg tasks.
for i, keyRange := range kvRanges {
endKey := keyRange.EndKey
endK, err := getRangeEndKey(reorgInfo.d.jobContext(reorgInfo.Job), workers[0].sessCtx.GetStore(), workers[0].priority, t.RecordPrefix(), keyRange.StartKey, endKey)
endK, err := getRangeEndKey(reorgInfo.d.jobContext(reorgInfo.Job), workers[0].sessCtx.GetStore(), workers[0].priority, prefix, keyRange.StartKey, endKey)
if err != nil {
logutil.BgLogger().Info("[ddl] send range task to workers, get reverse key failed", zap.Error(err))
} else {
logutil.BgLogger().Info("[ddl] send range task to workers, change end key",
zap.String("end key", tryDecodeToHandleString(endKey)), zap.String("current end key", tryDecodeToHandleString(endK)))
zap.String("end key", hex.EncodeToString(endKey)), zap.String("current end key", hex.EncodeToString(endK)))
endKey = endK
}

Expand Down Expand Up @@ -646,9 +655,16 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.Physic

switch bfWorkerType {
case typeAddIndexWorker:
idxWorker := newAddIndexWorker(sessCtx, i, t, decodeColMap, reorgInfo, jc)
idxWorker, err := newAddIndexWorker(sessCtx, i, t, decodeColMap, reorgInfo, jc)
if err != nil {
return errors.Trace(err)
}
backfillWorkers = append(backfillWorkers, idxWorker.backfillWorker)
go idxWorker.backfillWorker.run(reorgInfo.d, idxWorker, job)
case typeAddIndexMergeTmpWorker:
tmpIdxWorker := newMergeTempIndexWorker(sessCtx, i, t, reorgInfo, jc)
backfillWorkers = append(backfillWorkers, tmpIdxWorker.backfillWorker)
go tmpIdxWorker.backfillWorker.run(reorgInfo.d, tmpIdxWorker, job)
case typeUpdateColumnWorker:
// Setting InCreateOrAlterStmt tells the difference between SELECT casting and ALTER COLUMN casting.
sessCtx.GetSessionVars().StmtCtx.InCreateOrAlterStmt = true
Expand Down
3 changes: 2 additions & 1 deletion ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -797,8 +797,9 @@ func doReorgWorkForModifyColumnMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, j

func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table,
oldCol, changingCol *model.ColumnInfo, changingIdxs []*model.IndexInfo) (done bool, ver int64, err error) {
job.ReorgMeta.ReorgTp = model.ReorgTypeTxn
rh := newReorgHandler(t, w.sess, w.concurrentDDL)
reorgInfo, err := getReorgInfo(d.jobContext(job), d, rh, job, tbl, BuildElements(changingCol, changingIdxs))
reorgInfo, err := getReorgInfo(d.jobContext(job), d, rh, job, tbl, BuildElements(changingCol, changingIdxs), false)
if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
// and then run the reorg next time.
Expand Down
9 changes: 2 additions & 7 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,14 +514,9 @@ func jobNeedGC(job *model.Job) bool {
return false
}
switch job.Type {
case model.ActionAddIndex, model.ActionAddPrimaryKey:
if job.State != model.JobStateRollbackDone {
tangenta marked this conversation as resolved.
Show resolved Hide resolved
break
}
// After rolling back an AddIndex operation, we need to use delete-range to delete the half-done index data.
return true
case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex, model.ActionDropPrimaryKey,
model.ActionDropTablePartition, model.ActionTruncateTablePartition, model.ActionDropColumn, model.ActionModifyColumn:
model.ActionDropTablePartition, model.ActionTruncateTablePartition, model.ActionDropColumn, model.ActionModifyColumn,
model.ActionAddIndex, model.ActionAddPrimaryKey:
return true
case model.ActionMultiSchemaChange:
for _, sub := range job.MultiSchemaInfo.SubJobs {
Expand Down
14 changes: 11 additions & 3 deletions ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,22 +269,30 @@ func TestJobNeedGC(t *testing.T) {
job := &model.Job{Type: model.ActionAddIndex, State: model.JobStateCancelled}
require.False(t, ddl.JobNeedGCForTest(job))

job = &model.Job{Type: model.ActionAddIndex, State: model.JobStateDone}
job = &model.Job{Type: model.ActionAddColumn, State: model.JobStateDone}
require.False(t, ddl.JobNeedGCForTest(job))
job = &model.Job{Type: model.ActionAddIndex, State: model.JobStateDone}
require.True(t, ddl.JobNeedGCForTest(job))
job = &model.Job{Type: model.ActionAddPrimaryKey, State: model.JobStateDone}
require.False(t, ddl.JobNeedGCForTest(job))
require.True(t, ddl.JobNeedGCForTest(job))
job = &model.Job{Type: model.ActionAddIndex, State: model.JobStateRollbackDone}
require.True(t, ddl.JobNeedGCForTest(job))
job = &model.Job{Type: model.ActionAddPrimaryKey, State: model.JobStateRollbackDone}
require.True(t, ddl.JobNeedGCForTest(job))

job = &model.Job{Type: model.ActionMultiSchemaChange, State: model.JobStateDone, MultiSchemaInfo: &model.MultiSchemaInfo{
SubJobs: []*model.SubJob{
{Type: model.ActionAddIndex, State: model.JobStateDone},
{Type: model.ActionAddColumn, State: model.JobStateDone},
{Type: model.ActionRebaseAutoID, State: model.JobStateDone},
}}}
require.False(t, ddl.JobNeedGCForTest(job))
job = &model.Job{Type: model.ActionMultiSchemaChange, State: model.JobStateDone, MultiSchemaInfo: &model.MultiSchemaInfo{
SubJobs: []*model.SubJob{
{Type: model.ActionAddIndex, State: model.JobStateDone},
{Type: model.ActionAddColumn, State: model.JobStateDone},
{Type: model.ActionRebaseAutoID, State: model.JobStateDone},
}}}
require.True(t, ddl.JobNeedGCForTest(job))
job = &model.Job{Type: model.ActionMultiSchemaChange, State: model.JobStateDone, MultiSchemaInfo: &model.MultiSchemaInfo{
SubJobs: []*model.SubJob{
{Type: model.ActionAddIndex, State: model.JobStateDone},
Expand Down
29 changes: 18 additions & 11 deletions ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,27 +322,34 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context,
}
// ActionAddIndex, ActionAddPrimaryKey needs do it, because it needs to be rolled back when it's canceled.
case model.ActionAddIndex, model.ActionAddPrimaryKey:
tableID := job.TableID
var indexID int64
var ifExists bool
var partitionIDs []int64
if err := job.DecodeArgs(&indexID, &ifExists, &partitionIDs); err != nil {
return errors.Trace(err)
}
// Determine the physicalIDs to be added.
physicalIDs := []int64{job.TableID}
if len(partitionIDs) > 0 {
for _, pid := range partitionIDs {
startKey := tablecodec.EncodeTableIndexPrefix(pid, indexID)
endKey := tablecodec.EncodeTableIndexPrefix(pid, indexID+1)
elemID := ea.allocForIndexID(pid, indexID)
if err := doInsert(ctx, s, job.ID, elemID, startKey, endKey, now, fmt.Sprintf("partition table ID is %d", pid)); err != nil {
physicalIDs = partitionIDs
}
// Determine the index IDs to be added.
tempIdxID := tablecodec.TempIndexPrefix | indexID
var indexIDs []int64
if job.State == model.JobStateRollbackDone {
indexIDs = []int64{indexID, tempIdxID}
} else {
indexIDs = []int64{tempIdxID}
}
for _, pid := range physicalIDs {
for _, iid := range indexIDs {
startKey := tablecodec.EncodeTableIndexPrefix(pid, iid)
endKey := tablecodec.EncodeTableIndexPrefix(pid, iid+1)
elemID := ea.allocForIndexID(pid, iid)
if err := doInsert(ctx, s, job.ID, elemID, startKey, endKey, now, fmt.Sprintf("physical ID is %d", pid)); err != nil {
return errors.Trace(err)
}
}
} else {
startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID)
endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1)
elemID := ea.allocForIndexID(tableID, indexID)
return doInsert(ctx, s, job.ID, elemID, startKey, endKey, now, fmt.Sprintf("table ID is %d", tableID))
}
case model.ActionDropIndex, model.ActionDropPrimaryKey:
tableID := job.TableID
Expand Down