Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

operator: fix some error checks #5629

Merged
merged 4 commits into from Oct 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
43 changes: 19 additions & 24 deletions server/schedule/operator/step.go
Expand Up @@ -404,22 +404,24 @@ type PromoteLearner struct {
}

// ConfVerChanged returns the delta value for version increased by this step.
// It is also used by ChangePeerV2Leave. Since there are currently four roles,
// we need to confirm whether it is a Voter, not a DemotingVoter, etc.
func (pl PromoteLearner) ConfVerChanged(region *core.RegionInfo) uint64 {
peer := region.GetStoreVoter(pl.ToStore)
return typeutil.BoolToUint64(peer.GetId() == pl.PeerID)
return typeutil.BoolToUint64(peer.GetId() == pl.PeerID && peer.GetRole() == metapb.PeerRole_Voter)
Copy link
Member Author

@HunDunDM HunDunDM Oct 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, IncomingVoter and DemotingVoter are also considered Voter, so additional checks are required.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should add these comments to code, which is convenient to read code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add some comment

}

func (pl PromoteLearner) String() string {
return fmt.Sprintf("promote learner peer %v on store %v to voter", pl.PeerID, pl.ToStore)
}

// IsFinish checks if current step is finished.
// IsFinish checks if current step is finished. It is also used by ChangePeerV2Leave.
func (pl PromoteLearner) IsFinish(region *core.RegionInfo) bool {
if peer := region.GetStoreVoter(pl.ToStore); peer != nil {
if peer.GetId() != pl.PeerID {
log.Warn("obtain unexpected peer", zap.String("expect", pl.String()), zap.Uint64("obtain-voter", peer.GetId()))
}
return peer.GetId() == pl.PeerID
return peer.GetId() == pl.PeerID && peer.GetRole() == metapb.PeerRole_Voter
}
return false
}
Expand Down Expand Up @@ -643,9 +645,10 @@ func (dv DemoteVoter) String() string {
}

