Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: support set explicit task type for ddl request #45789

Merged
merged 4 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -6924,13 +6924,13 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sha256 = "41aee514dad7b095f70a59843b8db9424b54cb1f11baf4f0608e2120768a0ab9",
strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20230731032349-719e6456f7d5",
sha256 = "6220f72ec9098282705c8faf333f8f8b1fa675e38d320d29aabce010e1c7951f",
strip_prefix = "github.com/glorv/client-go/v2@v2.0.1-0.20230803025210-25c8c1bc7df1",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230731032349-719e6456f7d5.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230731032349-719e6456f7d5.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230731032349-719e6456f7d5.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230731032349-719e6456f7d5.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/glorv/client-go/v2/com_github_glorv_client_go_v2-v2.0.1-0.20230803025210-25c8c1bc7df1.zip",
"http://ats.apps.svc/gomod/github.com/glorv/client-go/v2/com_github_glorv_client_go_v2-v2.0.1-0.20230803025210-25c8c1bc7df1.zip",
"https://cache.hawkingrei.com/gomod/github.com/glorv/client-go/v2/com_github_glorv_client_go_v2-v2.0.1-0.20230803025210-25c8c1bc7df1.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/glorv/client-go/v2/com_github_glorv_client_go_v2-v2.0.1-0.20230803025210-25c8c1bc7df1.zip",
],
)
go_repository(
Expand Down
67 changes: 42 additions & 25 deletions br/pkg/lightning/backend/local/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
"github.com/pingcap/tidb/util/ranger"
tikverror "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/tikv"
kvutil "github.com/tikv/client-go/v2/util"
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/exp/slices"
Expand Down Expand Up @@ -306,6 +307,8 @@
region *split.RegionInfo,
keyRange tidbkv.KeyRange,
importClientFactory ImportClientFactory,
resourceGroupName string,
taskType string,
) (import_sstpb.ImportSST_DuplicateDetectClient, error) {
leader := region.Leader
if leader == nil {
Expand All @@ -319,6 +322,10 @@
RegionId: region.Region.GetId(),
RegionEpoch: region.Region.GetRegionEpoch(),
Peer: leader,
ResourceControlContext: &kvrpcpb.ResourceControlContext{
ResourceGroupName: resourceGroupName,
},
RequestSource: kvutil.BuildRequestSource(true, tidbkv.InternalTxnLightning, taskType),

Check warning on line 328 in br/pkg/lightning/backend/local/duplicate.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/backend/local/duplicate.go#L325-L328

Added lines #L325 - L328 were not covered by tests
}
req := &import_sstpb.DuplicateDetectRequest{
Context: reqCtx,
Expand All @@ -338,9 +345,11 @@
region *split.RegionInfo,
keyRange tidbkv.KeyRange,
importClientFactory ImportClientFactory,
resourceGroupName string,
taskType string,
) (*RemoteDupKVStream, error) {
subCtx, cancel := context.WithCancel(ctx)
cli, err := getDupDetectClient(subCtx, region, keyRange, importClientFactory)
cli, err := getDupDetectClient(subCtx, region, keyRange, importClientFactory, resourceGroupName, taskType)

Check warning on line 352 in br/pkg/lightning/backend/local/duplicate.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/backend/local/duplicate.go#L352

Added line #L352 was not covered by tests
if err != nil {
cancel()
return nil, errors.Trace(err)
Expand Down Expand Up @@ -398,17 +407,19 @@
// are stored into the errorMgr.
// this object can only be used once, either for local or remote deduplication.
type DupeDetector struct {
tbl table.Table
tableName string
splitCli split.SplitClient
tikvCli *tikv.KVStore
tikvCodec tikv.Codec
errorMgr *errormanager.ErrorManager
decoder *kv.TableKVDecoder
logger log.Logger
concurrency int
hasDupe atomic.Bool
indexID int64
tbl table.Table
tableName string
splitCli split.SplitClient
tikvCli *tikv.KVStore
tikvCodec tikv.Codec
errorMgr *errormanager.ErrorManager
decoder *kv.TableKVDecoder
logger log.Logger
concurrency int
hasDupe atomic.Bool
indexID int64
resourceGroupName string
taskType string
}

// NewDupeDetector creates a new DupeDetector.
Expand All @@ -422,23 +433,27 @@
sessOpts *encode.SessionOptions,
concurrency int,
logger log.Logger,
resourceGroupName string,
taskType string,
) (*DupeDetector, error) {
logger = logger.With(zap.String("tableName", tableName))
decoder, err := kv.NewTableKVDecoder(tbl, tableName, sessOpts, logger)
if err != nil {
return nil, errors.Trace(err)
}
return &DupeDetector{
tbl: tbl,
tableName: tableName,
splitCli: splitCli,
tikvCli: tikvCli,
tikvCodec: tikvCodec,
errorMgr: errMgr,
decoder: decoder,
logger: logger,
concurrency: concurrency,
indexID: sessOpts.IndexID,
tbl: tbl,
tableName: tableName,
splitCli: splitCli,
tikvCli: tikvCli,
tikvCodec: tikvCodec,
errorMgr: errMgr,
decoder: decoder,
logger: logger,
concurrency: concurrency,
indexID: sessOpts.IndexID,
resourceGroupName: resourceGroupName,
taskType: taskType,
}, nil
}

Expand Down Expand Up @@ -812,7 +827,7 @@
logutil.Key("dupDetectEndKey", kr.EndKey),
)
err := func() error {
stream, err := NewRemoteDupKVStream(ctx, region, kr, importClientFactory)
stream, err := NewRemoteDupKVStream(ctx, region, kr, importClientFactory, m.resourceGroupName, m.taskType)

Check warning on line 830 in br/pkg/lightning/backend/local/duplicate.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/backend/local/duplicate.go#L830

Added line #L830 was not covered by tests
if err != nil {
return errors.Annotatef(err, "failed to create remote duplicate kv stream")
}
Expand Down Expand Up @@ -937,6 +952,8 @@
duplicateDB *pebble.DB
keyAdapter KeyAdapter
importClientFactory ImportClientFactory
resourceGroupName string
taskType string
}

// CollectLocalDuplicateRows collect duplicate keys from local db. We will store the duplicate keys which
Expand All @@ -948,7 +965,7 @@
}()

