Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: use common scan-split-scatter logic #52433

Merged
merged 9 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 11 additions & 6 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,13 @@ func NewRestoreClient(
pdHTTPCli pdhttp.Client,
tlsConf *tls.Config,
keepaliveConf keepalive.ClientParameters,
isRawKv bool,
) *Client {
return &Client{
pdClient: pdClient,
pdHTTPClient: pdHTTPCli,
toolClient: split.NewSplitClient(pdClient, pdHTTPCli, tlsConf, isRawKv, maxSplitKeysOnce),
pdClient: pdClient,
pdHTTPClient: pdHTTPCli,
// toolClient reuse the split.SplitClient to do miscellaneous things. It doesn't
// call split related functions so set the arguments to arbitrary values.
toolClient: split.NewClient(pdClient, pdHTTPCli, tlsConf, maxSplitKeysOnce, 3),
tlsConf: tlsConf,
keepaliveConf: keepaliveConf,
switchCh: make(chan struct{}),
Expand Down Expand Up @@ -555,7 +556,11 @@ func (rc *Client) InitClients(ctx context.Context, backend *backuppb.StorageBack
useTokenBucket = true
}

metaClient := split.NewSplitClient(rc.pdClient, rc.pdHTTPClient, rc.tlsConf, isRawKvMode, maxSplitKeysOnce)
var splitClientOpts []split.ClientOptionalParameter
if isRawKvMode {
splitClientOpts = append(splitClientOpts, split.WithRawKV())
}
metaClient := split.NewClient(rc.pdClient, rc.pdHTTPClient, rc.tlsConf, maxSplitKeysOnce, rc.GetStoreCount()+1, splitClientOpts...)
importCli := NewImportClient(metaClient, rc.tlsConf, rc.keepaliveConf)
rc.fileImporter = NewFileImporter(metaClient, importCli, backend, isRawKvMode, isTxnKvMode, stores, rc.rewriteMode, concurrencyPerStore, useTokenBucket)
}
Expand Down Expand Up @@ -1423,7 +1428,7 @@ func (rc *Client) WrapLogFilesIterWithSplitHelper(logIter LogIter, rules map[int
execCtx := se.GetSessionCtx().GetRestrictedSQLExecutor()
splitSize, splitKeys := utils.GetRegionSplitInfo(execCtx)
log.Info("get split threshold from tikv config", zap.Uint64("split-size", splitSize), zap.Int64("split-keys", splitKeys))
client := split.NewSplitClient(rc.GetPDClient(), rc.pdHTTPClient, rc.GetTLSConfig(), false, maxSplitKeysOnce)
client := split.NewClient(rc.GetPDClient(), rc.pdHTTPClient, rc.GetTLSConfig(), maxSplitKeysOnce, 3)
return NewLogFilesIterWithSplitHelper(logIter, rules, client, splitSize, splitKeys), nil
}

Expand Down
32 changes: 16 additions & 16 deletions br/pkg/restore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var defaultKeepaliveCfg = keepalive.ClientParameters{
func TestCreateTables(t *testing.T) {
m := mc
g := gluetidb.New()
client := restore.NewRestoreClient(m.PDClient, m.PDHTTPCli, nil, defaultKeepaliveCfg, false)
client := restore.NewRestoreClient(m.PDClient, m.PDHTTPCli, nil, defaultKeepaliveCfg)
err := client.Init(g, m.Storage)
require.NoError(t, err)

Expand Down Expand Up @@ -106,7 +106,7 @@ func TestCreateTables(t *testing.T) {
func TestIsOnline(t *testing.T) {
m := mc
g := gluetidb.New()
client := restore.NewRestoreClient(m.PDClient, m.PDHTTPCli, nil, defaultKeepaliveCfg, false)
client := restore.NewRestoreClient(m.PDClient, m.PDHTTPCli, nil, defaultKeepaliveCfg)
err := client.Init(g, m.Storage)
require.NoError(t, err)

Expand All @@ -130,7 +130,7 @@ func TestNeedCheckTargetClusterFresh(t *testing.T) {
defer cluster.Stop()

g := gluetidb.New()
client := restore.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, defaultKeepaliveCfg, false)
client := restore.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, defaultKeepaliveCfg)
err := client.Init(g, cluster.Storage)
require.NoError(t, err)

Expand Down Expand Up @@ -160,7 +160,7 @@ func TestCheckTargetClusterFresh(t *testing.T) {
defer cluster.Stop()

g := gluetidb.New()
client := restore.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, defaultKeepaliveCfg, false)
client := restore.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, defaultKeepaliveCfg)
err := client.Init(g, cluster.Storage)
require.NoError(t, err)

Expand All @@ -177,7 +177,7 @@ func TestCheckTargetClusterFreshWithTable(t *testing.T) {
defer cluster.Stop()

g := gluetidb.New()
client := restore.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, defaultKeepaliveCfg, false)
client := restore.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, defaultKeepaliveCfg)
err := client.Init(g, cluster.Storage)
require.NoError(t, err)

Expand Down Expand Up @@ -212,7 +212,7 @@ func TestCheckTargetClusterFreshWithTable(t *testing.T) {
func TestCheckSysTableCompatibility(t *testing.T) {
cluster := mc
g := gluetidb.New()
client := restore.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, defaultKeepaliveCfg, false)
client := restore.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, defaultKeepaliveCfg)
err := client.Init(g, cluster.Storage)
require.NoError(t, err)

Expand Down Expand Up @@ -288,7 +288,7 @@ func TestCheckSysTableCompatibility(t *testing.T) {
func TestInitFullClusterRestore(t *testing.T) {
cluster := mc
g := gluetidb.New()
client := restore.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, defaultKeepaliveCfg, false)
client := restore.NewRestoreClient(cluster.PDClient, cluster.PDHTTPCli, nil, defaultKeepaliveCfg)
err := client.Init(g, cluster.Storage)
require.NoError(t, err)

Expand All @@ -313,7 +313,7 @@ func TestInitFullClusterRestore(t *testing.T) {
func TestPreCheckTableClusterIndex(t *testing.T) {
m := mc
g := gluetidb.New()
client := restore.NewRestoreClient(m.PDClient, m.PDHTTPCli, nil, defaultKeepaliveCfg, false)
client := restore.NewRestoreClient(m.PDClient, m.PDHTTPCli, nil, defaultKeepaliveCfg)
err := client.Init(g, m.Storage)
require.NoError(t, err)

Expand Down Expand Up @@ -402,23 +402,23 @@ func TestGetTSWithRetry(t *testing.T) {
t.Run("PD leader is healthy:", func(t *testing.T) {
retryTimes := -1000
pDClient := fakePDClient{notLeader: false, retryTimes: &retryTimes}
client := restore.NewRestoreClient(pDClient, nil, nil, defaultKeepaliveCfg, false)
client := restore.NewRestoreClient(pDClient, nil, nil, defaultKeepaliveCfg)
_, err := client.GetTSWithRetry(context.Background())
require.NoError(t, err)
})

t.Run("PD leader failure:", func(t *testing.T) {
retryTimes := -1000
pDClient := fakePDClient{notLeader: true, retryTimes: &retryTimes}
client := restore.NewRestoreClient(pDClient, nil, nil, defaultKeepaliveCfg, false)
client := restore.NewRestoreClient(pDClient, nil, nil, defaultKeepaliveCfg)
_, err := client.GetTSWithRetry(context.Background())
require.Error(t, err)
})

t.Run("PD leader switch successfully", func(t *testing.T) {
retryTimes := 0
pDClient := fakePDClient{notLeader: true, retryTimes: &retryTimes}
client := restore.NewRestoreClient(pDClient, nil, nil, defaultKeepaliveCfg, false)
client := restore.NewRestoreClient(pDClient, nil, nil, defaultKeepaliveCfg)
_, err := client.GetTSWithRetry(context.Background())
require.NoError(t, err)
})
Expand Down Expand Up @@ -450,7 +450,7 @@ func TestPreCheckTableTiFlashReplicas(t *testing.T) {
g := gluetidb.New()
client := restore.NewRestoreClient(fakePDClient{
stores: mockStores,
}, nil, nil, defaultKeepaliveCfg, false)
}, nil, nil, defaultKeepaliveCfg)
err := client.Init(g, m.Storage)
require.NoError(t, err)

Expand Down Expand Up @@ -574,7 +574,7 @@ func TestSetSpeedLimit(t *testing.T) {
// 1. The cost of concurrent communication is expected to be less than the cost of serial communication.
client := restore.NewRestoreClient(fakePDClient{
stores: mockStores,
}, nil, nil, defaultKeepaliveCfg, false)
}, nil, nil, defaultKeepaliveCfg)
ctx := context.Background()

recordStores = NewRecordStores()
Expand All @@ -600,7 +600,7 @@ func TestSetSpeedLimit(t *testing.T) {
mockStores[5].Id = SET_SPEED_LIMIT_ERROR // setting a fault store
client = restore.NewRestoreClient(fakePDClient{
stores: mockStores,
}, nil, nil, defaultKeepaliveCfg, false)
}, nil, nil, defaultKeepaliveCfg)

// Concurrency needs to be less than the number of stores
err = restore.MockCallSetSpeedLimit(ctx, FakeImporterClient{}, client, 2)
Expand Down Expand Up @@ -680,7 +680,7 @@ func TestDeleteRangeQuery(t *testing.T) {
g := gluetidb.New()
client := restore.NewRestoreClient(fakePDClient{
stores: mockStores,
}, nil, nil, defaultKeepaliveCfg, false)
}, nil, nil, defaultKeepaliveCfg)
err := client.Init(g, m.Storage)
require.NoError(t, err)

Expand Down Expand Up @@ -730,7 +730,7 @@ func TestDeleteRangeQueryExec(t *testing.T) {
client := restore.NewRestoreClient(fakePDClient{
stores: mockStores,
retryTimes: &retryCnt,
}, nil, nil, defaultKeepaliveCfg, false)
}, nil, nil, defaultKeepaliveCfg)
err := client.Init(g, m.Storage)
require.NoError(t, err)

Expand Down
37 changes: 37 additions & 0 deletions br/pkg/restore/import_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,43 @@ func assertRegions(t *testing.T, regions []*split.RegionInfo, keys ...string) {
}
}

// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, )
func initTestClient(isRawKv bool) *TestClient {
peers := make([]*metapb.Peer, 1)
peers[0] = &metapb.Peer{
Id: 1,
StoreId: 1,
}
keys := [6]string{"", "aay", "bba", "bbh", "cca", ""}
regions := make(map[uint64]*split.RegionInfo)
for i := uint64(1); i < 6; i++ {
startKey := []byte(keys[i-1])
if len(startKey) != 0 {
startKey = codec.EncodeBytesExt([]byte{}, startKey, isRawKv)
}
endKey := []byte(keys[i])
if len(endKey) != 0 {
endKey = codec.EncodeBytesExt([]byte{}, endKey, isRawKv)
}
regions[i] = &split.RegionInfo{
Leader: &metapb.Peer{
Id: i,
},
Region: &metapb.Region{
Id: i,
Peers: peers,
StartKey: startKey,
EndKey: endKey,
},
}
}
stores := make(map[uint64]*metapb.Store)
stores[1] = &metapb.Store{
Id: 1,
}
return NewTestClient(stores, regions, 6)
}

func TestScanSuccess(t *testing.T) {
// region: [, aay), [aay, bba), [bba, bbh), [bbh, cca), [cca, )
cli := initTestClient(false)
Expand Down
87 changes: 8 additions & 79 deletions br/pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,6 @@ const (
maxSplitKeysOnce = 10240
)

type SplitContext struct {
isRawKv bool
storeCount int
onSplit OnSplitFunc
}

// RegionSplitter is a executor of region split by rules.
type RegionSplitter struct {
client split.SplitClient
Expand All @@ -66,9 +60,6 @@ type OnSplitFunc func(key [][]byte)
func (rs *RegionSplitter) ExecuteSplit(
ctx context.Context,
ranges []rtree.Range,
storeCount int,
isRawKv bool,
onSplit OnSplitFunc,
) error {
if len(ranges) == 0 {
log.Info("skip split regions, no range")
Expand Down Expand Up @@ -97,22 +88,12 @@ func (rs *RegionSplitter) ExecuteSplit(
sortedKeys = append(sortedKeys, r.EndKey)
totalRangeSize += r.Size
}
// need use first range's start key to scan region
// and the range size must be greater than 0 here
scanStartKey := sortedRanges[0].StartKey
sctx := SplitContext{
isRawKv: isRawKv,
onSplit: onSplit,
storeCount: storeCount,
}
// the range size must be greater than 0 here
return rs.executeSplitByRanges(ctx, sctx, scanStartKey, sortedKeys)
return rs.executeSplitByRanges(ctx, sortedKeys)
}

func (rs *RegionSplitter) executeSplitByRanges(
ctx context.Context,
splitContext SplitContext,
scanStartKey []byte,
sortedKeys [][]byte,
) error {
startTime := time.Now()
Expand All @@ -125,14 +106,14 @@ func (rs *RegionSplitter) executeSplitByRanges(
roughSortedSplitKeys = append(roughSortedSplitKeys, sortedKeys[curRegionIndex])
}
if len(roughSortedSplitKeys) > 0 {
if err := rs.executeSplitByKeys(ctx, splitContext, scanStartKey, roughSortedSplitKeys); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scanStartKey is the start key. but sortedKeys are all end keys. if didn't scan with start key will cause error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need to split the scanStartKey so theoretically we don't need to care about it. For the error I should have fixed in

// we need to find the regions that contain the split keys. However, the scan
// region API accepts a key range [start, end) where end key is exclusive, and if
// sortedSplitKeys length is 1, we scan region may return empty result. So we
// increase the end key a bit. If the end key is on the region boundaries, it
// will be skipped by getSplitKeysOfRegions.
scanStart := codec.EncodeBytesExt(nil, sortedSplitKeys[0], c.isRawKv)
lastKey := kv.Key(sortedSplitKeys[len(sortedSplitKeys)-1])
scanEnd := codec.EncodeBytesExt(nil, lastKey.Next(), c.isRawKv)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update: I fixed a bug and add tests in 59e6f0f

Copy link
Contributor

@3pointer 3pointer Apr 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, We need make sure not use same key as start/end key to scan regions. or we will get an error from PD. I assume here is no chance to get a slice with only one nil/"" value in it.

if err := rs.executeSplitByKeys(ctx, roughSortedSplitKeys); err != nil {
return errors.Trace(err)
}
}
log.Info("finish spliting regions roughly", zap.Duration("take", time.Since(startTime)))

// Then send split requests to each TiKV.
if err := rs.executeSplitByKeys(ctx, splitContext, scanStartKey, sortedKeys); err != nil {
if err := rs.executeSplitByKeys(ctx, sortedKeys); err != nil {
return errors.Trace(err)
}

Expand All @@ -143,65 +124,13 @@ func (rs *RegionSplitter) executeSplitByRanges(
// executeSplitByKeys will split regions by **sorted** keys with following steps.
// 1. locate regions with correspond keys.
// 2. split these regions with correspond keys.
// 3. make sure new splitted regions are balanced.
// 3. make sure new split regions are balanced.
func (rs *RegionSplitter) executeSplitByKeys(
ctx context.Context,
splitContext SplitContext,
scanStartKey []byte,
sortedKeys [][]byte,
) error {
var mutex sync.Mutex
startTime := time.Now()
minKey := codec.EncodeBytesExt(nil, scanStartKey, splitContext.isRawKv)
maxKey := codec.EncodeBytesExt(nil, sortedKeys[len(sortedKeys)-1], splitContext.isRawKv)
scatterRegions := make([]*split.RegionInfo, 0)
regionsMap := make(map[uint64]*split.RegionInfo)

err := utils.WithRetry(ctx, func() error {
clear(regionsMap)
regions, err := split.PaginateScanRegion(ctx, rs.client, minKey, maxKey, split.ScanRegionPaginationLimit)
if err != nil {
return err
}
splitKeyMap := split.GetSplitKeysOfRegions(sortedKeys, regions, splitContext.isRawKv)
workerPool := util.NewWorkerPool(uint(splitContext.storeCount)+1, "split keys")
eg, ectx := errgroup.WithContext(ctx)
for region, splitKeys := range splitKeyMap {
region := region
keys := splitKeys
sctx := splitContext
workerPool.ApplyOnErrorGroup(eg, func() error {
log.Info("get split keys for split regions",
logutil.Region(region.Region), logutil.Keys(keys))
newRegions, err := rs.splitAndScatterRegions(ectx, region, keys, sctx.isRawKv)
if err != nil {
return err
}
if len(newRegions) != len(keys) {
log.Warn("split key count and new region count mismatch",
zap.Int("new region count", len(newRegions)),
zap.Int("split key count", len(keys)))
}
log.Info("scattered regions", zap.Int("count", len(newRegions)))
mutex.Lock()
for _, r := range newRegions {
regionsMap[r.Region.Id] = r
}
mutex.Unlock()
sctx.onSplit(keys)
return nil
})
}
err = eg.Wait()
if err != nil {
return err
}
for _, r := range regionsMap {
// merge all scatter regions
scatterRegions = append(scatterRegions, r)
}
return nil
}, newSplitBackoffer())
scatterRegions, err := rs.client.SplitKeysAndScatter(ctx, sortedKeys)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -216,7 +145,7 @@ func (rs *RegionSplitter) executeSplitByKeys(
}

func (rs *RegionSplitter) splitAndScatterRegions(
ctx context.Context, regionInfo *split.RegionInfo, keys [][]byte, _ bool,
ctx context.Context, regionInfo *split.RegionInfo, keys [][]byte,
) ([]*split.RegionInfo, error) {
newRegions, err := rs.client.SplitWaitAndScatter(ctx, regionInfo, keys)
return newRegions, err
Expand Down Expand Up @@ -409,7 +338,7 @@ func (helper *LogSplitHelper) splitRegionByPoints(
}

helper.pool.ApplyOnErrorGroup(helper.eg, func() error {
newRegions, errSplit := regionSplitter.splitAndScatterRegions(ctx, region, splitPoints, false)
newRegions, errSplit := regionSplitter.splitAndScatterRegions(ctx, region, splitPoints)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now it can directly call regionSplitter.client.SplitWaitAndScatter and the function splitAndScatterRegions can be removed.

if errSplit != nil {
log.Warn("failed to split the scaned region", zap.Error(errSplit))
_, startKey, _ := codec.DecodeBytes(region.Region.StartKey, nil)
Expand All @@ -419,7 +348,7 @@ func (helper *LogSplitHelper) splitRegionByPoints(
startKey = point
}

return regionSplitter.ExecuteSplit(ctx, ranges, 3, false, func([][]byte) {})
return regionSplitter.ExecuteSplit(ctx, ranges)
}
select {
case <-ctx.Done():
Expand Down