Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: support set explicit task type for ddl request #45789

Merged
merged 4 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -6924,13 +6924,13 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sha256 = "41aee514dad7b095f70a59843b8db9424b54cb1f11baf4f0608e2120768a0ab9",
strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20230731032349-719e6456f7d5",
sha256 = "ed4a6bacc74d58cca6eb30c8828a3c138c78895782b407e607dc5c13f3b338e7",
strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20230809050315-300545a8a3c4",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230731032349-719e6456f7d5.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230731032349-719e6456f7d5.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230731032349-719e6456f7d5.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230731032349-719e6456f7d5.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230809050315-300545a8a3c4.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230809050315-300545a8a3c4.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230809050315-300545a8a3c4.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20230809050315-300545a8a3c4.zip",
],
)
go_repository(
Expand Down
67 changes: 42 additions & 25 deletions br/pkg/lightning/backend/local/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
"github.com/pingcap/tidb/util/ranger"
tikverror "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/tikv"
kvutil "github.com/tikv/client-go/v2/util"
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/exp/slices"
Expand Down Expand Up @@ -306,6 +307,8 @@
region *split.RegionInfo,
keyRange tidbkv.KeyRange,
importClientFactory ImportClientFactory,
resourceGroupName string,
taskType string,
) (import_sstpb.ImportSST_DuplicateDetectClient, error) {
leader := region.Leader
if leader == nil {
Expand All @@ -319,6 +322,10 @@
RegionId: region.Region.GetId(),
RegionEpoch: region.Region.GetRegionEpoch(),
Peer: leader,
ResourceControlContext: &kvrpcpb.ResourceControlContext{
ResourceGroupName: resourceGroupName,
},
RequestSource: kvutil.BuildRequestSource(true, tidbkv.InternalTxnLightning, taskType),

Check warning on line 328 in br/pkg/lightning/backend/local/duplicate.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/backend/local/duplicate.go#L325-L328

Added lines #L325 - L328 were not covered by tests
}
req := &import_sstpb.DuplicateDetectRequest{
Context: reqCtx,
Expand All @@ -338,9 +345,11 @@
region *split.RegionInfo,
keyRange tidbkv.KeyRange,
importClientFactory ImportClientFactory,
resourceGroupName string,
taskType string,
) (*RemoteDupKVStream, error) {
subCtx, cancel := context.WithCancel(ctx)
cli, err := getDupDetectClient(subCtx, region, keyRange, importClientFactory)
cli, err := getDupDetectClient(subCtx, region, keyRange, importClientFactory, resourceGroupName, taskType)

Check warning on line 352 in br/pkg/lightning/backend/local/duplicate.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/backend/local/duplicate.go#L352

Added line #L352 was not covered by tests
if err != nil {
cancel()
return nil, errors.Trace(err)
Expand Down Expand Up @@ -398,17 +407,19 @@
// are stored into the errorMgr.
// this object can only be used once, either for local or remote deduplication.
type DupeDetector struct {
tbl table.Table
tableName string
splitCli split.SplitClient
tikvCli *tikv.KVStore
tikvCodec tikv.Codec
errorMgr *errormanager.ErrorManager
decoder *kv.TableKVDecoder
logger log.Logger
concurrency int
hasDupe atomic.Bool
indexID int64
tbl table.Table
tableName string
splitCli split.SplitClient
tikvCli *tikv.KVStore
tikvCodec tikv.Codec
errorMgr *errormanager.ErrorManager
decoder *kv.TableKVDecoder
logger log.Logger
concurrency int
hasDupe atomic.Bool
indexID int64
resourceGroupName string
taskType string
}

// NewDupeDetector creates a new DupeDetector.
Expand All @@ -422,23 +433,27 @@
sessOpts *encode.SessionOptions,
concurrency int,
logger log.Logger,
resourceGroupName string,
taskType string,
) (*DupeDetector, error) {
logger = logger.With(zap.String("tableName", tableName))
decoder, err := kv.NewTableKVDecoder(tbl, tableName, sessOpts, logger)
if err != nil {
return nil, errors.Trace(err)
}
return &DupeDetector{
tbl: tbl,
tableName: tableName,
splitCli: splitCli,
tikvCli: tikvCli,
tikvCodec: tikvCodec,
errorMgr: errMgr,
decoder: decoder,
logger: logger,
concurrency: concurrency,
indexID: sessOpts.IndexID,
tbl: tbl,
tableName: tableName,
splitCli: splitCli,
tikvCli: tikvCli,
tikvCodec: tikvCodec,
errorMgr: errMgr,
decoder: decoder,
logger: logger,
concurrency: concurrency,
indexID: sessOpts.IndexID,
resourceGroupName: resourceGroupName,
taskType: taskType,

Check warning on line 456 in br/pkg/lightning/backend/local/duplicate.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/backend/local/duplicate.go#L445-L456

Added lines #L445 - L456 were not covered by tests
}, nil
}

Expand Down Expand Up @@ -812,7 +827,7 @@
logutil.Key("dupDetectEndKey", kr.EndKey),
)
err := func() error {
stream, err := NewRemoteDupKVStream(ctx, region, kr, importClientFactory)
stream, err := NewRemoteDupKVStream(ctx, region, kr, importClientFactory, m.resourceGroupName, m.taskType)

Check warning on line 830 in br/pkg/lightning/backend/local/duplicate.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/backend/local/duplicate.go#L830

Added line #L830 was not covered by tests
if err != nil {
return errors.Annotatef(err, "failed to create remote duplicate kv stream")
}
Expand Down Expand Up @@ -937,6 +952,8 @@
duplicateDB *pebble.DB
keyAdapter KeyAdapter
importClientFactory ImportClientFactory
resourceGroupName string
taskType string
}

// CollectLocalDuplicateRows collect duplicate keys from local db. We will store the duplicate keys which
Expand All @@ -948,7 +965,7 @@
}()

