Skip to content

Commit

Permalink
Merge branch 'master' into issue_39029
Browse files Browse the repository at this point in the history
  • Loading branch information
fengou1 committed Dec 12, 2022
2 parents 6ed5e77 + 965ed00 commit ec535c6
Show file tree
Hide file tree
Showing 42 changed files with 2,153 additions and 132 deletions.
6 changes: 3 additions & 3 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")

http_archive(
name = "io_bazel_rules_go",
sha256 = "ae013bf35bd23234d1dea46b079f1e05ba74ac0321423830119d3e787ec73483",
sha256 = "56d8c5a5c91e1af73eca71a6fab2ced959b67c86d12ba37feedb0a2dfea441a6",
urls = [
"https://mirror.bazel.build/github.com/bazelbuild/rules_go/releases/download/v0.36.0/rules_go-v0.36.0.zip",
"https://github.com/bazelbuild/rules_go/releases/download/v0.36.0/rules_go-v0.36.0.zip",
"https://mirror.bazel.build/github.com/bazelbuild/rules_go/releases/download/v0.37.0/rules_go-v0.37.0.zip",
"https://github.com/bazelbuild/rules_go/releases/download/v0.37.0/rules_go-v0.37.0.zip",
],
)

Expand Down
99 changes: 74 additions & 25 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,33 +31,97 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/copr"
"github.com/pingcap/tidb/store/driver/backoff"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
decoder "github.com/pingcap/tidb/util/rowDecoder"
"github.com/pingcap/tidb/util/timeutil"
"github.com/pingcap/tidb/util/topsql"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
)

type backfillWorkerType byte
type backfillerType byte

const (
typeAddIndexWorker backfillWorkerType = 0
typeUpdateColumnWorker backfillWorkerType = 1
typeCleanUpIndexWorker backfillWorkerType = 2
typeAddIndexMergeTmpWorker backfillWorkerType = 3
typeAddIndexWorker backfillerType = 0
typeUpdateColumnWorker backfillerType = 1
typeCleanUpIndexWorker backfillerType = 2
typeAddIndexMergeTmpWorker backfillerType = 3

// InstanceLease is the instance lease.
InstanceLease = 1 * time.Minute
)

func (bT backfillerType) String() string {
switch bT {
case typeAddIndexWorker:
return "add index"
case typeUpdateColumnWorker:
return "update column"
case typeCleanUpIndexWorker:
return "clean up index"
case typeAddIndexMergeTmpWorker:
return "merge temporary index"
default:
return "unknown"
}
}

// BackfillJob is for a tidb_ddl_backfill table's record.
type BackfillJob struct {
ID int64
JobID int64
EleID int64
EleKey []byte
Tp backfillerType
State model.JobState
StoreID int64
InstanceID string
InstanceLease types.Time
// range info
CurrKey []byte
StartKey []byte
EndKey []byte

StartTS uint64
FinishTS uint64
RowCount int64
Meta *model.BackfillMeta
}

// AbbrStr returns the BackfillJob's info without the Meta info.
func (bj *BackfillJob) AbbrStr() string {
return fmt.Sprintf("ID:%d, JobID:%d, EleID:%d, Type:%s, State:%s, InstanceID:%s, InstanceLease:%s",
bj.ID, bj.JobID, bj.EleID, bj.Tp, bj.State, bj.InstanceID, bj.InstanceLease)
}

// GetOracleTime returns the current time from TS.
func GetOracleTime(se *session) (time.Time, error) {
txn, err := se.Txn(true)
if err != nil {
return time.Time{}, err
}
return oracle.GetTimeFromTS(txn.StartTS()).UTC(), nil
}

// GetLeaseGoTime returns a types.Time by adding a lease.
func GetLeaseGoTime(currTime time.Time, lease time.Duration) types.Time {
leaseTime := currTime.Add(lease)
return types.NewTime(types.FromGoTime(leaseTime.In(time.UTC)), mysql.TypeTimestamp, types.MaxFsp)
}