// ConfVerChanged returns the delta value for version increased by this step.
func (dv DemoteVoter) ConfVerChanged(region *core.RegionInfo) bool {
peer := region.GetStoreLearner(dv.ToStore)
return peer.GetId() == dv.PeerID
func (dv DemoteVoter) ConfVerChanged(region *core.RegionInfo) uint64 {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConfVerChanged will be called when the next steps are performed, and it is necessary to consider that the Peer may have been removed in the future.

peer := region.GetStorePeer(dv.ToStore)
// the demoting peer may be removed later.
return typeutil.BoolToUint64(peer == nil || (peer.GetId() == dv.PeerID && peer.GetRole() == metapb.PeerRole_Learner))
}

// IsFinish checks if current step is finished.
Expand Down Expand Up @@ -700,7 +703,8 @@ func (cpe ChangePeerV2Enter) ConfVerChanged(region *core.RegionInfo) uint64 {
}
}
for _, dv := range cpe.DemoteVoters {
peer := region.GetStoreVoter(dv.ToStore)
peer := region.GetStorePeer(dv.ToStore)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There may be Learner here, the previous GetStoreVoter was used incorrectly.

// the demoting peer may be removed later.
if peer != nil && (peer.GetId() != dv.PeerID || !core.IsLearnerOrDemotingVoter(peer)) {
return 0
}
Expand All @@ -715,16 +719,16 @@ func (cpe ChangePeerV2Enter) IsFinish(region *core.RegionInfo) bool {
if peer != nil && peer.GetId() != pl.PeerID {
log.Warn("obtain unexpected peer", zap.String("expect", pl.String()), zap.Uint64("obtain-voter", peer.GetId()))
}
if peer.GetId() != pl.PeerID || peer.GetRole() != metapb.PeerRole_IncomingVoter {
if peer.GetId() != pl.PeerID || !core.IsVoterOrIncomingVoter(peer) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allow to promote directly.

return false
}
}
for _, dv := range cpe.DemoteVoters {
peer := region.GetStoreVoter(dv.ToStore)
peer := region.GetStorePeer(dv.ToStore)
if peer != nil && peer.GetId() != dv.PeerID {
log.Warn("obtain unexpected peer", zap.String("expect", dv.String()), zap.Uint64("obtain-learner", peer.GetId()))
}
if peer.GetId() != dv.PeerID || peer.GetRole() != metapb.PeerRole_DemotingVoter {
if peer.GetId() != dv.PeerID || !core.IsLearnerOrDemotingVoter(peer) {
return false
}
}
Expand All @@ -740,12 +744,10 @@ func (cpe ChangePeerV2Enter) CheckInProgress(_ ClusterInformer, region *core.Reg
return errors.New("peer does not exist")
}
switch peer.GetRole() {
case metapb.PeerRole_Learner:
case metapb.PeerRole_Learner, metapb.PeerRole_Voter:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allow to promote directly.

notInJointState = true
case metapb.PeerRole_IncomingVoter:
inJointState = true
case metapb.PeerRole_Voter:
return errors.New("peer already is a voter")
case metapb.PeerRole_DemotingVoter:
return errors.New("cannot promote a demoting voter")
default:
Expand All @@ -758,12 +760,10 @@ func (cpe ChangePeerV2Enter) CheckInProgress(_ ClusterInformer, region *core.Reg
return errors.New("peer does not exist")
}
switch peer.GetRole() {
case metapb.PeerRole_Voter:
case metapb.PeerRole_Voter, metapb.PeerRole_Learner:
notInJointState = true
case metapb.PeerRole_DemotingVoter:
inJointState = true
case metapb.PeerRole_Learner:
return errors.New("peer already is a learner")
case metapb.PeerRole_IncomingVoter:
return errors.New("cannot demote a incoming voter")
default:
Expand Down Expand Up @@ -833,13 +833,12 @@ func (cpl ChangePeerV2Leave) String() string {
// ConfVerChanged returns the delta value for version increased by this step.
func (cpl ChangePeerV2Leave) ConfVerChanged(region *core.RegionInfo) uint64 {
for _, pl := range cpl.PromoteLearners {
peer := region.GetStoreVoter(pl.ToStore)
if peer.GetId() != pl.PeerID || peer.GetRole() != metapb.PeerRole_Voter {
if pl.ConfVerChanged(region) == 0 {
return 0
}
}
for _, dv := range cpl.DemoteVoters {
if region.GetStorePeer(dv.PeerID) != nil && !dv.ConfVerChanged(region) {
if dv.ConfVerChanged(region) == 0 {
return 0
}
}
Expand All @@ -849,11 +848,7 @@ func (cpl ChangePeerV2Leave) ConfVerChanged(region *core.RegionInfo) uint64 {
// IsFinish checks if current step is finished.
func (cpl ChangePeerV2Leave) IsFinish(region *core.RegionInfo) bool {
for _, pl := range cpl.PromoteLearners {
peer := region.GetStoreVoter(pl.ToStore)
if peer != nil && peer.GetId() != pl.PeerID {
log.Warn("obtain unexpected peer", zap.String("expect", pl.String()), zap.Uint64("obtain-voter", peer.GetId()))
}
if peer.GetId() != pl.PeerID || peer.GetRole() != metapb.PeerRole_Voter {
if !pl.IsFinish(region) {
return false
}
}
Expand Down
107 changes: 107 additions & 0 deletions server/schedule/operator/step_test.go
Expand Up @@ -293,6 +293,113 @@ func (suite *operatorStepTestSuite) TestChangePeerV2Enter() {
suite.check(cpe, desc, testCases)
}

func (suite *operatorStepTestSuite) TestChangePeerV2EnterWithSingleChange() {
cpe := ChangePeerV2Enter{
PromoteLearners: []PromoteLearner{{PeerID: 3, ToStore: 3}},
}
testCases := []testCase{
{ // before step
[]*metapb.Peer{
{Id: 1, StoreId: 1, Role: metapb.PeerRole_Voter},
{Id: 2, StoreId: 2, Role: metapb.PeerRole_Voter},
{Id: 3, StoreId: 3, Role: metapb.PeerRole_Learner},
},
0,
false,
suite.NoError,
},
{ // after step
[]*metapb.Peer{
{Id: 1, StoreId: 1, Role: metapb.PeerRole_Voter},
{Id: 2, StoreId: 2, Role: metapb.PeerRole_Voter},
{Id: 3, StoreId: 3, Role: metapb.PeerRole_IncomingVoter},
},
1,
true,
suite.NoError,
},
{ // after step (direct)
[]*metapb.Peer{
{Id: 1, StoreId: 1, Role: metapb.PeerRole_Voter},
{Id: 2, StoreId: 2, Role: metapb.PeerRole_Voter},
{Id: 3, StoreId: 3, Role: metapb.PeerRole_Voter},
},
1,
true,
suite.NoError,
},
{ // error role
[]*metapb.Peer{
{Id: 1, StoreId: 1, Role: metapb.PeerRole_Voter},
{Id: 2, StoreId: 2, Role: metapb.PeerRole_Voter},
{Id: 3, StoreId: 3, Role: metapb.PeerRole_DemotingVoter},
},
0,
false,
suite.Error,
},
}
desc := "use joint consensus, promote learner peer 3 on store 3 to voter"
suite.check(cpe, desc, testCases)

cpe = ChangePeerV2Enter{
DemoteVoters: []DemoteVoter{{PeerID: 3, ToStore: 3}},
}
testCases = []testCase{
{ // before step
[]*metapb.Peer{
{Id: 1, StoreId: 1, Role: metapb.PeerRole_Voter},
{Id: 2, StoreId: 2, Role: metapb.PeerRole_Voter},
{Id: 3, StoreId: 3, Role: metapb.PeerRole_Voter},
},
0,
false,
suite.NoError,
},
{ // after step
[]*metapb.Peer{
{Id: 1, StoreId: 1, Role: metapb.PeerRole_Voter},
{Id: 2, StoreId: 2, Role: metapb.PeerRole_Voter},
{Id: 3, StoreId: 3, Role: metapb.PeerRole_DemotingVoter},
},
1,
true,
suite.NoError,
},
{ // after step (direct)
[]*metapb.Peer{
{Id: 1, StoreId: 1, Role: metapb.PeerRole_Voter},
{Id: 2, StoreId: 2, Role: metapb.PeerRole_Voter},
{Id: 3, StoreId: 3, Role: metapb.PeerRole_Learner},
},
1,
true,
suite.NoError,
},
{ // demote and remove peer
[]*metapb.Peer{
{Id: 1, StoreId: 1, Role: metapb.PeerRole_Voter},
{Id: 2, StoreId: 2, Role: metapb.PeerRole_Voter},
},
1, // correct calculation is required
false,
suite.Error,
},
{ // error role
[]*metapb.Peer{
{Id: 1, StoreId: 1, Role: metapb.PeerRole_Voter},
{Id: 2, StoreId: 2, Role: metapb.PeerRole_Voter},
{Id: 3, StoreId: 3, Role: metapb.PeerRole_IncomingVoter},
},
0,
false,
suite.Error,
},
}
desc = "use joint consensus, demote voter peer 3 on store 3 to learner"
suite.check(cpe, desc, testCases)
}

func (suite *operatorStepTestSuite) TestChangePeerV2Leave() {
cpl := ChangePeerV2Leave{
PromoteLearners: []PromoteLearner{{PeerID: 3, ToStore: 3}, {PeerID: 4, ToStore: 4}},
Expand Down