Skip to content

Commit

Permalink
fix the conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Oct 22, 2019
1 parent a01903d commit 058ba9f
Show file tree
Hide file tree
Showing 20 changed files with 100 additions and 93 deletions.
15 changes: 8 additions & 7 deletions server/coordinator_test.go
Expand Up @@ -688,13 +688,14 @@ func (s *testCoordinatorSuite) TestPersistScheduler(c *C) {
co.run()
storage = tc.RaftCluster.storage
c.Assert(co.schedulers, HasLen, 3)
bls, err := schedule.CreateScheduler("balance-leader", oc, storage, nil)
bls, err := schedule.CreateScheduler("balance-leader", oc, storage, schedule.ConfigSliceDecoder("balance-leader", []string{"", ""}))
c.Assert(err, IsNil)
c.Assert(co.addScheduler(bls), IsNil)
brs, err := schedule.CreateScheduler("balance-region", oc, storage, nil)
brs, err := schedule.CreateScheduler("balance-region", oc, storage, schedule.ConfigSliceDecoder("balance-region", []string{"", ""}))
c.Assert(err, IsNil)
c.Assert(co.addScheduler(brs), IsNil)
c.Assert(co.schedulers, HasLen, 5)

// the scheduler option should contain 7 items
// the `hot scheduler` and `label scheduler` are disabled
c.Assert(co.cluster.opt.GetSchedulers(), HasLen, 7)
Expand All @@ -705,7 +706,7 @@ func (s *testCoordinatorSuite) TestPersistScheduler(c *C) {
c.Assert(co.cluster.opt.Persist(co.cluster.storage), IsNil)
co.stop()
co.wg.Wait()

fmt.Println("------------------------------")
_, newOpt, err = newTestScheduleConfig()
c.Assert(err, IsNil)
c.Assert(newOpt.Reload(co.cluster.storage), IsNil)
Expand Down Expand Up @@ -872,7 +873,7 @@ func (s *testOperatorControllerSuite) TestStoreOverloaded(c *C) {
defer cleanup()
defer hbStreams.Close()
oc := schedule.NewOperatorController(tc.RaftCluster, hbStreams)
lb, err := schedule.CreateScheduler("balance-region", oc, tc.storage, nil)
lb, err := schedule.CreateScheduler("balance-region", oc, tc.storage, schedule.ConfigSliceDecoder("balance-region", []string{"", ""}))
c.Assert(err, IsNil)

c.Assert(tc.addRegionStore(4, 40), IsNil)
Expand Down Expand Up @@ -907,7 +908,7 @@ func (s *testOperatorControllerSuite) TestStoreOverloadedWithReplace(c *C) {
defer cleanup()
defer hbStreams.Close()
oc := schedule.NewOperatorController(tc.RaftCluster, hbStreams)
lb, err := schedule.CreateScheduler("balance-region", oc, tc.storage, nil)
lb, err := schedule.CreateScheduler("balance-region", oc, tc.storage, schedule.ConfigSliceDecoder("balance-region", []string{"", ""}))
c.Assert(err, IsNil)

c.Assert(tc.addRegionStore(4, 40), IsNil)
Expand Down Expand Up @@ -957,7 +958,7 @@ func (s *testScheduleControllerSuite) TestController(c *C) {

co := newCoordinator(tc.RaftCluster, hbStreams, namespace.DefaultClassifier)
oc := co.opController
scheduler, err := schedule.CreateScheduler("balance-leader", oc, core.NewStorage(kv.NewMemoryKV()), nil)
scheduler, err := schedule.CreateScheduler("balance-leader", oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder("balance-leader", []string{"", ""}))
c.Assert(err, IsNil)
lb := &mockLimitScheduler{
Scheduler: scheduler,
Expand Down Expand Up @@ -1032,7 +1033,7 @@ func (s *testScheduleControllerSuite) TestInterval(c *C) {
defer hbStreams.Close()

co := newCoordinator(tc.RaftCluster, hbStreams, namespace.DefaultClassifier)
lb, err := schedule.CreateScheduler("balance-leader", co.opController, core.NewStorage(kv.NewMemoryKV()), nil)
lb, err := schedule.CreateScheduler("balance-leader", co.opController, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder("balance-leader", []string{"", ""}))
c.Assert(err, IsNil)
sc := newScheduleController(co, lb)

Expand Down
8 changes: 4 additions & 4 deletions server/core/basic_cluster.go
Expand Up @@ -339,13 +339,13 @@ type StoreSetController interface {
// KeyRange is a key range.
type KeyRange struct {
StartKey []byte `json:"start-key"`
EndKey []byte `json:"end-key"`
EndKey []byte `json:"end-key"`
}

// NewKeyRange create a KeyRange with the given start key and end key.
func NewKeyRange(startKey, endKey string) KeyRange {
return KeyRange {
return KeyRange{
StartKey: []byte(startKey),
EndKey: []byte(endKey),
EndKey: []byte(endKey),
}
}
}
8 changes: 4 additions & 4 deletions server/core/region.go
Expand Up @@ -17,9 +17,9 @@ import (
"bytes"
"encoding/hex"
"fmt"
"math/rand"
"reflect"
"strings"
"math/rand"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -758,7 +758,7 @@ func (r *RegionsInfo) RandPendingRegion(storeID uint64, ranges []KeyRange, opts

// RandLeaderRegion randomly gets a store's leader region.
func (r *RegionsInfo) RandLeaderRegion(storeID uint64, ranges []KeyRange, opts ...RegionOption) *RegionInfo {
return randRegion(r.leaders[storeID],ranges, opts...)
return randRegion(r.leaders[storeID], ranges, opts...)
}

// RandFollowerRegion randomly gets a store's follower region.
Expand Down Expand Up @@ -830,8 +830,8 @@ type RegionsContainer interface {

func randRegion(regions RegionsContainer, ranges []KeyRange, opts ...RegionOption) *RegionInfo {
for i := 0; i < randomRegionMaxRetry; i++ {
idx:=rand.Intn(len(ranges))
r:=ranges[idx]
idx := rand.Intn(len(ranges))
r := ranges[idx]
region := regions.RandomRegion(r.StartKey, r.EndKey)
if region == nil {
return nil
Expand Down
2 changes: 1 addition & 1 deletion server/core/region_test.go
Expand Up @@ -148,7 +148,7 @@ func BenchmarkRandomRegion(b *testing.B) {
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
regions.RandRegion([]KeyRange{NewKeyRange("","")})
regions.RandRegion([]KeyRange{NewKeyRange("", "")})
}
}

Expand Down
4 changes: 2 additions & 2 deletions server/namespace_test.go
Expand Up @@ -133,7 +133,7 @@ func (s *testNamespaceSuite) TestSchedulerBalanceRegion(c *C) {
s.opt.SetMaxReplicas(1)

oc := schedule.NewOperatorController(nil, nil)
sched, _ := schedule.CreateScheduler("balance-region", oc, core.NewStorage(kv.NewMemoryKV()), nil)
sched, _ := schedule.CreateScheduler("balance-region", oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder("balance-region", []string{"", ""}))

// Balance is limited within a namespace.
c.Assert(s.tc.addLeaderRegion(1, 2), IsNil)
Expand Down Expand Up @@ -174,7 +174,7 @@ func (s *testNamespaceSuite) TestSchedulerBalanceLeader(c *C) {
s.classifier.setStore(4, "ns2")

oc := schedule.NewOperatorController(nil, nil)
sched, _ := schedule.CreateScheduler("balance-leader", oc, core.NewStorage(kv.NewMemoryKV()), nil)
sched, _ := schedule.CreateScheduler("balance-leader", oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder("balance-leader", []string{"", ""}))

// Balance is limited within a namespace.
c.Assert(s.tc.addLeaderRegion(1, 2, 1), IsNil)
Expand Down
2 changes: 1 addition & 1 deletion server/schedule/range_cluster.go
Expand Up @@ -107,7 +107,7 @@ func (r *RangeCluster) RandFollowerRegion(storeID uint64, ranges []core.KeyRange
}

// RandLeaderRegion returns a random region that has leader on the store.
func (r *RangeCluster) RandLeaderRegion(storeID uint64, ranges []core.KeyRange, opts ...core.RegionOption) *core.RegionInfo {
func (r *RangeCluster) RandLeaderRegion(storeID uint64, ranges []core.KeyRange, opts ...core.RegionOption) *core.RegionInfo {
return r.regions.RandLeaderRegion(storeID, ranges, opts...)
}

Expand Down
4 changes: 2 additions & 2 deletions server/schedulers/adjacent_region.go
Expand Up @@ -60,7 +60,7 @@ func init() {
}
conf.LeaderLimit = defaultAdjacentLeaderLimit
conf.PeerLimit = defaultAdjacentPeerLimit
conf.Name=balanceAdjacentRegionName
conf.Name = balanceAdjacentRegionName
return nil
}
})
Expand All @@ -78,7 +78,7 @@ func init() {
}

type balanceAdjacentRegionConfig struct {
Name string `json:"name"`
Name string `json:"name"`
LeaderLimit uint64 `json:"leader-limit"`
PeerLimit uint64 `json:"peer-limit"`
}
Expand Down
24 changes: 15 additions & 9 deletions server/schedulers/balance_leader.go
Expand Up @@ -24,9 +24,15 @@ import (
"github.com/pingcap/pd/server/schedule/operator"
"github.com/pingcap/pd/server/schedule/opt"
"github.com/pingcap/pd/server/schedule/selector"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"github.com/pkg/errors"
)

const (
balanceLeaderName = "balance-leader-scheduler"
// balanceLeaderRetryLimit is the limit to retry schedule for selected source store and target store.
balanceLeaderRetryLimit = 10
)

func init() {
Expand All @@ -36,12 +42,12 @@ func init() {
if !ok {
return ErrScheduleConfigNotExist
}
ranges, err := getKeyRanges(args)
ranges, err := getKeyRanges(args)
if err != nil {
return errors.WithStack(err)
}
conf.Ranges = ranges
conf.Name=balanceRegionName
conf.Name = balanceLeaderName
return nil
}
})
Expand All @@ -54,16 +60,13 @@ func init() {
}

type balanceLeaderSchedulerConfig struct {
Name string `json:"name"`
Name string `json:"name"`
Ranges []core.KeyRange `json:"ranges"`
}

// balanceLeaderRetryLimit is the limit to retry schedule for selected source store and target store.
const balanceLeaderRetryLimit = 10

type balanceLeaderScheduler struct {
*baseScheduler
conf *balanceLeaderSchedulerConfig
conf *balanceLeaderSchedulerConfig
selector *selector.BalanceSelector
taintStores *cache.TTLUint64
opController *schedule.OperatorController
Expand All @@ -78,7 +81,7 @@ func newBalanceLeaderScheduler(opController *schedule.OperatorController, conf *

s := &balanceLeaderScheduler{
baseScheduler: base,
conf: conf,
conf: conf,
taintStores: taintStores,
opController: opController,
counter: balanceLeaderCounter,
Expand Down Expand Up @@ -113,6 +116,9 @@ func WithBalanceLeaderName(name string) BalanceLeaderCreateOption {
}

func (l *balanceLeaderScheduler) GetName() string {
if l.conf.Name == "" {
return balanceLeaderName
}
return l.conf.Name
}

Expand Down
15 changes: 9 additions & 6 deletions server/schedulers/balance_region.go
Expand Up @@ -27,9 +27,9 @@ import (
"github.com/pingcap/pd/server/schedule/operator"
"github.com/pingcap/pd/server/schedule/opt"
"github.com/pingcap/pd/server/schedule/selector"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"github.com/pkg/errors"
)

func init() {
Expand All @@ -39,12 +39,12 @@ func init() {
if !ok {
return ErrScheduleConfigNotExist
}
ranges, err := getKeyRanges(args)
ranges, err := getKeyRanges(args)
if err != nil {
return errors.WithStack(err)
}
conf.Ranges = ranges
conf.Name=balanceRegionName
conf.Name = balanceRegionName
return nil
}
})
Expand All @@ -70,13 +70,13 @@ const (
)

type balanceRegionSchedulerConfig struct {
Name string `json:"name"`
Name string `json:"name"`
Ranges []core.KeyRange `json:"ranges"`
}

type balanceRegionScheduler struct {
*baseScheduler
conf *balanceRegionSchedulerConfig
conf *balanceRegionSchedulerConfig
selector *selector.BalanceSelector
opController *schedule.OperatorController
hitsCounter *hitsStoreBuilder
Expand All @@ -89,7 +89,7 @@ func newBalanceRegionScheduler(opController *schedule.OperatorController, conf *
base := newBaseScheduler(opController)
s := &balanceRegionScheduler{
baseScheduler: base,
conf: conf,
conf: conf,
opController: opController,
hitsCounter: newHitsStoreBuilder(hitsStoreTTL, hitsStoreCountThreshold),
counter: balanceRegionCounter,
Expand Down Expand Up @@ -123,6 +123,9 @@ func WithBalanceRegionName(name string) BalanceRegionCreateOption {
}

func (s *balanceRegionScheduler) GetName() string {
if s.conf.Name == "" {
return balanceRegionName
}
return s.conf.Name
}

Expand Down
16 changes: 8 additions & 8 deletions server/schedulers/balance_test.go
Expand Up @@ -163,7 +163,7 @@ func (s *testBalanceLeaderSchedulerSuite) SetUpTest(c *C) {
opt := mockoption.NewScheduleOptions()
s.tc = mockcluster.NewCluster(opt)
s.oc = schedule.NewOperatorController(nil, nil)
lb, err := schedule.CreateScheduler("balance-leader", s.oc, core.NewStorage(kv.NewMemoryKV()), nil)
lb, err := schedule.CreateScheduler("balance-leader", s.oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder("balance-leader", []string{"", ""}))
c.Assert(err, IsNil)
s.lb = lb
}
Expand Down Expand Up @@ -421,7 +421,7 @@ func (s *testBalanceRegionSchedulerSuite) TestBalance(c *C) {
tc := mockcluster.NewCluster(opt)
oc := schedule.NewOperatorController(nil, nil)

sb, err := schedule.CreateScheduler("balance-region", oc, core.NewStorage(kv.NewMemoryKV()), nil)
sb, err := schedule.CreateScheduler("balance-region", oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder("balance-region", []string{"", ""}))
c.Assert(err, IsNil)

opt.SetMaxReplicas(1)
Expand Down Expand Up @@ -456,7 +456,7 @@ func (s *testBalanceRegionSchedulerSuite) TestReplicas3(c *C) {

newTestReplication(opt, 3, "zone", "rack", "host")

sb, err := schedule.CreateScheduler("balance-region", oc, core.NewStorage(kv.NewMemoryKV()), nil)
sb, err := schedule.CreateScheduler("balance-region", oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder("balance-region", []string{"", ""}))
c.Assert(err, IsNil)

// Store 1 has the largest region score, so the balancer try to replace peer in store 1.
Expand Down Expand Up @@ -525,7 +525,7 @@ func (s *testBalanceRegionSchedulerSuite) TestReplicas5(c *C) {

newTestReplication(opt, 5, "zone", "rack", "host")

sb, err := schedule.CreateScheduler("balance-region", oc, core.NewStorage(kv.NewMemoryKV()), nil)
sb, err := schedule.CreateScheduler("balance-region", oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder("balance-region", []string{"", ""}))
c.Assert(err, IsNil)

tc.AddLabelsStore(1, 4, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"})
Expand Down Expand Up @@ -580,7 +580,7 @@ func (s *testBalanceRegionSchedulerSuite) TestBalance1(c *C) {

opt.TolerantSizeRatio = 1

sb, err := schedule.CreateScheduler("balance-region", oc, core.NewStorage(kv.NewMemoryKV()), nil)
sb, err := schedule.CreateScheduler("balance-region", oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder("balance-region", []string{"", ""}))
c.Assert(err, IsNil)

tc.AddRegionStore(1, 11)
Expand Down Expand Up @@ -620,7 +620,7 @@ func (s *testBalanceRegionSchedulerSuite) TestStoreWeight(c *C) {
tc := mockcluster.NewCluster(opt)
oc := schedule.NewOperatorController(nil, nil)

sb, err := schedule.CreateScheduler("balance-region", oc, core.NewStorage(kv.NewMemoryKV()), nil)
sb, err := schedule.CreateScheduler("balance-region", oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder("balance-region", []string{"", ""}))
c.Assert(err, IsNil)
opt.SetMaxReplicas(1)

Expand All @@ -647,7 +647,7 @@ func (s *testBalanceRegionSchedulerSuite) TestReplacePendingRegion(c *C) {

newTestReplication(opt, 3, "zone", "rack", "host")

sb, err := schedule.CreateScheduler("balance-region", oc, core.NewStorage(kv.NewMemoryKV()), nil)
sb, err := schedule.CreateScheduler("balance-region", oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder("balance-region", []string{"", ""}))
c.Assert(err, IsNil)

// Store 1 has the largest region score, so the balancer try to replace peer in store 1.
Expand Down Expand Up @@ -1004,7 +1004,7 @@ func (s *testRandomMergeSchedulerSuite) TestMerge(c *C) {
hb := mockhbstream.NewHeartbeatStreams(tc.ID)
oc := schedule.NewOperatorController(tc, hb)

mb, err := schedule.CreateScheduler("random-merge", oc, core.NewStorage(kv.NewMemoryKV()), nil)
mb, err := schedule.CreateScheduler("random-merge", oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder("random-merge", []string{"", ""}))
c.Assert(err, IsNil)

tc.AddRegionStore(1, 4)
Expand Down
12 changes: 6 additions & 6 deletions server/schedulers/evict_leader.go
Expand Up @@ -41,14 +41,14 @@ func init() {
if err != nil {
return errors.WithStack(err)
}
ranges, err:=getKeyRanges(args[1:])
ranges, err := getKeyRanges(args[1:])
if err != nil {
return errors.WithStack(err)
return errors.WithStack(err)
}
name := fmt.Sprintf("evict-leader-scheduler-%d", id)
conf.StoreID = id
conf.Name = name
conf.Ranges=ranges
conf.Ranges = ranges
return nil

}
Expand All @@ -62,9 +62,9 @@ func init() {
}

type evictLeaderSchedulerConfig struct {
Name string `json:"name"`
StoreID uint64 `json:"store-id"`
Ranges []core.KeyRange `json:"ranges"`
Name string `json:"name"`
StoreID uint64 `json:"store-id"`
Ranges []core.KeyRange `json:"ranges"`
}

type evictLeaderScheduler struct {
Expand Down

0 comments on commit 058ba9f

Please sign in to comment.