Skip to content

Commit

Permalink
add light peer without considering the influence (#1554) (#1563)
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx authored and nolouch committed Jun 5, 2019
1 parent c44ddf4 commit 9ae18a3
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 34 deletions.
29 changes: 28 additions & 1 deletion server/schedule/mockcluster.go
Expand Up @@ -27,7 +27,15 @@ import (
"go.uber.org/zap"
)

// MockCluster is used to mock clusterInfo for test use
// MockHeadbeatStream is used to mock HeadbeatStream for test use.
type MockHeadbeatStream struct{}

// SendMsg is used to send the message.
func (m MockHeadbeatStream) SendMsg(region *core.RegionInfo, msg *pdpb.RegionHeartbeatResponse) {
return
}

// MockCluster is used to mock clusterInfo for test use.
type MockCluster struct {
*BasicCluster
*core.MockIDAllocator
Expand Down Expand Up @@ -396,6 +404,15 @@ func (mc *MockCluster) ApplyOperatorStep(region *core.RegionInfo, op *Operator)
StoreId: s.ToStore,
}
region = region.Clone(core.WithAddPeer(peer))
case AddLightPeer:
if region.GetStorePeer(s.ToStore) != nil {
panic("Add peer that exists")
}
peer := &metapb.Peer{
Id: s.PeerID,
StoreId: s.ToStore,
}
region = region.Clone(core.WithAddPeer(peer))
case RemovePeer:
if region.GetStorePeer(s.FromStore) == nil {
panic("Remove peer that doesn't exist")
Expand All @@ -414,6 +431,16 @@ func (mc *MockCluster) ApplyOperatorStep(region *core.RegionInfo, op *Operator)
IsLearner: true,
}
region = region.Clone(core.WithAddPeer(peer))
case AddLightLearner:
if region.GetStorePeer(s.ToStore) != nil {
panic("Add learner that exists")
}
peer := &metapb.Peer{
Id: s.PeerID,
StoreId: s.ToStore,
IsLearner: true,
}
region = region.Clone(core.WithAddPeer(peer))
case PromoteLearner:
if region.GetStoreLearner(s.ToStore) == nil {
panic("Promote peer that doesn't exist")
Expand Down
116 changes: 97 additions & 19 deletions server/schedule/operator.go
Expand Up @@ -60,7 +60,7 @@ func (tl TransferLeader) IsFinish(region *core.RegionInfo) bool {
return region.GetLeader().GetStoreId() == tl.ToStore
}

// Influence calculates the store difference that current step make
// Influence calculates the store difference that current step makes.
func (tl TransferLeader) Influence(opInfluence OpInfluence, region *core.RegionInfo) {
from := opInfluence.GetStoreInfluence(tl.FromStore)
to := opInfluence.GetStoreInfluence(tl.ToStore)
Expand Down Expand Up @@ -92,7 +92,7 @@ func (ap AddPeer) IsFinish(region *core.RegionInfo) bool {
return false
}

// Influence calculates the store difference that current step make
// Influence calculates the store difference that current step makes.
func (ap AddPeer) Influence(opInfluence OpInfluence, region *core.RegionInfo) {
to := opInfluence.GetStoreInfluence(ap.ToStore)

Expand Down Expand Up @@ -122,7 +122,7 @@ func (al AddLearner) IsFinish(region *core.RegionInfo) bool {
return false
}

// Influence calculates the store difference that current step make
// Influence calculates the store difference that current step makes.
func (al AddLearner) Influence(opInfluence OpInfluence, region *core.RegionInfo) {
to := opInfluence.GetStoreInfluence(al.ToStore)

Expand Down Expand Up @@ -151,7 +151,7 @@ func (pl PromoteLearner) IsFinish(region *core.RegionInfo) bool {
return false
}

// Influence calculates the store difference that current step make
// Influence calculates the store difference that current step makes.
func (pl PromoteLearner) Influence(opInfluence OpInfluence, region *core.RegionInfo) {}

// RemovePeer is an OperatorStep that removes a region peer.
Expand All @@ -168,7 +168,7 @@ func (rp RemovePeer) IsFinish(region *core.RegionInfo) bool {
return region.GetStorePeer(rp.FromStore) == nil
}

// Influence calculates the store difference that current step make
// Influence calculates the store difference that current step makes.
func (rp RemovePeer) Influence(opInfluence OpInfluence, region *core.RegionInfo) {
from := opInfluence.GetStoreInfluence(rp.FromStore)

Expand All @@ -183,25 +183,25 @@ type MergeRegion struct {
// there are two regions involved in merge process,
// so to keep them from other scheduler,
// both of them should add MerRegion operatorStep.
// But actually, tikv just need the region want to be merged to get the merge request,
// But actually, TiKV just needs the region want to be merged to get the merge request,
// thus use a IsPassive mark to indicate that
// this region doesn't need to send merge request to tikv.
// this region doesn't need to send merge request to TiKV.
IsPassive bool
}

func (mr MergeRegion) String() string {
return fmt.Sprintf("merge region %v into region %v", mr.FromRegion.GetId(), mr.ToRegion.GetId())
}

// IsFinish checks if current step is finished
// IsFinish checks if current step is finished.
func (mr MergeRegion) IsFinish(region *core.RegionInfo) bool {
if mr.IsPassive {
return !bytes.Equal(region.GetStartKey(), mr.ToRegion.StartKey) || !bytes.Equal(region.GetEndKey(), mr.ToRegion.EndKey)
}
return false
}

// Influence calculates the store difference that current step make
// Influence calculates the store difference that current step makes.
func (mr MergeRegion) Influence(opInfluence OpInfluence, region *core.RegionInfo) {
if mr.IsPassive {
for _, p := range region.GetPeers() {
Expand Down Expand Up @@ -229,7 +229,7 @@ func (sr SplitRegion) IsFinish(region *core.RegionInfo) bool {
return !bytes.Equal(region.GetStartKey(), sr.StartKey) || !bytes.Equal(region.GetEndKey(), sr.EndKey)
}

// Influence calculates the store difference that current step make.
// Influence calculates the store difference that current step makes.
func (sr SplitRegion) Influence(opInfluence OpInfluence, region *core.RegionInfo) {
for _, p := range region.GetPeers() {
inf := opInfluence.GetStoreInfluence(p.GetStoreId())
Expand All @@ -240,6 +240,64 @@ func (sr SplitRegion) Influence(opInfluence OpInfluence, region *core.RegionInfo
}
}

// AddLightPeer is an OperatorStep that adds a region peer without considering the influence.
type AddLightPeer struct {
ToStore, PeerID uint64
}

func (ap AddLightPeer) String() string {
return fmt.Sprintf("add peer %v on store %v", ap.PeerID, ap.ToStore)
}

// IsFinish checks if current step is finished.
func (ap AddLightPeer) IsFinish(region *core.RegionInfo) bool {
if p := region.GetStoreVoter(ap.ToStore); p != nil {
if p.GetId() != ap.PeerID {
log.Warn("obtain unexpected peer", zap.String("expect", ap.String()), zap.Uint64("obtain-voter", p.GetId()))
return false
}
return region.GetPendingVoter(p.GetId()) == nil
}
return false
}

// Influence calculates the store difference that current step makes.
func (ap AddLightPeer) Influence(opInfluence OpInfluence, region *core.RegionInfo) {
to := opInfluence.GetStoreInfluence(ap.ToStore)

to.RegionSize += region.GetApproximateSize()
to.RegionCount++
}

// AddLightLearner is an OperatorStep that adds a region learner peer without considering the influence.
type AddLightLearner struct {
ToStore, PeerID uint64
}

func (al AddLightLearner) String() string {
return fmt.Sprintf("add learner peer %v on store %v", al.PeerID, al.ToStore)
}

// IsFinish checks if current step is finished.
func (al AddLightLearner) IsFinish(region *core.RegionInfo) bool {
if p := region.GetStoreLearner(al.ToStore); p != nil {
if p.GetId() != al.PeerID {
log.Warn("obtain unexpected peer", zap.String("expect", al.String()), zap.Uint64("obtain-learner", p.GetId()))
return false
}
return region.GetPendingLearner(p.GetId()) == nil
}
return false
}

// Influence calculates the store difference that current step makes.
func (al AddLightLearner) Influence(opInfluence OpInfluence, region *core.RegionInfo) {
to := opInfluence.GetStoreInfluence(al.ToStore)

to.RegionSize += region.GetApproximateSize()
to.RegionCount++
}

// Operator contains execution steps generated by scheduler.
type Operator struct {
desc string
Expand Down Expand Up @@ -278,7 +336,7 @@ func (o *Operator) String() string {
return s
}

// MarshalJSON serialize custom types to JSON
// MarshalJSON serializes custom types to JSON.
func (o *Operator) MarshalJSON() ([]byte, error) {
return []byte(`"` + o.String() + `"`), nil
}
Expand Down Expand Up @@ -347,12 +405,12 @@ func (o *Operator) Check(region *core.RegionInfo) OperatorStep {
return nil
}

// SetPriorityLevel set the priority level for operator
// SetPriorityLevel sets the priority level for operator.
func (o *Operator) SetPriorityLevel(level core.PriorityLevel) {
o.level = level
}

// GetPriorityLevel get the priority level
// GetPriorityLevel gets the priority level.
func (o *Operator) GetPriorityLevel() core.PriorityLevel {
return o.level
}
Expand All @@ -379,7 +437,7 @@ func (o *Operator) IsTimeout() bool {
return false
}

// UnfinishedInfluence calculates the store difference which unfinished operator steps make
// UnfinishedInfluence calculates the store difference which unfinished operator steps make.
func (o *Operator) UnfinishedInfluence(opInfluence OpInfluence, region *core.RegionInfo) {
for step := atomic.LoadInt32(&o.currentStep); int(step) < len(o.steps); step++ {
if !o.steps[int(step)].IsFinish(region) {
Expand All @@ -388,7 +446,7 @@ func (o *Operator) UnfinishedInfluence(opInfluence OpInfluence, region *core.Reg
}
}

// TotalInfluence calculates the store difference which whole operator steps make
// TotalInfluence calculates the store difference which whole operator steps make.
func (o *Operator) TotalInfluence(opInfluence OpInfluence, region *core.RegionInfo) {
for step := 0; step < len(o.steps); step++ {
o.steps[int(step)].Influence(opInfluence, region)
Expand Down Expand Up @@ -418,8 +476,12 @@ func (o *Operator) History() []OperatorHistory {
})
case AddPeer:
addPeerStores = append(addPeerStores, s.ToStore)
case AddLightPeer:
addPeerStores = append(addPeerStores, s.ToStore)
case AddLearner:
addPeerStores = append(addPeerStores, s.ToStore)
case AddLightLearner:
addPeerStores = append(addPeerStores, s.ToStore)
case RemovePeer:
removePeerStores = append(removePeerStores, s.FromStore)
}
Expand All @@ -437,7 +499,7 @@ func (o *Operator) History() []OperatorHistory {
return histories
}

// CreateRemovePeerOperator creates an Operator that removes a peer from region.
// CreateRemovePeerOperator creates an operator that removes a peer from region.
func CreateRemovePeerOperator(desc string, cluster Cluster, kind OperatorKind, region *core.RegionInfo, storeID uint64) (*Operator, error) {
removeKind, steps, err := removePeerSteps(cluster, region, storeID, getRegionFollowerIDs(region))
if err != nil {
Expand All @@ -446,7 +508,7 @@ func CreateRemovePeerOperator(desc string, cluster Cluster, kind OperatorKind, r
return NewOperator(desc, region.GetID(), region.GetRegionEpoch(), removeKind|kind, steps...), nil
}

// CreateAddPeerSteps creates an OperatorStep list that add a new Peer.
// CreateAddPeerSteps creates an OperatorStep list that add a new peer.
func CreateAddPeerSteps(newStore uint64, peerID uint64, cluster Cluster) []OperatorStep {
var st []OperatorStep
if cluster.IsRaftLearnerEnabled() {
Expand All @@ -462,7 +524,23 @@ func CreateAddPeerSteps(newStore uint64, peerID uint64, cluster Cluster) []Opera
return st
}

// CreateMovePeerOperator creates an Operator that replaces an old peer with a new peer.
// CreateAddLightPeerSteps creates an OperatorStep list that add a new peer without considering the influence.
func CreateAddLightPeerSteps(newStore uint64, peerID uint64, cluster Cluster) []OperatorStep {
var st []OperatorStep
if cluster.IsRaftLearnerEnabled() {
st = []OperatorStep{
AddLightLearner{ToStore: newStore, PeerID: peerID},
PromoteLearner{ToStore: newStore, PeerID: peerID},
}
} else {
st = []OperatorStep{
AddLightPeer{ToStore: newStore, PeerID: peerID},
}
}
return st
}

// CreateMovePeerOperator creates an operator that replaces an old peer with a new peer.
func CreateMovePeerOperator(desc string, cluster Cluster, region *core.RegionInfo, kind OperatorKind, oldStore, newStore uint64, peerID uint64) (*Operator, error) {
removeKind, steps, err := removePeerSteps(cluster, region, oldStore, append(getRegionFollowerIDs(region), newStore))
if err != nil {
Expand Down Expand Up @@ -503,7 +581,7 @@ func removePeerSteps(cluster Cluster, region *core.RegionInfo, storeID uint64, f
return
}

// CreateMergeRegionOperator creates an Operator that merge two region into one
// CreateMergeRegionOperator creates an operator that merge two region into one.
func CreateMergeRegionOperator(desc string, cluster Cluster, source *core.RegionInfo, target *core.RegionInfo, kind OperatorKind) ([]*Operator, error) {
steps, kinds, err := matchPeerSteps(cluster, source, target)
if err != nil {
Expand Down
31 changes: 31 additions & 0 deletions server/schedule/operator_controller.go
Expand Up @@ -338,6 +338,21 @@ func (oc *OperatorController) SendScheduleCommand(region *core.RegionInfo, step
},
}
oc.hbStreams.SendMsg(region, cmd)
case AddLightPeer:
if region.GetStorePeer(st.ToStore) != nil {
// The newly added peer is pending.
return
}
cmd := &pdpb.RegionHeartbeatResponse{
ChangePeer: &pdpb.ChangePeer{
ChangeType: eraftpb.ConfChangeType_AddNode,
Peer: &metapb.Peer{
Id: st.PeerID,
StoreId: st.ToStore,
},
},
}
oc.hbStreams.SendMsg(region, cmd)
case AddLearner:
if region.GetStorePeer(st.ToStore) != nil {
// The newly added peer is pending.
Expand All @@ -354,6 +369,22 @@ func (oc *OperatorController) SendScheduleCommand(region *core.RegionInfo, step
},
}
oc.hbStreams.SendMsg(region, cmd)
case AddLightLearner:
if region.GetStorePeer(st.ToStore) != nil {
// The newly added peer is pending.
return
}
cmd := &pdpb.RegionHeartbeatResponse{
ChangePeer: &pdpb.ChangePeer{
ChangeType: eraftpb.ConfChangeType_AddLearnerNode,
Peer: &metapb.Peer{
Id: st.PeerID,
StoreId: st.ToStore,
IsLearner: true,
},
},
}
oc.hbStreams.SendMsg(region, cmd)
case PromoteLearner:
cmd := &pdpb.RegionHeartbeatResponse{
ChangePeer: &pdpb.ChangePeer{
Expand Down
13 changes: 2 additions & 11 deletions server/schedule/operator_controller_test.go
Expand Up @@ -20,7 +20,6 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/pd/server/core"
)

var _ = Suite(&testOperatorControllerSuite{})
Expand Down Expand Up @@ -56,17 +55,10 @@ func (t *testOperatorControllerSuite) TestGetOpInfluence(c *C) {
c.Assert(oc.GetOperator(2), NotNil)
}

type mockHeadbeatStream struct{}

func (m mockHeadbeatStream) SendMsg(region *core.RegionInfo, msg *pdpb.RegionHeartbeatResponse) {
return
}

func (t *testOperatorControllerSuite) TestOperatorStatus(c *C) {
opt := NewMockSchedulerOptions()
tc := NewMockCluster(opt)
oc := NewOperatorController(tc, nil)
oc.hbStreams = mockHeadbeatStream{}
oc := NewOperatorController(tc, MockHeadbeatStream{})
tc.AddLeaderStore(1, 2)
tc.AddLeaderStore(2, 0)
tc.AddLeaderRegion(1, 1, 2)
Expand Down Expand Up @@ -98,8 +90,7 @@ func (t *testOperatorControllerSuite) TestOperatorStatus(c *C) {
func (t *testOperatorControllerSuite) TestPollDispatchRegion(c *C) {
opt := NewMockSchedulerOptions()
tc := NewMockCluster(opt)
oc := NewOperatorController(tc, nil)
oc.hbStreams = mockHeadbeatStream{}
oc := NewOperatorController(tc, MockHeadbeatStream{})
tc.AddLeaderStore(1, 2)
tc.AddLeaderStore(2, 0)
tc.AddLeaderRegion(1, 1, 2)
Expand Down

0 comments on commit 9ae18a3

Please sign in to comment.