Skip to content

Commit

Permalink
*: remove opt package totally (#4535)
Browse files Browse the repository at this point in the history
* remove IsFeatureSupported interface

Signed-off-by: Ryan Leung <rleungx@gmail.com>

* remove opt package totally

Signed-off-by: Ryan Leung <rleungx@gmail.com>

* address the comment

Signed-off-by: Ryan Leung <rleungx@gmail.com>

* address comments

Signed-off-by: Ryan Leung <rleungx@gmail.com>

* add lock back

Signed-off-by: Ryan Leung <rleungx@gmail.com>

* rename and simplify

Signed-off-by: Ryan Leung <rleungx@gmail.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
rleungx and ti-chi-bot committed Jan 14, 2022
1 parent 1279d55 commit 3792f07
Show file tree
Hide file tree
Showing 44 changed files with 290 additions and 307 deletions.
13 changes: 7 additions & 6 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/core/storelimit"
"github.com/tikv/pd/server/id"
"github.com/tikv/pd/server/kv"
"github.com/tikv/pd/server/schedule/labeler"
"github.com/tikv/pd/server/schedule/placement"
Expand All @@ -42,7 +43,7 @@ const (
mb = (1 << 20) // 1MiB
)

// Cluster is used to mock clusterInfo for test use.
// Cluster is used to mock a cluster for test purpose.
type Cluster struct {
*core.BasicCluster
*mockid.IDAllocator
Expand Down Expand Up @@ -77,9 +78,9 @@ func (mc *Cluster) GetOpts() *config.PersistOptions {
return mc.PersistOptions
}

// AllocID allocs a new unique ID.
func (mc *Cluster) AllocID() (uint64, error) {
return mc.Alloc()
// GetAllocator returns the ID allocator.
func (mc *Cluster) GetAllocator() id.Allocator {
return mc.IDAllocator
}

// ScanRegions scans region with start key, until number greater than limit.
Expand Down Expand Up @@ -151,7 +152,7 @@ func hotRegionsFromStore(w *statistics.HotCache, storeID uint64, kind statistics

// AllocPeer allocs a new peer on a store.
func (mc *Cluster) AllocPeer(storeID uint64) (*metapb.Peer, error) {
peerID, err := mc.AllocID()
peerID, err := mc.GetAllocator().Alloc()
if err != nil {
log.Error("failed to alloc peer", errs.ZapError(err))
return nil, err
Expand Down Expand Up @@ -285,7 +286,7 @@ func (mc *Cluster) AddRegionStoreWithLeader(storeID uint64, regionCount int, lea
}
mc.AddRegionStore(storeID, regionCount)
for i := 0; i < leaderCount; i++ {
id, _ := mc.AllocID()
id, _ := mc.GetAllocator().Alloc()
mc.AddLeaderRegion(id, storeID)
}
}
Expand Down
11 changes: 5 additions & 6 deletions plugin/scheduler_example/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/tikv/pd/server/schedule"
"github.com/tikv/pd/server/schedule/filter"
"github.com/tikv/pd/server/schedule/operator"
"github.com/tikv/pd/server/schedule/opt"
"github.com/tikv/pd/server/schedulers"
"github.com/unrolled/render"
)
Expand Down Expand Up @@ -93,7 +92,7 @@ type evictLeaderSchedulerConfig struct {
mu sync.RWMutex
storage *core.Storage
StoreIDWitRanges map[uint64][]core.KeyRange `json:"store-id-ranges"`
cluster opt.Cluster
cluster schedule.Cluster
}

func (conf *evictLeaderSchedulerConfig) BuildWithArgs(args []string) error {
Expand Down Expand Up @@ -184,7 +183,7 @@ func (s *evictLeaderScheduler) EncodeConfig() ([]byte, error) {
return schedule.EncodeConfig(s.conf)
}

func (s *evictLeaderScheduler) Prepare(cluster opt.Cluster) error {
func (s *evictLeaderScheduler) Prepare(cluster schedule.Cluster) error {
s.conf.mu.RLock()
defer s.conf.mu.RUnlock()
var res error
Expand All @@ -196,23 +195,23 @@ func (s *evictLeaderScheduler) Prepare(cluster opt.Cluster) error {
return res
}

func (s *evictLeaderScheduler) Cleanup(cluster opt.Cluster) {
func (s *evictLeaderScheduler) Cleanup(cluster schedule.Cluster) {
s.conf.mu.RLock()
defer s.conf.mu.RUnlock()
for id := range s.conf.StoreIDWitRanges {
cluster.ResumeLeaderTransfer(id)
}
}

func (s *evictLeaderScheduler) IsScheduleAllowed(cluster opt.Cluster) bool {
func (s *evictLeaderScheduler) IsScheduleAllowed(cluster schedule.Cluster) bool {
allowed := s.OpController.OperatorCount(operator.OpLeader) < cluster.GetOpts().GetLeaderScheduleLimit()
if !allowed {
operator.OperatorLimitCounter.WithLabelValues(s.GetType(), operator.OpLeader.String()).Inc()
}
return allowed
}

func (s *evictLeaderScheduler) Schedule(cluster opt.Cluster) []*operator.Operator {
func (s *evictLeaderScheduler) Schedule(cluster schedule.Cluster) []*operator.Operator {
ops := make([]*operator.Operator, 0, len(s.conf.StoreIDWitRanges))
s.conf.mu.RLock()
defer s.conf.mu.RUnlock()
Expand Down
6 changes: 3 additions & 3 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1402,9 +1402,9 @@ func (c *RaftCluster) getStoresWithoutLabelLocked(region *core.RegionInfo, key,
return stores
}

// AllocID allocs ID.
func (c *RaftCluster) AllocID() (uint64, error) {
return c.id.Alloc()
// GetAllocator returns cluster's id allocator.
func (c *RaftCluster) GetAllocator() id.Allocator {
return c.id
}

// OnStoreVersionChange changes the version of the cluster when needed.
Expand Down
5 changes: 2 additions & 3 deletions server/cluster/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/tikv/pd/server/schedule"
"github.com/tikv/pd/server/schedule/hbstream"
"github.com/tikv/pd/server/schedule/operator"
"github.com/tikv/pd/server/schedule/opt"
"github.com/tikv/pd/server/schedulers"
"github.com/tikv/pd/server/statistics"
)
Expand All @@ -46,7 +45,7 @@ func newTestOperator(regionID uint64, regionEpoch *metapb.RegionEpoch, kind oper
}

func (c *testCluster) AllocPeer(storeID uint64) (*metapb.Peer, error) {
id, err := c.AllocID()
id, err := c.GetAllocator().Alloc()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1095,7 +1094,7 @@ type mockLimitScheduler struct {
kind operator.OpKind
}

func (s *mockLimitScheduler) IsScheduleAllowed(cluster opt.Cluster) bool {
func (s *mockLimitScheduler) IsScheduleAllowed(cluster schedule.Cluster) bool {
return s.counter.OperatorCount(s.kind) < s.limit
}

Expand Down
10 changes: 5 additions & 5 deletions server/cluster/unsafe_recovery_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func (u *unsafeRecoveryController) generateRecoveryPlan() {
newRegion.StartKey = lastEnd
newRegion.EndKey = overlapRegion.StartKey
if _, inUse := inUseRegions[region.Id]; inUse {
newRegion.Id, _ = u.cluster.AllocID()
newRegion.Id, _ = u.cluster.GetAllocator().Alloc()
creates = append(creates, newRegion)
} else {
inUseRegions[region.Id] = true
Expand All @@ -337,7 +337,7 @@ func (u *unsafeRecoveryController) generateRecoveryPlan() {
newRegion.StartKey = lastEnd
newRegion.EndKey = region.EndKey
if _, inUse := inUseRegions[region.Id]; inUse {
newRegion.Id, _ = u.cluster.AllocID()
newRegion.Id, _ = u.cluster.GetAllocator().Alloc()
creates = append(creates, newRegion)
} else {
inUseRegions[region.Id] = true
Expand Down Expand Up @@ -374,7 +374,7 @@ func (u *unsafeRecoveryController) generateRecoveryPlan() {
newRegion := &metapb.Region{}
newRegion.StartKey = lastEnd
newRegion.EndKey = region.StartKey
newRegion.Id, _ = u.cluster.AllocID()
newRegion.Id, _ = u.cluster.GetAllocator().Alloc()
newRegion.RegionEpoch = &metapb.RegionEpoch{ConfVer: 1, Version: 1}
creates = append(creates, newRegion)
}
Expand All @@ -384,7 +384,7 @@ func (u *unsafeRecoveryController) generateRecoveryPlan() {
if !bytes.Equal(lastEnd, []byte("")) {
newRegion := &metapb.Region{}
newRegion.StartKey = lastEnd
newRegion.Id, _ = u.cluster.AllocID()
newRegion.Id, _ = u.cluster.GetAllocator().Alloc()
creates = append(creates, newRegion)
}
var allStores []uint64
Expand All @@ -393,7 +393,7 @@ func (u *unsafeRecoveryController) generateRecoveryPlan() {
}
for idx, create := range creates {
storeID := allStores[idx%len(allStores)]
peerID, _ := u.cluster.AllocID()
peerID, _ := u.cluster.GetAllocator().Alloc()
create.Peers = []*metapb.Peer{{Id: peerID, StoreId: storeID, Role: metapb.PeerRole_Voter}}
storeRecoveryPlan, exists := u.storeRecoveryPlans[storeID]
if !exists {
Expand Down
12 changes: 6 additions & 6 deletions server/replication/replication_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/schedule/opt"
"github.com/tikv/pd/server/schedule"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -67,7 +67,7 @@ type ModeManager struct {
sync.RWMutex
config config.ReplicationModeConfig
storage *core.Storage
cluster opt.Cluster
cluster schedule.Cluster
fileReplicater FileReplicater
replicatedMembers []uint64

Expand All @@ -86,7 +86,7 @@ type ModeManager struct {
}

// NewReplicationModeManager creates the replicate mode manager.
func NewReplicationModeManager(config config.ReplicationModeConfig, storage *core.Storage, cluster opt.Cluster, fileReplicater FileReplicater) (*ModeManager, error) {
func NewReplicationModeManager(config config.ReplicationModeConfig, storage *core.Storage, cluster schedule.Cluster, fileReplicater FileReplicater) (*ModeManager, error) {
m := &ModeManager{
initTime: time.Now(),
config: config,
Expand Down Expand Up @@ -254,7 +254,7 @@ func (m *ModeManager) drSwitchToAsync() error {
}

func (m *ModeManager) drSwitchToAsyncWithLock() error {
id, err := m.cluster.AllocID()
id, err := m.cluster.GetAllocator().Alloc()
if err != nil {
log.Warn("failed to switch to async state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err))
return err
Expand All @@ -277,7 +277,7 @@ func (m *ModeManager) drSwitchToSyncRecover() error {
}

func (m *ModeManager) drSwitchToSyncRecoverWithLock() error {
id, err := m.cluster.AllocID()
id, err := m.cluster.GetAllocator().Alloc()
if err != nil {
log.Warn("failed to switch to sync_recover state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err))
return err
Expand All @@ -298,7 +298,7 @@ func (m *ModeManager) drSwitchToSyncRecoverWithLock() error {
func (m *ModeManager) drSwitchToSync() error {
m.Lock()
defer m.Unlock()
id, err := m.cluster.AllocID()
id, err := m.cluster.GetAllocator().Alloc()
if err != nil {
log.Warn("failed to switch to sync state", zap.String("replicate-mode", modeDRAutoSync), errs.ZapError(err))
return err
Expand Down
5 changes: 2 additions & 3 deletions server/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/tikv/pd/server/schedule"
"github.com/tikv/pd/server/schedule/labeler"
"github.com/tikv/pd/server/schedule/operator"
"github.com/tikv/pd/server/schedule/opt"
"github.com/tikv/pd/server/schedule/placement"
)

Expand All @@ -33,7 +32,7 @@ const DefaultCacheSize = 1000

// Controller is used to manage all checkers.
type Controller struct {
cluster opt.Cluster
cluster schedule.Cluster
opts *config.PersistOptions
opController *schedule.OperatorController
learnerChecker *LearnerChecker
Expand All @@ -48,7 +47,7 @@ type Controller struct {

// NewController create a new Controller.
// TODO: isSupportMerge should be removed.
func NewController(ctx context.Context, cluster opt.Cluster, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *schedule.OperatorController) *Controller {
func NewController(ctx context.Context, cluster schedule.Cluster, ruleManager *placement.RuleManager, labeler *labeler.RegionLabeler, opController *schedule.OperatorController) *Controller {
regionWaitingList := cache.NewDefaultCache(DefaultCacheSize)
return &Controller{
cluster: cluster,
Expand Down
6 changes: 3 additions & 3 deletions server/schedule/checker/joint_state_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/schedule"
"github.com/tikv/pd/server/schedule/operator"
"github.com/tikv/pd/server/schedule/opt"
)

// JointStateChecker ensures region is in joint state will leave.
type JointStateChecker struct {
PauseController
cluster opt.Cluster
cluster schedule.Cluster
}

// NewJointStateChecker creates a joint state checker.
func NewJointStateChecker(cluster opt.Cluster) *JointStateChecker {
func NewJointStateChecker(cluster schedule.Cluster) *JointStateChecker {
return &JointStateChecker{
cluster: cluster,
}
Expand Down
6 changes: 3 additions & 3 deletions server/schedule/checker/learner_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/schedule"
"github.com/tikv/pd/server/schedule/operator"
"github.com/tikv/pd/server/schedule/opt"
)

// LearnerChecker ensures region has a learner will be promoted.
type LearnerChecker struct {
PauseController
cluster opt.Cluster
cluster schedule.Cluster
}

// NewLearnerChecker creates a learner checker.
func NewLearnerChecker(cluster opt.Cluster) *LearnerChecker {
func NewLearnerChecker(cluster schedule.Cluster) *LearnerChecker {
return &LearnerChecker{
cluster: cluster,
}
Expand Down
9 changes: 4 additions & 5 deletions server/schedule/checker/merge_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/tikv/pd/server/schedule"
"github.com/tikv/pd/server/schedule/labeler"
"github.com/tikv/pd/server/schedule/operator"
"github.com/tikv/pd/server/schedule/opt"
"github.com/tikv/pd/server/schedule/placement"
)

Expand All @@ -45,14 +44,14 @@ const (
// MergeChecker ensures region to merge with adjacent region when size is small
type MergeChecker struct {
PauseController
cluster opt.Cluster
cluster schedule.Cluster
opts *config.PersistOptions
splitCache *cache.TTLUint64
startTime time.Time // it's used to judge whether server recently start.
}

// NewMergeChecker creates a merge checker.
func NewMergeChecker(ctx context.Context, cluster opt.Cluster) *MergeChecker {
func NewMergeChecker(ctx context.Context, cluster schedule.Cluster) *MergeChecker {
opts := cluster.GetOpts()
splitCache := cache.NewIDTTL(ctx, time.Minute, opts.GetSplitMergeInterval())
return &MergeChecker{
Expand Down Expand Up @@ -207,7 +206,7 @@ func (m *MergeChecker) checkTarget(region, adjacent *core.RegionInfo) bool {
}

// AllowMerge returns true if two regions can be merged according to the key type.
func AllowMerge(cluster opt.Cluster, region, adjacent *core.RegionInfo) bool {
func AllowMerge(cluster schedule.Cluster, region, adjacent *core.RegionInfo) bool {
var start, end []byte
if bytes.Equal(region.GetEndKey(), adjacent.GetStartKey()) && len(region.GetEndKey()) != 0 {
start, end = region.GetStartKey(), adjacent.GetEndKey()
Expand Down Expand Up @@ -263,7 +262,7 @@ func isTableIDSame(region, adjacent *core.RegionInfo) bool {
// Check whether there is a peer of the adjacent region on an offline store,
// while the source region has no peer on it. This is to prevent from bringing
// any other peer into an offline store to slow down the offline process.
func checkPeerStore(cluster opt.Cluster, region, adjacent *core.RegionInfo) bool {
func checkPeerStore(cluster schedule.Cluster, region, adjacent *core.RegionInfo) bool {
regionStoreIDs := region.GetStoreIds()
for _, peer := range adjacent.GetPeers() {
storeID := peer.GetStoreId()
Expand Down
6 changes: 3 additions & 3 deletions server/schedule/checker/priority_inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/tikv/pd/pkg/cache"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/schedule/opt"
"github.com/tikv/pd/server/schedule"
"github.com/tikv/pd/server/schedule/placement"
)

Expand All @@ -29,13 +29,13 @@ const defaultPriorityQueueSize = 1280

// PriorityInspector ensures high priority region should run first
type PriorityInspector struct {
cluster opt.Cluster
cluster schedule.Cluster
opts *config.PersistOptions
queue *cache.PriorityQueue
}

// NewPriorityInspector creates a priority inspector.
func NewPriorityInspector(cluster opt.Cluster) *PriorityInspector {
func NewPriorityInspector(cluster schedule.Cluster) *PriorityInspector {
return &PriorityInspector{
cluster: cluster,
opts: cluster.GetOpts(),
Expand Down

0 comments on commit 3792f07

Please sign in to comment.