Skip to content

Commit

Permalink
lightning: move scan-split-scatter regions logic into common package (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 committed Apr 9, 2024
1 parent 1905e89 commit a0b5861
Show file tree
Hide file tree
Showing 7 changed files with 372 additions and 640 deletions.
6 changes: 5 additions & 1 deletion br/pkg/restore/split/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
"//pkg/lightning/config",
"//pkg/lightning/log",
"//pkg/store/pdtypes",
"//pkg/util",
"//pkg/util/codec",
"//pkg/util/intest",
"//pkg/util/redact",
Expand All @@ -41,6 +42,7 @@ go_library(
"@org_golang_google_grpc//credentials",
"@org_golang_google_grpc//credentials/insecure",
"@org_golang_google_grpc//status",
"@org_golang_x_sync//errgroup",
"@org_uber_go_multierr//:multierr",
"@org_uber_go_zap//:zap",
],
Expand All @@ -56,13 +58,15 @@ go_test(
],
embed = [":split"],
flaky = True,
shard_count = 13,
shard_count = 17,
deps = [
"//br/pkg/errors",
"//br/pkg/utils",
"//pkg/kv",
"//pkg/sessionctx/stmtctx",
"//pkg/store/pdtypes",
"//pkg/tablecodec",
"//pkg/types",
"//pkg/util/codec",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down
219 changes: 145 additions & 74 deletions br/pkg/restore/split/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,18 @@ import (
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/lightning/config"
brlog "github.com/pingcap/tidb/pkg/lightning/log"
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/pingcap/tidb/pkg/util/intest"
pd "github.com/tikv/pd/client"
pdhttp "github.com/tikv/pd/client/http"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
Expand All @@ -58,6 +62,11 @@ type SplitClient interface {
GetRegion(ctx context.Context, key []byte) (*RegionInfo, error)
// GetRegionByID gets a region by a region id.
GetRegionByID(ctx context.Context, regionID uint64) (*RegionInfo, error)
// SplitKeysAndScatter splits the related regions of the keys and scatters the
// new regions. It returns the new regions that need to be called with
// WaitRegionsScattered.
SplitKeysAndScatter(ctx context.Context, sortedSplitKeys [][]byte) ([]*RegionInfo, error)

// SplitWaitAndScatter splits a region from a batch of keys, waits for the split
// is finished, and scatters the new regions. It will return the original region,
// new regions and error. The input keys should not be encoded.
Expand All @@ -71,6 +80,7 @@ type SplitClient interface {
//
// The scatter step has a few retry times. If it meets error, it will log a
// warning and continue.
// TODO(lance6716): remove this function in interface after BR uses SplitKeysAndScatter.
SplitWaitAndScatter(ctx context.Context, region *RegionInfo, keys [][]byte) ([]*RegionInfo, error)
// GetOperator gets the status of operator of the specified region.
GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error)
Expand Down Expand Up @@ -111,10 +121,12 @@ type pdClient struct {
needScatterInit sync.Once

isRawKv bool
splitConcurrency int
splitBatchKeyCnt int
}

// NewSplitClient returns a client used by RegionSplitter.
// TODO(lance6716): replace this function with NewClient.
func NewSplitClient(
client pd.Client,
httpCli pdhttp.Client,
Expand All @@ -133,6 +145,30 @@ func NewSplitClient(
return cli
}

// NewClient creates a SplitClient.
//
// splitBatchKeyCnt controls how many keys are sent to TiKV in a batch in split
// region API. splitConcurrency controls how many regions are split concurrently.
func NewClient(
client pd.Client,
httpCli pdhttp.Client,
tlsConf *tls.Config,
isRawKv bool,
splitBatchKeyCnt int,
splitConcurrency int,
) SplitClient {
cli := &pdClient{
client: client,
httpCli: httpCli,
tlsConf: tlsConf,
storeCache: make(map[uint64]*metapb.Store),
isRawKv: isRawKv,
splitBatchKeyCnt: splitBatchKeyCnt,
splitConcurrency: splitConcurrency,
}
return cli
}

func (c *pdClient) needScatter(ctx context.Context) bool {
c.needScatterInit.Do(func() {
var err error
Expand Down Expand Up @@ -234,80 +270,6 @@ func (c *pdClient) GetRegionByID(ctx context.Context, regionID uint64) (*RegionI
}, nil
}

func (c *pdClient) SplitRegion(ctx context.Context, regionInfo *RegionInfo, key []byte) (*RegionInfo, error) {
var peer *metapb.Peer
if regionInfo.Leader != nil {
peer = regionInfo.Leader
} else {
if len(regionInfo.Region.Peers) == 0 {
return nil, errors.Annotate(berrors.ErrRestoreNoPeer, "region does not have peer")
}
peer = regionInfo.Region.Peers[0]
}
storeID := peer.GetStoreId()
store, err := c.GetStore(ctx, storeID)
if err != nil {
return nil, errors.Trace(err)
}
conn, err := grpc.Dial(store.GetAddress(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
config.DefaultGrpcKeepaliveParams)
if err != nil {
return nil, errors.Trace(err)
}
defer conn.Close()

client := tikvpb.NewTikvClient(conn)
resp, err := client.SplitRegion(ctx, &kvrpcpb.SplitRegionRequest{
Context: &kvrpcpb.Context{
RegionId: regionInfo.Region.Id,
RegionEpoch: regionInfo.Region.RegionEpoch,
Peer: peer,
},
SplitKey: key,
})
if err != nil {
return nil, errors.Trace(err)
}
if resp.RegionError != nil {
log.Error("fail to split region",
logutil.Region(regionInfo.Region),
logutil.Key("key", key),
zap.Stringer("regionErr", resp.RegionError))
return nil, errors.Annotatef(berrors.ErrRestoreSplitFailed, "err=%v", resp.RegionError)
}

// BUG: Left is deprecated, it may be nil even if split is succeed!
// Assume the new region is the left one.
newRegion := resp.GetLeft()
if newRegion == nil {
regions := resp.GetRegions()
for _, r := range regions {
if bytes.Equal(r.GetStartKey(), regionInfo.Region.GetStartKey()) {
newRegion = r
break
}
}
}
if newRegion == nil {
return nil, errors.Annotate(berrors.ErrRestoreSplitFailed, "new region is nil")
}
var leader *metapb.Peer
// Assume the leaders will be at the same store.
if regionInfo.Leader != nil {
for _, p := range newRegion.GetPeers() {
if p.GetStoreId() == regionInfo.Leader.GetStoreId() {
leader = p
break
}
}
}
return &RegionInfo{
Region: newRegion,
Leader: leader,
}, nil
}

func splitRegionWithFailpoint(
ctx context.Context,
regionInfo *RegionInfo,
Expand Down Expand Up @@ -572,6 +534,115 @@ func (c *pdClient) hasHealthyRegion(ctx context.Context, regionID uint64) (bool,
return len(regionInfo.PendingPeers) == 0, nil
}

func (c *pdClient) SplitKeysAndScatter(ctx context.Context, sortedSplitKeys [][]byte) ([]*RegionInfo, error) {
if len(sortedSplitKeys) == 0 {
return nil, nil
}
// we need to find the regions that contain the split keys. However, the scan
// region API accepts a key range [start, end) where end key is exclusive, and if
// sortedSplitKeys length is 1, we scan region may return empty result. So we
// increase the end key a bit. If the end key is on the region boundaries, it
// will be skipped by getSplitKeysOfRegions.
scanStart := codec.EncodeBytesExt(nil, sortedSplitKeys[0], c.isRawKv)
lastKey := kv.Key(sortedSplitKeys[len(sortedSplitKeys)-1])
scanEnd := codec.EncodeBytesExt(nil, lastKey.Next(), c.isRawKv)

// mu protects ret, retrySplitKeys, lastSplitErr
mu := sync.Mutex{}
ret := make([]*RegionInfo, 0, len(sortedSplitKeys)+1)
retrySplitKeys := make([][]byte, 0, len(sortedSplitKeys))
var lastSplitErr error

err := utils.WithRetryReturnLastErr(ctx, func() error {
ret = ret[:0]

if len(retrySplitKeys) > 0 {
scanStart = codec.EncodeBytesExt(nil, retrySplitKeys[0], c.isRawKv)
lastKey2 := kv.Key(retrySplitKeys[len(retrySplitKeys)-1])
scanEnd = codec.EncodeBytesExt(nil, lastKey2.Next(), c.isRawKv)
}
regions, err := PaginateScanRegion(ctx, c, scanStart, scanEnd, ScanRegionPaginationLimit)
if err != nil {
return err
}
log.Info("paginate scan regions",
zap.Int("count", len(regions)),
logutil.Key("start", scanStart),
logutil.Key("end", scanEnd))

allSplitKeys := sortedSplitKeys
if len(retrySplitKeys) > 0 {
allSplitKeys = retrySplitKeys
retrySplitKeys = retrySplitKeys[:0]
}
splitKeyMap := GetSplitKeysOfRegions(allSplitKeys, regions, c.isRawKv)
workerPool := tidbutil.NewWorkerPool(uint(c.splitConcurrency), "split keys")
eg, eCtx := errgroup.WithContext(ctx)
for region, splitKeys := range splitKeyMap {
region := region
splitKeys := splitKeys
workerPool.ApplyOnErrorGroup(eg, func() error {
// TODO(lance6716): add error handling to retry from scan or retry from split
newRegions, err2 := c.SplitWaitAndScatter(eCtx, region, splitKeys)
if err2 != nil {
if common.IsContextCanceledError(err2) {
return err2
}
log.Warn("split and scatter region meet error, will retry",
zap.Uint64("region_id", region.Region.Id),
zap.Error(err2))
mu.Lock()
retrySplitKeys = append(retrySplitKeys, splitKeys...)
lastSplitErr = err2
mu.Unlock()
return nil
}

if len(newRegions) != len(splitKeys) {
log.Warn("split key count and new region count mismatch",
zap.Int("new region count", len(newRegions)),
zap.Int("split key count", len(splitKeys)))
}
mu.Lock()
ret = append(ret, newRegions...)
mu.Unlock()
return nil
})
}
if err2 := eg.Wait(); err2 != nil {
return err2
}
if len(retrySplitKeys) == 0 {
return nil
}
slices.SortFunc(retrySplitKeys, bytes.Compare)
return lastSplitErr
}, newSplitBackoffer())
return ret, errors.Trace(err)
}

type splitBackoffer struct {
state utils.RetryState
}

func newSplitBackoffer() *splitBackoffer {
return &splitBackoffer{
state: utils.InitialRetryState(SplitRetryTimes, SplitRetryInterval, SplitMaxRetryInterval),
}
}

func (bo *splitBackoffer) NextBackoff(err error) time.Duration {
if berrors.ErrInvalidRange.Equal(err) {
bo.state.GiveUp()
return 0
}
return bo.state.ExponentialBackoff()
}

func (bo *splitBackoffer) Attempt() int {
return bo.state.Attempt()
}

func (c *pdClient) SplitWaitAndScatter(ctx context.Context, region *RegionInfo, keys [][]byte) ([]*RegionInfo, error) {
failpoint.Inject("failToSplit", func(_ failpoint.Value) {
failpoint.Return(nil, errors.New("retryable error"))
Expand Down

0 comments on commit a0b5861

Please sign in to comment.