Skip to content

Commit

Permalink
br: handle region leader miss (#52822) (#52855)
Browse files Browse the repository at this point in the history
close #50501, close #51124
  • Loading branch information
ti-chi-bot committed May 23, 2024
1 parent 8a21519 commit c149ae8
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 7 deletions.
4 changes: 3 additions & 1 deletion br/pkg/lightning/backend/local/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/lightning/backend/encode"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/common"
Expand Down Expand Up @@ -313,7 +314,8 @@ func getDupDetectClient(
) (import_sstpb.ImportSST_DuplicateDetectClient, error) {
leader := region.Leader
if leader == nil {
leader = region.Region.GetPeers()[0]
return nil, errors.Annotatef(berrors.ErrPDLeaderNotFound,
"region id %d has no leader", region.Region.Id)
}
importClient, err := importClientFactory.Create(ctx, leader.GetStoreId())
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/local/localhelper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ func (c *testSplitClient) BatchSplitRegionsWithOrigin(
StartKey: startKey,
EndKey: key,
},
Leader: target.Leader,
}
c.regions[c.nextRegionID] = newRegion
c.regionsInfo.SetRegion(pdtypes.NewRegionInfo(newRegion.Region, newRegion.Leader))
Expand Down
4 changes: 3 additions & 1 deletion br/pkg/lightning/backend/local/region_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
sst "github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/log"
Expand Down Expand Up @@ -627,7 +628,8 @@ func (local *Backend) doIngest(ctx context.Context, j *regionJob) (*sst.IngestRe

leader := j.region.Leader
if leader == nil {
leader = j.region.Region.GetPeers()[0]
return nil, errors.Annotatef(berrors.ErrPDLeaderNotFound,
"region id %d has no leader", j.region.Region.Id)
}

cli, err := clientFactory.Create(ctx, leader.StoreId)
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -899,7 +899,8 @@ func (importer *FileImporter) ingestSSTs(
) (*import_sstpb.IngestResponse, error) {
leader := regionInfo.Leader
if leader == nil {
leader = regionInfo.Region.GetPeers()[0]
return nil, errors.Annotatef(berrors.ErrPDLeaderNotFound,
"region id %d has no leader", regionInfo.Region.Id)
}
reqCtx := &kvrpcpb.Context{
RegionId: regionInfo.Region.GetId(),
Expand Down
8 changes: 5 additions & 3 deletions br/pkg/restore/import_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func TestEpochNotMatch(t *testing.T) {
{Id: 43},
},
},
Leader: &metapb.Peer{Id: 43},
Leader: &metapb.Peer{Id: 43, StoreId: 1},
}
newRegion := pdtypes.NewRegionInfo(info.Region, info.Leader)
mergeRegion := func() {
Expand Down Expand Up @@ -304,7 +304,8 @@ func TestRegionSplit(t *testing.T) {
EndKey: codec.EncodeBytes(nil, []byte("aayy")),
},
Leader: &metapb.Peer{
Id: 43,
Id: 43,
StoreId: 1,
},
},
{
Expand All @@ -314,7 +315,8 @@ func TestRegionSplit(t *testing.T) {
EndKey: target.Region.EndKey,
},
Leader: &metapb.Peer{
Id: 45,
Id: 45,
StoreId: 1,
},
},
}
Expand Down
16 changes: 16 additions & 0 deletions br/pkg/restore/split/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,23 @@ func CheckRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) erro
}

