Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: auto adjust dynamic region configuration #34537

Merged
merged 17 commits into from May 16, 2022
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 5 additions & 5 deletions br/pkg/lightning/backend/backend.go
Expand Up @@ -152,7 +152,7 @@ type AbstractBackend interface {
// ImportEngine imports engine data to the backend. If it returns ErrDuplicateDetected,
// it means there is duplicate detected. For this situation, all data in the engine must be imported.
// It's safe to reset or cleanup this engine.
ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize int64) error
ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error

CleanupEngine(ctx context.Context, engineUUID uuid.UUID) error

Expand Down Expand Up @@ -315,7 +315,7 @@ func (be Backend) CheckDiskQuota(quota int64) (
// into the target and then reset the engine to empty. This method will not
// close the engine. Make sure the engine is flushed manually before calling
// this method.
func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID, regionSplitSize int64) error {
func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error {
// DO NOT call be.abstract.CloseEngine()! The engine should still be writable after
// calling UnsafeImportAndReset().
closedEngine := ClosedEngine{
Expand All @@ -325,7 +325,7 @@ func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID
uuid: engineUUID,
},
}
if err := closedEngine.Import(ctx, regionSplitSize); err != nil {
if err := closedEngine.Import(ctx, regionSplitSize, regionSplitKeys); err != nil {
return err
}
return be.abstract.ResetEngine(ctx, engineUUID)
Expand Down Expand Up @@ -445,12 +445,12 @@ func (en engine) unsafeClose(ctx context.Context, cfg *EngineConfig) (*ClosedEng
}

// Import the data written to the engine into the target.
func (engine *ClosedEngine) Import(ctx context.Context, regionSplitSize int64) error {
func (engine *ClosedEngine) Import(ctx context.Context, regionSplitSize, regionSplitKeys int64) error {
var err error

for i := 0; i < importMaxRetryTimes; i++ {
task := engine.logger.With(zap.Int("retryCnt", i)).Begin(zap.InfoLevel, "import")
err = engine.backend.ImportEngine(ctx, engine.uuid, regionSplitSize)
err = engine.backend.ImportEngine(ctx, engine.uuid, regionSplitSize, regionSplitKeys)
if !common.IsRetryableError(err) {
task.End(zap.ErrorLevel, err)
return err
Expand Down
18 changes: 9 additions & 9 deletions br/pkg/lightning/backend/backend_test.go
Expand Up @@ -54,7 +54,7 @@ func TestOpenCloseImportCleanUpEngine(t *testing.T) {
Return(nil).
After(openCall)
importCall := s.mockBackend.EXPECT().
ImportEngine(ctx, engineUUID, gomock.Any()).
ImportEngine(ctx, engineUUID, gomock.Any(), gomock.Any()).
Return(nil).
After(closeCall)
s.mockBackend.EXPECT().
Expand All @@ -66,7 +66,7 @@ func TestOpenCloseImportCleanUpEngine(t *testing.T) {
require.NoError(t, err)
closedEngine, err := engine.Close(ctx, nil)
require.NoError(t, err)
err = closedEngine.Import(ctx, 1)
err = closedEngine.Import(ctx, 1, 1)
require.NoError(t, err)
err = closedEngine.Cleanup(ctx)
require.NoError(t, err)
Expand Down Expand Up @@ -250,12 +250,12 @@ func TestImportFailedNoRetry(t *testing.T) {

s.mockBackend.EXPECT().CloseEngine(ctx, nil, gomock.Any()).Return(nil)
s.mockBackend.EXPECT().
ImportEngine(ctx, gomock.Any(), gomock.Any()).
ImportEngine(ctx, gomock.Any(), gomock.Any(), gomock.Any()).
Return(errors.Annotate(context.Canceled, "fake unrecoverable import error"))

closedEngine, err := s.backend.UnsafeCloseEngine(ctx, nil, "`db`.`table`", 1)
require.NoError(t, err)
err = closedEngine.Import(ctx, 1)
err = closedEngine.Import(ctx, 1, 1)
require.Error(t, err)
require.Regexp(t, "^fake unrecoverable import error", err.Error())
}
Expand All @@ -268,14 +268,14 @@ func TestImportFailedWithRetry(t *testing.T) {

s.mockBackend.EXPECT().CloseEngine(ctx, nil, gomock.Any()).Return(nil)
s.mockBackend.EXPECT().
ImportEngine(ctx, gomock.Any(), gomock.Any()).
ImportEngine(ctx, gomock.Any(), gomock.Any(), gomock.Any()).
Return(errors.Annotate(driver.ErrBadConn, "fake recoverable import error")).
MinTimes(2)
s.mockBackend.EXPECT().RetryImportDelay().Return(time.Duration(0)).AnyTimes()

closedEngine, err := s.backend.UnsafeCloseEngine(ctx, nil, "`db`.`table`", 1)
require.NoError(t, err)
err = closedEngine.Import(ctx, 1)
err = closedEngine.Import(ctx, 1, 1)
require.Error(t, err)
require.Contains(t, err.Error(), "fake recoverable import error")
}
Expand All @@ -288,16 +288,16 @@ func TestImportFailedRecovered(t *testing.T) {

s.mockBackend.EXPECT().CloseEngine(ctx, nil, gomock.Any()).Return(nil)
s.mockBackend.EXPECT().
ImportEngine(ctx, gomock.Any(), gomock.Any()).
ImportEngine(ctx, gomock.Any(), gomock.Any(), gomock.Any()).
Return(gmysql.ErrInvalidConn)
s.mockBackend.EXPECT().
ImportEngine(ctx, gomock.Any(), gomock.Any()).
ImportEngine(ctx, gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil)
s.mockBackend.EXPECT().RetryImportDelay().Return(time.Duration(0)).AnyTimes()

closedEngine, err := s.backend.UnsafeCloseEngine(ctx, nil, "`db`.`table`", 1)
require.NoError(t, err)
err = closedEngine.Import(ctx, 1)
err = closedEngine.Import(ctx, 1, 1)
require.NoError(t, err)
}

Expand Down
59 changes: 50 additions & 9 deletions br/pkg/lightning/backend/local/local.go
Expand Up @@ -88,10 +88,6 @@ const (
gRPCKeepAliveTimeout = 5 * time.Minute
gRPCBackOffMaxDelay = 10 * time.Minute

// See: https://github.com/tikv/tikv/blob/e030a0aae9622f3774df89c62f21b2171a72a69e/etc/config-template.toml#L360
// lower the max-key-count to avoid tikv trigger region auto split
regionMaxKeyCount = 1_280_000
defaultRegionSplitSize = 96 * units.MiB
// The max ranges count in a batch to split and scatter.
maxBatchSplitRanges = 4096

Expand Down Expand Up @@ -823,7 +819,7 @@ func (local *local) WriteToTiKV(
// if region-split-size <= 96MiB, we bump the threshold a bit to avoid too many retry split
// because the range-properties is not 100% accurate
regionMaxSize := regionSplitSize
if regionSplitSize <= defaultRegionSplitSize {
if regionSplitSize <= int64(config.SplitRegionSize) {
regionMaxSize = regionSplitSize * 4 / 3
}

Expand Down Expand Up @@ -1328,7 +1324,7 @@ func (local *local) writeAndIngestByRanges(ctx context.Context, engine *Engine,
return allErr
}

func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize int64) error {
func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error {
lf := local.lockEngine(engineUUID, importMutexStateImport)
if lf == nil {
// skip if engine not exist. See the comment of `CloseEngine` for more detail.
Expand All @@ -1342,9 +1338,16 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regi
log.L().Info("engine contains no kv, skip import", zap.Stringer("engine", engineUUID))
return nil
}
regionSplitKeys := int64(regionMaxKeyCount)
if regionSplitSize > defaultRegionSplitSize {
regionSplitKeys = int64(float64(regionSplitSize) / float64(defaultRegionSplitSize) * float64(regionMaxKeyCount))
kvRegionSplitSize, kvRegionSplitKeys, err := getRegionSplitSizeKeys(ctx, local.pdCtl.GetPDClient(), local.tls)
if err == nil {
if kvRegionSplitSize > regionSplitSize {
regionSplitSize = kvRegionSplitSize
}
if kvRegionSplitKeys > regionSplitKeys {
regionSplitKeys = kvRegionSplitKeys
}
sleepymole marked this conversation as resolved.
Show resolved Hide resolved
} else {
log.L().Warn("fail to get region split keys and size", zap.Error(err))
}

// split sorted file into range by 96MB size per file
Expand Down Expand Up @@ -1842,3 +1845,41 @@ func (local *local) EngineFileSizes() (res []backend.EngineFileSize) {
})
return
}

func getSplitConfFromStore(ctx context.Context, host string, tls *common.TLS) (int64, int64, error) {
var (
nested struct {
Coprocessor struct {
RegionSplitSize string `json:"region-split-size"`
RegionSplitKeys int64 `json:"region-split-keys"`
} `json:"coprocessor"`
}
)
if err := tls.WithHost(host).GetJSON(ctx, "/config", &nested); err != nil {
return 0, 0, errors.Trace(err)
}
splitSize, err := units.FromHumanSize(nested.Coprocessor.RegionSplitSize)
if err != nil {
return 0, 0, errors.Trace(err)
}

return splitSize, nested.Coprocessor.RegionSplitKeys, nil
}

func getRegionSplitSizeKeys(ctx context.Context, cli pd.Client, tls *common.TLS) (int64, int64, error) {
stores, err := cli.GetAllStores(ctx, pd.WithExcludeTombstone())
if err != nil {
return 0, 0, err
}
for _, store := range stores {
sleepymole marked this conversation as resolved.
Show resolved Hide resolved
if store.StatusAddress == "" {
continue
}
regionSplitSize, regionSplitKeys, err := getSplitConfFromStore(ctx, store.StatusAddress, tls)
if err == nil {
return regionSplitSize, regionSplitKeys, nil
}
log.L().Warn("get region split size and keys failed", zap.Error(err), zap.String("store", store.StatusAddress))
}
return 0, 0, errors.New("get region split size and keys failed")
}
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/noop/noop.go
Expand Up @@ -79,7 +79,7 @@ func (b noopBackend) CloseEngine(ctx context.Context, cfg *backend.EngineConfig,
return nil
}

func (b noopBackend) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize int64) error {
func (b noopBackend) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/tidb/tidb.go
Expand Up @@ -432,7 +432,7 @@ func (be *tidbBackend) ResolveDuplicateRows(ctx context.Context, tbl table.Table
return nil
}

func (be *tidbBackend) ImportEngine(context.Context, uuid.UUID, int64) error {
func (be *tidbBackend) ImportEngine(context.Context, uuid.UUID, int64, int64) error {
return nil
}

Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/config/config.go
Expand Up @@ -523,6 +523,7 @@ type TikvImporter struct {
MaxKVPairs int `toml:"max-kv-pairs" json:"max-kv-pairs"`
SendKVPairs int `toml:"send-kv-pairs" json:"send-kv-pairs"`
RegionSplitSize ByteSize `toml:"region-split-size" json:"region-split-size"`
RegionSplitKeys int `toml:"region-split-keys" json:"region-split-keys"`
SortedKVDir string `toml:"sorted-kv-dir" json:"sorted-kv-dir"`
DiskQuota ByteSize `toml:"disk-quota" json:"disk-quota"`
RangeConcurrency int `toml:"range-concurrency" json:"range-concurrency"`
Expand Down
7 changes: 5 additions & 2 deletions br/pkg/lightning/config/const.go
Expand Up @@ -20,9 +20,12 @@ import (

const (
// mydumper
ReadBlockSize ByteSize = 64 * units.KiB
MaxRegionSize ByteSize = 256 * units.MiB
ReadBlockSize ByteSize = 64 * units.KiB
MaxRegionSize ByteSize = 256 * units.MiB
// See: https://github.com/tikv/tikv/blob/e030a0aae9622f3774df89c62f21b2171a72a69e/etc/config-template.toml#L360
// lower the max-key-count to avoid tikv trigger region auto split
SplitRegionSize ByteSize = 96 * units.MiB
SplitRegionKeys int = 1_280_000
MaxSplitRegionSizeRatio int = 10

BufferSizeScale = 5
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/restore/restore.go
Expand Up @@ -1807,7 +1807,7 @@ func (rc *Controller) enforceDiskQuota(ctx context.Context) {
var importErr error
for _, engine := range largeEngines {
// Use a larger split region size to avoid split the same region by many times.
if err := rc.backend.UnsafeImportAndReset(ctx, engine, int64(config.SplitRegionSize)*int64(config.MaxSplitRegionSizeRatio)); err != nil {
if err := rc.backend.UnsafeImportAndReset(ctx, engine, int64(config.SplitRegionSize)*int64(config.MaxSplitRegionSizeRatio), int64(config.SplitRegionKeys)*int64(config.MaxSplitRegionSizeRatio)); err != nil {
importErr = multierr.Append(importErr, err)
}
}
Expand Down
11 changes: 10 additions & 1 deletion br/pkg/lightning/restore/table_restore.go
Expand Up @@ -921,6 +921,8 @@ func (tr *TableRestore) importKV(
) error {
task := closedEngine.Logger().Begin(zap.InfoLevel, "import and cleanup engine")
regionSplitSize := int64(rc.cfg.TikvImporter.RegionSplitSize)
regionSplitKeys := int64(rc.cfg.TikvImporter.RegionSplitKeys)

if regionSplitSize == 0 && rc.taskMgr != nil {
regionSplitSize = int64(config.SplitRegionSize)
if err := rc.taskMgr.CheckTasksExclusively(ctx, func(tasks []taskMeta) ([]taskMeta, error) {
Expand All @@ -932,7 +934,14 @@ func (tr *TableRestore) importKV(
return errors.Trace(err)
}
}
err := closedEngine.Import(ctx, regionSplitSize)
if regionSplitKeys == 0 {
if regionSplitSize > int64(config.SplitRegionSize) {
regionSplitKeys = int64(float64(regionSplitSize) / float64(config.SplitRegionSize) * float64(config.SplitRegionKeys))
} else {
regionSplitKeys = int64(config.SplitRegionKeys)
}
}
Comment on lines +937 to +943
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

those code seems more like adjusting, maybe move into config.Adjust()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These codes are adjusting. However it's a bit hard to put them in config.Adjust() because some extra instances are needed. Maybe left a TODO here and refactor this in the next PR?

err := closedEngine.Import(ctx, regionSplitSize, regionSplitKeys)
saveCpErr := rc.saveStatusCheckpoint(ctx, tr.tableName, engineID, err, checkpoints.CheckpointStatusImported)
// Don't clean up when save checkpoint failed, because we will verifyLocalFile and import engine again after restart.
if err == nil && saveCpErr == nil {
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/lightning/restore/table_restore_test.go
Expand Up @@ -831,7 +831,7 @@ func (s *tableRestoreSuite) TestImportKVSuccess() {
CloseEngine(ctx, nil, engineUUID).
Return(nil)
mockBackend.EXPECT().
ImportEngine(ctx, engineUUID, gomock.Any()).
ImportEngine(ctx, engineUUID, gomock.Any(), gomock.Any()).
Return(nil)
mockBackend.EXPECT().
CleanupEngine(ctx, engineUUID).
Expand Down Expand Up @@ -866,7 +866,7 @@ func (s *tableRestoreSuite) TestImportKVFailure() {
CloseEngine(ctx, nil, engineUUID).
Return(nil)
mockBackend.EXPECT().
ImportEngine(ctx, engineUUID, gomock.Any()).
ImportEngine(ctx, engineUUID, gomock.Any(), gomock.Any()).
Return(errors.Annotate(context.Canceled, "fake import error"))

closedEngine, err := importer.UnsafeCloseEngineWithUUID(ctx, nil, "tag", engineUUID)
Expand Down
8 changes: 4 additions & 4 deletions br/pkg/mock/backend.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion dumpling/export/metrics.go
Expand Up @@ -68,7 +68,7 @@ func InitMetricsVector(labels prometheus.Labels) {
Namespace: "dumpling",
Subsystem: "write",
Name: "receive_chunk_duration_time",
Help: "Bucketed histogram of write time (s) of files",
Help: "Bucketed histogram of receiving time (s) of chunks",
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 20),
}, labelNames)
errorCount = prometheus.NewCounterVec(
Expand Down