From 601166272358fc823ff45d325432e32488b4a402 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Tue, 10 May 2022 14:55:39 +0800 Subject: [PATCH 1/7] auto adjust dynamic region configuration --- br/pkg/lightning/backend/local/local.go | 80 +++++++++++++++++++++++-- br/pkg/lightning/config/const.go | 4 +- dumpling/export/metrics.go | 2 +- 3 files changed, 78 insertions(+), 8 deletions(-) diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index be55fc7eb9ce..225aa33b4357 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -17,8 +17,10 @@ package local import ( "bytes" "context" + "encoding/json" "fmt" "math" + "net/http" "os" "path/filepath" "strings" @@ -90,8 +92,8 @@ const ( // See: https://github.com/tikv/tikv/blob/e030a0aae9622f3774df89c62f21b2171a72a69e/etc/config-template.toml#L360 // lower the max-key-count to avoid tikv trigger region auto split - regionMaxKeyCount = 1_280_000 - defaultRegionSplitSize = 96 * units.MiB + regionMaxKeyCount = 144_000_000 + defaultRegionSplitSize = 10 * units.GiB // The max ranges count in a batch to split and scatter. maxBatchSplitRanges = 4096 @@ -1342,9 +1344,16 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regi log.L().Info("engine contains no kv, skip import", zap.Stringer("engine", engineUUID)) return nil } - regionSplitKeys := int64(regionMaxKeyCount) - if regionSplitSize > defaultRegionSplitSize { - regionSplitKeys = int64(float64(regionSplitSize) / float64(defaultRegionSplitSize) * float64(regionMaxKeyCount)) + kvRegionSplitSize, regionSplitKeys, err := getRegionSplitSizeKeys(ctx, local.pdCtl.GetPDClient(), local.tls) + if err == nil { + if kvRegionSplitSize > regionSplitSize { + regionSplitSize = kvRegionSplitSize + } + } else { + regionSplitKeys = int64(regionMaxKeyCount) + if regionSplitSize > defaultRegionSplitSize { + regionSplitKeys = int64(float64(regionSplitSize) / float64(defaultRegionSplitSize) * float64(regionMaxKeyCount)) + } } // split sorted file into range by 96MB size per file @@ -1842,3 +1851,64 @@ func (local *local) EngineFileSizes() (res []backend.EngineFileSize) { }) return } + +func getSplitConfFromStore(url string, httpClient *http.Client) (int64, int64, error) { + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return 0, 0, errors.Trace(err) + } + resp, err := httpClient.Do(req) + if err != nil { + return 0, 0, errors.Trace(err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return 0, 0, errors.Errorf("get split conf from store failed, status code: %d", resp.StatusCode) + } + var ( + nested struct { + Coprocessor struct { + RegionSplitSize string `json:"region-split-size"` + RegionSplitKeys int64 `json:"region-split-keys"` + } `json:"coprocessor"` + } + ) + if err = json.NewDecoder(resp.Body).Decode(&nested); err != nil { + return 0, 0, errors.Trace(err) + } + splitSize, err := units.FromHumanSize(nested.Coprocessor.RegionSplitSize) + if err != nil { + return 0, 0, errors.Trace(err) + } + + return splitSize, nested.Coprocessor.RegionSplitKeys, nil +} + +func getRegionSplitSizeKeys(ctx context.Context, cli pd.Client, tls *common.TLS) (int64, int64, error) { + stores, err := cli.GetAllStores(ctx, pd.WithExcludeTombstone()) + if err != nil { + return 0, 0, err + } + scheme := "http" + httpClient := http.DefaultClient + if tls != nil { + scheme = "https" + httpClient = &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: tls.TLSConfig(), + }, + } + } + for _, store := range stores { + if store.StatusAddress == "" { + continue + } + url := fmt.Sprintf("%s://%s/config", scheme, store.StatusAddress) + regionSplitSize, regionSplitKeys, err := getSplitConfFromStore(url, httpClient) + if err == nil { + return regionSplitSize, regionSplitKeys, nil + } + log.L().Warn("get region split size and keys failed", zap.Error(err), zap.String("store", store.StatusAddress)) + } + return 0, 0, errors.New("get region split size and keys failed") +} diff --git a/br/pkg/lightning/config/const.go b/br/pkg/lightning/config/const.go index bf807f2fe759..e5a23605257e 100644 --- a/br/pkg/lightning/config/const.go +++ b/br/pkg/lightning/config/const.go @@ -21,8 +21,8 @@ import ( const ( // mydumper ReadBlockSize ByteSize = 64 * units.KiB - MaxRegionSize ByteSize = 256 * units.MiB - SplitRegionSize ByteSize = 96 * units.MiB + MaxRegionSize ByteSize = 15 * units.GiB + SplitRegionSize ByteSize = 10 * units.GiB MaxSplitRegionSizeRatio int = 10 BufferSizeScale = 5 diff --git a/dumpling/export/metrics.go b/dumpling/export/metrics.go index cb62caf04122..2a812ea2a06b 100644 --- a/dumpling/export/metrics.go +++ b/dumpling/export/metrics.go @@ -68,7 +68,7 @@ func InitMetricsVector(labels prometheus.Labels) { Namespace: "dumpling", Subsystem: "write", Name: "receive_chunk_duration_time", - Help: "Bucketed histogram of write time (s) of files", + Help: "Bucketed histogram of receiving time (s) of chunks", Buckets: prometheus.ExponentialBuckets(0.00005, 2, 20), }, labelNames) errorCount = prometheus.NewCounterVec( From 161dc225cb7b98983954f44ff3f7047bd1d82dba Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Tue, 10 May 2022 16:53:18 +0800 Subject: [PATCH 2/7] update --- br/pkg/lightning/backend/local/local.go | 31 +++---------------------- 1 file changed, 3 insertions(+), 28 deletions(-) diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 225aa33b4357..0977986d494f 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -17,10 +17,8 @@ package local import ( "bytes" "context" - "encoding/json" "fmt" "math" - "net/http" "os" "path/filepath" "strings" @@ -1852,19 +1850,7 @@ func (local *local) EngineFileSizes() (res []backend.EngineFileSize) { return } -func getSplitConfFromStore(url string, httpClient *http.Client) (int64, int64, error) { - req, err := http.NewRequest(http.MethodGet, url, nil) - if err != nil { - return 0, 0, errors.Trace(err) - } - resp, err := httpClient.Do(req) - if err != nil { - return 0, 0, errors.Trace(err) - } - defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - return 0, 0, errors.Errorf("get split conf from store failed, status code: %d", resp.StatusCode) - } +func getSplitConfFromStore(ctx context.Context, host string, tls *common.TLS) (int64, int64, error) { var ( nested struct { Coprocessor struct { @@ -1873,7 +1859,7 @@ func getSplitConfFromStore(url string, httpClient *http.Client) (int64, int64, e } `json:"coprocessor"` } ) - if err = json.NewDecoder(resp.Body).Decode(&nested); err != nil { + if err := tls.WithHost(host).GetJSON(ctx, "/config", &nested); err != nil { return 0, 0, errors.Trace(err) } splitSize, err := units.FromHumanSize(nested.Coprocessor.RegionSplitSize) @@ -1889,22 +1875,11 @@ func getRegionSplitSizeKeys(ctx context.Context, cli pd.Client, tls *common.TLS) if err != nil { return 0, 0, err } - scheme := "http" - httpClient := http.DefaultClient - if tls != nil { - scheme = "https" - httpClient = &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: tls.TLSConfig(), - }, - } - } for _, store := range stores { if store.StatusAddress == "" { continue } - url := fmt.Sprintf("%s://%s/config", scheme, store.StatusAddress) - regionSplitSize, regionSplitKeys, err := getSplitConfFromStore(url, httpClient) + regionSplitSize, regionSplitKeys, err := getSplitConfFromStore(ctx, store.StatusAddress, tls) if err == nil { return regionSplitSize, regionSplitKeys, nil } From 0887fdbca612413ef784667b7b9fc74f44fd872d Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Mon, 16 May 2022 13:15:39 +0800 Subject: [PATCH 3/7] address comments --- br/pkg/lightning/backend/backend.go | 10 +++---- br/pkg/lightning/backend/backend_test.go | 18 ++++++------- br/pkg/lightning/backend/local/local.go | 15 ++++++----- br/pkg/lightning/backend/noop/noop.go | 2 +- br/pkg/lightning/backend/tidb/tidb.go | 2 +- br/pkg/lightning/config/config.go | 1 + br/pkg/lightning/config/const.go | 5 ++-- br/pkg/lightning/restore/restore.go | 2 +- br/pkg/lightning/restore/table_restore.go | 32 +++++++++++++++++------ br/pkg/mock/backend.go | 8 +++--- 10 files changed, 58 insertions(+), 37 deletions(-) diff --git a/br/pkg/lightning/backend/backend.go b/br/pkg/lightning/backend/backend.go index e6c14728ae9a..e090cc053dc3 100644 --- a/br/pkg/lightning/backend/backend.go +++ b/br/pkg/lightning/backend/backend.go @@ -152,7 +152,7 @@ type AbstractBackend interface { // ImportEngine imports engine data to the backend. If it returns ErrDuplicateDetected, // it means there is duplicate detected. For this situation, all data in the engine must be imported. // It's safe to reset or cleanup this engine. - ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize int64) error + ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error CleanupEngine(ctx context.Context, engineUUID uuid.UUID) error @@ -315,7 +315,7 @@ func (be Backend) CheckDiskQuota(quota int64) ( // into the target and then reset the engine to empty. This method will not // close the engine. Make sure the engine is flushed manually before calling // this method. -func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID, regionSplitSize int64) error { +func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error { // DO NOT call be.abstract.CloseEngine()! The engine should still be writable after // calling UnsafeImportAndReset(). closedEngine := ClosedEngine{ @@ -325,7 +325,7 @@ func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID uuid: engineUUID, }, } - if err := closedEngine.Import(ctx, regionSplitSize); err != nil { + if err := closedEngine.Import(ctx, regionSplitSize, regionSplitKeys); err != nil { return err } return be.abstract.ResetEngine(ctx, engineUUID) @@ -445,12 +445,12 @@ func (en engine) unsafeClose(ctx context.Context, cfg *EngineConfig) (*ClosedEng } // Import the data written to the engine into the target. -func (engine *ClosedEngine) Import(ctx context.Context, regionSplitSize int64) error { +func (engine *ClosedEngine) Import(ctx context.Context, regionSplitSize, regionSplitKeys int64) error { var err error for i := 0; i < importMaxRetryTimes; i++ { task := engine.logger.With(zap.Int("retryCnt", i)).Begin(zap.InfoLevel, "import") - err = engine.backend.ImportEngine(ctx, engine.uuid, regionSplitSize) + err = engine.backend.ImportEngine(ctx, engine.uuid, regionSplitSize, regionSplitKeys) if !common.IsRetryableError(err) { task.End(zap.ErrorLevel, err) return err diff --git a/br/pkg/lightning/backend/backend_test.go b/br/pkg/lightning/backend/backend_test.go index b42ce1815b70..d388e7453383 100644 --- a/br/pkg/lightning/backend/backend_test.go +++ b/br/pkg/lightning/backend/backend_test.go @@ -54,7 +54,7 @@ func TestOpenCloseImportCleanUpEngine(t *testing.T) { Return(nil). After(openCall) importCall := s.mockBackend.EXPECT(). - ImportEngine(ctx, engineUUID, gomock.Any()). + ImportEngine(ctx, engineUUID, gomock.Any(), gomock.Any()). Return(nil). After(closeCall) s.mockBackend.EXPECT(). @@ -66,7 +66,7 @@ func TestOpenCloseImportCleanUpEngine(t *testing.T) { require.NoError(t, err) closedEngine, err := engine.Close(ctx, nil) require.NoError(t, err) - err = closedEngine.Import(ctx, 1) + err = closedEngine.Import(ctx, 1, 1) require.NoError(t, err) err = closedEngine.Cleanup(ctx) require.NoError(t, err) @@ -250,12 +250,12 @@ func TestImportFailedNoRetry(t *testing.T) { s.mockBackend.EXPECT().CloseEngine(ctx, nil, gomock.Any()).Return(nil) s.mockBackend.EXPECT(). - ImportEngine(ctx, gomock.Any(), gomock.Any()). + ImportEngine(ctx, gomock.Any(), gomock.Any(), gomock.Any()). Return(errors.Annotate(context.Canceled, "fake unrecoverable import error")) closedEngine, err := s.backend.UnsafeCloseEngine(ctx, nil, "`db`.`table`", 1) require.NoError(t, err) - err = closedEngine.Import(ctx, 1) + err = closedEngine.Import(ctx, 1, 1) require.Error(t, err) require.Regexp(t, "^fake unrecoverable import error", err.Error()) } @@ -268,14 +268,14 @@ func TestImportFailedWithRetry(t *testing.T) { s.mockBackend.EXPECT().CloseEngine(ctx, nil, gomock.Any()).Return(nil) s.mockBackend.EXPECT(). - ImportEngine(ctx, gomock.Any(), gomock.Any()). + ImportEngine(ctx, gomock.Any(), gomock.Any(), gomock.Any()). Return(errors.Annotate(driver.ErrBadConn, "fake recoverable import error")). MinTimes(2) s.mockBackend.EXPECT().RetryImportDelay().Return(time.Duration(0)).AnyTimes() closedEngine, err := s.backend.UnsafeCloseEngine(ctx, nil, "`db`.`table`", 1) require.NoError(t, err) - err = closedEngine.Import(ctx, 1) + err = closedEngine.Import(ctx, 1, 1) require.Error(t, err) require.Contains(t, err.Error(), "fake recoverable import error") } @@ -288,16 +288,16 @@ func TestImportFailedRecovered(t *testing.T) { s.mockBackend.EXPECT().CloseEngine(ctx, nil, gomock.Any()).Return(nil) s.mockBackend.EXPECT(). - ImportEngine(ctx, gomock.Any(), gomock.Any()). + ImportEngine(ctx, gomock.Any(), gomock.Any(), gomock.Any()). Return(gmysql.ErrInvalidConn) s.mockBackend.EXPECT(). - ImportEngine(ctx, gomock.Any(), gomock.Any()). + ImportEngine(ctx, gomock.Any(), gomock.Any(), gomock.Any()). Return(nil) s.mockBackend.EXPECT().RetryImportDelay().Return(time.Duration(0)).AnyTimes() closedEngine, err := s.backend.UnsafeCloseEngine(ctx, nil, "`db`.`table`", 1) require.NoError(t, err) - err = closedEngine.Import(ctx, 1) + err = closedEngine.Import(ctx, 1, 1) require.NoError(t, err) } diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 0977986d494f..4e79532f9d29 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -90,8 +90,8 @@ const ( // See: https://github.com/tikv/tikv/blob/e030a0aae9622f3774df89c62f21b2171a72a69e/etc/config-template.toml#L360 // lower the max-key-count to avoid tikv trigger region auto split - regionMaxKeyCount = 144_000_000 - defaultRegionSplitSize = 10 * units.GiB + regionMaxKeyCount = 1_280_000 + defaultRegionSplitSize = 96 * units.MiB // The max ranges count in a batch to split and scatter. maxBatchSplitRanges = 4096 @@ -111,7 +111,7 @@ var ( // Local backend is compatible with TiDB [4.0.0, NextMajorVersion). localMinTiDBVersion = *semver.New("4.0.0") localMinTiKVVersion = *semver.New("4.0.0") - localMinPDVersion = *semver.New("4.0.0") + localMinPDVersion = *semver.New("7.0.0") localMaxTiDBVersion = version.NextMajorVersion() localMaxTiKVVersion = version.NextMajorVersion() localMaxPDVersion = version.NextMajorVersion() @@ -1328,7 +1328,7 @@ func (local *local) writeAndIngestByRanges(ctx context.Context, engine *Engine, return allErr } -func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize int64) error { +func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error { lf := local.lockEngine(engineUUID, importMutexStateImport) if lf == nil { // skip if engine not exist. See the comment of `CloseEngine` for more detail. @@ -1342,13 +1342,16 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regi log.L().Info("engine contains no kv, skip import", zap.Stringer("engine", engineUUID)) return nil } - kvRegionSplitSize, regionSplitKeys, err := getRegionSplitSizeKeys(ctx, local.pdCtl.GetPDClient(), local.tls) + kvRegionSplitSize, kvRegionSplitKeys, err := getRegionSplitSizeKeys(ctx, local.pdCtl.GetPDClient(), local.tls) if err == nil { if kvRegionSplitSize > regionSplitSize { regionSplitSize = kvRegionSplitSize } + if kvRegionSplitKeys > regionSplitKeys { + regionSplitKeys = kvRegionSplitKeys + } } else { - regionSplitKeys = int64(regionMaxKeyCount) + log.L().Warn("fail to get region split keys and size", zap.Error(err)) if regionSplitSize > defaultRegionSplitSize { regionSplitKeys = int64(float64(regionSplitSize) / float64(defaultRegionSplitSize) * float64(regionMaxKeyCount)) } diff --git a/br/pkg/lightning/backend/noop/noop.go b/br/pkg/lightning/backend/noop/noop.go index 430c4c5a83e8..2ac3e2b346db 100644 --- a/br/pkg/lightning/backend/noop/noop.go +++ b/br/pkg/lightning/backend/noop/noop.go @@ -79,7 +79,7 @@ func (b noopBackend) CloseEngine(ctx context.Context, cfg *backend.EngineConfig, return nil } -func (b noopBackend) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize int64) error { +func (b noopBackend) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error { return nil } diff --git a/br/pkg/lightning/backend/tidb/tidb.go b/br/pkg/lightning/backend/tidb/tidb.go index fe93c84c249d..9ae70e7afef1 100644 --- a/br/pkg/lightning/backend/tidb/tidb.go +++ b/br/pkg/lightning/backend/tidb/tidb.go @@ -432,7 +432,7 @@ func (be *tidbBackend) ResolveDuplicateRows(ctx context.Context, tbl table.Table return nil } -func (be *tidbBackend) ImportEngine(context.Context, uuid.UUID, int64) error { +func (be *tidbBackend) ImportEngine(context.Context, uuid.UUID, int64, int64) error { return nil } diff --git a/br/pkg/lightning/config/config.go b/br/pkg/lightning/config/config.go index 1955e11c7e06..fee2aaf29deb 100644 --- a/br/pkg/lightning/config/config.go +++ b/br/pkg/lightning/config/config.go @@ -523,6 +523,7 @@ type TikvImporter struct { MaxKVPairs int `toml:"max-kv-pairs" json:"max-kv-pairs"` SendKVPairs int `toml:"send-kv-pairs" json:"send-kv-pairs"` RegionSplitSize ByteSize `toml:"region-split-size" json:"region-split-size"` + RegionSplitKeys int `toml:"region-split-keys" json:"region-split-keys"` SortedKVDir string `toml:"sorted-kv-dir" json:"sorted-kv-dir"` DiskQuota ByteSize `toml:"disk-quota" json:"disk-quota"` RangeConcurrency int `toml:"range-concurrency" json:"range-concurrency"` diff --git a/br/pkg/lightning/config/const.go b/br/pkg/lightning/config/const.go index e5a23605257e..2cd372241c8c 100644 --- a/br/pkg/lightning/config/const.go +++ b/br/pkg/lightning/config/const.go @@ -21,8 +21,9 @@ import ( const ( // mydumper ReadBlockSize ByteSize = 64 * units.KiB - MaxRegionSize ByteSize = 15 * units.GiB - SplitRegionSize ByteSize = 10 * units.GiB + MaxRegionSize ByteSize = 256 * units.MiB + SplitRegionSize ByteSize = 96 * units.MiB + SplitRegionKeys int = 1_280_000 MaxSplitRegionSizeRatio int = 10 BufferSizeScale = 5 diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 464402549803..c776510ae3c9 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -1807,7 +1807,7 @@ func (rc *Controller) enforceDiskQuota(ctx context.Context) { var importErr error for _, engine := range largeEngines { // Use a larger split region size to avoid split the same region by many times. - if err := rc.backend.UnsafeImportAndReset(ctx, engine, int64(config.SplitRegionSize)*int64(config.MaxSplitRegionSizeRatio)); err != nil { + if err := rc.backend.UnsafeImportAndReset(ctx, engine, int64(config.SplitRegionSize)*int64(config.MaxSplitRegionSizeRatio), int64(config.SplitRegionKeys)*int64(config.MaxSplitRegionSizeRatio)); err != nil { importErr = multierr.Append(importErr, err) } } diff --git a/br/pkg/lightning/restore/table_restore.go b/br/pkg/lightning/restore/table_restore.go index 48057c0f1760..d3e1aec0a214 100644 --- a/br/pkg/lightning/restore/table_restore.go +++ b/br/pkg/lightning/restore/table_restore.go @@ -921,18 +921,34 @@ func (tr *TableRestore) importKV( ) error { task := closedEngine.Logger().Begin(zap.InfoLevel, "import and cleanup engine") regionSplitSize := int64(rc.cfg.TikvImporter.RegionSplitSize) - if regionSplitSize == 0 && rc.taskMgr != nil { + regionSplitKeys := int64(rc.cfg.TikvImporter.RegionSplitKeys) + if regionSplitSize == 0 { regionSplitSize = int64(config.SplitRegionSize) - if err := rc.taskMgr.CheckTasksExclusively(ctx, func(tasks []taskMeta) ([]taskMeta, error) { - if len(tasks) > 0 { - regionSplitSize = int64(config.SplitRegionSize) * int64(mathutil.Min(len(tasks), config.MaxSplitRegionSizeRatio)) + if rc.taskMgr != nil { + if err := rc.taskMgr.CheckTasksExclusively(ctx, func(tasks []taskMeta) ([]taskMeta, error) { + if len(tasks) > 0 { + regionSplitSize = int64(config.SplitRegionSize) * int64(mathutil.Min(len(tasks), config.MaxSplitRegionSizeRatio)) + } + return nil, nil + }); err != nil { + return errors.Trace(err) + } + } + } + if regionSplitKeys == 0 { + regionSplitKeys = int64(config.SplitRegionKeys) + if rc.taskMgr != nil { + if err := rc.taskMgr.CheckTasksExclusively(ctx, func(tasks []taskMeta) ([]taskMeta, error) { + if len(tasks) > 0 { + regionSplitSize = int64(config.SplitRegionKeys) * int64(mathutil.Min(len(tasks), config.MaxSplitRegionSizeRatio)) + } + return nil, nil + }); err != nil { + return errors.Trace(err) } - return nil, nil - }); err != nil { - return errors.Trace(err) } } - err := closedEngine.Import(ctx, regionSplitSize) + err := closedEngine.Import(ctx, regionSplitSize, regionSplitKeys) saveCpErr := rc.saveStatusCheckpoint(ctx, tr.tableName, engineID, err, checkpoints.CheckpointStatusImported) // Don't clean up when save checkpoint failed, because we will verifyLocalFile and import engine again after restart. if err == nil && saveCpErr == nil { diff --git a/br/pkg/mock/backend.go b/br/pkg/mock/backend.go index ee8016f3d292..7eba5180694a 100644 --- a/br/pkg/mock/backend.go +++ b/br/pkg/mock/backend.go @@ -185,17 +185,17 @@ func (mr *MockBackendMockRecorder) FlushEngine(arg0, arg1 interface{}) *gomock.C } // ImportEngine mocks base method. -func (m *MockBackend) ImportEngine(arg0 context.Context, arg1 uuid.UUID, arg2 int64) error { +func (m *MockBackend) ImportEngine(arg0 context.Context, arg1 uuid.UUID, arg2, arg3 int64) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ImportEngine", arg0, arg1, arg2) + ret := m.ctrl.Call(m, "ImportEngine", arg0, arg1, arg2, arg3) ret0, _ := ret[0].(error) return ret0 } // ImportEngine indicates an expected call of ImportEngine. -func (mr *MockBackendMockRecorder) ImportEngine(arg0, arg1, arg2 interface{}) *gomock.Call { +func (mr *MockBackendMockRecorder) ImportEngine(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ImportEngine", reflect.TypeOf((*MockBackend)(nil).ImportEngine), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ImportEngine", reflect.TypeOf((*MockBackend)(nil).ImportEngine), arg0, arg1, arg2, arg3) } // LocalWriter mocks base method. From 0faf095984245ac18e70633993537b21315b5313 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Mon, 16 May 2022 14:42:51 +0800 Subject: [PATCH 4/7] address comments --- br/pkg/lightning/backend/local/local.go | 12 +++------ br/pkg/lightning/config/const.go | 6 +++-- br/pkg/lightning/restore/table_restore.go | 33 +++++++++++------------ 3 files changed, 23 insertions(+), 28 deletions(-) diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 4e79532f9d29..ac9f30038796 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -88,10 +88,6 @@ const ( gRPCKeepAliveTimeout = 5 * time.Minute gRPCBackOffMaxDelay = 10 * time.Minute - // See: https://github.com/tikv/tikv/blob/e030a0aae9622f3774df89c62f21b2171a72a69e/etc/config-template.toml#L360 - // lower the max-key-count to avoid tikv trigger region auto split - regionMaxKeyCount = 1_280_000 - defaultRegionSplitSize = 96 * units.MiB // The max ranges count in a batch to split and scatter. maxBatchSplitRanges = 4096 @@ -111,7 +107,7 @@ var ( // Local backend is compatible with TiDB [4.0.0, NextMajorVersion). localMinTiDBVersion = *semver.New("4.0.0") localMinTiKVVersion = *semver.New("4.0.0") - localMinPDVersion = *semver.New("7.0.0") + localMinPDVersion = *semver.New("4.0.0") localMaxTiDBVersion = version.NextMajorVersion() localMaxTiKVVersion = version.NextMajorVersion() localMaxPDVersion = version.NextMajorVersion() @@ -823,7 +819,7 @@ func (local *local) WriteToTiKV( // if region-split-size <= 96MiB, we bump the threshold a bit to avoid too many retry split // because the range-properties is not 100% accurate regionMaxSize := regionSplitSize - if regionSplitSize <= defaultRegionSplitSize { + if regionSplitSize <= int64(config.SplitRegionSize) { regionMaxSize = regionSplitSize * 4 / 3 } @@ -1352,8 +1348,8 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regi } } else { log.L().Warn("fail to get region split keys and size", zap.Error(err)) - if regionSplitSize > defaultRegionSplitSize { - regionSplitKeys = int64(float64(regionSplitSize) / float64(defaultRegionSplitSize) * float64(regionMaxKeyCount)) + if regionSplitSize > int64(config.SplitRegionSize) { + regionSplitKeys = int64(float64(regionSplitSize) / float64(config.SplitRegionSize) * float64(config.SplitRegionKeys)) } } diff --git a/br/pkg/lightning/config/const.go b/br/pkg/lightning/config/const.go index 2cd372241c8c..23a38ac41117 100644 --- a/br/pkg/lightning/config/const.go +++ b/br/pkg/lightning/config/const.go @@ -20,8 +20,10 @@ import ( const ( // mydumper - ReadBlockSize ByteSize = 64 * units.KiB - MaxRegionSize ByteSize = 256 * units.MiB + ReadBlockSize ByteSize = 64 * units.KiB + MaxRegionSize ByteSize = 256 * units.MiB + // See: https://github.com/tikv/tikv/blob/e030a0aae9622f3774df89c62f21b2171a72a69e/etc/config-template.toml#L360 + // lower the max-key-count to avoid tikv trigger region auto split SplitRegionSize ByteSize = 96 * units.MiB SplitRegionKeys int = 1_280_000 MaxSplitRegionSizeRatio int = 10 diff --git a/br/pkg/lightning/restore/table_restore.go b/br/pkg/lightning/restore/table_restore.go index d3e1aec0a214..1df6a0900109 100644 --- a/br/pkg/lightning/restore/table_restore.go +++ b/br/pkg/lightning/restore/table_restore.go @@ -922,31 +922,28 @@ func (tr *TableRestore) importKV( task := closedEngine.Logger().Begin(zap.InfoLevel, "import and cleanup engine") regionSplitSize := int64(rc.cfg.TikvImporter.RegionSplitSize) regionSplitKeys := int64(rc.cfg.TikvImporter.RegionSplitKeys) - if regionSplitSize == 0 { - regionSplitSize = int64(config.SplitRegionSize) - if rc.taskMgr != nil { - if err := rc.taskMgr.CheckTasksExclusively(ctx, func(tasks []taskMeta) ([]taskMeta, error) { - if len(tasks) > 0 { + + if rc.taskMgr != nil && (regionSplitSize == 0 || regionSplitKeys == 0) { + if err := rc.taskMgr.CheckTasksExclusively(ctx, func(tasks []taskMeta) ([]taskMeta, error) { + if len(tasks) > 0 { + if regionSplitSize == 0 { regionSplitSize = int64(config.SplitRegionSize) * int64(mathutil.Min(len(tasks), config.MaxSplitRegionSizeRatio)) } - return nil, nil - }); err != nil { - return errors.Trace(err) + if regionSplitKeys == 0 { + regionSplitKeys = int64(config.SplitRegionKeys) * int64(mathutil.Min(len(tasks), config.MaxSplitRegionSizeRatio)) + } } + + return nil, nil + }); err != nil { + return errors.Trace(err) } } + if regionSplitSize == 0 { + regionSplitSize = int64(config.SplitRegionSize) + } if regionSplitKeys == 0 { regionSplitKeys = int64(config.SplitRegionKeys) - if rc.taskMgr != nil { - if err := rc.taskMgr.CheckTasksExclusively(ctx, func(tasks []taskMeta) ([]taskMeta, error) { - if len(tasks) > 0 { - regionSplitSize = int64(config.SplitRegionKeys) * int64(mathutil.Min(len(tasks), config.MaxSplitRegionSizeRatio)) - } - return nil, nil - }); err != nil { - return errors.Trace(err) - } - } } err := closedEngine.Import(ctx, regionSplitSize, regionSplitKeys) saveCpErr := rc.saveStatusCheckpoint(ctx, tr.tableName, engineID, err, checkpoints.CheckpointStatusImported) From 39c9eb521f0fa54abe3865dbacae2f979fff1921 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Mon, 16 May 2022 14:51:58 +0800 Subject: [PATCH 5/7] update --- br/pkg/lightning/backend/local/local.go | 3 --- br/pkg/lightning/restore/table_restore.go | 20 ++++++++------------ 2 files changed, 8 insertions(+), 15 deletions(-) diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index ac9f30038796..a1c4e508193e 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -1348,9 +1348,6 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regi } } else { log.L().Warn("fail to get region split keys and size", zap.Error(err)) - if regionSplitSize > int64(config.SplitRegionSize) { - regionSplitKeys = int64(float64(regionSplitSize) / float64(config.SplitRegionSize) * float64(config.SplitRegionKeys)) - } } // split sorted file into range by 96MB size per file diff --git a/br/pkg/lightning/restore/table_restore.go b/br/pkg/lightning/restore/table_restore.go index 1df6a0900109..feedb2d47681 100644 --- a/br/pkg/lightning/restore/table_restore.go +++ b/br/pkg/lightning/restore/table_restore.go @@ -923,27 +923,23 @@ func (tr *TableRestore) importKV( regionSplitSize := int64(rc.cfg.TikvImporter.RegionSplitSize) regionSplitKeys := int64(rc.cfg.TikvImporter.RegionSplitKeys) - if rc.taskMgr != nil && (regionSplitSize == 0 || regionSplitKeys == 0) { + if regionSplitSize == 0 && rc.taskMgr != nil { + regionSplitSize = int64(config.SplitRegionSize) if err := rc.taskMgr.CheckTasksExclusively(ctx, func(tasks []taskMeta) ([]taskMeta, error) { if len(tasks) > 0 { - if regionSplitSize == 0 { - regionSplitSize = int64(config.SplitRegionSize) * int64(mathutil.Min(len(tasks), config.MaxSplitRegionSizeRatio)) - } - if regionSplitKeys == 0 { - regionSplitKeys = int64(config.SplitRegionKeys) * int64(mathutil.Min(len(tasks), config.MaxSplitRegionSizeRatio)) - } + regionSplitSize = int64(config.SplitRegionSize) * int64(mathutil.Min(len(tasks), config.MaxSplitRegionSizeRatio)) } - return nil, nil }); err != nil { return errors.Trace(err) } } - if regionSplitSize == 0 { - regionSplitSize = int64(config.SplitRegionSize) - } if regionSplitKeys == 0 { - regionSplitKeys = int64(config.SplitRegionKeys) + if regionSplitSize > int64(config.SplitRegionSize) { + regionSplitKeys = int64(float64(regionSplitSize) / float64(config.SplitRegionSize) * float64(config.SplitRegionKeys)) + } else { + regionSplitKeys = int64(config.SplitRegionKeys) + } } err := closedEngine.Import(ctx, regionSplitSize, regionSplitKeys) saveCpErr := rc.saveStatusCheckpoint(ctx, tr.tableName, engineID, err, checkpoints.CheckpointStatusImported) From 75ac1ebc6d70e2e920b3242c4d3c1d257bdebb65 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Mon, 16 May 2022 15:03:29 +0800 Subject: [PATCH 6/7] fix ut --- br/pkg/lightning/restore/table_restore_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/br/pkg/lightning/restore/table_restore_test.go b/br/pkg/lightning/restore/table_restore_test.go index 5114ab98d318..2083a51f9874 100644 --- a/br/pkg/lightning/restore/table_restore_test.go +++ b/br/pkg/lightning/restore/table_restore_test.go @@ -831,7 +831,7 @@ func (s *tableRestoreSuite) TestImportKVSuccess() { CloseEngine(ctx, nil, engineUUID). Return(nil) mockBackend.EXPECT(). - ImportEngine(ctx, engineUUID, gomock.Any()). + ImportEngine(ctx, engineUUID, gomock.Any(), gomock.Any()). Return(nil) mockBackend.EXPECT(). CleanupEngine(ctx, engineUUID). @@ -866,7 +866,7 @@ func (s *tableRestoreSuite) TestImportKVFailure() { CloseEngine(ctx, nil, engineUUID). Return(nil) mockBackend.EXPECT(). - ImportEngine(ctx, engineUUID, gomock.Any()). + ImportEngine(ctx, engineUUID, gomock.Any(), gomock.Any()). Return(errors.Annotate(context.Canceled, "fake import error")) closedEngine, err := importer.UnsafeCloseEngineWithUUID(ctx, nil, "tag", engineUUID) From e306b74c0e9c30456d2393775d8960c8b6513148 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Mon, 16 May 2022 15:51:23 +0800 Subject: [PATCH 7/7] address comment --- br/pkg/lightning/backend/local/local.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index a1c4e508193e..ed140ce1c0ef 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -1872,7 +1872,7 @@ func getRegionSplitSizeKeys(ctx context.Context, cli pd.Client, tls *common.TLS) return 0, 0, err } for _, store := range stores { - if store.StatusAddress == "" { + if store.StatusAddress == "" || version.IsTiFlash(store) { continue } regionSplitSize, regionSplitKeys, err := getSplitConfFromStore(ctx, store.StatusAddress, tls)