cur := regions[0]
if cur.Leader == nil {
return errors.Annotatef(berrors.ErrPDBatchScanRegion,
"region %d's leader is nil", cur.Region.Id)
}
if cur.Leader.StoreId == 0 {
return errors.Annotatef(berrors.ErrPDBatchScanRegion,
"region %d's leader's store id is 0", cur.Region.Id)
}
for _, r := range regions[1:] {
if r.Leader == nil {
return errors.Annotatef(berrors.ErrPDBatchScanRegion,
"region %d's leader is nil", r.Region.Id)
}
if r.Leader.StoreId == 0 {
return errors.Annotatef(berrors.ErrPDBatchScanRegion,
"region %d's leader's store id is 0", r.Region.Id)
}
if !bytes.Equal(cur.Region.EndKey, r.Region.StartKey) {
return errors.Annotatef(berrors.ErrPDBatchScanRegion,
"region %d's endKey not equal to next region %d's startKey, endKey: %s, startKey: %s, region epoch: %s %s",
Expand Down
63 changes: 62 additions & 1 deletion br/pkg/restore/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,8 @@ func initTestClient(isRawKv bool) *TestClient {
}
regions[i] = &split.RegionInfo{
Leader: &metapb.Peer{
Id: i,
Id: i,
StoreId: 1,
},
Region: &metapb.Region{
Id: i,
Expand Down Expand Up @@ -693,6 +694,10 @@ func TestRegionConsistency(t *testing.T) {
"region 6's endKey not equal to next region 8's startKey(.*?)",
[]*split.RegionInfo{
{
Leader: &metapb.Peer{
Id: 6,
StoreId: 1,
},
Region: &metapb.Region{
Id: 6,
StartKey: codec.EncodeBytes([]byte{}, []byte("b")),
Expand All @@ -701,6 +706,10 @@ func TestRegionConsistency(t *testing.T) {
},
},
{
Leader: &metapb.Peer{
Id: 8,
StoreId: 1,
},
Region: &metapb.Region{
Id: 8,
StartKey: codec.EncodeBytes([]byte{}, []byte("e")),
Expand All @@ -709,6 +718,58 @@ func TestRegionConsistency(t *testing.T) {
},
},
},
{
codec.EncodeBytes([]byte{}, []byte("c")),
codec.EncodeBytes([]byte{}, []byte("e")),
"region 6's leader is nil(.*?)",
[]*split.RegionInfo{
{
Region: &metapb.Region{
Id: 6,
StartKey: codec.EncodeBytes([]byte{}, []byte("c")),
EndKey: codec.EncodeBytes([]byte{}, []byte("d")),
RegionEpoch: nil,
},
},
{
Region: &metapb.Region{
Id: 8,
StartKey: codec.EncodeBytes([]byte{}, []byte("d")),
EndKey: codec.EncodeBytes([]byte{}, []byte("e")),
},
},
},
},
{
codec.EncodeBytes([]byte{}, []byte("c")),
codec.EncodeBytes([]byte{}, []byte("e")),
"region 6's leader's store id is 0(.*?)",
[]*split.RegionInfo{
{
Leader: &metapb.Peer{
Id: 6,
StoreId: 0,
},
Region: &metapb.Region{
Id: 6,
StartKey: codec.EncodeBytes([]byte{}, []byte("c")),
EndKey: codec.EncodeBytes([]byte{}, []byte("d")),
RegionEpoch: nil,
},
},
{
Leader: &metapb.Peer{
Id: 6,
StoreId: 0,
},
Region: &metapb.Region{
Id: 8,
StartKey: codec.EncodeBytes([]byte{}, []byte("d")),
EndKey: codec.EncodeBytes([]byte{}, []byte("e")),
},
},
},
},
}
for _, ca := range cases {
err := split.CheckRegionConsistency(ca.startKey, ca.endKey, ca.regions)
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/restore/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ func TestPaginateScanRegion(t *testing.T) {
Id: i + 1,
Peers: peers,
},
Leader: peers[0],
}

if i != 0 {
Expand Down Expand Up @@ -222,6 +223,7 @@ func TestPaginateScanRegion(t *testing.T) {
StartKey: endKey,
EndKey: []byte{},
},
Leader: peers[0],
}
regionsMap[num] = ri
regions = append(regions, ri)
Expand Down

0 comments on commit c149ae8

Please sign in to comment.