// By now the DDL jobs that need backfilling include:
// 1: add-index
// 2: modify-column-type
Expand Down Expand Up @@ -110,21 +174,6 @@ const (
// Instead, it is divided into batches, each time a kv transaction completes the backfilling
// of a partial batch.

func (bWT backfillWorkerType) String() string {
switch bWT {
case typeAddIndexWorker:
return "add index"
case typeUpdateColumnWorker:
return "update column"
case typeCleanUpIndexWorker:
return "clean up index"
case typeAddIndexMergeTmpWorker:
return "merge temporary index"
default:
return "unknown"
}
}

type backfiller interface {
BackfillDataInTxn(handleRange reorgBackfillTask) (taskCtx backfillTaskContext, errInTxn error)
AddMetricInfo(float64)
Expand Down Expand Up @@ -191,13 +240,13 @@ type backfillWorker struct {
resultCh chan *backfillResult
table table.Table
priority int
tp backfillWorkerType
tp backfillerType
ctx context.Context
cancel func()
}

func newBackfillWorker(ctx context.Context, sessCtx sessionctx.Context, id int, t table.PhysicalTable,
reorgInfo *reorgInfo, tp backfillWorkerType) *backfillWorker {
reorgInfo *reorgInfo, tp backfillerType) *backfillWorker {
bfCtx, cancel := context.WithCancel(ctx)
return &backfillWorker{
id: id,
Expand Down Expand Up @@ -611,7 +660,7 @@ type backfillScheduler struct {
ctx context.Context
reorgInfo *reorgInfo
sessPool *sessionPool
tp backfillWorkerType
tp backfillerType
tbl table.PhysicalTable
decodeColMap map[int64]decoder.Column
jobCtx *JobContext
Expand All @@ -628,7 +677,7 @@ type backfillScheduler struct {
const backfillTaskChanSize = 1024

func newBackfillScheduler(ctx context.Context, info *reorgInfo, sessPool *sessionPool,
tp backfillWorkerType, tbl table.PhysicalTable, decColMap map[int64]decoder.Column,
tp backfillerType, tbl table.PhysicalTable, decColMap map[int64]decoder.Column,
jobCtx *JobContext) *backfillScheduler {
return &backfillScheduler{
ctx: ctx,
Expand Down Expand Up @@ -805,7 +854,7 @@ func (b *backfillScheduler) Close() {
//
// The above operations are completed in a transaction.
// Finally, update the concurrent processing of the total number of rows, and store the completed handle value.
func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.PhysicalTable, bfWorkerType backfillWorkerType, reorgInfo *reorgInfo) error {
func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sessionPool, t table.PhysicalTable, bfWorkerType backfillerType, reorgInfo *reorgInfo) error {
job := reorgInfo.Job
totalAddedCount := job.GetRowCount()

Expand Down
2 changes: 1 addition & 1 deletion ddl/concurrentddltest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_test")

go_test(
name = "concurrentddltest_test",
timeout = "long",
timeout = "moderate",
srcs = [
"main_test.go",
"switch_test.go",
Expand Down
46 changes: 46 additions & 0 deletions ddl/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ const (
ReorgTable = "tidb_ddl_reorg"
// HistoryTable stores the history DDL jobs.
HistoryTable = "tidb_ddl_history"
// BackfillTable stores the information of backfill jobs.
BackfillTable = "tidb_ddl_backfill"
// BackfillHistoryTable stores the information of history backfill jobs.
BackfillHistoryTable = "tidb_ddl_backfill_history"

// JobTableID is the table ID of `tidb_ddl_job`.
JobTableID = meta.MaxInt48 - 1
Expand All @@ -34,11 +38,53 @@ const (
HistoryTableID = meta.MaxInt48 - 3
// MDLTableID is the table ID of `tidb_mdl_info`.
MDLTableID = meta.MaxInt48 - 4
// BackfillTableID is the table ID of `tidb_ddl_backfill`.
BackfillTableID = meta.MaxInt48 - 5
// BackfillHistoryTableID is the table ID of `tidb_ddl_backfill_history`.
BackfillHistoryTableID = meta.MaxInt48 - 6

// JobTableSQL is the CREATE TABLE SQL of `tidb_ddl_job`.
JobTableSQL = "create table " + JobTable + "(job_id bigint not null, reorg int, schema_ids text(65535), table_ids text(65535), job_meta longblob, type int, processing int, primary key(job_id))"
// ReorgTableSQL is the CREATE TABLE SQL of `tidb_ddl_reorg`.
ReorgTableSQL = "create table " + ReorgTable + "(job_id bigint not null, ele_id bigint, ele_type blob, start_key blob, end_key blob, physical_id bigint, reorg_meta longblob, unique key(job_id, ele_id, ele_type(20)))"
// HistoryTableSQL is the CREATE TABLE SQL of `tidb_ddl_history`.
HistoryTableSQL = "create table " + HistoryTable + "(job_id bigint not null, job_meta longblob, db_name char(64), table_name char(64), schema_ids text(65535), table_ids text(65535), create_time datetime, primary key(job_id))"
// BackfillTableSQL is the CREATE TABLE SQL of `tidb_ddl_backfill`.
BackfillTableSQL = "create table " + BackfillTable + `(
id bigint not null,
ddl_job_id bigint not null,
ele_id bigint not null,
ele_key blob,
store_id bigint,
type int,
exec_id blob default null,
exec_lease Time,
state int,
curr_key blob,
start_key blob,
end_key blob,
start_ts bigint,
finish_ts bigint,
row_count bigint,
backfill_meta longblob,
unique key(ddl_job_id, ele_id, ele_key(20), id))`
// BackfillHistoryTableSQL is the CREATE TABLE SQL of `tidb_ddl_backfill_history`.
BackfillHistoryTableSQL = "create table " + BackfillHistoryTable + `(
id bigint not null,
ddl_job_id bigint not null,
ele_id bigint not null,
ele_key blob,
store_id bigint,
type int,
exec_id blob default null,
exec_lease Time,
state int,
curr_key blob,
start_key blob,
end_key blob,
start_ts bigint,
finish_ts bigint,
row_count bigint,
backfill_meta longblob,
unique key(ddl_job_id, ele_id, ele_key(20), id))`
)
3 changes: 3 additions & 0 deletions ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ func (d *ddl) SetInterceptor(i Interceptor) {
// JobNeedGCForTest is only used for test.
var JobNeedGCForTest = jobNeedGC

// NewSession is only used for test.
var NewSession = newSession

// GetMaxRowID is used for test.
func GetMaxRowID(store kv.Storage, priority int, t table.Table, startHandle, endHandle kv.Key) (kv.Key, error) {
return getRangeEndKey(NewJobContext(), store, priority, t.RecordPrefix(), startHandle, endHandle)
Expand Down
1 change: 1 addition & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ func (d *ddl) addBatchDDLJobs2Table(tasks []*limitJobTask) error {
jobTasks[i] = job
injectModifyJobArgFailPoint(job)
}

sess.SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
err = insertDDLJobs2Table(newSession(sess), true, jobTasks...)
}
Expand Down
Loading

0 comments on commit ec535c6

Please sign in to comment.