duplicateManager, err := NewDupeDetector(tbl, tableName, local.splitCli, local.tikvCli, local.tikvCodec,
local.errorMgr, opts, local.dupeConcurrency, log.FromContext(ctx))
local.errorMgr, opts, local.dupeConcurrency, log.FromContext(ctx), local.resourceGroupName, local.taskType)

Check warning on line 968 in br/pkg/lightning/backend/local/duplicate.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/backend/local/duplicate.go#L968

Added line #L968 was not covered by tests
if err != nil {
return false, errors.Trace(err)
}
Expand All @@ -967,7 +984,7 @@
}()

duplicateManager, err := NewDupeDetector(tbl, tableName, local.splitCli, local.tikvCli, local.tikvCodec,
local.errorMgr, opts, local.dupeConcurrency, log.FromContext(ctx))
local.errorMgr, opts, local.dupeConcurrency, log.FromContext(ctx), local.resourceGroupName, local.taskType)

Check warning on line 987 in br/pkg/lightning/backend/local/duplicate.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/backend/local/duplicate.go#L987

Added line #L987 was not covered by tests
if err != nil {
return false, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/local/duplicate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestBuildDupTask(t *testing.T) {
}
for _, tc := range testCases {
dupMgr, err := local.NewDupeDetector(tbl, "t", nil, nil, keyspace.CodecV1, nil,
tc.sessOpt, 4, log.FromContext(context.Background()))
tc.sessOpt, 4, log.FromContext(context.Background()), "test", "lightning")
require.NoError(t, err)
tasks, err := local.BuildDuplicateTaskForTest(dupMgr)
require.NoError(t, err)
Expand Down
6 changes: 5 additions & 1 deletion br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,11 +423,12 @@
// the scope when pause PD schedulers.
PausePDSchedulerScope config.PausePDSchedulerScope
ResourceGroupName string
TaskType string
RaftKV2SwitchModeDuration time.Duration
}

// NewBackendConfig creates a new BackendConfig.
func NewBackendConfig(cfg *config.Config, maxOpenFiles int, keyspaceName, resourceGroupName string, raftKV2SwitchModeDuration time.Duration) BackendConfig {
func NewBackendConfig(cfg *config.Config, maxOpenFiles int, keyspaceName, resourceGroupName, taskType string, raftKV2SwitchModeDuration time.Duration) BackendConfig {

Check warning on line 431 in br/pkg/lightning/backend/local/local.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/backend/local/local.go#L431

Added line #L431 was not covered by tests
return BackendConfig{
PDAddr: cfg.TiDB.PdAddr,
LocalStoreDir: cfg.TikvImporter.SortedKVDir,
Expand All @@ -449,6 +450,7 @@
KeyspaceName: keyspaceName,
PausePDSchedulerScope: cfg.TikvImporter.PausePDSchedulerScope,
ResourceGroupName: resourceGroupName,
TaskType: taskType,

Check warning on line 453 in br/pkg/lightning/backend/local/local.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/backend/local/local.go#L453

Added line #L453 was not covered by tests
RaftKV2SwitchModeDuration: raftKV2SwitchModeDuration,
}
}
Expand Down Expand Up @@ -1680,6 +1682,8 @@
duplicateDB: local.duplicateDB,
keyAdapter: local.keyAdapter,
importClientFactory: local.importClientFactory,
resourceGroupName: local.ResourceGroupName,
taskType: local.TaskType,

Check warning on line 1686 in br/pkg/lightning/backend/local/local.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/backend/local/local.go#L1685-L1686

Added lines #L1685 - L1686 were not covered by tests
}
}

Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/backend/local/region_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@
ResourceControlContext: &kvrpcpb.ResourceControlContext{
ResourceGroupName: local.ResourceGroupName,
},
RequestSource: util.BuildRequestSource(true, kv.InternalTxnLightning, util.ExplicitTypeLightning),
RequestSource: util.BuildRequestSource(true, kv.InternalTxnLightning, local.TaskType),

