Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br, lightning: move *batch* split region logic into common package #52300

Merged
merged 8 commits into from
Apr 7, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func NewRestoreClient(
return &Client{
pdClient: pdClient,
pdHTTPClient: pdHTTPCli,
toolClient: split.NewSplitClient(pdClient, pdHTTPCli, tlsConf, isRawKv),
toolClient: split.NewSplitClient(pdClient, pdHTTPCli, tlsConf, isRawKv, maxSplitKeysOnce),
tlsConf: tlsConf,
keepaliveConf: keepaliveConf,
switchCh: make(chan struct{}),
Expand Down Expand Up @@ -555,7 +555,7 @@ func (rc *Client) InitClients(ctx context.Context, backend *backuppb.StorageBack
useTokenBucket = true
}

metaClient := split.NewSplitClient(rc.pdClient, rc.pdHTTPClient, rc.tlsConf, isRawKvMode)
metaClient := split.NewSplitClient(rc.pdClient, rc.pdHTTPClient, rc.tlsConf, isRawKvMode, maxSplitKeysOnce)
importCli := NewImportClient(metaClient, rc.tlsConf, rc.keepaliveConf)
rc.fileImporter = NewFileImporter(metaClient, importCli, backend, isRawKvMode, isTxnKvMode, stores, rc.rewriteMode, concurrencyPerStore, useTokenBucket)
}
Expand Down Expand Up @@ -1423,7 +1423,7 @@ func (rc *Client) WrapLogFilesIterWithSplitHelper(logIter LogIter, rules map[int
execCtx := se.GetSessionCtx().GetRestrictedSQLExecutor()
splitSize, splitKeys := utils.GetRegionSplitInfo(execCtx)
log.Info("get split threshold from tikv config", zap.Uint64("split-size", splitSize), zap.Int64("split-keys", splitKeys))
client := split.NewSplitClient(rc.GetPDClient(), rc.pdHTTPClient, rc.GetTLSConfig(), false)
client := split.NewSplitClient(rc.GetPDClient(), rc.pdHTTPClient, rc.GetTLSConfig(), false, maxSplitKeysOnce)
return NewLogFilesIterWithSplitHelper(logIter, rules, client, splitSize, splitKeys), nil
}

Expand Down
32 changes: 11 additions & 21 deletions br/pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ import (
type Granularity string

const (
FineGrained Granularity = "fine-grained"
CoarseGrained Granularity = "coarse-grained"
FineGrained Granularity = "fine-grained"
CoarseGrained Granularity = "coarse-grained"
maxSplitKeysOnce = 10240
)

type SplitContext struct {
Expand Down Expand Up @@ -118,26 +119,15 @@ func (rs *RegionSplitter) executeSplitByRanges(
// Choose the rough region split keys,
// each splited region contains 128 regions to be splitted.
const regionIndexStep = 128
const maxSplitKeysOnce = 10240
curRegionIndex := regionIndexStep
roughScanStartKey := scanStartKey
for {
roughSortedSplitKeys := make([][]byte, 0, maxSplitKeysOnce)
for i := 0; i < maxSplitKeysOnce && curRegionIndex < len(sortedKeys); i += 1 {
roughSortedSplitKeys = append(roughSortedSplitKeys, sortedKeys[curRegionIndex])
curRegionIndex += regionIndexStep
}
if len(roughSortedSplitKeys) == 0 {
break
}
if err := rs.executeSplitByKeys(ctx, splitContext, roughScanStartKey, roughSortedSplitKeys); err != nil {

roughSortedSplitKeys := make([][]byte, 0, len(sortedKeys)/regionIndexStep+1)
for curRegionIndex := regionIndexStep; curRegionIndex < len(sortedKeys); curRegionIndex += regionIndexStep {
roughSortedSplitKeys = append(roughSortedSplitKeys, sortedKeys[curRegionIndex])
}
if len(roughSortedSplitKeys) > 0 {
if err := rs.executeSplitByKeys(ctx, splitContext, scanStartKey, roughSortedSplitKeys); err != nil {
return errors.Trace(err)
}
if curRegionIndex >= len(sortedKeys) {
break
}
// update the roughScanStartKey to the last key of roughSortedSplitKeys
roughScanStartKey = roughSortedSplitKeys[len(roughSortedSplitKeys)-1]
}
log.Info("finish spliting regions roughly", zap.Duration("take", time.Since(startTime)))

Expand Down Expand Up @@ -228,7 +218,7 @@ func (rs *RegionSplitter) executeSplitByKeys(
func (rs *RegionSplitter) splitAndScatterRegions(
ctx context.Context, regionInfo *split.RegionInfo, keys [][]byte, _ bool,
) ([]*split.RegionInfo, error) {
_, newRegions, err := rs.client.SplitWaitAndScatter(ctx, regionInfo, keys)
newRegions, err := rs.client.SplitWaitAndScatter(ctx, regionInfo, keys)
return newRegions, err
}

Expand Down
4 changes: 3 additions & 1 deletion br/pkg/restore/split/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_library(
"//pkg/util/codec",
"//pkg/util/intest",
"//pkg/util/redact",
"@com_github_docker_go_units//:go-units",
"@com_github_google_btree//:btree",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
Expand All @@ -49,12 +50,13 @@ go_test(
name = "split_test",
timeout = "short",
srcs = [
"client_test.go",
"split_test.go",
"sum_sorted_test.go",
],
embed = [":split"],
flaky = True,
shard_count = 12,
shard_count = 13,
deps = [
"//br/pkg/errors",
"//br/pkg/utils",
Expand Down
104 changes: 71 additions & 33 deletions br/pkg/restore/split/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"sync"
"time"

"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/errorpb"
Expand Down Expand Up @@ -43,6 +44,12 @@ const (
splitRegionMaxRetryTime = 4
)

var (
// the max total key size in a split region batch.
// our threshold should be smaller than TiKV's raft max entry size(default is 8MB).
maxBatchSplitSize = 6 * units.MiB
)

// SplitClient is an external client used by RegionSplitter.
type SplitClient interface {
// GetStore gets a store by a store id.
Expand All @@ -64,7 +71,7 @@ type SplitClient interface {
//
// The scatter step has a few retry times. If it meets error, it will log a
// warning and continue.
SplitWaitAndScatter(ctx context.Context, region *RegionInfo, keys [][]byte) (*RegionInfo, []*RegionInfo, error)
SplitWaitAndScatter(ctx context.Context, region *RegionInfo, keys [][]byte) ([]*RegionInfo, error)
// GetOperator gets the status of operator of the specified region.
GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error)
// ScanRegions gets a list of regions, starts from the region that contains key.
Expand Down Expand Up @@ -103,7 +110,8 @@ type pdClient struct {
needScatterVal bool
needScatterInit sync.Once

isRawKv bool
isRawKv bool
splitBatchKeyCnt int
}

// NewSplitClient returns a client used by RegionSplitter.
Expand All @@ -112,13 +120,15 @@ func NewSplitClient(
httpCli pdhttp.Client,
tlsConf *tls.Config,
isRawKv bool,
splitBatchKeyCnt int,
) SplitClient {
cli := &pdClient{
client: client,
httpCli: httpCli,
tlsConf: tlsConf,
storeCache: make(map[uint64]*metapb.Store),
isRawKv: isRawKv,
client: client,
httpCli: httpCli,
tlsConf: tlsConf,
storeCache: make(map[uint64]*metapb.Store),
isRawKv: isRawKv,
splitBatchKeyCnt: splitBatchKeyCnt,
}
return cli
}
Expand Down Expand Up @@ -555,39 +565,67 @@ func (c *pdClient) hasHealthyRegion(ctx context.Context, regionID uint64) (bool,
return len(regionInfo.PendingPeers) == 0, nil
}

func (c *pdClient) SplitWaitAndScatter(
ctx context.Context, region *RegionInfo, keys [][]byte,
) (*RegionInfo, []*RegionInfo, error) {
func (c *pdClient) SplitWaitAndScatter(ctx context.Context, region *RegionInfo, keys [][]byte) ([]*RegionInfo, error) {
failpoint.Inject("failToSplit", func(_ failpoint.Value) {
failpoint.Return(nil, nil, errors.New("retryable error"))
failpoint.Return(nil, errors.New("retryable error"))
})
if len(keys) == 0 {
return region, []*RegionInfo{region}, nil
return []*RegionInfo{region}, nil
}

origin, newRegions, err := c.batchSplitRegionsWithOrigin(ctx, region, keys)
if err != nil {
return nil, nil, errors.Trace(err)
}
err = c.waitRegionsSplit(ctx, newRegions)
if err != nil {
brlog.FromContext(ctx).Warn(
"wait regions split failed, will continue anyway",
zap.Error(err),
)
}
if err = ctx.Err(); err != nil {
return nil, nil, errors.Trace(err)
}
var (
start, end = 0, 0
batchSize = 0
newRegions = make([]*RegionInfo, 0, len(keys))
)

err = c.scatterRegions(ctx, newRegions)
if err != nil {
brlog.FromContext(ctx).Warn(
"scatter regions failed, will continue anyway",
zap.Error(err),
)
for end <= len(keys) {
if end == len(keys) ||
batchSize+len(keys[end]) > maxBatchSplitSize ||
end-start >= c.splitBatchKeyCnt {
// split, wait and scatter for this batch
originRegion, newRegionsOfBatch, err := c.batchSplitRegionsWithOrigin(ctx, region, keys[start:end])
if err != nil {
return nil, errors.Trace(err)
}
err = c.waitRegionsSplit(ctx, newRegionsOfBatch)
if err != nil {
brlog.FromContext(ctx).Warn(
"wait regions split failed, will continue anyway",
zap.Error(err),
)
}
if err = ctx.Err(); err != nil {
return nil, errors.Trace(err)
}
err = c.scatterRegions(ctx, newRegionsOfBatch)
if err != nil {
brlog.FromContext(ctx).Warn(
"scatter regions failed, will continue anyway",
zap.Error(err),
)
}

// the region with the max start key is the region need to be further split, it
// can be the leftmost region or the rightmost region.
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
lastRegion := newRegionsOfBatch[len(newRegionsOfBatch)-1]
if bytes.Compare(originRegion.Region.StartKey, lastRegion.Region.StartKey) < 0 {
region = lastRegion
} else {
region = originRegion
}
newRegions = append(newRegions, newRegionsOfBatch...)
batchSize = 0
start = end
}

if end < len(keys) {
batchSize += len(keys[end])
}
end++
}
return origin, newRegions, errors.Trace(ctx.Err())

return newRegions, errors.Trace(ctx.Err())
}

func (c *pdClient) getStoreCount(ctx context.Context) (int, error) {
Expand Down
51 changes: 51 additions & 0 deletions br/pkg/restore/split/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2024 PingCAP, Inc. Licensed under Apache-2.0.

package split

import (
"context"
"testing"

"github.com/stretchr/testify/require"
)

func TestBatchSplit(t *testing.T) {
backup := maxBatchSplitSize
maxBatchSplitSize = 7
t.Cleanup(func() {
maxBatchSplitSize = backup
})

mockPDClient := NewMockPDClientForSplit()
keys := [][]byte{[]byte(""), []byte("")}
setRegions := mockPDClient.SetRegions(keys)
require.Len(t, setRegions, 1)
splitRegion := &RegionInfo{Region: setRegions[0]}
mockClient := &pdClient{
client: mockPDClient,
splitBatchKeyCnt: 100,
isRawKv: true, // make tests more readable
}
ctx := context.Background()

splitKeys := [][]byte{
[]byte("ba"), []byte("bb"), []byte("bc"),
[]byte("bd"), []byte("be"), []byte("bf"),
[]byte("bg"), []byte("bh"),
}
expectedBatchSplitCnt := 3

_, err := mockClient.SplitWaitAndScatter(ctx, splitRegion, splitKeys)
require.NoError(t, err)

// check split ranges
regions, err := PaginateScanRegion(ctx, mockClient, []byte{'b'}, []byte{'c'}, 5)
require.NoError(t, err)
expected := [][]byte{[]byte("")}
expected = append(expected, splitKeys...)
expected = append(expected, []byte(""))
checkRegionsBoundaries(t, regions, expected)

require.EqualValues(t, expectedBatchSplitCnt, mockPDClient.splitRegions.count)
require.EqualValues(t, len(splitKeys), mockPDClient.scatterRegions.regionCount)
}
2 changes: 1 addition & 1 deletion br/pkg/restore/split/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func TestSplitCtxCancel(t *testing.T) {
client: mockCli,
}

_, _, err := client.SplitWaitAndScatter(ctx, &RegionInfo{}, [][]byte{{1}})
_, err := client.SplitWaitAndScatter(ctx, &RegionInfo{}, [][]byte{{1}})
require.ErrorIs(t, err, context.Canceled)
}

Expand Down
8 changes: 2 additions & 6 deletions br/pkg/restore/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,10 @@ func (c *TestClient) GetRegionByID(ctx context.Context, regionID uint64) (*split
return region, nil
}

func (c *TestClient) SplitWaitAndScatter(
ctx context.Context, regionInfo *split.RegionInfo, keys [][]byte,
) (*split.RegionInfo, []*split.RegionInfo, error) {
func (c *TestClient) SplitWaitAndScatter(_ context.Context, _ *split.RegionInfo, keys [][]byte) ([]*split.RegionInfo, error) {
c.mu.Lock()
defer c.mu.Unlock()
newRegions := make([]*split.RegionInfo, 0)
var region *split.RegionInfo
for _, key := range keys {
var target *split.RegionInfo
splitKey := codec.EncodeBytes([]byte{}, key)
Expand All @@ -131,10 +128,9 @@ func (c *TestClient) SplitWaitAndScatter(
c.nextRegionID++
target.Region.StartKey = splitKey
c.regions[target.Region.Id] = target
region = target
newRegions = append(newRegions, newRegion)
}
return region, newRegions, nil
return newRegions, nil
}

func (c *TestClient) GetOperator(context.Context, uint64) (*pdpb.GetOperatorResponse, error) {
Expand Down
1 change: 1 addition & 0 deletions br/pkg/restore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,7 @@ func SplitRanges(
client.pdHTTPClient,
client.GetTLSConfig(),
isRawKv,
maxSplitKeysOnce,
))

return splitter.ExecuteSplit(ctx, ranges, client.GetStoreCount(), isRawKv, func(keys [][]byte) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ func NewBackend(
pdCli.GetServiceDiscovery(),
pdhttp.WithTLSConfig(tls.TLSConfig()),
).WithBackoffer(retry.InitialBackoffer(time.Second, time.Second, pdutil.PDRequestRetryTime*time.Second))
splitCli := split.NewSplitClient(pdCli, pdHTTPCli, tls.TLSConfig(), false)
splitCli := split.NewSplitClient(pdCli, pdHTTPCli, tls.TLSConfig(), false, config.RegionSplitBatchSize)
importClientFactory := newImportClientFactoryImpl(splitCli, tls, config.MaxConnPerStore, config.ConnCompressType)
var writeLimiter StoreWriteLimiter
if config.StoreWriteBWLimit > 0 {
Expand Down