Skip to content

Commit

Permalink
br, lightning: move *batch* split region logic into common package (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 committed Apr 7, 2024
1 parent 2c4c5bb commit bdffd30
Show file tree
Hide file tree
Showing 11 changed files with 187 additions and 203 deletions.
6 changes: 3 additions & 3 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func NewRestoreClient(
return &Client{
pdClient: pdClient,
pdHTTPClient: pdHTTPCli,
toolClient: split.NewSplitClient(pdClient, pdHTTPCli, tlsConf, isRawKv),
toolClient: split.NewSplitClient(pdClient, pdHTTPCli, tlsConf, isRawKv, maxSplitKeysOnce),
tlsConf: tlsConf,
keepaliveConf: keepaliveConf,
switchCh: make(chan struct{}),
Expand Down Expand Up @@ -555,7 +555,7 @@ func (rc *Client) InitClients(ctx context.Context, backend *backuppb.StorageBack
useTokenBucket = true
}

metaClient := split.NewSplitClient(rc.pdClient, rc.pdHTTPClient, rc.tlsConf, isRawKvMode)
metaClient := split.NewSplitClient(rc.pdClient, rc.pdHTTPClient, rc.tlsConf, isRawKvMode, maxSplitKeysOnce)
importCli := NewImportClient(metaClient, rc.tlsConf, rc.keepaliveConf)
rc.fileImporter = NewFileImporter(metaClient, importCli, backend, isRawKvMode, isTxnKvMode, stores, rc.rewriteMode, concurrencyPerStore, useTokenBucket)
}
Expand Down Expand Up @@ -1423,7 +1423,7 @@ func (rc *Client) WrapLogFilesIterWithSplitHelper(logIter LogIter, rules map[int
execCtx := se.GetSessionCtx().GetRestrictedSQLExecutor()
splitSize, splitKeys := utils.GetRegionSplitInfo(execCtx)
log.Info("get split threshold from tikv config", zap.Uint64("split-size", splitSize), zap.Int64("split-keys", splitKeys))
client := split.NewSplitClient(rc.GetPDClient(), rc.pdHTTPClient, rc.GetTLSConfig(), false)
client := split.NewSplitClient(rc.GetPDClient(), rc.pdHTTPClient, rc.GetTLSConfig(), false, maxSplitKeysOnce)
return NewLogFilesIterWithSplitHelper(logIter, rules, client, splitSize, splitKeys), nil
}

Expand Down
32 changes: 11 additions & 21 deletions br/pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ import (
type Granularity string

const (
FineGrained Granularity = "fine-grained"
CoarseGrained Granularity = "coarse-grained"
FineGrained Granularity = "fine-grained"
CoarseGrained Granularity = "coarse-grained"
maxSplitKeysOnce = 10240
)

type SplitContext struct {
Expand Down Expand Up @@ -118,26 +119,15 @@ func (rs *RegionSplitter) executeSplitByRanges(
// Choose the rough region split keys,
// each splited region contains 128 regions to be splitted.
const regionIndexStep = 128
const maxSplitKeysOnce = 10240
curRegionIndex := regionIndexStep
roughScanStartKey := scanStartKey
for {
roughSortedSplitKeys := make([][]byte, 0, maxSplitKeysOnce)
for i := 0; i < maxSplitKeysOnce && curRegionIndex < len(sortedKeys); i += 1 {
roughSortedSplitKeys = append(roughSortedSplitKeys, sortedKeys[curRegionIndex])
curRegionIndex += regionIndexStep
}
if len(roughSortedSplitKeys) == 0 {
break
}
if err := rs.executeSplitByKeys(ctx, splitContext, roughScanStartKey, roughSortedSplitKeys); err != nil {

roughSortedSplitKeys := make([][]byte, 0, len(sortedKeys)/regionIndexStep+1)
for curRegionIndex := regionIndexStep; curRegionIndex < len(sortedKeys); curRegionIndex += regionIndexStep {
roughSortedSplitKeys = append(roughSortedSplitKeys, sortedKeys[curRegionIndex])
}
if len(roughSortedSplitKeys) > 0 {
if err := rs.executeSplitByKeys(ctx, splitContext, scanStartKey, roughSortedSplitKeys); err != nil {
return errors.Trace(err)
}
if curRegionIndex >= len(sortedKeys) {
break
}
// update the roughScanStartKey to the last key of roughSortedSplitKeys
roughScanStartKey = roughSortedSplitKeys[len(roughSortedSplitKeys)-1]
}
log.Info("finish spliting regions roughly", zap.Duration("take", time.Since(startTime)))

Expand Down Expand Up @@ -228,7 +218,7 @@ func (rs *RegionSplitter) executeSplitByKeys(
func (rs *RegionSplitter) splitAndScatterRegions(
ctx context.Context, regionInfo *split.RegionInfo, keys [][]byte, _ bool,
) ([]*split.RegionInfo, error) {
_, newRegions, err := rs.client.SplitWaitAndScatter(ctx, regionInfo, keys)
newRegions, err := rs.client.SplitWaitAndScatter(ctx, regionInfo, keys)
return newRegions, err
}

Expand Down
4 changes: 3 additions & 1 deletion br/pkg/restore/split/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_library(
"//pkg/util/codec",
"//pkg/util/intest",
"//pkg/util/redact",
"@com_github_docker_go_units//:go-units",
"@com_github_google_btree//:btree",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
Expand All @@ -49,12 +50,13 @@ go_test(
name = "split_test",
timeout = "short",
srcs = [
"client_test.go",
"split_test.go",
"sum_sorted_test.go",
],
embed = [":split"],
flaky = True,
shard_count = 12,
shard_count = 13,
deps = [
"//br/pkg/errors",
"//br/pkg/utils",
Expand Down
112 changes: 79 additions & 33 deletions br/pkg/restore/split/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"sync"
"time"

"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/errorpb"
Expand Down Expand Up @@ -43,6 +44,12 @@ const (
splitRegionMaxRetryTime = 4
)

var (
// the max total key size in a split region batch.
// our threshold should be smaller than TiKV's raft max entry size(default is 8MB).
maxBatchSplitSize = 6 * units.MiB
)

// SplitClient is an external client used by RegionSplitter.
type SplitClient interface {
// GetStore gets a store by a store id.
Expand All @@ -64,7 +71,7 @@ type SplitClient interface {
//
// The scatter step has a few retry times. If it meets error, it will log a
// warning and continue.
SplitWaitAndScatter(ctx context.Context, region *RegionInfo, keys [][]byte) (*RegionInfo, []*RegionInfo, error)
SplitWaitAndScatter(ctx context.Context, region *RegionInfo, keys [][]byte) ([]*RegionInfo, error)
// GetOperator gets the status of operator of the specified region.
GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error)
// ScanRegions gets a list of regions, starts from the region that contains key.
Expand Down Expand Up @@ -103,7 +110,8 @@ type pdClient struct {
needScatterVal bool
needScatterInit sync.Once

isRawKv bool
isRawKv bool
splitBatchKeyCnt int
}

// NewSplitClient returns a client used by RegionSplitter.
Expand All @@ -112,13 +120,15 @@ func NewSplitClient(
httpCli pdhttp.Client,
tlsConf *tls.Config,
isRawKv bool,
splitBatchKeyCnt int,
) SplitClient {
cli := &pdClient{
client: client,
httpCli: httpCli,
tlsConf: tlsConf,
storeCache: make(map[uint64]*metapb.Store),
isRawKv: isRawKv,
client: client,
httpCli: httpCli,
tlsConf: tlsConf,
storeCache: make(map[uint64]*metapb.Store),
isRawKv: isRawKv,
splitBatchKeyCnt: splitBatchKeyCnt,
}
return cli
}
Expand Down Expand Up @@ -452,6 +462,13 @@ func sendSplitRegionRequest(
return false, resp, nil
}

// batchSplitRegionsWithOrigin calls the batch split region API and groups the
// returned regions into two groups: the region with the same ID as the origin,
// and the other regions. The former does not need to be scattered while the
// latter need to be scattered.
//
// Depending on the TiKV configuration right-derive-when-split, the origin region
// can be the first return region or the last return region.
func (c *pdClient) batchSplitRegionsWithOrigin(
ctx context.Context, regionInfo *RegionInfo, keys [][]byte,
) (*RegionInfo, []*RegionInfo, error) {
Expand Down Expand Up @@ -555,39 +572,68 @@ func (c *pdClient) hasHealthyRegion(ctx context.Context, regionID uint64) (bool,
return len(regionInfo.PendingPeers) == 0, nil
}

func (c *pdClient) SplitWaitAndScatter(
ctx context.Context, region *RegionInfo, keys [][]byte,
) (*RegionInfo, []*RegionInfo, error) {
func (c *pdClient) SplitWaitAndScatter(ctx context.Context, region *RegionInfo, keys [][]byte) ([]*RegionInfo, error) {
failpoint.Inject("failToSplit", func(_ failpoint.Value) {
failpoint.Return(nil, nil, errors.New("retryable error"))
failpoint.Return(nil, errors.New("retryable error"))
})
if len(keys) == 0 {
return region, []*RegionInfo{region}, nil
return []*RegionInfo{region}, nil
}

origin, newRegions, err := c.batchSplitRegionsWithOrigin(ctx, region, keys)
if err != nil {
return nil, nil, errors.Trace(err)
}
err = c.waitRegionsSplit(ctx, newRegions)
if err != nil {
brlog.FromContext(ctx).Warn(
"wait regions split failed, will continue anyway",
zap.Error(err),
)
}
if err = ctx.Err(); err != nil {
return nil, nil, errors.Trace(err)
}
var (
start, end = 0, 0
batchSize = 0
newRegions = make([]*RegionInfo, 0, len(keys))
)

err = c.scatterRegions(ctx, newRegions)
if err != nil {
brlog.FromContext(ctx).Warn(
"scatter regions failed, will continue anyway",
zap.Error(err),
)
for end <= len(keys) {
if end == len(keys) ||
batchSize+len(keys[end]) > maxBatchSplitSize ||
end-start >= c.splitBatchKeyCnt {
// split, wait and scatter for this batch
originRegion, newRegionsOfBatch, err := c.batchSplitRegionsWithOrigin(ctx, region, keys[start:end])
if err != nil {
return nil, errors.Trace(err)
}
err = c.waitRegionsSplit(ctx, newRegionsOfBatch)
if err != nil {
brlog.FromContext(ctx).Warn(
"wait regions split failed, will continue anyway",
zap.Error(err),
)
}
if err = ctx.Err(); err != nil {
return nil, errors.Trace(err)
}
err = c.scatterRegions(ctx, newRegionsOfBatch)
if err != nil {
brlog.FromContext(ctx).Warn(
"scatter regions failed, will continue anyway",
zap.Error(err),
)
}

// the region with the max start key is the region need to be further split,
// depending on the origin region is the first region or last region, we need to
// compare the origin region and the last one of new regions.
lastNewRegion := newRegionsOfBatch[len(newRegionsOfBatch)-1]
if bytes.Compare(originRegion.Region.StartKey, lastNewRegion.Region.StartKey) < 0 {
region = lastNewRegion
} else {
region = originRegion
}
newRegions = append(newRegions, newRegionsOfBatch...)
batchSize = 0
start = end
}

if end < len(keys) {
batchSize += len(keys[end])
}
end++
}
return origin, newRegions, errors.Trace(ctx.Err())

return newRegions, errors.Trace(ctx.Err())
}

func (c *pdClient) getStoreCount(ctx context.Context) (int, error) {
Expand Down
51 changes: 51 additions & 0 deletions br/pkg/restore/split/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2024 PingCAP, Inc. Licensed under Apache-2.0.

package split

import (
"context"
"testing"

"github.com/stretchr/testify/require"
)

func TestBatchSplit(t *testing.T) {
backup := maxBatchSplitSize
maxBatchSplitSize = 7
t.Cleanup(func() {
maxBatchSplitSize = backup
})

mockPDClient := NewMockPDClientForSplit()
keys := [][]byte{[]byte(""), []byte("")}
setRegions := mockPDClient.SetRegions(keys)
require.Len(t, setRegions, 1)
splitRegion := &RegionInfo{Region: setRegions[0]}
mockClient := &pdClient{
client: mockPDClient,
splitBatchKeyCnt: 100,
isRawKv: true, // make tests more readable
}
ctx := context.Background()

splitKeys := [][]byte{
[]byte("ba"), []byte("bb"), []byte("bc"),
[]byte("bd"), []byte("be"), []byte("bf"),
[]byte("bg"), []byte("bh"),
}
expectedBatchSplitCnt := 3

_, err := mockClient.SplitWaitAndScatter(ctx, splitRegion, splitKeys)
require.NoError(t, err)

// check split ranges
regions, err := PaginateScanRegion(ctx, mockClient, []byte{'b'}, []byte{'c'}, 5)
require.NoError(t, err)
expected := [][]byte{[]byte("")}
expected = append(expected, splitKeys...)
expected = append(expected, []byte(""))
checkRegionsBoundaries(t, regions, expected)

require.EqualValues(t, expectedBatchSplitCnt, mockPDClient.splitRegions.count)
require.EqualValues(t, len(splitKeys), mockPDClient.scatterRegions.regionCount)
}
2 changes: 1 addition & 1 deletion br/pkg/restore/split/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func TestSplitCtxCancel(t *testing.T) {
client: mockCli,
}

_, _, err := client.SplitWaitAndScatter(ctx, &RegionInfo{}, [][]byte{{1}})
_, err := client.SplitWaitAndScatter(ctx, &RegionInfo{}, [][]byte{{1}})
require.ErrorIs(t, err, context.Canceled)
}

Expand Down
8 changes: 2 additions & 6 deletions br/pkg/restore/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,10 @@ func (c *TestClient) GetRegionByID(ctx context.Context, regionID uint64) (*split
return region, nil
}

func (c *TestClient) SplitWaitAndScatter(
ctx context.Context, regionInfo *split.RegionInfo, keys [][]byte,
) (*split.RegionInfo, []*split.RegionInfo, error) {
func (c *TestClient) SplitWaitAndScatter(_ context.Context, _ *split.RegionInfo, keys [][]byte) ([]*split.RegionInfo, error) {
c.mu.Lock()
defer c.mu.Unlock()
newRegions := make([]*split.RegionInfo, 0)
var region *split.RegionInfo
for _, key := range keys {
var target *split.RegionInfo
splitKey := codec.EncodeBytes([]byte{}, key)
Expand All @@ -131,10 +128,9 @@ func (c *TestClient) SplitWaitAndScatter(
c.nextRegionID++
target.Region.StartKey = splitKey
c.regions[target.Region.Id] = target
region = target
newRegions = append(newRegions, newRegion)
}
return region, newRegions, nil
return newRegions, nil
}

func (c *TestClient) GetOperator(context.Context, uint64) (*pdpb.GetOperatorResponse, error) {
Expand Down
1 change: 1 addition & 0 deletions br/pkg/restore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,7 @@ func SplitRanges(
client.pdHTTPClient,
client.GetTLSConfig(),
isRawKv,
maxSplitKeysOnce,
))

return splitter.ExecuteSplit(ctx, ranges, client.GetStoreCount(), isRawKv, func(keys [][]byte) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ func NewBackend(
pdCli.GetServiceDiscovery(),
pdhttp.WithTLSConfig(tls.TLSConfig()),
).WithBackoffer(retry.InitialBackoffer(time.Second, time.Second, pdutil.PDRequestRetryTime*time.Second))
splitCli := split.NewSplitClient(pdCli, pdHTTPCli, tls.TLSConfig(), false)
splitCli := split.NewSplitClient(pdCli, pdHTTPCli, tls.TLSConfig(), false, config.RegionSplitBatchSize)
importClientFactory := newImportClientFactoryImpl(splitCli, tls, config.MaxConnPerStore, config.ConnCompressType)
var writeLimiter StoreWriteLimiter
if config.StoreWriteBWLimit > 0 {
Expand Down

0 comments on commit bdffd30

Please sign in to comment.