Check warning on line 271 in br/pkg/lightning/backend/local/region_job.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/backend/local/region_job.go#L271

Added line #L271 was not covered by tests
},
}
for _, peer := range region.GetPeers() {
Expand Down Expand Up @@ -600,7 +600,7 @@
ResourceControlContext: &kvrpcpb.ResourceControlContext{
ResourceGroupName: local.ResourceGroupName,
},
RequestSource: util.BuildRequestSource(true, kv.InternalTxnLightning, util.ExplicitTypeLightning),
RequestSource: util.BuildRequestSource(true, kv.InternalTxnLightning, local.TaskType),

Check warning on line 603 in br/pkg/lightning/backend/local/region_job.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/backend/local/region_job.go#L603

Added line #L603 was not covered by tests
}

if supportMultiIngest {
Expand Down
7 changes: 1 addition & 6 deletions br/pkg/lightning/importer/checksum_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,7 @@ func NewChecksumManager(ctx context.Context, rc *Controller, store kv.Storage) (
backoffWeight = local.DefaultBackoffWeight
}

explicitRequestSourceType, err := common.GetExplicitRequestSourceTypeFromDB(ctx, rc.db)
if err != nil {
log.FromContext(ctx).Warn("get tidb_request_source_type failed", zap.Error(err), zap.String("tidb_request_source_type", explicitRequestSourceType))
return nil, errors.Trace(err)
}
manager = local.NewTiKVChecksumManager(store.GetClient(), rc.pdCli, uint(rc.cfg.TiDB.DistSQLScanConcurrency), backoffWeight, rc.resourceGroupName, explicitRequestSourceType)
manager = local.NewTiKVChecksumManager(store.GetClient(), rc.pdCli, uint(rc.cfg.TiDB.DistSQLScanConcurrency), backoffWeight, rc.resourceGroupName, rc.taskType)
} else {
manager = local.NewTiDBChecksumExecutor(rc.db)
}
Expand Down
17 changes: 16 additions & 1 deletion br/pkg/lightning/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/driver"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/collate"
Expand All @@ -64,6 +65,7 @@ import (
"github.com/pingcap/tidb/util/set"
"github.com/prometheus/client_golang/prometheus"
tikvconfig "github.com/tikv/client-go/v2/config"
kvutil "github.com/tikv/client-go/v2/util"
pd "github.com/tikv/pd/client"
"go.uber.org/atomic"
"go.uber.org/multierr"
Expand Down Expand Up @@ -235,6 +237,7 @@ type Controller struct {

keyspaceName string
resourceGroupName string
taskType string
}

// LightningStatus provides the finished bytes and total bytes of the current task.
Expand Down Expand Up @@ -274,6 +277,8 @@ type ControllerParam struct {
KeyspaceName string
// ResourceGroup name for current TiDB user
ResourceGroupName string
// TaskType is the source component name use for background task control.
TaskType string
}

// NewImportController creates a new Controller instance.
Expand Down Expand Up @@ -383,6 +388,15 @@ func NewImportControllerWithPauser(
}
}

taskType, err := common.GetExplicitRequestSourceTypeFromDB(ctx, db)
if err != nil {
return nil, errors.Annotatef(err, "get system variable '%s' failed", variable.TiDBExplicitRequestSourceType)
}
if taskType == "" {
taskType = kvutil.ExplicitTypeLightning
}
p.TaskType = taskType

