Skip to content

Commit

Permalink
ddl: use another way to get index uniqueness during task exec init (#…
Browse files Browse the repository at this point in the history
…50378)

ref #50307, close #50435
  • Loading branch information
tangenta authored Jan 16, 2024
1 parent 44e200d commit a257521
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 45 deletions.
5 changes: 0 additions & 5 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1289,11 +1289,6 @@ func (local *Backend) ImportEngine(
return err
}

// GetRegionSplitSizeKeys gets the region split size and keys from PD.
func (local *Backend) GetRegionSplitSizeKeys(ctx context.Context) (finalSize int64, finalKeys int64, err error) {
return GetRegionSplitSizeKeys(ctx, local.pdCli, local.tls)
}

// expose these variables to unit test.
var (
testJobToWorkerCh = make(chan *regionJob)
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ go_library(
deps = [
"//br/pkg/lightning/backend",
"//br/pkg/lightning/backend/external",
"//br/pkg/lightning/backend/local",
"//br/pkg/lightning/common",
"//br/pkg/lightning/config",
"//br/pkg/storage",
Expand Down
27 changes: 18 additions & 9 deletions pkg/ddl/backfilling_dist_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,18 +136,13 @@ func (s *backfillDistExecutor) Init(ctx context.Context) error {
return errors.Trace(err)
}
job := &bgm.Job
_, tbl, err := d.getTableByTxn((*asAutoIDRequirement)(d.ddlCtx), job.SchemaID, job.TableID)

unique, err := decodeIndexUniqueness(job)
if err != nil {
return errors.Trace(err)
}
// We only support adding multiple unique indexes or multiple non-unique indexes,
// we use the first index uniqueness here.
idx := model.FindIndexInfoByID(tbl.Meta().Indices, bgm.EleIDs[0])
if idx == nil {
return errors.Trace(errors.Errorf("index info not found: %d", bgm.EleIDs[0]))
return err
}
pdLeaderAddr := d.store.(tikv.Storage).GetRegionCache().PDClient().GetLeaderAddr()
bc, err := ingest.LitBackCtxMgr.Register(ctx, idx.Unique, job.ID, d.etcdCli, pdLeaderAddr, job.ReorgMeta.ResourceGroupName)
bc, err := ingest.LitBackCtxMgr.Register(ctx, unique, job.ID, d.etcdCli, pdLeaderAddr, job.ReorgMeta.ResourceGroupName)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -156,6 +151,20 @@ func (s *backfillDistExecutor) Init(ctx context.Context) error {
return nil
}

func decodeIndexUniqueness(job *model.Job) (bool, error) {
unique := make([]bool, 1)
err := job.DecodeArgs(&unique[0])
if err != nil {
err = job.DecodeArgs(&unique)
}
if err != nil {
return false, errors.Trace(err)
}
// We only support adding multiple unique indexes or multiple non-unique indexes,
// we use the first index uniqueness here.
return unique[0], nil
}

func (s *backfillDistExecutor) GetSubtaskExecutor(ctx context.Context, task *proto.Task, summary *execute.Summary) (execute.SubtaskExecutor, error) {
switch task.Step {
case StepReadIndex, StepMergeSort, StepWriteAndIngest:
Expand Down
38 changes: 21 additions & 17 deletions pkg/ddl/backfilling_dist_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/backend/external"
"github.com/pingcap/tidb/br/pkg/lightning/backend/local"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/ddl/ingest"
Expand Down Expand Up @@ -127,9 +128,9 @@ func (sch *BackfillingSchedulerExt) OnNextSubtasksBatch(
})
return generateGlobalSortIngestPlan(
ctx,
sch.d.store.(kv.StorageWithPD),
taskHandle,
task,
job.ID,
backfillMeta.CloudStorageURI,
prevStep,
logger)
Expand Down Expand Up @@ -333,9 +334,9 @@ func calculateRegionBatch(totalRegionCnt int, instanceCnt int, useLocalDisk bool

func generateGlobalSortIngestPlan(
ctx context.Context,
store kv.StorageWithPD,
taskHandle diststorage.TaskHandle,
task *proto.Task,
jobID int64,
cloudStorageURI string,
step proto.Step,
logger *zap.Logger,
Expand All @@ -353,7 +354,7 @@ func generateGlobalSortIngestPlan(
return nil, err
}
splitter, err := getRangeSplitter(
ctx, cloudStorageURI, jobID, int64(totalSize), int64(len(instanceIDs)), multiFileStat, logger)
ctx, store, cloudStorageURI, int64(totalSize), int64(len(instanceIDs)), multiFileStat, logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -468,8 +469,8 @@ func generateMergePlan(

func getRangeSplitter(
ctx context.Context,
store kv.StorageWithPD,
cloudStorageURI string,
jobID int64,
totalSize int64,
instanceCnt int64,
multiFileStat []external.MultipleFilesStat,
Expand All @@ -486,21 +487,24 @@ func getRangeSplitter(

rangeGroupSize := totalSize / instanceCnt
rangeGroupKeys := int64(math.MaxInt64)
bcCtx, ok := ingest.LitBackCtxMgr.Load(jobID)
if !ok {
return nil, errors.Errorf("backend context not found")
}

local := bcCtx.GetLocalBackend()
if local == nil {
return nil, errors.Errorf("local backend not found")
}
maxSizePerRange, maxKeysPerRange, err := local.GetRegionSplitSizeKeys(ctx)
if err != nil {
logger.Warn("fail to get region split keys and size", zap.Error(err))
var maxSizePerRange = int64(config.SplitRegionSize)
var maxKeysPerRange = int64(config.SplitRegionKeys)
if store != nil {
pdCli := store.GetPDClient()
tls, err := ingest.NewDDLTLS()
if err == nil {
size, keys, err := local.GetRegionSplitSizeKeys(ctx, pdCli, tls)
if err == nil {
maxSizePerRange = max(maxSizePerRange, size)
maxKeysPerRange = max(maxKeysPerRange, keys)
} else {
logger.Warn("fail to get region split keys and size", zap.Error(err))
}
} else {
logger.Warn("fail to get region split keys and size", zap.Error(err))
}
}
maxSizePerRange = max(maxSizePerRange, int64(config.SplitRegionSize))
maxKeysPerRange = max(maxKeysPerRange, int64(config.SplitRegionKeys))

return external.NewRangeSplitter(ctx, multiFileStat, extStore,
rangeGroupSize, rangeGroupKeys, maxSizePerRange, maxKeysPerRange, true)
Expand Down
6 changes: 3 additions & 3 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -800,9 +800,9 @@ func (d *ddl) prepareWorkers4ConcurrencyDDL() {
reorgCnt := min(max(runtime.GOMAXPROCS(0)/4, 1), reorgWorkerCnt)
// local worker count at least 2 at most 10.
localCnt := min(max(runtime.GOMAXPROCS(0)/4, 2), localWorkerCnt)
d.reorgWorkerPool = newDDLWorkerPool(pools.NewResourcePool(workerFactory(addIdxWorker), reorgCnt, reorgCnt, 0), reorg)
d.generalDDLWorkerPool = newDDLWorkerPool(pools.NewResourcePool(workerFactory(generalWorker), generalWorkerCnt, generalWorkerCnt, 0), general)
d.localWorkerPool = newDDLWorkerPool(pools.NewResourcePool(workerFactory(localWorker), localCnt, localCnt, 0), local)
d.reorgWorkerPool = newDDLWorkerPool(pools.NewResourcePool(workerFactory(addIdxWorker), reorgCnt, reorgCnt, 0), jobTypeReorg)
d.generalDDLWorkerPool = newDDLWorkerPool(pools.NewResourcePool(workerFactory(generalWorker), generalWorkerCnt, generalWorkerCnt, 0), jobTypeGeneral)
d.localWorkerPool = newDDLWorkerPool(pools.NewResourcePool(workerFactory(localWorker), localCnt, localCnt, 0), jobTypeLocal)
failpoint.Inject("NoDDLDispatchLoop", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return()
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/ddl_workerpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestDDLWorkerPool(t *testing.T) {
return wk, nil
}
}
pool := newDDLWorkerPool(pools.NewResourcePool(f(), 1, 2, 0), reorg)
pool := newDDLWorkerPool(pools.NewResourcePool(f(), 1, 2, 0), jobTypeReorg)
pool.close()
pool.put(nil)
}
2 changes: 1 addition & 1 deletion pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2126,7 +2126,7 @@ func (w *worker) executeDistTask(reorgInfo *reorgInfo) error {
zap.Int("worker-cnt", workerCntLimit), zap.Int("task-concurrency", concurrency),
zap.String("task-key", taskKey))
taskMeta := &BackfillTaskMeta{
Job: *reorgInfo.Job.Clone(),
Job: *job.Clone(),
EleIDs: elemIDs,
EleTypeKey: reorgInfo.currElement.TypeKey,
CloudStorageURI: w.jobContext(job.ID, job.ReorgMeta).cloudStorageURI,
Expand Down
16 changes: 16 additions & 0 deletions pkg/ddl/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ package ingest
import (
"context"
"math"
"net"
"path/filepath"
"strconv"
"sync/atomic"

"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/br/pkg/lightning/common"
lightning "github.com/pingcap/tidb/br/pkg/lightning/config"
tidb "github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/util/logutil"
Expand Down Expand Up @@ -80,6 +83,19 @@ func genConfig(ctx context.Context, memRoot MemRoot, jobID int64, unique bool) (
return c, err
}

// NewDDLTLS creates a common.TLS from the tidb config for DDL.
func NewDDLTLS() (*common.TLS, error) {
tidbCfg := tidb.GetGlobalConfig()
hostPort := net.JoinHostPort("127.0.0.1", strconv.Itoa(int(tidbCfg.Status.StatusPort)))
return common.NewTLS(
tidbCfg.Security.ClusterSSLCA,
tidbCfg.Security.ClusterSSLCert,
tidbCfg.Security.ClusterSSLKey,
hostPort,
nil, nil, nil,
)
}

var (
compactMemory = 1 * size.GB
compactConcurrency = 4
Expand Down
18 changes: 9 additions & 9 deletions pkg/ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,26 +65,26 @@ type jobType int

func (t jobType) String() string {
switch t {
case general:
case jobTypeGeneral:
return "general"
case reorg:
case jobTypeReorg:
return "reorg"
case local:
case jobTypeLocal:
return "local"
}
return "unknown job type: " + strconv.Itoa(int(t))
}

const (
general jobType = iota
reorg
local
jobTypeGeneral jobType = iota
jobTypeReorg
jobTypeLocal
)

func (d *ddl) getJob(se *sess.Session, tp jobType, filter func(*model.Job) (bool, error)) (*model.Job, error) {
not := "not"
label := "get_job_general"
if tp == reorg {
if tp == jobTypeReorg {
not = ""
label = "get_job_reorg"
}
Expand Down Expand Up @@ -204,7 +204,7 @@ func (d *ddl) processJobDuringUpgrade(sess *sess.Session, job *model.Job) (isRun
}

func (d *ddl) getGeneralJob(sess *sess.Session) (*model.Job, error) {
return d.getJob(sess, general, func(job *model.Job) (bool, error) {
return d.getJob(sess, jobTypeGeneral, func(job *model.Job) (bool, error) {
if !d.runningJobs.checkRunnable(job) {
return false, nil
}
Expand All @@ -224,7 +224,7 @@ func (d *ddl) getGeneralJob(sess *sess.Session) (*model.Job, error) {
}

func (d *ddl) getReorgJob(sess *sess.Session) (*model.Job, error) {
return d.getJob(sess, reorg, func(job *model.Job) (bool, error) {
return d.getJob(sess, jobTypeReorg, func(job *model.Job) (bool, error) {
if !d.runningJobs.checkRunnable(job) {
return false, nil
}
Expand Down

0 comments on commit a257521

Please sign in to comment.