duplicateManager, err := NewDupeDetector(tbl, tableName, local.splitCli, local.tikvCli, local.tikvCodec,
local.errorMgr, opts, local.dupeConcurrency, log.FromContext(ctx))
local.errorMgr, opts, local.dupeConcurrency, log.FromContext(ctx), local.resourceGroupName, local.taskType)

Check warning on line 968 in br/pkg/lightning/backend/local/duplicate.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/backend/local/duplicate.go#L968

Added line #L968 was not covered by tests
if err != nil {
return false, errors.Trace(err)
}
Expand All @@ -967,7 +984,7 @@
}()

duplicateManager, err := NewDupeDetector(tbl, tableName, local.splitCli, local.tikvCli, local.tikvCodec,
local.errorMgr, opts, local.dupeConcurrency, log.FromContext(ctx))
local.errorMgr, opts, local.dupeConcurrency, log.FromContext(ctx), local.resourceGroupName, local.taskType)

Check warning on line 987 in br/pkg/lightning/backend/local/duplicate.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/backend/local/duplicate.go#L987

Added line #L987 was not covered by tests
if err != nil {
return false, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/local/duplicate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestBuildDupTask(t *testing.T) {
}
for _, tc := range testCases {
dupMgr, err := local.NewDupeDetector(tbl, "t", nil, nil, keyspace.CodecV1, nil,
tc.sessOpt, 4, log.FromContext(context.Background()))
tc.sessOpt, 4, log.FromContext(context.Background()), "test", "lightning")
require.NoError(t, err)
tasks, err := local.BuildDuplicateTaskForTest(dupMgr)
require.NoError(t, err)
Expand Down
6 changes: 5 additions & 1 deletion br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,11 +423,12 @@
// the scope when pause PD schedulers.
PausePDSchedulerScope config.PausePDSchedulerScope
ResourceGroupName string
TaskType string
RaftKV2SwitchModeDuration time.Duration
}