isRaftKV2, err := common.IsRaftKV2(ctx, db)
if err != nil {
log.FromContext(ctx).Warn("check isRaftKV2 failed", zap.Error(err))
Expand All @@ -391,7 +405,7 @@ func NewImportControllerWithPauser(
if isRaftKV2 {
raftKV2SwitchModeDuration = cfg.Cron.SwitchMode.Duration
}
backendConfig := local.NewBackendConfig(cfg, maxOpenFiles, p.KeyspaceName, p.ResourceGroupName, raftKV2SwitchModeDuration)
backendConfig := local.NewBackendConfig(cfg, maxOpenFiles, p.KeyspaceName, p.ResourceGroupName, p.TaskType, raftKV2SwitchModeDuration)
backendObj, err = local.NewBackend(ctx, tls, backendConfig, regionSizeGetter)
if err != nil {
return nil, common.NormalizeOrWrapErr(common.ErrUnknown, err)
Expand Down Expand Up @@ -494,6 +508,7 @@ func NewImportControllerWithPauser(

keyspaceName: p.KeyspaceName,
resourceGroupName: p.ResourceGroupName,
taskType: p.TaskType,
}

return rc, nil
Expand Down
3 changes: 3 additions & 0 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/pingcap/tidb/util/topsql"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/tikv"
kvutil "github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -775,6 +776,7 @@ func iterateSnapshotKeys(ctx *JobContext, store kv.Storage, priority int, keyPre
snap.SetOption(kv.Priority, priority)
snap.SetOption(kv.RequestSourceInternal, true)
snap.SetOption(kv.RequestSourceType, ctx.ddlJobSourceType())
snap.SetOption(kv.ExplicitRequestSourceType, kvutil.ExplicitTypeDDL)
if tagger := ctx.getResourceGroupTaggerForTopSQL(); tagger != nil {
snap.SetOption(kv.ResourceGroupTagger, tagger)
}
Expand Down Expand Up @@ -824,6 +826,7 @@ func GetRangeEndKey(ctx *JobContext, store kv.Storage, priority int, keyPrefix k
}
snap.SetOption(kv.RequestSourceInternal, true)
snap.SetOption(kv.RequestSourceType, ctx.ddlJobSourceType())
snap.SetOption(kv.ExplicitRequestSourceType, kvutil.ExplicitTypeDDL)
it, err := snap.IterReverse(endKey, nil)
if err != nil {
return nil, errors.Trace(err)
Expand Down
3 changes: 2 additions & 1 deletion ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
decoder "github.com/pingcap/tidb/util/rowDecoder"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tidb/util/sqlexec"
kvutil "github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -1439,7 +1440,7 @@ func (w *updateColumnWorker) cleanRowMap() {
// BackfillData will backfill the table record in a transaction. A lock corresponds to a rowKey if the value of rowKey is changed.
func (w *updateColumnWorker) BackfillData(handleRange reorgBackfillTask) (taskCtx backfillTaskContext, errInTxn error) {
oprStartTime := time.Now()
ctx := kv.WithInternalSourceType(context.Background(), w.jobContext.ddlJobSourceType())
ctx := kv.WithInternalSourceAndTaskType(context.Background(), w.jobContext.ddlJobSourceType(), kvutil.ExplicitTypeDDL)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why it cannot use WithInternalSourceType? it should also be ddl?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ddlJobSourceType() returns something like "ddl_add_index", so in our current check logic, this can't be recognized as ddl.

errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error {
taskCtx.addedCount = 0
taskCtx.scanCount = 0
Expand Down
3 changes: 2 additions & 1 deletion ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/pingcap/tidb/util/topsql"
topsqlstate "github.com/pingcap/tidb/util/topsql/state"
"github.com/tikv/client-go/v2/tikvrpc"
kvutil "github.com/tikv/client-go/v2/util"
clientv3 "go.etcd.io/etcd/client/v3"
atomicutil "go.uber.org/atomic"
"go.uber.org/zap"
Expand Down Expand Up @@ -700,7 +701,7 @@ func (w *JobContext) setDDLLabelForDiagnosis(jobType model.ActionType) {
return
}
w.tp = getDDLRequestSource(jobType)
w.ddlJobCtx = kv.WithInternalSourceType(w.ddlJobCtx, w.ddlJobSourceType())
w.ddlJobCtx = kv.WithInternalSourceAndTaskType(w.ddlJobCtx, w.ddlJobSourceType(), kvutil.ExplicitTypeDDL)
}

func (w *worker) HandleJobDone(d *ddlCtx, job *model.Job, t *meta.Meta) error {
Expand Down
5 changes: 3 additions & 2 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
kvutil "github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
"golang.org/x/exp/slices"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -1732,7 +1733,7 @@

oprStartTime := time.Now()
jobID := handleRange.getJobID()
ctx := kv.WithInternalSourceType(context.Background(), w.jobContext.ddlJobSourceType())
ctx := kv.WithInternalSourceAndTaskType(context.Background(), w.jobContext.ddlJobSourceType(), kvutil.ExplicitTypeDDL)
errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) (err error) {
taskCtx.finishTS = txn.StartTS()
taskCtx.addedCount = 0
Expand Down Expand Up @@ -2081,7 +2082,7 @@
})

oprStartTime := time.Now()
ctx := kv.WithInternalSourceType(context.Background(), w.jobContext.ddlJobSourceType())
ctx := kv.WithInternalSourceAndTaskType(context.Background(), w.jobContext.ddlJobSourceType(), kvutil.ExplicitTypeDDL)

Check warning on line 2085 in ddl/index.go

View check run for this annotation

Codecov / codecov/patch

ddl/index.go#L2085

Added line #L2085 was not covered by tests
errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error {
taskCtx.addedCount = 0
taskCtx.scanCount = 0
Expand Down