From 900b27b7c2ba631a8b37bf0a894fb7f64a073518 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Wed, 20 Sep 2023 17:04:44 +0800 Subject: [PATCH] importinto: cleanup sorted files, writer memory quota, and test (#47092) ref pingcap/tidb#46704 --- br/pkg/lightning/backend/external/BUILD.bazel | 3 +- .../lightning/backend/external/util_test.go | 74 ++++++++++ br/pkg/lightning/backend/external/writer.go | 14 +- .../lightning/backend/external/writer_test.go | 12 +- br/pkg/lightning/backend/local/BUILD.bazel | 1 + br/pkg/lightning/backend/local/local.go | 7 +- br/pkg/lightning/backend/local/local_test.go | 10 ++ disttask/importinto/BUILD.bazel | 11 +- disttask/importinto/dispatcher.go | 88 ++++++++++-- disttask/importinto/dispatcher_test.go | 66 +++++++++ .../importinto/dispatcher_testkit_test.go | 132 +++++++++++++++++- .../importinto/encode_and_sort_operator.go | 61 +++++++- .../encode_and_sort_operator_test.go | 69 ++++++++- disttask/importinto/job_testkit_test.go | 107 ++++++++++++++ disttask/importinto/planner.go | 31 +++- disttask/importinto/scheduler.go | 8 +- .../realtikvtest/importintotest4/BUILD.bazel | 2 + .../importintotest4/global_sort_test.go | 106 ++++++++++++++ 18 files changed, 762 insertions(+), 40 deletions(-) create mode 100644 disttask/importinto/job_testkit_test.go create mode 100644 tests/realtikvtest/importintotest4/global_sort_test.go diff --git a/br/pkg/lightning/backend/external/BUILD.bazel b/br/pkg/lightning/backend/external/BUILD.bazel index 8dc5e30b31ffd..e33c4de6ce698 100644 --- a/br/pkg/lightning/backend/external/BUILD.bazel +++ b/br/pkg/lightning/backend/external/BUILD.bazel @@ -55,8 +55,9 @@ go_test( ], embed = [":external"], flaky = True, - shard_count = 34, + shard_count = 36, deps = [ + "//br/pkg/lightning/backend/kv", "//br/pkg/lightning/common", "//br/pkg/storage", "//kv", diff --git a/br/pkg/lightning/backend/external/util_test.go b/br/pkg/lightning/backend/external/util_test.go index b26b6bfbcde54..119dccac147f9 100644 --- a/br/pkg/lightning/backend/external/util_test.go +++ b/br/pkg/lightning/backend/external/util_test.go @@ -247,3 +247,77 @@ func TestGetMaxOverlapping(t *testing.T) { } require.EqualValues(t, 3, GetMaxOverlapping(points)) } + +func TestSortedKVMeta(t *testing.T) { + summary := []*WriterSummary{ + { + Min: []byte("a"), + Max: []byte("b"), + TotalSize: 123, + MultipleFilesStats: []MultipleFilesStat{ + { + Filenames: [][2]string{ + {"f1", "stat1"}, + {"f2", "stat2"}, + }, + }, + }, + }, + { + Min: []byte("x"), + Max: []byte("y"), + TotalSize: 177, + MultipleFilesStats: []MultipleFilesStat{ + { + Filenames: [][2]string{ + {"f3", "stat3"}, + {"f4", "stat4"}, + }, + }, + }, + }, + } + meta0 := NewSortedKVMeta(summary[0]) + require.Equal(t, []byte("a"), meta0.MinKey) + require.Equal(t, []byte("b"), meta0.MaxKey) + require.Equal(t, uint64(123), meta0.TotalKVSize) + require.Equal(t, []string{"f1", "f2"}, meta0.DataFiles) + require.Equal(t, []string{"stat1", "stat2"}, meta0.StatFiles) + meta1 := NewSortedKVMeta(summary[1]) + require.Equal(t, []byte("x"), meta1.MinKey) + require.Equal(t, []byte("y"), meta1.MaxKey) + require.Equal(t, uint64(177), meta1.TotalKVSize) + require.Equal(t, []string{"f3", "f4"}, meta1.DataFiles) + require.Equal(t, []string{"stat3", "stat4"}, meta1.StatFiles) + + meta0.MergeSummary(summary[1]) + require.Equal(t, []byte("a"), meta0.MinKey) + require.Equal(t, []byte("y"), meta0.MaxKey) + require.Equal(t, uint64(300), meta0.TotalKVSize) + require.Equal(t, []string{"f1", "f2", "f3", "f4"}, meta0.DataFiles) + require.Equal(t, []string{"stat1", "stat2", "stat3", "stat4"}, meta0.StatFiles) + + meta00 := NewSortedKVMeta(summary[0]) + meta00.Merge(meta1) + require.Equal(t, meta0, meta00) +} + +func TestKeyMinMax(t *testing.T) { + require.Equal(t, []byte(nil), NotNilMin(nil, nil)) + require.Equal(t, []byte{}, NotNilMin(nil, []byte{})) + require.Equal(t, []byte(nil), NotNilMin([]byte{}, nil)) + require.Equal(t, []byte("a"), NotNilMin([]byte("a"), nil)) + require.Equal(t, []byte("a"), NotNilMin([]byte("a"), []byte{})) + require.Equal(t, []byte("a"), NotNilMin(nil, []byte("a"))) + require.Equal(t, []byte("a"), NotNilMin([]byte("a"), []byte("b"))) + require.Equal(t, []byte("a"), NotNilMin([]byte("b"), []byte("a"))) + + require.Equal(t, []byte(nil), NotNilMax(nil, nil)) + require.Equal(t, []byte{}, NotNilMax(nil, []byte{})) + require.Equal(t, []byte(nil), NotNilMax([]byte{}, nil)) + require.Equal(t, []byte("a"), NotNilMax([]byte("a"), nil)) + require.Equal(t, []byte("a"), NotNilMax([]byte("a"), []byte{})) + require.Equal(t, []byte("a"), NotNilMax(nil, []byte("a"))) + require.Equal(t, []byte("b"), NotNilMax([]byte("a"), []byte("b"))) + require.Equal(t, []byte("b"), NotNilMax([]byte("b"), []byte("a"))) +} diff --git a/br/pkg/lightning/backend/external/writer.go b/br/pkg/lightning/backend/external/writer.go index 522705540b6e5..d79e25be1b509 100644 --- a/br/pkg/lightning/backend/external/writer.go +++ b/br/pkg/lightning/backend/external/writer.go @@ -38,6 +38,11 @@ import ( var multiFileStatNum = 500 +const ( + // DefaultMemSizeLimit is the default memory size limit for writer. + DefaultMemSizeLimit = 256 * size.MB +) + // rangePropertiesCollector collects range properties for each range. The zero // value of rangePropertiesCollector is not ready to use, should call reset() // first. @@ -61,8 +66,11 @@ func (rc *rangePropertiesCollector) encode() []byte { // WriterSummary is the summary of a writer. type WriterSummary struct { - WriterID string - Seq int + WriterID string + Seq int + // Min and Max are the min and max key written by this writer, both are + // inclusive, i.e. [Min, Max]. + // will be empty if no key is written. Min tidbkv.Key Max tidbkv.Key TotalSize uint64 @@ -90,7 +98,7 @@ type WriterBuilder struct { // NewWriterBuilder creates a WriterBuilder. func NewWriterBuilder() *WriterBuilder { return &WriterBuilder{ - memSizeLimit: 256 * size.MB, + memSizeLimit: DefaultMemSizeLimit, writeBatchCount: 8 * 1024, propSizeDist: 1 * size.MB, propKeysDist: 8 * 1024, diff --git a/br/pkg/lightning/backend/external/writer_test.go b/br/pkg/lightning/backend/external/writer_test.go index bf07607b41e80..c32876e46198c 100644 --- a/br/pkg/lightning/backend/external/writer_test.go +++ b/br/pkg/lightning/backend/external/writer_test.go @@ -26,6 +26,7 @@ import ( "time" "github.com/cockroachdb/pebble" + "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/storage" dbkv "github.com/pingcap/tidb/kv" @@ -41,11 +42,13 @@ func TestWriter(t *testing.T) { ctx := context.Background() memStore := storage.NewMemStorage() - writer := NewWriterBuilder(). + w := NewWriterBuilder(). SetPropSizeDistance(100). SetPropKeysDistance(2). Build(memStore, "/test", "0") + writer := NewEngineWriter(w) + kvCnt := rand.Intn(10) + 10 kvs := make([]common.KvPair, kvCnt) for i := 0; i < kvCnt; i++ { @@ -58,12 +61,9 @@ func TestWriter(t *testing.T) { _, err = rand.Read(kvs[i].Val) require.NoError(t, err) } - for _, pair := range kvs { - err := writer.WriteRow(ctx, pair.Key, pair.Val, nil) - require.NoError(t, err) - } - err := writer.Close(ctx) + require.NoError(t, writer.AppendRows(ctx, nil, kv.MakeRowsFromKvPairs(kvs))) + _, err := writer.Close(ctx) require.NoError(t, err) slices.SortFunc(kvs, func(i, j common.KvPair) int { diff --git a/br/pkg/lightning/backend/local/BUILD.bazel b/br/pkg/lightning/backend/local/BUILD.bazel index 9b867d0c972d6..33fbc53f058eb 100644 --- a/br/pkg/lightning/backend/local/BUILD.bazel +++ b/br/pkg/lightning/backend/local/BUILD.bazel @@ -58,6 +58,7 @@ go_library( "//util/compress", "//util/engine", "//util/hack", + "//util/intest", "//util/mathutil", "//util/ranger", "@com_github_cockroachdb_pebble//:pebble", diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index cf39bbfb73b52..efb7652307d40 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -59,6 +59,7 @@ import ( "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/engine" + "github.com/pingcap/tidb/util/intest" "github.com/pingcap/tidb/util/mathutil" "github.com/tikv/client-go/v2/oracle" tikvclient "github.com/tikv/client-go/v2/tikv" @@ -939,7 +940,11 @@ func (local *Backend) CloseEngine(ctx context.Context, cfg *backend.EngineConfig if err != nil { return err } - store, err := storage.New(ctx, storeBackend, nil) + opt := &storage.ExternalStorageOptions{} + if intest.InTest { + opt.NoCredentials = true + } + store, err := storage.New(ctx, storeBackend, opt) if err != nil { return err } diff --git a/br/pkg/lightning/backend/local/local_test.go b/br/pkg/lightning/backend/local/local_test.go index 87aae043d7ffa..d1ae8846087f3 100644 --- a/br/pkg/lightning/backend/local/local_test.go +++ b/br/pkg/lightning/backend/local/local_test.go @@ -2255,3 +2255,13 @@ func TestExternalEngine(t *testing.T) { } require.Equal(t, 100, kvIdx) } + +func TestGetExternalEngineKVStatistics(t *testing.T) { + b := Backend{ + externalEngine: map[uuid.UUID]common.Engine{}, + } + // non existent uuid + size, count := b.GetExternalEngineKVStatistics(uuid.New()) + require.Zero(t, size) + require.Zero(t, count) +} diff --git a/disttask/importinto/BUILD.bazel b/disttask/importinto/BUILD.bazel index ecab609ebc2c7..f50fe5d07ec0e 100644 --- a/disttask/importinto/BUILD.bazel +++ b/disttask/importinto/BUILD.bazel @@ -46,6 +46,7 @@ go_library( "//meta/autoid", "//metrics", "//parser/ast", + "//parser/model", "//parser/mysql", "//resourcemanager/pool/workerpool", "//resourcemanager/util", @@ -59,7 +60,9 @@ go_library( "//util/logutil", "//util/mathutil", "//util/promutil", + "//util/size", "//util/sqlexec", + "@com_github_docker_go_units//:go-units", "@com_github_go_sql_driver_mysql//:mysql", "@com_github_google_uuid//:uuid", "@com_github_pingcap_errors//:errors", @@ -78,6 +81,7 @@ go_test( "dispatcher_test.go", "dispatcher_testkit_test.go", "encode_and_sort_operator_test.go", + "job_testkit_test.go", "metrics_test.go", "planner_test.go", "subtask_executor_test.go", @@ -86,12 +90,14 @@ go_test( embed = [":importinto"], flaky = True, race = "on", - shard_count = 8, + shard_count = 11, deps = [ "//br/pkg/lightning/backend", + "//br/pkg/lightning/backend/external", "//br/pkg/lightning/checkpoints", "//br/pkg/lightning/mydump", "//br/pkg/lightning/verification", + "//ddl", "//disttask/framework/dispatcher", "//disttask/framework/planner", "//disttask/framework/proto", @@ -101,9 +107,12 @@ go_test( "//domain/infosync", "//executor/importer", "//meta/autoid", + "//parser", + "//parser/ast", "//parser/model", "//testkit", "//util/logutil", + "//util/mock", "//util/sqlexec", "@com_github_ngaut_pools//:pools", "@com_github_pingcap_errors//:errors", diff --git a/disttask/importinto/dispatcher.go b/disttask/importinto/dispatcher.go index e7bc3d464c79e..f0240e16507b3 100644 --- a/disttask/importinto/dispatcher.go +++ b/disttask/importinto/dispatcher.go @@ -25,9 +25,11 @@ import ( dmysql "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/br/pkg/lightning/backend/external" "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" "github.com/pingcap/tidb/br/pkg/lightning/common" "github.com/pingcap/tidb/br/pkg/lightning/config" + "github.com/pingcap/tidb/br/pkg/lightning/log" "github.com/pingcap/tidb/br/pkg/lightning/metric" "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/disttask/framework/dispatcher" @@ -122,7 +124,7 @@ func (t *taskInfo) close(ctx context.Context) { // ImportDispatcherExt is an extension of ImportDispatcher, exported for test. type ImportDispatcherExt struct { - globalSort bool + GlobalSort bool mu sync.RWMutex // NOTE: there's no need to sync for below 2 fields actually, since we add a restriction that only one // task can be running at a time. but we might support task queuing in the future, leave it for now. @@ -237,13 +239,16 @@ func (dsp *ImportDispatcherExt) OnNextSubtasksBatch( metrics.BytesCounter.WithLabelValues(metric.StateTotalRestore).Add(float64(taskMeta.Plan.TotalFileSize)) } jobStep := importer.JobStepImporting - if dsp.globalSort { + if dsp.GlobalSort { jobStep = importer.JobStepGlobalSorting } if err = startJob(ctx, logger, taskHandle, taskMeta, jobStep); err != nil { return nil, err } case StepEncodeAndSort: + failpoint.Inject("failWhenDispatchWriteIngestSubtask", func() { + failpoint.Return(nil, errors.New("injected error")) + }) previousSubtaskMetas, err = taskHandle.GetPreviousSubtaskMetas(gTask.ID, StepEncodeAndSort) if err != nil { return nil, err @@ -262,14 +267,11 @@ func (dsp *ImportDispatcherExt) OnNextSubtasksBatch( failpoint.Inject("failWhenDispatchPostProcessSubtask", func() { failpoint.Return(nil, errors.New("injected error after StepImport")) }) - if err := updateResult(taskHandle, gTask, taskMeta); err != nil { - return nil, err - } // we need get metas where checksum is stored. - step := StepImport - if dsp.globalSort { - step = StepEncodeAndSort + if err := updateResult(taskHandle, gTask, taskMeta, dsp.GlobalSort); err != nil { + return nil, err } + step := getStepOfEncode(dsp.GlobalSort) previousSubtaskMetas, err = taskHandle.GetPreviousSubtaskMetas(gTask.ID, step) if err != nil { return nil, err @@ -365,7 +367,7 @@ func (*ImportDispatcherExt) IsRetryableErr(error) bool { func (dsp *ImportDispatcherExt) GetNextStep(_ dispatcher.TaskHandle, task *proto.Task) int64 { switch task.Step { case proto.StepInit: - if dsp.globalSort { + if dsp.GlobalSort { return StepEncodeAndSort } return StepImport @@ -437,7 +439,7 @@ func (dsp *importDispatcher) Init() (err error) { } dsp.BaseDispatcher.Extension = &ImportDispatcherExt{ - globalSort: taskMeta.Plan.CloudStorageURI != "", + GlobalSort: taskMeta.Plan.CloudStorageURI != "", } return dsp.BaseDispatcher.Init() } @@ -537,9 +539,17 @@ func toChunkMap(engineCheckpoints map[int32]*checkpoints.EngineCheckpoint) map[i return chunkMap } +func getStepOfEncode(globalSort bool) int64 { + if globalSort { + return StepEncodeAndSort + } + return StepImport +} + // we will update taskMeta in place and make gTask.Meta point to the new taskMeta. -func updateResult(handle dispatcher.TaskHandle, gTask *proto.Task, taskMeta *TaskMeta) error { - metas, err := handle.GetPreviousSubtaskMetas(gTask.ID, gTask.Step) +func updateResult(handle dispatcher.TaskHandle, gTask *proto.Task, taskMeta *TaskMeta, globalSort bool) error { + stepOfEncode := getStepOfEncode(globalSort) + metas, err := handle.GetPreviousSubtaskMetas(gTask.ID, stepOfEncode) if err != nil { return err } @@ -561,9 +571,34 @@ func updateResult(handle dispatcher.TaskHandle, gTask *proto.Task, taskMeta *Tas } } taskMeta.Result.ColSizeMap = columnSizeMap + + if globalSort { + taskMeta.Result.LoadedRowCnt, err = getLoadedRowCountOnGlobalSort(handle, gTask) + if err != nil { + return err + } + } + return updateMeta(gTask, taskMeta) } +func getLoadedRowCountOnGlobalSort(handle dispatcher.TaskHandle, gTask *proto.Task) (uint64, error) { + metas, err := handle.GetPreviousSubtaskMetas(gTask.ID, StepWriteAndIngest) + if err != nil { + return 0, err + } + + var loadedRowCount uint64 + for _, bs := range metas { + var subtaskMeta WriteIngestStepMeta + if err = json.Unmarshal(bs, &subtaskMeta); err != nil { + return 0, errors.Trace(err) + } + loadedRowCount += subtaskMeta.Result.LoadedRowCnt + } + return loadedRowCount, nil +} + func startJob(ctx context.Context, logger *zap.Logger, taskHandle dispatcher.TaskHandle, taskMeta *TaskMeta, jobStep string) error { failpoint.Inject("syncBeforeJobStarted", func() { TestSyncChan <- struct{}{} @@ -610,6 +645,9 @@ func job2Step(ctx context.Context, logger *zap.Logger, taskMeta *TaskMeta, step func (dsp *ImportDispatcherExt) finishJob(ctx context.Context, logger *zap.Logger, taskHandle dispatcher.TaskHandle, gTask *proto.Task, taskMeta *TaskMeta) error { dsp.unregisterTask(ctx, gTask) + if dsp.GlobalSort { + cleanUpGlobalSortedData(ctx, gTask, taskMeta) + } redactSensitiveInfo(gTask, taskMeta) summary := &importer.JobSummary{ImportedRows: taskMeta.Result.LoadedRowCnt} // retry for 3+6+12+24+(30-4)*30 ~= 825s ~= 14 minutes @@ -628,6 +666,9 @@ func (dsp *ImportDispatcherExt) failJob(ctx context.Context, taskHandle dispatch taskMeta *TaskMeta, logger *zap.Logger, errorMsg string) error { dsp.switchTiKV2NormalMode(ctx, gTask, logger) dsp.unregisterTask(ctx, gTask) + if dsp.GlobalSort { + cleanUpGlobalSortedData(ctx, gTask, taskMeta) + } redactSensitiveInfo(gTask, taskMeta) // retry for 3+6+12+24+(30-4)*30 ~= 825s ~= 14 minutes backoffer := backoff.NewExponential(dispatcher.RetrySQLInterval, 2, dispatcher.RetrySQLMaxInterval) @@ -641,6 +682,29 @@ func (dsp *ImportDispatcherExt) failJob(ctx context.Context, taskHandle dispatch ) } +func cleanUpGlobalSortedData(ctx context.Context, gTask *proto.Task, taskMeta *TaskMeta) { + // we can only clean up files after all write&ingest subtasks are finished, + // since they might share the same file. + // we don't return error here, since the task is already done, we should + // return success if the task is success. + // TODO: maybe add a way to notify user that there are files left in global sorted storage. + logger := logutil.BgLogger().With(zap.Int64("task-id", gTask.ID)) + callLog := log.BeginTask(logger, "cleanup global sorted data") + defer callLog.End(zap.InfoLevel, nil) + + controller, err := buildController(&taskMeta.Plan, taskMeta.Stmt) + if err != nil { + logger.Warn("failed to build controller", zap.Error(err)) + } + if err = controller.InitDataStore(ctx); err != nil { + logger.Warn("failed to init data store", zap.Error(err)) + } + if err = external.CleanUpFiles(ctx, controller.GlobalSortStore, + strconv.Itoa(int(gTask.ID)), uint(taskMeta.Plan.ThreadCnt)); err != nil { + logger.Warn("failed to clean up files of task", zap.Error(err)) + } +} + func redactSensitiveInfo(gTask *proto.Task, taskMeta *TaskMeta) { taskMeta.Stmt = "" taskMeta.Plan.Path = ast.RedactURL(taskMeta.Plan.Path) diff --git a/disttask/importinto/dispatcher_test.go b/disttask/importinto/dispatcher_test.go index 86fd23938f1c4..88e18106709d2 100644 --- a/disttask/importinto/dispatcher_test.go +++ b/disttask/importinto/dispatcher_test.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/disttask/framework/dispatcher" "github.com/pingcap/tidb/disttask/framework/proto" "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/executor/importer" @@ -106,3 +107,68 @@ func (s *importIntoSuite) TestUpdateCurrentTask() { require.Equal(s.T(), int64(1), dsp.currTaskID.Load()) require.True(s.T(), dsp.disableTiKVImportMode.Load()) } + +func (s *importIntoSuite) TestDispatcherInit() { + meta := TaskMeta{ + Plan: importer.Plan{ + CloudStorageURI: "", + }, + } + bytes, err := json.Marshal(meta) + s.NoError(err) + dsp := importDispatcher{ + BaseDispatcher: &dispatcher.BaseDispatcher{ + Task: &proto.Task{ + Meta: bytes, + }, + }, + } + s.NoError(dsp.Init()) + s.False(dsp.Extension.(*ImportDispatcherExt).GlobalSort) + + meta.Plan.CloudStorageURI = "s3://test" + bytes, err = json.Marshal(meta) + s.NoError(err) + dsp = importDispatcher{ + BaseDispatcher: &dispatcher.BaseDispatcher{ + Task: &proto.Task{ + Meta: bytes, + }, + }, + } + s.NoError(dsp.Init()) + s.True(dsp.Extension.(*ImportDispatcherExt).GlobalSort) +} + +func (s *importIntoSuite) TestGetNextStep() { + task := &proto.Task{ + Step: proto.StepInit, + } + ext := &ImportDispatcherExt{} + for _, nextStep := range []int64{StepImport, StepPostProcess, proto.StepDone} { + s.Equal(nextStep, ext.GetNextStep(nil, task)) + task.Step = nextStep + } + + task.Step = proto.StepInit + ext = &ImportDispatcherExt{GlobalSort: true} + for _, nextStep := range []int64{StepEncodeAndSort, StepWriteAndIngest, StepPostProcess, proto.StepDone} { + s.Equal(nextStep, ext.GetNextStep(nil, task)) + task.Step = nextStep + } +} + +func (s *importIntoSuite) TestStr() { + s.Equal("init", stepStr(proto.StepInit)) + s.Equal("import", stepStr(StepImport)) + s.Equal("postprocess", stepStr(StepPostProcess)) + s.Equal("encode&sort", stepStr(StepEncodeAndSort)) + s.Equal("write&ingest", stepStr(StepWriteAndIngest)) + s.Equal("done", stepStr(proto.StepDone)) + s.Equal("unknown", stepStr(111)) +} + +func (s *importIntoSuite) TestGetStepOfEncode() { + s.Equal(StepImport, getStepOfEncode(false)) + s.Equal(StepEncodeAndSort, getStepOfEncode(true)) +} diff --git a/disttask/importinto/dispatcher_testkit_test.go b/disttask/importinto/dispatcher_testkit_test.go index fc8bae2681a1b..e8abb97278a69 100644 --- a/disttask/importinto/dispatcher_testkit_test.go +++ b/disttask/importinto/dispatcher_testkit_test.go @@ -22,6 +22,8 @@ import ( "github.com/ngaut/pools" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/br/pkg/lightning/backend/external" "github.com/pingcap/tidb/disttask/framework/dispatcher" "github.com/pingcap/tidb/disttask/framework/proto" "github.com/pingcap/tidb/disttask/framework/storage" @@ -35,7 +37,7 @@ import ( "github.com/tikv/client-go/v2/util" ) -func TestDispatcherExt(t *testing.T) { +func TestDispatcherExtLocalSort(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) pool := pools.NewResourcePool(func() (pools.Resource, error) { @@ -146,3 +148,131 @@ func TestDispatcherExt(t *testing.T) { require.NoError(t, err) require.Equal(t, "failed", gotJobInfo.Status) } + +func TestDispatcherExtGlobalSort(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + pool := pools.NewResourcePool(func() (pools.Resource, error) { + return tk.Session(), nil + }, 1, 1, time.Second) + defer pool.Close() + ctx := context.WithValue(context.Background(), "etcd", true) + mgr := storage.NewTaskManager(util.WithInternalSourceType(ctx, "taskManager"), pool) + storage.SetTaskManager(mgr) + dsp, err := dispatcher.NewManager(util.WithInternalSourceType(ctx, "dispatcher"), mgr, "host:port") + require.NoError(t, err) + + // create job + conn := tk.Session().(sqlexec.SQLExecutor) + jobID, err := importer.CreateJob(ctx, conn, "test", "t", 1, + "root", &importer.ImportParameters{}, 123) + require.NoError(t, err) + gotJobInfo, err := importer.GetJob(ctx, conn, jobID, "root", true) + require.NoError(t, err) + require.Equal(t, "pending", gotJobInfo.Status) + logicalPlan := &importinto.LogicalPlan{ + JobID: jobID, + Plan: importer.Plan{ + Path: "gs://test-load/*.csv", + Format: "csv", + DBName: "test", + TableInfo: &model.TableInfo{ + Name: model.NewCIStr("t"), + State: model.StatePublic, + }, + DisableTiKVImportMode: true, + CloudStorageURI: "gs://sort-bucket", + InImportInto: true, + }, + Stmt: `IMPORT INTO db.tb FROM 'gs://test-load/*.csv?endpoint=xxx'`, + EligibleInstances: []*infosync.ServerInfo{{ID: "1"}}, + ChunkMap: map[int32][]importinto.Chunk{ + 1: {{Path: "gs://test-load/1.csv"}}, + 2: {{Path: "gs://test-load/2.csv"}}, + }, + } + bs, err := logicalPlan.ToTaskMeta() + require.NoError(t, err) + task := &proto.Task{ + Type: proto.ImportInto, + Meta: bs, + Step: proto.StepInit, + State: proto.TaskStatePending, + StateUpdateTime: time.Now(), + } + manager, err := storage.GetTaskManager() + require.NoError(t, err) + taskMeta, err := json.Marshal(task) + require.NoError(t, err) + taskID, err := manager.AddNewGlobalTask(importinto.TaskKey(jobID), proto.ImportInto, 1, taskMeta) + require.NoError(t, err) + task.ID = taskID + + // to import stage, job should be running + d := dsp.MockDispatcher(task) + ext := importinto.ImportDispatcherExt{ + GlobalSort: true, + } + subtaskMetas, err := ext.OnNextSubtasksBatch(ctx, d, task, ext.GetNextStep(nil, task)) + require.NoError(t, err) + require.Len(t, subtaskMetas, 2) + task.Step = ext.GetNextStep(nil, task) + require.Equal(t, importinto.StepEncodeAndSort, task.Step) + gotJobInfo, err = importer.GetJob(ctx, conn, jobID, "root", true) + require.NoError(t, err) + require.Equal(t, "running", gotJobInfo.Status) + require.Equal(t, "global-sorting", gotJobInfo.Step) + // update task/subtask, and finish subtask, so we can go to next stage + subtasks := make([]*proto.Subtask, 0, len(subtaskMetas)) + for _, m := range subtaskMetas { + subtasks = append(subtasks, proto.NewSubtask(task.Step, task.ID, task.Type, "", m)) + } + _, err = manager.UpdateGlobalTaskAndAddSubTasks(task, subtasks, proto.TaskStatePending) + require.NoError(t, err) + gotSubtasks, err := manager.GetSubtasksForImportInto(taskID, task.Step) + require.NoError(t, err) + sortStepMeta := &importinto.ImportStepMeta{ + SortedDataMeta: &external.SortedKVMeta{}, + SortedIndexMetas: map[int64]*external.SortedKVMeta{ + 1: {}, + }, + } + sortStepMetaBytes, err := json.Marshal(sortStepMeta) + require.NoError(t, err) + for _, s := range gotSubtasks { + require.NoError(t, manager.FinishSubtask(s.ID, sortStepMetaBytes)) + } + // to write-and-ingest stage + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/disttask/importinto/mockWriteIngestSpecs", "return(true)")) + t.Cleanup(func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/disttask/importinto/mockWriteIngestSpecs")) + }) + subtaskMetas, err = ext.OnNextSubtasksBatch(ctx, d, task, ext.GetNextStep(nil, task)) + require.NoError(t, err) + require.Len(t, subtaskMetas, 2) + task.Step = ext.GetNextStep(nil, task) + require.Equal(t, importinto.StepWriteAndIngest, task.Step) + gotJobInfo, err = importer.GetJob(ctx, conn, jobID, "root", true) + require.NoError(t, err) + require.Equal(t, "running", gotJobInfo.Status) + require.Equal(t, "importing", gotJobInfo.Step) + // on next stage, to post-process stage + subtaskMetas, err = ext.OnNextSubtasksBatch(ctx, d, task, ext.GetNextStep(nil, task)) + require.NoError(t, err) + require.Len(t, subtaskMetas, 1) + task.Step = ext.GetNextStep(nil, task) + require.Equal(t, importinto.StepPostProcess, task.Step) + gotJobInfo, err = importer.GetJob(ctx, conn, jobID, "root", true) + require.NoError(t, err) + require.Equal(t, "running", gotJobInfo.Status) + require.Equal(t, "validating", gotJobInfo.Step) + // next stage, done + subtaskMetas, err = ext.OnNextSubtasksBatch(ctx, d, task, ext.GetNextStep(nil, task)) + require.NoError(t, err) + require.Len(t, subtaskMetas, 0) + task.Step = ext.GetNextStep(nil, task) + require.Equal(t, proto.StepDone, task.Step) + gotJobInfo, err = importer.GetJob(ctx, conn, jobID, "root", true) + require.NoError(t, err) + require.Equal(t, "finished", gotJobInfo.Status) +} diff --git a/disttask/importinto/encode_and_sort_operator.go b/disttask/importinto/encode_and_sort_operator.go index f6ac48f57628d..164b2477a4856 100644 --- a/disttask/importinto/encode_and_sort_operator.go +++ b/disttask/importinto/encode_and_sort_operator.go @@ -25,15 +25,24 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/backend/external" "github.com/pingcap/tidb/disttask/operator" "github.com/pingcap/tidb/executor/importer" + "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/resourcemanager/pool/workerpool" "github.com/pingcap/tidb/resourcemanager/util" tidbutil "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/size" "go.uber.org/atomic" "go.uber.org/zap" ) const ( maxWaitDuration = 30 * time.Second + + // We limit the memory usage of KV deliver to 1GB per concurrency, and data + // KV deliver has external.DefaultMemSizeLimit, the rest of memory is for + // all index KV deliver. + // Note: this size is the memory taken by KV, not the size of taken by golang, + // each KV has additional 24*2 bytes overhead for golang slice. + indexKVTotalBufSize = size.GB - external.DefaultMemSizeLimit ) // encodeAndSortOperator is an operator that encodes and sorts data. @@ -60,7 +69,8 @@ var _ operator.Operator = (*encodeAndSortOperator)(nil) var _ operator.WithSource[*importStepMinimalTask] = (*encodeAndSortOperator)(nil) var _ operator.WithSink[workerpool.None] = (*encodeAndSortOperator)(nil) -func newEncodeAndSortOperator(ctx context.Context, executor *importStepExecutor, sharedVars *SharedVars, subtaskID int64) *encodeAndSortOperator { +func newEncodeAndSortOperator(ctx context.Context, executor *importStepExecutor, + sharedVars *SharedVars, subtaskID int64, indexMemorySizeLimit uint64) *encodeAndSortOperator { subCtx, cancel := context.WithCancel(ctx) op := &encodeAndSortOperator{ ctx: subCtx, @@ -77,7 +87,7 @@ func newEncodeAndSortOperator(ctx context.Context, executor *importStepExecutor, util.ImportInto, int(executor.taskMeta.Plan.ThreadCnt), func() workerpool.Worker[*importStepMinimalTask, workerpool.None] { - return newChunkWorker(ctx, op) + return newChunkWorker(ctx, op, indexMemorySizeLimit) }, ) op.AsyncOperator = operator.NewAsyncOperator(subCtx, pool) @@ -135,7 +145,7 @@ type chunkWorker struct { indexWriter *importer.IndexRouteWriter } -func newChunkWorker(ctx context.Context, op *encodeAndSortOperator) *chunkWorker { +func newChunkWorker(ctx context.Context, op *encodeAndSortOperator, indexMemorySizeLimit uint64) *chunkWorker { w := &chunkWorker{ ctx: ctx, op: op, @@ -148,8 +158,9 @@ func newChunkWorker(ctx context.Context, op *encodeAndSortOperator) *chunkWorker builder := external.NewWriterBuilder(). SetOnCloseFunc(func(summary *external.WriterSummary) { op.sharedVars.mergeIndexSummary(indexID, summary) - }) - prefix := path.Join(strconv.Itoa(int(op.taskID)), strconv.Itoa(int(op.subtaskID))) + }).SetMemorySizeLimit(indexMemorySizeLimit) + prefix := subtaskPrefix(op.taskID, op.subtaskID) + // writer id for index: index/{indexID}/{workerID} writerID := path.Join("index", strconv.Itoa(int(indexID)), workerUUID) writer := builder.Build(op.tableImporter.GlobalSortStore, prefix, writerID) return writer @@ -158,7 +169,8 @@ func newChunkWorker(ctx context.Context, op *encodeAndSortOperator) *chunkWorker // sorted data kv storage path: /{taskID}/{subtaskID}/data/{workerID} builder := external.NewWriterBuilder(). SetOnCloseFunc(op.sharedVars.mergeDataSummary) - prefix := path.Join(strconv.Itoa(int(op.taskID)), strconv.Itoa(int(op.subtaskID))) + prefix := subtaskPrefix(op.taskID, op.subtaskID) + // writer id for data: data/{workerID} writerID := path.Join("data", workerUUID) writer := builder.Build(op.tableImporter.GlobalSortStore, prefix, writerID) w.dataWriter = external.NewEngineWriter(writer) @@ -201,3 +213,40 @@ func (w *chunkWorker) Close() { } } } + +func subtaskPrefix(taskID, subtaskID int64) string { + return path.Join(strconv.Itoa(int(taskID)), strconv.Itoa(int(subtaskID))) +} + +func getWriterMemorySizeLimit(plan *importer.Plan) uint64 { + // min(external.DefaultMemSizeLimit, indexKVTotalBufSize / num-of-index-that-gen-kv) + cnt := getNumOfIndexGenKV(plan.DesiredTableInfo) + limit := indexKVTotalBufSize + if cnt > 0 { + limit = limit / uint64(cnt) + } + if limit > external.DefaultMemSizeLimit { + limit = external.DefaultMemSizeLimit + } + return limit +} + +func getNumOfIndexGenKV(tblInfo *model.TableInfo) int { + var count int + var nonClusteredPK bool + for _, idxInfo := range tblInfo.Indices { + // all public non-primary index generates index KVs + if idxInfo.State != model.StatePublic { + continue + } + if idxInfo.Primary && !tblInfo.HasClusteredIndex() { + nonClusteredPK = true + continue + } + count++ + } + if nonClusteredPK { + count++ + } + return count +} diff --git a/disttask/importinto/encode_and_sort_operator_test.go b/disttask/importinto/encode_and_sort_operator_test.go index 24b29e8fcff9d..3aa2ee0377732 100644 --- a/disttask/importinto/encode_and_sort_operator_test.go +++ b/disttask/importinto/encode_and_sort_operator_test.go @@ -25,9 +25,15 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/br/pkg/lightning/backend" + "github.com/pingcap/tidb/br/pkg/lightning/backend/external" + "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/disttask/importinto/mock" "github.com/pingcap/tidb/disttask/operator" "github.com/pingcap/tidb/executor/importer" + "github.com/pingcap/tidb/parser" + "github.com/pingcap/tidb/parser/ast" + "github.com/pingcap/tidb/parser/model" + utilmock "github.com/pingcap/tidb/util/mock" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" "go.uber.org/zap" @@ -74,7 +80,7 @@ func TestEncodeAndSortOperator(t *testing.T) { } source := operator.NewSimpleDataChannel(make(chan *importStepMinimalTask)) - op := newEncodeAndSortOperator(context.Background(), executorForParam, nil, 3) + op := newEncodeAndSortOperator(context.Background(), executorForParam, nil, 3, 0) op.SetSource(source) require.NoError(t, op.Open()) require.Greater(t, len(op.String()), 0) @@ -94,7 +100,7 @@ func TestEncodeAndSortOperator(t *testing.T) { // cancel on error and log other errors mockErr2 := errors.New("mock err 2") source = operator.NewSimpleDataChannel(make(chan *importStepMinimalTask)) - op = newEncodeAndSortOperator(context.Background(), executorForParam, nil, 2) + op = newEncodeAndSortOperator(context.Background(), executorForParam, nil, 2, 0) op.SetSource(source) executor1 := mock.NewMockMiniTaskExecutor(ctrl) executor2 := mock.NewMockMiniTaskExecutor(ctrl) @@ -136,3 +142,62 @@ func TestEncodeAndSortOperator(t *testing.T) { require.NoError(t, err) require.Contains(t, string(content), "mock error should be logged") } + +func TestGetWriterMemorySizeLimit(t *testing.T) { + cases := []struct { + createSQL string + numOfIndexGenKV int + writerMemorySizeLimit uint64 + }{ + { + createSQL: "create table t (a int)", + numOfIndexGenKV: 0, + writerMemorySizeLimit: external.DefaultMemSizeLimit, + }, + { + createSQL: "create table t (a int primary key clustered)", + numOfIndexGenKV: 0, + writerMemorySizeLimit: external.DefaultMemSizeLimit, + }, + { + createSQL: "create table t (a int primary key nonclustered)", + numOfIndexGenKV: 1, + writerMemorySizeLimit: external.DefaultMemSizeLimit, + }, + { + createSQL: "create table t (a int primary key clustered, b int, key(b))", + numOfIndexGenKV: 1, + writerMemorySizeLimit: external.DefaultMemSizeLimit, + }, + { + createSQL: "create table t (a int primary key clustered, b int, key(b), key(a,b))", + numOfIndexGenKV: 2, + writerMemorySizeLimit: external.DefaultMemSizeLimit, + }, + { + createSQL: "create table t (a int primary key clustered, b int, c int, key(b,c), unique(b), unique(c), key(a,b))", + numOfIndexGenKV: 4, + writerMemorySizeLimit: indexKVTotalBufSize / 4, + }, + { + createSQL: "create table t (a int, b int, c int, primary key(a,b,c) nonclustered, key(b,c), unique(b), unique(c), key(a,b))", + numOfIndexGenKV: 5, + writerMemorySizeLimit: indexKVTotalBufSize / 5, + }, + } + + for _, c := range cases { + p := parser.New() + node, err := p.ParseOneStmt(c.createSQL, "", "") + require.NoError(t, err) + sctx := utilmock.NewContext() + info, err := ddl.MockTableInfo(sctx, node.(*ast.CreateTableStmt), 1) + require.NoError(t, err) + info.State = model.StatePublic + + require.Equal(t, c.numOfIndexGenKV, getNumOfIndexGenKV(info), c.createSQL) + require.Equal(t, c.writerMemorySizeLimit, getWriterMemorySizeLimit(&importer.Plan{ + DesiredTableInfo: info, + }), c.createSQL) + } +} diff --git a/disttask/importinto/job_testkit_test.go b/disttask/importinto/job_testkit_test.go new file mode 100644 index 0000000000000..3638feb2d59d6 --- /dev/null +++ b/disttask/importinto/job_testkit_test.go @@ -0,0 +1,107 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package importinto_test + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/ngaut/pools" + "github.com/pingcap/tidb/disttask/framework/proto" + "github.com/pingcap/tidb/disttask/framework/storage" + "github.com/pingcap/tidb/disttask/importinto" + "github.com/pingcap/tidb/executor/importer" + "github.com/pingcap/tidb/testkit" + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/util" +) + +func TestGetTaskImportedRows(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + pool := pools.NewResourcePool(func() (pools.Resource, error) { + return tk.Session(), nil + }, 1, 1, time.Second) + defer pool.Close() + ctx := context.WithValue(context.Background(), "etcd", true) + mgr := storage.NewTaskManager(util.WithInternalSourceType(ctx, "taskManager"), pool) + storage.SetTaskManager(mgr) + manager, err := storage.GetTaskManager() + require.NoError(t, err) + + // local sort + taskMeta := importinto.TaskMeta{ + Plan: importer.Plan{}, + } + bytes, err := json.Marshal(taskMeta) + require.NoError(t, err) + taskID, err := manager.AddNewGlobalTask(importinto.TaskKey(111), proto.ImportInto, 1, bytes) + require.NoError(t, err) + importStepMetas := []*importinto.ImportStepMeta{ + { + Result: importinto.Result{ + LoadedRowCnt: 1, + }, + }, + { + Result: importinto.Result{ + LoadedRowCnt: 2, + }, + }, + } + for _, m := range importStepMetas { + bytes, err := json.Marshal(m) + require.NoError(t, err) + require.NoError(t, manager.AddNewSubTask(taskID, importinto.StepImport, + "", bytes, proto.ImportInto, false)) + } + rows, err := importinto.GetTaskImportedRows(111) + require.NoError(t, err) + require.Equal(t, uint64(3), rows) + + // global sort + taskMeta = importinto.TaskMeta{ + Plan: importer.Plan{ + CloudStorageURI: "s3://test-bucket/test-path", + }, + } + bytes, err = json.Marshal(taskMeta) + require.NoError(t, err) + taskID, err = manager.AddNewGlobalTask(importinto.TaskKey(222), proto.ImportInto, 1, bytes) + require.NoError(t, err) + ingestStepMetas := []*importinto.WriteIngestStepMeta{ + { + Result: importinto.Result{ + LoadedRowCnt: 11, + }, + }, + { + Result: importinto.Result{ + LoadedRowCnt: 22, + }, + }, + } + for _, m := range ingestStepMetas { + bytes, err := json.Marshal(m) + require.NoError(t, err) + require.NoError(t, manager.AddNewSubTask(taskID, importinto.StepWriteAndIngest, + "", bytes, proto.ImportInto, false)) + } + rows, err = importinto.GetTaskImportedRows(222) + require.NoError(t, err) + require.Equal(t, uint64(33), rows) +} diff --git a/disttask/importinto/planner.go b/disttask/importinto/planner.go index 869e22ff2f168..ccd35e108cad4 100644 --- a/disttask/importinto/planner.go +++ b/disttask/importinto/planner.go @@ -22,6 +22,7 @@ import ( "strconv" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/br/pkg/lightning/backend/external" "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" "github.com/pingcap/tidb/br/pkg/lightning/common" @@ -222,18 +223,22 @@ func (*PostProcessSpec) ToSubtaskMeta(planCtx planner.PlanCtx) ([]byte, error) { return json.Marshal(postProcessStepMeta) } -func buildController(p *LogicalPlan) (*importer.LoadDataController, error) { +func buildControllerForPlan(p *LogicalPlan) (*importer.LoadDataController, error) { + return buildController(&p.Plan, p.Stmt) +} + +func buildController(plan *importer.Plan, stmt string) (*importer.LoadDataController, error) { idAlloc := kv.NewPanickingAllocators(0) - tbl, err := tables.TableFromMeta(idAlloc, p.Plan.TableInfo) + tbl, err := tables.TableFromMeta(idAlloc, plan.TableInfo) if err != nil { return nil, err } - astArgs, err := importer.ASTArgsFromStmt(p.Stmt) + astArgs, err := importer.ASTArgsFromStmt(stmt) if err != nil { return nil, err } - controller, err := importer.NewLoadDataController(&p.Plan, tbl, astArgs) + controller, err := importer.NewLoadDataController(plan, tbl, astArgs) if err != nil { return nil, err } @@ -245,7 +250,7 @@ func generateImportSpecs(ctx context.Context, p *LogicalPlan) ([]*ImportSpec, er if len(p.ChunkMap) > 0 { chunkMap = p.ChunkMap } else { - controller, err2 := buildController(p) + controller, err2 := buildControllerForPlan(p) if err2 != nil { return nil, err2 } @@ -276,7 +281,7 @@ func generateImportSpecs(ctx context.Context, p *LogicalPlan) ([]*ImportSpec, er func generateWriteIngestSpecs(planCtx planner.PlanCtx, p *LogicalPlan) ([]*WriteIngestSpec, error) { ctx := planCtx.Ctx - controller, err2 := buildController(p) + controller, err2 := buildControllerForPlan(p) if err2 != nil { return nil, err2 } @@ -290,6 +295,20 @@ func generateWriteIngestSpecs(planCtx planner.PlanCtx, p *LogicalPlan) ([]*Write if err != nil { return nil, err } + failpoint.Inject("mockWriteIngestSpecs", func() { + failpoint.Return([]*WriteIngestSpec{ + { + WriteIngestStepMeta: &WriteIngestStepMeta{ + KVGroup: dataKVGroup, + }, + }, + { + WriteIngestStepMeta: &WriteIngestStepMeta{ + KVGroup: "1", + }, + }, + }, nil) + }) specs := make([]*WriteIngestSpec, 0, 16) for kvGroup, kvMeta := range kvMetas { splitter, err1 := getRangeSplitter(ctx, controller.GlobalSortStore, kvMeta) diff --git a/disttask/importinto/scheduler.go b/disttask/importinto/scheduler.go index 0c98c8c4f9d4e..ccbb4d8735ffa 100644 --- a/disttask/importinto/scheduler.go +++ b/disttask/importinto/scheduler.go @@ -20,6 +20,7 @@ import ( "sync" "time" + "github.com/docker/go-units" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/br/pkg/lightning/backend" @@ -50,6 +51,8 @@ type importStepExecutor struct { sharedVars sync.Map logger *zap.Logger + indexMemorySizeLimit uint64 + importCtx context.Context importCancel context.CancelFunc wg sync.WaitGroup @@ -99,6 +102,9 @@ func (s *importStepExecutor) Init(ctx context.Context) error { s.tableImporter.CheckDiskQuota(s.importCtx) }() } + s.indexMemorySizeLimit = getWriterMemorySizeLimit(s.tableImporter.Plan) + s.logger.Info("index writer memory size limit", + zap.String("limit", units.BytesSize(float64(s.indexMemorySizeLimit)))) return nil } @@ -141,7 +147,7 @@ func (s *importStepExecutor) RunSubtask(ctx context.Context, subtask *proto.Subt s.sharedVars.Store(subtaskMeta.ID, sharedVars) source := operator.NewSimpleDataChannel(make(chan *importStepMinimalTask)) - op := newEncodeAndSortOperator(ctx, s, sharedVars, subtask.ID) + op := newEncodeAndSortOperator(ctx, s, sharedVars, subtask.ID, s.indexMemorySizeLimit) op.SetSource(source) pipeline := operator.NewAsyncPipeline(op) if err = pipeline.Execute(); err != nil { diff --git a/tests/realtikvtest/importintotest4/BUILD.bazel b/tests/realtikvtest/importintotest4/BUILD.bazel index 2bdac1eaad3d6..243c775e6acf7 100644 --- a/tests/realtikvtest/importintotest4/BUILD.bazel +++ b/tests/realtikvtest/importintotest4/BUILD.bazel @@ -4,6 +4,7 @@ go_test( name = "importintotest4_test", timeout = "moderate", srcs = [ + "global_sort_test.go", "main_test.go", "split_file_test.go", ], @@ -14,6 +15,7 @@ go_test( "//config", "//disttask/framework/storage", "//disttask/importinto", + "//executor/importer", "//kv", "//testkit", "//tests/realtikvtest", diff --git a/tests/realtikvtest/importintotest4/global_sort_test.go b/tests/realtikvtest/importintotest4/global_sort_test.go new file mode 100644 index 0000000000000..0ddd7b93a44cd --- /dev/null +++ b/tests/realtikvtest/importintotest4/global_sort_test.go @@ -0,0 +1,106 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package importintotest + +import ( + "context" + "encoding/json" + "fmt" + "net/url" + "strconv" + "testing" + "time" + + "github.com/fsouza/fake-gcs-server/fakestorage" + "github.com/pingcap/tidb/disttask/framework/storage" + "github.com/pingcap/tidb/disttask/importinto" + "github.com/pingcap/tidb/executor/importer" + "github.com/pingcap/tidb/testkit" + "github.com/stretchr/testify/require" +) + +func urlEqual(t *testing.T, expected, actual string) { + urlExpected, err := url.Parse(expected) + require.NoError(t, err) + urlGot, err := url.Parse(actual) + require.NoError(t, err) + // order of query parameters might change + require.Equal(t, urlExpected.Query(), urlGot.Query()) + urlExpected.RawQuery, urlGot.RawQuery = "", "" + require.Equal(t, urlExpected.String(), urlGot.String()) +} + +func (s *mockGCSSuite) TestGlobalSortBasic() { + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "gs-basic", Name: "t.1.csv"}, + Content: []byte("1,foo1,bar1,123\n2,foo2,bar2,456\n3,foo3,bar3,789\n"), + }) + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{BucketName: "gs-basic", Name: "t.2.csv"}, + Content: []byte("4,foo4,bar4,123\n5,foo5,bar5,223\n6,foo6,bar6,323\n"), + }) + s.server.CreateBucketWithOpts(fakestorage.CreateBucketOpts{Name: "sorted"}) + s.prepareAndUseDB("gsort_basic") + s.tk.MustExec(`create table t (a bigint primary key, b varchar(100), c varchar(100), d int, + key(a), key(c,d), key(d));`) + s.enableFailpoint("github.com/pingcap/tidb/parser/ast/forceRedactURL", "return(true)") + sortStorageURI := fmt.Sprintf("gs://sorted/import?endpoint=%s&access-key=aaaaaa&secret-access-key=bbbbbb", gcsEndpoint) + importSQL := fmt.Sprintf(`import into t FROM 'gs://gs-basic/t.*.csv?endpoint=%s' + with __max_engine_size = '1', cloud_storage_uri='%s'`, gcsEndpoint, sortStorageURI) + result := s.tk.MustQuery(importSQL).Rows() + s.Len(result, 1) + jobID, err := strconv.Atoi(result[0][0].(string)) + s.NoError(err) + s.tk.MustQuery("select * from t").Sort().Check(testkit.Rows( + "1 foo1 bar1 123", "2 foo2 bar2 456", "3 foo3 bar3 789", + "4 foo4 bar4 123", "5 foo5 bar5 223", "6 foo6 bar6 323", + )) + + // check all sorted data cleaned up + _, files, err := s.server.ListObjectsWithOptions("sorted", fakestorage.ListOptions{Prefix: "import"}) + s.NoError(err) + s.Len(files, 0) + // check sensitive info is redacted + jobInfo, err := importer.GetJob(context.Background(), s.tk.Session(), int64(jobID), "", true) + s.NoError(err) + redactedSortStorageURI := fmt.Sprintf("gs://sorted/import?endpoint=%s&access-key=xxxxxx&secret-access-key=xxxxxx", gcsEndpoint) + urlEqual(s.T(), redactedSortStorageURI, jobInfo.Parameters.Options["cloud_storage_uri"].(string)) + // TODO: enable it when external engine fixed statistics. + //s.Equal(6, jobInfo.Summary.ImportedRows) + globalTaskManager, err := storage.GetTaskManager() + s.NoError(err) + taskKey := importinto.TaskKey(int64(jobID)) + globalTask, err2 := globalTaskManager.GetGlobalTaskByKey(taskKey) + s.NoError(err2) + taskMeta := importinto.TaskMeta{} + s.NoError(json.Unmarshal(globalTask.Meta, &taskMeta)) + urlEqual(s.T(), redactedSortStorageURI, taskMeta.Plan.CloudStorageURI) + + s.enableFailpoint("github.com/pingcap/tidb/disttask/importinto/failWhenDispatchWriteIngestSubtask", "return(true)") + s.tk.MustExec("truncate table t") + result = s.tk.MustQuery(importSQL + ", detached").Rows() + s.Len(result, 1) + jobID, err = strconv.Atoi(result[0][0].(string)) + s.NoError(err) + s.Eventually(func() bool { + globalTask, err2 = globalTaskManager.GetGlobalTaskByKey(importinto.TaskKey(int64(jobID))) + s.NoError(err2) + return globalTask.State == "failed" + }, 10*time.Second, 300*time.Millisecond) + // check all sorted data cleaned up + _, files, err = s.server.ListObjectsWithOptions("sorted", fakestorage.ListOptions{Prefix: "import"}) + s.NoError(err) + s.Len(files, 0) +}