// NewBackendConfig creates a new BackendConfig.
func NewBackendConfig(cfg *config.Config, maxOpenFiles int, keyspaceName, resourceGroupName string, raftKV2SwitchModeDuration time.Duration) BackendConfig {
func NewBackendConfig(cfg *config.Config, maxOpenFiles int, keyspaceName, resourceGroupName, taskType string, raftKV2SwitchModeDuration time.Duration) BackendConfig {

Check warning on line 431 in br/pkg/lightning/backend/local/local.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/backend/local/local.go#L431

Added line #L431 was not covered by tests
return BackendConfig{
PDAddr: cfg.TiDB.PdAddr,
LocalStoreDir: cfg.TikvImporter.SortedKVDir,
Expand All @@ -449,6 +450,7 @@
KeyspaceName: keyspaceName,
PausePDSchedulerScope: cfg.TikvImporter.PausePDSchedulerScope,
ResourceGroupName: resourceGroupName,
TaskType: taskType,

Check warning on line 453 in br/pkg/lightning/backend/local/local.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/backend/local/local.go#L453

Added line #L453 was not covered by tests
RaftKV2SwitchModeDuration: raftKV2SwitchModeDuration,
}
}
Expand Down Expand Up @@ -1680,6 +1682,8 @@
duplicateDB: local.duplicateDB,
keyAdapter: local.keyAdapter,
importClientFactory: local.importClientFactory,
resourceGroupName: local.ResourceGroupName,
taskType: local.TaskType,

Check warning on line 1686 in br/pkg/lightning/backend/local/local.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/backend/local/local.go#L1685-L1686

Added lines #L1685 - L1686 were not covered by tests
}
}

Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/backend/local/region_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (local *Backend) writeToTiKV(ctx context.Context, j *regionJob) error {
ResourceControlContext: &kvrpcpb.ResourceControlContext{
ResourceGroupName: local.ResourceGroupName,
},
RequestSource: util.BuildRequestSource(true, kv.InternalTxnLightning, util.ExplicitTypeLightning),
RequestSource: util.BuildRequestSource(true, kv.InternalTxnLightning, local.TaskType),
},
}
for _, peer := range region.GetPeers() {
Expand Down Expand Up @@ -600,7 +600,7 @@ func (local *Backend) doIngest(ctx context.Context, j *regionJob) (*sst.IngestRe
ResourceControlContext: &kvrpcpb.ResourceControlContext{
ResourceGroupName: local.ResourceGroupName,
},
RequestSource: util.BuildRequestSource(true, kv.InternalTxnLightning, util.ExplicitTypeLightning),
RequestSource: util.BuildRequestSource(true, kv.InternalTxnLightning, local.TaskType),
}

if supportMultiIngest {
Expand Down
7 changes: 1 addition & 6 deletions br/pkg/lightning/importer/checksum_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,7 @@
backoffWeight = local.DefaultBackoffWeight
}

explicitRequestSourceType, err := common.GetExplicitRequestSourceTypeFromDB(ctx, rc.db)
if err != nil {
log.FromContext(ctx).Warn("get tidb_request_source_type failed", zap.Error(err), zap.String("tidb_request_source_type", explicitRequestSourceType))
return nil, errors.Trace(err)
}
manager = local.NewTiKVChecksumManager(store.GetClient(), rc.pdCli, uint(rc.cfg.TiDB.DistSQLScanConcurrency), backoffWeight, rc.resourceGroupName, explicitRequestSourceType)
manager = local.NewTiKVChecksumManager(store.GetClient(), rc.pdCli, uint(rc.cfg.TiDB.DistSQLScanConcurrency), backoffWeight, rc.resourceGroupName, rc.taskType)

Check warning on line 56 in br/pkg/lightning/importer/checksum_helper.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/importer/checksum_helper.go#L56

Added line #L56 was not covered by tests
} else {
manager = local.NewTiDBChecksumExecutor(rc.db)
}
Expand Down
17 changes: 16 additions & 1 deletion br/pkg/lightning/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/driver"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/collate"
Expand All @@ -64,6 +65,7 @@
"github.com/pingcap/tidb/util/set"
"github.com/prometheus/client_golang/prometheus"
tikvconfig "github.com/tikv/client-go/v2/config"
kvutil "github.com/tikv/client-go/v2/util"
pd "github.com/tikv/pd/client"
"go.uber.org/atomic"
"go.uber.org/multierr"
Expand Down Expand Up @@ -235,6 +237,7 @@

