Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: move operator into schedule directory. #725

Merged
merged 1 commit into from
Sep 4, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions server/api/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/gorilla/mux"
"github.com/pingcap/pd/server"
"github.com/pingcap/pd/server/schedule"
"github.com/unrolled/render"
)

Expand Down Expand Up @@ -54,8 +55,8 @@ func (h *operatorHandler) Get(w http.ResponseWriter, r *http.Request) {

func (h *operatorHandler) List(w http.ResponseWriter, r *http.Request) {
var (
results []server.Operator
ops []server.Operator
results []schedule.Operator
ops []schedule.Operator
err error
)

Expand Down
16 changes: 8 additions & 8 deletions server/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (l *balanceLeaderScheduler) Prepare(cluster *clusterInfo) error { return ni

func (l *balanceLeaderScheduler) Cleanup(cluster *clusterInfo) {}

func (l *balanceLeaderScheduler) Schedule(cluster *clusterInfo) Operator {
func (l *balanceLeaderScheduler) Schedule(cluster *clusterInfo) schedule.Operator {
schedulerCounter.WithLabelValues(l.GetName(), "schedule").Inc()
region, newLeader := scheduleTransferLeader(cluster, l.GetName(), l.selector)
if region == nil {
Expand Down Expand Up @@ -179,7 +179,7 @@ func (s *balanceRegionScheduler) Prepare(cluster *clusterInfo) error { return ni

func (s *balanceRegionScheduler) Cleanup(cluster *clusterInfo) {}

func (s *balanceRegionScheduler) Schedule(cluster *clusterInfo) Operator {
func (s *balanceRegionScheduler) Schedule(cluster *clusterInfo) schedule.Operator {
schedulerCounter.WithLabelValues(s.GetName(), "schedule").Inc()
// Select a peer from the store with most regions.
region, oldPeer := scheduleRemovePeer(cluster, s.GetName(), s.selector)
Expand Down Expand Up @@ -209,7 +209,7 @@ func (s *balanceRegionScheduler) Schedule(cluster *clusterInfo) Operator {
return op
}

func (s *balanceRegionScheduler) transferPeer(cluster *clusterInfo, region *core.RegionInfo, oldPeer *metapb.Peer) Operator {
func (s *balanceRegionScheduler) transferPeer(cluster *clusterInfo, region *core.RegionInfo, oldPeer *metapb.Peer) schedule.Operator {
// scoreGuard guarantees that the distinct score will not decrease.
stores := cluster.getRegionStores(region)
source := cluster.getStore(oldPeer.GetStoreId())
Expand Down Expand Up @@ -254,7 +254,7 @@ func newReplicaChecker(opt *scheduleOption, cluster *clusterInfo) *replicaChecke
}
}

func (r *replicaChecker) Check(region *core.RegionInfo) Operator {
func (r *replicaChecker) Check(region *core.RegionInfo) schedule.Operator {
if op := r.checkDownPeer(region); op != nil {
return op
}
Expand Down Expand Up @@ -332,7 +332,7 @@ func (r *replicaChecker) selectBestReplacement(region *core.RegionInfo, peer *me
return r.SelectBestStoreToAddReplica(newRegion, schedule.NewExcludedFilter(nil, region.GetStoreIds()))
}

func (r *replicaChecker) checkDownPeer(region *core.RegionInfo) Operator {
func (r *replicaChecker) checkDownPeer(region *core.RegionInfo) schedule.Operator {
for _, stats := range region.DownPeers {
peer := stats.GetPeer()
if peer == nil {
Expand All @@ -354,7 +354,7 @@ func (r *replicaChecker) checkDownPeer(region *core.RegionInfo) Operator {
return nil
}

func (r *replicaChecker) checkOfflinePeer(region *core.RegionInfo) Operator {
func (r *replicaChecker) checkOfflinePeer(region *core.RegionInfo) schedule.Operator {
for _, peer := range region.GetPeers() {
store := r.cluster.getStore(peer.GetStoreId())
if store == nil {
Expand All @@ -380,7 +380,7 @@ func (r *replicaChecker) checkOfflinePeer(region *core.RegionInfo) Operator {
return nil
}

func (r *replicaChecker) checkBestReplacement(region *core.RegionInfo) Operator {
func (r *replicaChecker) checkBestReplacement(region *core.RegionInfo) schedule.Operator {
oldPeer, oldScore := r.selectWorstPeer(region)
if oldPeer == nil {
return nil
Expand Down Expand Up @@ -467,7 +467,7 @@ func (h *balanceHotRegionScheduler) Prepare(cluster *clusterInfo) error { return

func (h *balanceHotRegionScheduler) Cleanup(cluster *clusterInfo) {}

func (h *balanceHotRegionScheduler) Schedule(cluster *clusterInfo) Operator {
func (h *balanceHotRegionScheduler) Schedule(cluster *clusterInfo) schedule.Operator {
schedulerCounter.WithLabelValues(h.GetName(), "schedule").Inc()
h.calcScore(cluster)

Expand Down
57 changes: 29 additions & 28 deletions server/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/schedule"
)

type testClusterInfo struct {
Expand Down Expand Up @@ -271,7 +272,7 @@ func (s *testBalanceLeaderSchedulerSuite) SetUpTest(c *C) {
s.lb = newBalanceLeaderScheduler(opt)
}

func (s *testBalanceLeaderSchedulerSuite) schedule() Operator {
func (s *testBalanceLeaderSchedulerSuite) schedule() schedule.Operator {
return s.lb.Schedule(s.cluster)
}

Expand Down Expand Up @@ -805,80 +806,80 @@ func (s *testReplicaCheckerSuite) TestDistinctScore2(c *C) {
c.Assert(rc.Check(region), IsNil)
}

func checkAddPeer(c *C, bop Operator, storeID uint64) {
var op *changePeerOperator
func checkAddPeer(c *C, bop schedule.Operator, storeID uint64) {
var op *schedule.ChangePeerOperator
switch t := bop.(type) {
case *changePeerOperator:
case *schedule.ChangePeerOperator:
op = t
case *regionOperator:
op = t.Ops[0].(*changePeerOperator)
case *schedule.RegionOperator:
op = t.Ops[0].(*schedule.ChangePeerOperator)
}
c.Assert(op.ChangePeer.GetChangeType(), Equals, pdpb.ConfChangeType_AddNode)
c.Assert(op.ChangePeer.GetPeer().GetStoreId(), Equals, storeID)
}

func checkRemovePeer(c *C, bop Operator, storeID uint64) {
var op *changePeerOperator
func checkRemovePeer(c *C, bop schedule.Operator, storeID uint64) {
var op *schedule.ChangePeerOperator
switch t := bop.(type) {
case *changePeerOperator:
case *schedule.ChangePeerOperator:
op = t
case *regionOperator:
case *schedule.RegionOperator:
if len(t.Ops) == 1 {
op = t.Ops[0].(*changePeerOperator)
op = t.Ops[0].(*schedule.ChangePeerOperator)
} else {
c.Assert(t.Ops, HasLen, 2)
transferLeader := t.Ops[0].(*transferLeaderOperator)
transferLeader := t.Ops[0].(*schedule.TransferLeaderOperator)
c.Assert(transferLeader.OldLeader.GetStoreId(), Equals, storeID)
op = t.Ops[1].(*changePeerOperator)
op = t.Ops[1].(*schedule.ChangePeerOperator)
}
}
c.Assert(op, NotNil)
c.Assert(op.ChangePeer.GetChangeType(), Equals, pdpb.ConfChangeType_RemoveNode)
c.Assert(op.ChangePeer.GetPeer().GetStoreId(), Equals, storeID)
}

func checkTransferPeer(c *C, bop Operator, sourceID, targetID uint64) {
op := bop.(*regionOperator)
func checkTransferPeer(c *C, bop schedule.Operator, sourceID, targetID uint64) {
op := bop.(*schedule.RegionOperator)
c.Assert(op, NotNil)
if len(op.Ops) == 2 {
checkAddPeer(c, op.Ops[0], targetID)
checkRemovePeer(c, op.Ops[1], sourceID)
} else {
c.Assert(op.Ops, HasLen, 3)
checkAddPeer(c, op.Ops[0], targetID)
transferLeader := op.Ops[1].(*transferLeaderOperator)
transferLeader := op.Ops[1].(*schedule.TransferLeaderOperator)
c.Assert(transferLeader.OldLeader.GetStoreId(), Equals, sourceID)
checkRemovePeer(c, op.Ops[2], sourceID)
}
}

func checkTransferPeerWithLeaderTransfer(c *C, bop Operator, sourceID, targetID uint64) {
op := bop.(*regionOperator)
func checkTransferPeerWithLeaderTransfer(c *C, bop schedule.Operator, sourceID, targetID uint64) {
op := bop.(*schedule.RegionOperator)
c.Assert(op, NotNil)
c.Assert(op.Ops, HasLen, 3)
checkTransferPeer(c, bop, sourceID, targetID)
}

func checkTransferLeader(c *C, bop Operator, sourceID, targetID uint64) {
var op *transferLeaderOperator
func checkTransferLeader(c *C, bop schedule.Operator, sourceID, targetID uint64) {
var op *schedule.TransferLeaderOperator
switch t := bop.(type) {
case *transferLeaderOperator:
case *schedule.TransferLeaderOperator:
op = t
case *regionOperator:
op = t.Ops[0].(*transferLeaderOperator)
case *schedule.RegionOperator:
op = t.Ops[0].(*schedule.TransferLeaderOperator)
}
c.Assert(op, NotNil)
c.Assert(op.OldLeader.GetStoreId(), Equals, sourceID)
c.Assert(op.NewLeader.GetStoreId(), Equals, targetID)
}

func checkTransferLeaderFrom(c *C, bop Operator, sourceID uint64) {
var op *transferLeaderOperator
func checkTransferLeaderFrom(c *C, bop schedule.Operator, sourceID uint64) {
var op *schedule.TransferLeaderOperator
switch t := bop.(type) {
case *transferLeaderOperator:
case *schedule.TransferLeaderOperator:
op = t
case *regionOperator:
op = t.Ops[0].(*transferLeaderOperator)
case *schedule.RegionOperator:
op = t.Ops[0].(*schedule.TransferLeaderOperator)
}
c.Assert(op, NotNil)
c.Assert(op.OldLeader.GetStoreId(), Equals, sourceID)
Expand Down
43 changes: 22 additions & 21 deletions server/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/juju/errors"
"github.com/pingcap/pd/server/cache"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/schedule"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -66,7 +67,7 @@ type coordinator struct {
opt *scheduleOption
limiter *scheduleLimiter
checker *replicaChecker
operators map[uint64]Operator
operators map[uint64]schedule.Operator
schedulers map[string]*scheduleController

histories *cache.LRU
Expand All @@ -84,7 +85,7 @@ func newCoordinator(cluster *clusterInfo, opt *scheduleOption, hbStreams *heartb
opt: opt,
limiter: newScheduleLimiter(),
checker: newReplicaChecker(opt, cluster),
operators: make(map[uint64]Operator),
operators: make(map[uint64]schedule.Operator),
schedulers: make(map[string]*scheduleController),
histories: cache.NewLRU(historiesCacheSize),
events: cache.NewFIFO(eventsCacheSize),
Expand Down Expand Up @@ -266,7 +267,7 @@ func (c *coordinator) runScheduler(s *scheduleController) {
}
}

func (c *coordinator) addOperator(op Operator) bool {
func (c *coordinator) addOperator(op schedule.Operator) bool {
c.Lock()
defer c.Unlock()
regionID := op.GetRegionID()
Expand All @@ -279,7 +280,7 @@ func (c *coordinator) addOperator(op Operator) bool {
return false
}
log.Infof("[region %v] replace old operator: %+v", regionID, old)
old.SetState(OperatorReplaced)
old.SetState(schedule.OperatorReplaced)
c.removeOperatorLocked(old)
}

Expand All @@ -297,7 +298,7 @@ func (c *coordinator) addOperator(op Operator) bool {
return true
}

func isHigherPriorityOperator(new Operator, old Operator) bool {
func isHigherPriorityOperator(new, old schedule.Operator) bool {
if new.GetResourceKind() == core.AdminKind {
return true
}
Expand All @@ -307,13 +308,13 @@ func isHigherPriorityOperator(new Operator, old Operator) bool {
return false
}

func (c *coordinator) removeOperator(op Operator) {
func (c *coordinator) removeOperator(op schedule.Operator) {
c.Lock()
defer c.Unlock()
c.removeOperatorLocked(op)
}

func (c *coordinator) removeOperatorLocked(op Operator) {
func (c *coordinator) removeOperatorLocked(op schedule.Operator) {
regionID := op.GetRegionID()
c.limiter.removeOperator(op)
delete(c.operators, regionID)
Expand All @@ -322,43 +323,43 @@ func (c *coordinator) removeOperatorLocked(op Operator) {
collectOperatorCounterMetrics(op)
}

func (c *coordinator) getOperator(regionID uint64) Operator {
func (c *coordinator) getOperator(regionID uint64) schedule.Operator {
c.RLock()
defer c.RUnlock()
return c.operators[regionID]
}

func (c *coordinator) getOperators() []Operator {
func (c *coordinator) getOperators() []schedule.Operator {
c.RLock()
defer c.RUnlock()

var operators []Operator
var operators []schedule.Operator
for _, op := range c.operators {
operators = append(operators, op)
}

return operators
}

func (c *coordinator) getHistories() []Operator {
func (c *coordinator) getHistories() []schedule.Operator {
c.RLock()
defer c.RUnlock()

var operators []Operator
var operators []schedule.Operator
for _, elem := range c.histories.Elems() {
operators = append(operators, elem.Value.(Operator))
operators = append(operators, elem.Value.(schedule.Operator))
}

return operators
}

func (c *coordinator) getHistoriesOfKind(kind core.ResourceKind) []Operator {
func (c *coordinator) getHistoriesOfKind(kind core.ResourceKind) []schedule.Operator {
c.RLock()
defer c.RUnlock()

var operators []Operator
var operators []schedule.Operator
for _, elem := range c.histories.Elems() {
op := elem.Value.(Operator)
op := elem.Value.(schedule.Operator)
if op.GetResourceKind() == kind {
operators = append(operators, op)
}
Expand All @@ -378,13 +379,13 @@ func newScheduleLimiter() *scheduleLimiter {
}
}

func (l *scheduleLimiter) addOperator(op Operator) {
func (l *scheduleLimiter) addOperator(op schedule.Operator) {
l.Lock()
defer l.Unlock()
l.counts[op.GetResourceKind()]++
}

func (l *scheduleLimiter) removeOperator(op Operator) {
func (l *scheduleLimiter) removeOperator(op schedule.Operator) {
l.Lock()
defer l.Unlock()
l.counts[op.GetResourceKind()]--
Expand Down Expand Up @@ -427,7 +428,7 @@ func (s *scheduleController) Stop() {
s.cancel()
}

func (s *scheduleController) Schedule(cluster *clusterInfo) Operator {
func (s *scheduleController) Schedule(cluster *clusterInfo) schedule.Operator {
for i := 0; i < maxScheduleRetries; i++ {
// If we have schedule, reset interval to the minimal interval.
if op := s.Scheduler.Schedule(cluster); op != nil {
Expand All @@ -450,8 +451,8 @@ func (s *scheduleController) AllowSchedule() bool {
return s.limiter.operatorCount(s.GetResourceKind()) < s.GetResourceLimit()
}

func collectOperatorCounterMetrics(op Operator) {
regionOp, ok := op.(*regionOperator)
func collectOperatorCounterMetrics(op schedule.Operator) {
regionOp, ok := op.(*schedule.RegionOperator)
if !ok {
return
}
Expand Down
Loading