diff --git a/br/pkg/lightning/backend/backend.go b/br/pkg/lightning/backend/backend.go index e6c14728ae9a..e090cc053dc3 100644 --- a/br/pkg/lightning/backend/backend.go +++ b/br/pkg/lightning/backend/backend.go @@ -152,7 +152,7 @@ type AbstractBackend interface { // ImportEngine imports engine data to the backend. If it returns ErrDuplicateDetected, // it means there is duplicate detected. For this situation, all data in the engine must be imported. // It's safe to reset or cleanup this engine. - ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize int64) error + ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error CleanupEngine(ctx context.Context, engineUUID uuid.UUID) error @@ -315,7 +315,7 @@ func (be Backend) CheckDiskQuota(quota int64) ( // into the target and then reset the engine to empty. This method will not // close the engine. Make sure the engine is flushed manually before calling // this method. -func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID, regionSplitSize int64) error { +func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error { // DO NOT call be.abstract.CloseEngine()! The engine should still be writable after // calling UnsafeImportAndReset(). closedEngine := ClosedEngine{ @@ -325,7 +325,7 @@ func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID uuid: engineUUID, }, } - if err := closedEngine.Import(ctx, regionSplitSize); err != nil { + if err := closedEngine.Import(ctx, regionSplitSize, regionSplitKeys); err != nil { return err } return be.abstract.ResetEngine(ctx, engineUUID) @@ -445,12 +445,12 @@ func (en engine) unsafeClose(ctx context.Context, cfg *EngineConfig) (*ClosedEng } // Import the data written to the engine into the target. -func (engine *ClosedEngine) Import(ctx context.Context, regionSplitSize int64) error { +func (engine *ClosedEngine) Import(ctx context.Context, regionSplitSize, regionSplitKeys int64) error { var err error for i := 0; i < importMaxRetryTimes; i++ { task := engine.logger.With(zap.Int("retryCnt", i)).Begin(zap.InfoLevel, "import") - err = engine.backend.ImportEngine(ctx, engine.uuid, regionSplitSize) + err = engine.backend.ImportEngine(ctx, engine.uuid, regionSplitSize, regionSplitKeys) if !common.IsRetryableError(err) { task.End(zap.ErrorLevel, err) return err diff --git a/br/pkg/lightning/backend/backend_test.go b/br/pkg/lightning/backend/backend_test.go index b42ce1815b70..d388e7453383 100644 --- a/br/pkg/lightning/backend/backend_test.go +++ b/br/pkg/lightning/backend/backend_test.go @@ -54,7 +54,7 @@ func TestOpenCloseImportCleanUpEngine(t *testing.T) { Return(nil). After(openCall) importCall := s.mockBackend.EXPECT(). - ImportEngine(ctx, engineUUID, gomock.Any()). + ImportEngine(ctx, engineUUID, gomock.Any(), gomock.Any()). Return(nil). After(closeCall) s.mockBackend.EXPECT(). @@ -66,7 +66,7 @@ func TestOpenCloseImportCleanUpEngine(t *testing.T) { require.NoError(t, err) closedEngine, err := engine.Close(ctx, nil) require.NoError(t, err) - err = closedEngine.Import(ctx, 1) + err = closedEngine.Import(ctx, 1, 1) require.NoError(t, err) err = closedEngine.Cleanup(ctx) require.NoError(t, err) @@ -250,12 +250,12 @@ func TestImportFailedNoRetry(t *testing.T) { s.mockBackend.EXPECT().CloseEngine(ctx, nil, gomock.Any()).Return(nil) s.mockBackend.EXPECT(). - ImportEngine(ctx, gomock.Any(), gomock.Any()). + ImportEngine(ctx, gomock.Any(), gomock.Any(), gomock.Any()). Return(errors.Annotate(context.Canceled, "fake unrecoverable import error")) closedEngine, err := s.backend.UnsafeCloseEngine(ctx, nil, "`db`.`table`", 1) require.NoError(t, err) - err = closedEngine.Import(ctx, 1) + err = closedEngine.Import(ctx, 1, 1) require.Error(t, err) require.Regexp(t, "^fake unrecoverable import error", err.Error()) } @@ -268,14 +268,14 @@ func TestImportFailedWithRetry(t *testing.T) { s.mockBackend.EXPECT().CloseEngine(ctx, nil, gomock.Any()).Return(nil) s.mockBackend.EXPECT(). - ImportEngine(ctx, gomock.Any(), gomock.Any()). + ImportEngine(ctx, gomock.Any(), gomock.Any(), gomock.Any()). Return(errors.Annotate(driver.ErrBadConn, "fake recoverable import error")). MinTimes(2) s.mockBackend.EXPECT().RetryImportDelay().Return(time.Duration(0)).AnyTimes() closedEngine, err := s.backend.UnsafeCloseEngine(ctx, nil, "`db`.`table`", 1) require.NoError(t, err) - err = closedEngine.Import(ctx, 1) + err = closedEngine.Import(ctx, 1, 1) require.Error(t, err) require.Contains(t, err.Error(), "fake recoverable import error") } @@ -288,16 +288,16 @@ func TestImportFailedRecovered(t *testing.T) { s.mockBackend.EXPECT().CloseEngine(ctx, nil, gomock.Any()).Return(nil) s.mockBackend.EXPECT(). - ImportEngine(ctx, gomock.Any(), gomock.Any()). + ImportEngine(ctx, gomock.Any(), gomock.Any(), gomock.Any()). Return(gmysql.ErrInvalidConn) s.mockBackend.EXPECT(). - ImportEngine(ctx, gomock.Any(), gomock.Any()). + ImportEngine(ctx, gomock.Any(), gomock.Any(), gomock.Any()). Return(nil) s.mockBackend.EXPECT().RetryImportDelay().Return(time.Duration(0)).AnyTimes() closedEngine, err := s.backend.UnsafeCloseEngine(ctx, nil, "`db`.`table`", 1) require.NoError(t, err) - err = closedEngine.Import(ctx, 1) + err = closedEngine.Import(ctx, 1, 1) require.NoError(t, err) } diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index be55fc7eb9ce..ed140ce1c0ef 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -88,10 +88,6 @@ const ( gRPCKeepAliveTimeout = 5 * time.Minute gRPCBackOffMaxDelay = 10 * time.Minute - // See: https://github.com/tikv/tikv/blob/e030a0aae9622f3774df89c62f21b2171a72a69e/etc/config-template.toml#L360 - // lower the max-key-count to avoid tikv trigger region auto split - regionMaxKeyCount = 1_280_000 - defaultRegionSplitSize = 96 * units.MiB // The max ranges count in a batch to split and scatter. maxBatchSplitRanges = 4096 @@ -823,7 +819,7 @@ func (local *local) WriteToTiKV( // if region-split-size <= 96MiB, we bump the threshold a bit to avoid too many retry split // because the range-properties is not 100% accurate regionMaxSize := regionSplitSize - if regionSplitSize <= defaultRegionSplitSize { + if regionSplitSize <= int64(config.SplitRegionSize) { regionMaxSize = regionSplitSize * 4 / 3 } @@ -1328,7 +1324,7 @@ func (local *local) writeAndIngestByRanges(ctx context.Context, engine *Engine, return allErr } -func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize int64) error { +func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error { lf := local.lockEngine(engineUUID, importMutexStateImport) if lf == nil { // skip if engine not exist. See the comment of `CloseEngine` for more detail. @@ -1342,9 +1338,16 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regi log.L().Info("engine contains no kv, skip import", zap.Stringer("engine", engineUUID)) return nil } - regionSplitKeys := int64(regionMaxKeyCount) - if regionSplitSize > defaultRegionSplitSize { - regionSplitKeys = int64(float64(regionSplitSize) / float64(defaultRegionSplitSize) * float64(regionMaxKeyCount)) + kvRegionSplitSize, kvRegionSplitKeys, err := getRegionSplitSizeKeys(ctx, local.pdCtl.GetPDClient(), local.tls) + if err == nil { + if kvRegionSplitSize > regionSplitSize { + regionSplitSize = kvRegionSplitSize + } + if kvRegionSplitKeys > regionSplitKeys { + regionSplitKeys = kvRegionSplitKeys + } + } else { + log.L().Warn("fail to get region split keys and size", zap.Error(err)) } // split sorted file into range by 96MB size per file @@ -1842,3 +1845,41 @@ func (local *local) EngineFileSizes() (res []backend.EngineFileSize) { }) return } + +func getSplitConfFromStore(ctx context.Context, host string, tls *common.TLS) (int64, int64, error) { + var ( + nested struct { + Coprocessor struct { + RegionSplitSize string `json:"region-split-size"` + RegionSplitKeys int64 `json:"region-split-keys"` + } `json:"coprocessor"` + } + ) + if err := tls.WithHost(host).GetJSON(ctx, "/config", &nested); err != nil { + return 0, 0, errors.Trace(err) + } + splitSize, err := units.FromHumanSize(nested.Coprocessor.RegionSplitSize) + if err != nil { + return 0, 0, errors.Trace(err) + } + + return splitSize, nested.Coprocessor.RegionSplitKeys, nil +} + +func getRegionSplitSizeKeys(ctx context.Context, cli pd.Client, tls *common.TLS) (int64, int64, error) { + stores, err := cli.GetAllStores(ctx, pd.WithExcludeTombstone()) + if err != nil { + return 0, 0, err + } + for _, store := range stores { + if store.StatusAddress == "" || version.IsTiFlash(store) { + continue + } + regionSplitSize, regionSplitKeys, err := getSplitConfFromStore(ctx, store.StatusAddress, tls) + if err == nil { + return regionSplitSize, regionSplitKeys, nil + } + log.L().Warn("get region split size and keys failed", zap.Error(err), zap.String("store", store.StatusAddress)) + } + return 0, 0, errors.New("get region split size and keys failed") +} diff --git a/br/pkg/lightning/backend/noop/noop.go b/br/pkg/lightning/backend/noop/noop.go index 430c4c5a83e8..2ac3e2b346db 100644 --- a/br/pkg/lightning/backend/noop/noop.go +++ b/br/pkg/lightning/backend/noop/noop.go @@ -79,7 +79,7 @@ func (b noopBackend) CloseEngine(ctx context.Context, cfg *backend.EngineConfig, return nil } -func (b noopBackend) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize int64) error { +func (b noopBackend) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error { return nil } diff --git a/br/pkg/lightning/backend/tidb/tidb.go b/br/pkg/lightning/backend/tidb/tidb.go index fe93c84c249d..9ae70e7afef1 100644 --- a/br/pkg/lightning/backend/tidb/tidb.go +++ b/br/pkg/lightning/backend/tidb/tidb.go @@ -432,7 +432,7 @@ func (be *tidbBackend) ResolveDuplicateRows(ctx context.Context, tbl table.Table return nil } -func (be *tidbBackend) ImportEngine(context.Context, uuid.UUID, int64) error { +func (be *tidbBackend) ImportEngine(context.Context, uuid.UUID, int64, int64) error { return nil } diff --git a/br/pkg/lightning/config/config.go b/br/pkg/lightning/config/config.go index 1955e11c7e06..fee2aaf29deb 100644 --- a/br/pkg/lightning/config/config.go +++ b/br/pkg/lightning/config/config.go @@ -523,6 +523,7 @@ type TikvImporter struct { MaxKVPairs int `toml:"max-kv-pairs" json:"max-kv-pairs"` SendKVPairs int `toml:"send-kv-pairs" json:"send-kv-pairs"` RegionSplitSize ByteSize `toml:"region-split-size" json:"region-split-size"` + RegionSplitKeys int `toml:"region-split-keys" json:"region-split-keys"` SortedKVDir string `toml:"sorted-kv-dir" json:"sorted-kv-dir"` DiskQuota ByteSize `toml:"disk-quota" json:"disk-quota"` RangeConcurrency int `toml:"range-concurrency" json:"range-concurrency"` diff --git a/br/pkg/lightning/config/const.go b/br/pkg/lightning/config/const.go index bf807f2fe759..23a38ac41117 100644 --- a/br/pkg/lightning/config/const.go +++ b/br/pkg/lightning/config/const.go @@ -20,9 +20,12 @@ import ( const ( // mydumper - ReadBlockSize ByteSize = 64 * units.KiB - MaxRegionSize ByteSize = 256 * units.MiB + ReadBlockSize ByteSize = 64 * units.KiB + MaxRegionSize ByteSize = 256 * units.MiB + // See: https://github.com/tikv/tikv/blob/e030a0aae9622f3774df89c62f21b2171a72a69e/etc/config-template.toml#L360 + // lower the max-key-count to avoid tikv trigger region auto split SplitRegionSize ByteSize = 96 * units.MiB + SplitRegionKeys int = 1_280_000 MaxSplitRegionSizeRatio int = 10 BufferSizeScale = 5 diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 464402549803..c776510ae3c9 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -1807,7 +1807,7 @@ func (rc *Controller) enforceDiskQuota(ctx context.Context) { var importErr error for _, engine := range largeEngines { // Use a larger split region size to avoid split the same region by many times. - if err := rc.backend.UnsafeImportAndReset(ctx, engine, int64(config.SplitRegionSize)*int64(config.MaxSplitRegionSizeRatio)); err != nil { + if err := rc.backend.UnsafeImportAndReset(ctx, engine, int64(config.SplitRegionSize)*int64(config.MaxSplitRegionSizeRatio), int64(config.SplitRegionKeys)*int64(config.MaxSplitRegionSizeRatio)); err != nil { importErr = multierr.Append(importErr, err) } } diff --git a/br/pkg/lightning/restore/table_restore.go b/br/pkg/lightning/restore/table_restore.go index 48057c0f1760..feedb2d47681 100644 --- a/br/pkg/lightning/restore/table_restore.go +++ b/br/pkg/lightning/restore/table_restore.go @@ -921,6 +921,8 @@ func (tr *TableRestore) importKV( ) error { task := closedEngine.Logger().Begin(zap.InfoLevel, "import and cleanup engine") regionSplitSize := int64(rc.cfg.TikvImporter.RegionSplitSize) + regionSplitKeys := int64(rc.cfg.TikvImporter.RegionSplitKeys) + if regionSplitSize == 0 && rc.taskMgr != nil { regionSplitSize = int64(config.SplitRegionSize) if err := rc.taskMgr.CheckTasksExclusively(ctx, func(tasks []taskMeta) ([]taskMeta, error) { @@ -932,7 +934,14 @@ func (tr *TableRestore) importKV( return errors.Trace(err) } } - err := closedEngine.Import(ctx, regionSplitSize) + if regionSplitKeys == 0 { + if regionSplitSize > int64(config.SplitRegionSize) { + regionSplitKeys = int64(float64(regionSplitSize) / float64(config.SplitRegionSize) * float64(config.SplitRegionKeys)) + } else { + regionSplitKeys = int64(config.SplitRegionKeys) + } + } + err := closedEngine.Import(ctx, regionSplitSize, regionSplitKeys) saveCpErr := rc.saveStatusCheckpoint(ctx, tr.tableName, engineID, err, checkpoints.CheckpointStatusImported) // Don't clean up when save checkpoint failed, because we will verifyLocalFile and import engine again after restart. if err == nil && saveCpErr == nil { diff --git a/br/pkg/lightning/restore/table_restore_test.go b/br/pkg/lightning/restore/table_restore_test.go index 5114ab98d318..2083a51f9874 100644 --- a/br/pkg/lightning/restore/table_restore_test.go +++ b/br/pkg/lightning/restore/table_restore_test.go @@ -831,7 +831,7 @@ func (s *tableRestoreSuite) TestImportKVSuccess() { CloseEngine(ctx, nil, engineUUID). Return(nil) mockBackend.EXPECT(). - ImportEngine(ctx, engineUUID, gomock.Any()). + ImportEngine(ctx, engineUUID, gomock.Any(), gomock.Any()). Return(nil) mockBackend.EXPECT(). CleanupEngine(ctx, engineUUID). @@ -866,7 +866,7 @@ func (s *tableRestoreSuite) TestImportKVFailure() { CloseEngine(ctx, nil, engineUUID). Return(nil) mockBackend.EXPECT(). - ImportEngine(ctx, engineUUID, gomock.Any()). + ImportEngine(ctx, engineUUID, gomock.Any(), gomock.Any()). Return(errors.Annotate(context.Canceled, "fake import error")) closedEngine, err := importer.UnsafeCloseEngineWithUUID(ctx, nil, "tag", engineUUID) diff --git a/br/pkg/mock/backend.go b/br/pkg/mock/backend.go index ee8016f3d292..7eba5180694a 100644 --- a/br/pkg/mock/backend.go +++ b/br/pkg/mock/backend.go @@ -185,17 +185,17 @@ func (mr *MockBackendMockRecorder) FlushEngine(arg0, arg1 interface{}) *gomock.C } // ImportEngine mocks base method. -func (m *MockBackend) ImportEngine(arg0 context.Context, arg1 uuid.UUID, arg2 int64) error { +func (m *MockBackend) ImportEngine(arg0 context.Context, arg1 uuid.UUID, arg2, arg3 int64) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ImportEngine", arg0, arg1, arg2) + ret := m.ctrl.Call(m, "ImportEngine", arg0, arg1, arg2, arg3) ret0, _ := ret[0].(error) return ret0 } // ImportEngine indicates an expected call of ImportEngine. -func (mr *MockBackendMockRecorder) ImportEngine(arg0, arg1, arg2 interface{}) *gomock.Call { +func (mr *MockBackendMockRecorder) ImportEngine(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ImportEngine", reflect.TypeOf((*MockBackend)(nil).ImportEngine), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ImportEngine", reflect.TypeOf((*MockBackend)(nil).ImportEngine), arg0, arg1, arg2, arg3) } // LocalWriter mocks base method. diff --git a/dumpling/export/metrics.go b/dumpling/export/metrics.go index cb62caf04122..2a812ea2a06b 100644 --- a/dumpling/export/metrics.go +++ b/dumpling/export/metrics.go @@ -68,7 +68,7 @@ func InitMetricsVector(labels prometheus.Labels) { Namespace: "dumpling", Subsystem: "write", Name: "receive_chunk_duration_time", - Help: "Bucketed histogram of write time (s) of files", + Help: "Bucketed histogram of receiving time (s) of chunks", Buckets: prometheus.ExponentialBuckets(0.00005, 2, 20), }, labelNames) errorCount = prometheus.NewCounterVec(