keyspaceName string
resourceGroupName string
taskType string
}

// LightningStatus provides the finished bytes and total bytes of the current task.
Expand Down Expand Up @@ -274,6 +277,8 @@
KeyspaceName string
// ResourceGroup name for current TiDB user
ResourceGroupName string
// TaskType is the source component name use for background task control.
TaskType string
}

// NewImportController creates a new Controller instance.
Expand Down Expand Up @@ -383,6 +388,15 @@
}
}

taskType, err := common.GetExplicitRequestSourceTypeFromDB(ctx, db)
if err != nil {
return nil, errors.Annotatef(err, "get system variable '%s' failed", variable.TiDBExplicitRequestSourceType)
}
if taskType == "" {
taskType = kvutil.ExplicitTypeLightning
}
p.TaskType = taskType

Check warning on line 399 in br/pkg/lightning/importer/import.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/importer/import.go#L391-L399

Added lines #L391 - L399 were not covered by tests
isRaftKV2, err := common.IsRaftKV2(ctx, db)
if err != nil {
log.FromContext(ctx).Warn("check isRaftKV2 failed", zap.Error(err))
Expand All @@ -391,7 +405,7 @@
if isRaftKV2 {
raftKV2SwitchModeDuration = cfg.Cron.SwitchMode.Duration
}
backendConfig := local.NewBackendConfig(cfg, maxOpenFiles, p.KeyspaceName, p.ResourceGroupName, raftKV2SwitchModeDuration)
backendConfig := local.NewBackendConfig(cfg, maxOpenFiles, p.KeyspaceName, p.ResourceGroupName, p.TaskType, raftKV2SwitchModeDuration)

Check warning on line 408 in br/pkg/lightning/importer/import.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/importer/import.go#L408

Added line #L408 was not covered by tests
backendObj, err = local.NewBackend(ctx, tls, backendConfig, regionSizeGetter)
if err != nil {
return nil, common.NormalizeOrWrapErr(common.ErrUnknown, err)
Expand Down Expand Up @@ -494,6 +508,7 @@

keyspaceName: p.KeyspaceName,
resourceGroupName: p.ResourceGroupName,
taskType: p.TaskType,

Check warning on line 511 in br/pkg/lightning/importer/import.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/importer/import.go#L511

Added line #L511 was not covered by tests
}

return rc, nil
Expand Down
3 changes: 3 additions & 0 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/pingcap/tidb/util/topsql"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/tikv"
kvutil "github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -789,6 +790,7 @@ func iterateSnapshotKeys(ctx *JobContext, store kv.Storage, priority int, keyPre
snap.SetOption(kv.Priority, priority)
snap.SetOption(kv.RequestSourceInternal, true)
snap.SetOption(kv.RequestSourceType, ctx.ddlJobSourceType())
snap.SetOption(kv.ExplicitRequestSourceType, kvutil.ExplicitTypeDDL)
if tagger := ctx.getResourceGroupTaggerForTopSQL(); tagger != nil {
snap.SetOption(kv.ResourceGroupTagger, tagger)
}
Expand Down Expand Up @@ -838,6 +840,7 @@ func getRangeEndKey(ctx *JobContext, store kv.Storage, priority int, keyPrefix k
}
snap.SetOption(kv.RequestSourceInternal, true)
snap.SetOption(kv.RequestSourceType, ctx.ddlJobSourceType())
snap.SetOption(kv.ExplicitRequestSourceType, kvutil.ExplicitTypeDDL)
it, err := snap.IterReverse(endKey.Next(), nil)
if err != nil {
return nil, errors.Trace(err)
Expand Down
3 changes: 2 additions & 1 deletion ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
decoder "github.com/pingcap/tidb/util/rowDecoder"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tidb/util/sqlexec"
kvutil "github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -1443,7 +1444,7 @@ func (w *updateColumnWorker) cleanRowMap() {
// BackfillData will backfill the table record in a transaction. A lock corresponds to a rowKey if the value of rowKey is changed.
func (w *updateColumnWorker) BackfillData(handleRange reorgBackfillTask) (taskCtx backfillTaskContext, errInTxn error) {
oprStartTime := time.Now()
ctx := kv.WithInternalSourceType(context.Background(), w.jobContext.ddlJobSourceType())
ctx := kv.WithInternalSourceAndTaskType(context.Background(), w.jobContext.ddlJobSourceType(), kvutil.ExplicitTypeDDL)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why it cannot use WithInternalSourceType? it should also be ddl?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ddlJobSourceType() returns something like "ddl_add_index", so in our current check logic, this can't be recognized as ddl.

errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error {
taskCtx.addedCount = 0
taskCtx.scanCount = 0
Expand Down
3 changes: 2 additions & 1 deletion ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/pingcap/tidb/util/topsql"
topsqlstate "github.com/pingcap/tidb/util/topsql/state"
"github.com/tikv/client-go/v2/tikvrpc"
kvutil "github.com/tikv/client-go/v2/util"
clientv3 "go.etcd.io/etcd/client/v3"
atomicutil "go.uber.org/atomic"
"go.uber.org/zap"
Expand Down Expand Up @@ -700,7 +701,7 @@ func (w *JobContext) setDDLLabelForDiagnosis(jobType model.ActionType) {
return
}
w.tp = getDDLRequestSource(jobType)
w.ddlJobCtx = kv.WithInternalSourceType(w.ddlJobCtx, w.ddlJobSourceType())
w.ddlJobCtx = kv.WithInternalSourceAndTaskType(w.ddlJobCtx, w.ddlJobSourceType(), kvutil.ExplicitTypeDDL)
}

func (w *worker) HandleJobDone(d *ddlCtx, job *model.Job, t *meta.Meta) error {
Expand Down
5 changes: 3 additions & 2 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
kvutil "github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -1739,7 +1740,7 @@ func (w *addIndexTxnWorker) BackfillData(handleRange reorgBackfillTask) (taskCtx

oprStartTime := time.Now()
jobID := handleRange.getJobID()
ctx := kv.WithInternalSourceType(context.Background(), w.jobContext.ddlJobSourceType())
ctx := kv.WithInternalSourceAndTaskType(context.Background(), w.jobContext.ddlJobSourceType(), kvutil.ExplicitTypeDDL)
errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) (err error) {
taskCtx.finishTS = txn.StartTS()
taskCtx.addedCount = 0
Expand Down Expand Up @@ -2088,7 +2089,7 @@ func (w *cleanUpIndexWorker) BackfillData(handleRange reorgBackfillTask) (taskCt
})

oprStartTime := time.Now()
ctx := kv.WithInternalSourceType(context.Background(), w.jobContext.ddlJobSourceType())
ctx := kv.WithInternalSourceAndTaskType(context.Background(), w.jobContext.ddlJobSourceType(), kvutil.ExplicitTypeDDL)
errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error {
taskCtx.addedCount = 0
taskCtx.scanCount = 0
Expand Down