Skip to content

Commit

Permalink
*: decouple coordinator with server (tikv#6503)
Browse files Browse the repository at this point in the history
ref tikv#5839, ref tikv#6418

Signed-off-by: Ryan Leung <rleungx@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
rleungx and ti-chi-bot[bot] committed Nov 30, 2023
1 parent f701ab9 commit dfd801d
Show file tree
Hide file tree
Showing 32 changed files with 2,223 additions and 2,109 deletions.
75 changes: 59 additions & 16 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/pkg/errors"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/storelimit"
"github.com/tikv/pd/pkg/errs"
Expand Down Expand Up @@ -63,7 +64,7 @@ type Cluster struct {

// NewCluster creates a new Cluster
func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster {
clus := &Cluster{
c := &Cluster{
ctx: ctx,
BasicCluster: core.NewBasicCluster(),
IDAllocator: mockid.NewIDAllocator(),
Expand All @@ -74,13 +75,13 @@ func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster {
StoreConfigManager: config.NewTestStoreConfigManager(nil),
Storage: storage.NewStorageWithMemoryBackend(),
}
if clus.PersistOptions.GetReplicationConfig().EnablePlacementRules {
clus.initRuleManager()
if c.PersistOptions.GetReplicationConfig().EnablePlacementRules {
c.initRuleManager()
}
// It should be updated to the latest feature version.
clus.PersistOptions.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.HotScheduleWithQuery))
clus.RegionLabeler, _ = labeler.NewRegionLabeler(ctx, storage.NewStorageWithMemoryBackend(), time.Second*5)
return clus
c.PersistOptions.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.HotScheduleWithQuery))
c.RegionLabeler, _ = labeler.NewRegionLabeler(ctx, c.Storage, time.Second*5)
return c
}

// GetStoreConfig returns the store config.
Expand All @@ -93,16 +94,35 @@ func (mc *Cluster) GetOpts() sc.Config {
return mc.PersistOptions
}

// GetStorage returns the storage.
func (mc *Cluster) GetStorage() storage.Storage {
return mc.Storage
}

// GetAllocator returns the ID allocator.
func (mc *Cluster) GetAllocator() id.Allocator {
return mc.IDAllocator
}

// GetStorage returns the storage.
func (mc *Cluster) GetStorage() storage.Storage {
return mc.Storage
// IsUnsafeRecovering returns if the cluster is in unsafe recovering.
func (mc *Cluster) IsUnsafeRecovering() bool {
return false
}

// GetPersistOptions returns the persist options.
func (mc *Cluster) GetPersistOptions() *config.PersistOptions {
return mc.PersistOptions
}

// UpdateRegionsLabelLevelStats updates the label level stats for the regions.
func (mc *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) {}

// IsSchedulerExisted checks if the scheduler with name is existed or not.
func (mc *Cluster) IsSchedulerExisted(name string) (bool, error) { return false, nil }

// IsSchedulerDisabled checks if the scheduler with name is disabled or not.
func (mc *Cluster) IsSchedulerDisabled(name string) (bool, error) { return false, nil }

// ScanRegions scans region with start key, until number greater than limit.
func (mc *Cluster) ScanRegions(startKey, endKey []byte, limit int) []*core.RegionInfo {
return mc.ScanRange(startKey, endKey, limit)
Expand Down Expand Up @@ -195,7 +215,7 @@ func (mc *Cluster) AllocPeer(storeID uint64) (*metapb.Peer, error) {

func (mc *Cluster) initRuleManager() {
if mc.RuleManager == nil {
mc.RuleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), mc, mc.GetOpts())
mc.RuleManager = placement.NewRuleManager(mc.GetStorage(), mc, mc.GetOpts())
mc.RuleManager.Initialize(int(mc.GetReplicationConfig().MaxReplicas), mc.GetReplicationConfig().LocationLabels, mc.GetReplicationConfig().IsolationLevel)
}
}
Expand Down Expand Up @@ -259,6 +279,22 @@ func (mc *Cluster) SetStoreBusy(storeID uint64, busy bool) {
mc.PutStore(newStore)
}

// BuryStore marks a store as tombstone in cluster.
func (mc *Cluster) BuryStore(storeID uint64, forceBury bool) error {
store := mc.GetStore(storeID)
if store.IsUp() {
if !forceBury {
return errs.ErrStoreIsUp.FastGenByArgs()
} else if !store.IsDisconnected() {
return errors.Errorf("The store %v is not offline nor disconnected", storeID)
}
}

newStore := store.Clone(core.TombstoneStore())
mc.PutStore(newStore)
return nil
}

// AddLeaderStore adds store with specified count of leader.
func (mc *Cluster) AddLeaderStore(storeID uint64, leaderCount int, leaderSizes ...int64) {
stats := &pdpb.StoreStats{}
Expand All @@ -285,7 +321,13 @@ func (mc *Cluster) AddLeaderStore(storeID uint64, leaderCount int, leaderSizes .
}

// AddRegionStore adds store with specified count of region.
func (mc *Cluster) AddRegionStore(storeID uint64, regionCount int) {
func (mc *Cluster) AddRegionStore(storeID uint64, regionCount int, regionSizes ...uint64) {
var regionSize uint64
if len(regionSizes) == 0 {
regionSize = uint64(int64(regionCount) * defaultRegionSize / units.MiB)
} else {
regionSize = regionSizes[0]
}
stats := &pdpb.StoreStats{}
stats.Capacity = defaultStoreCapacity
stats.UsedSize = uint64(regionCount) * defaultRegionSize
Expand All @@ -299,8 +341,8 @@ func (mc *Cluster) AddRegionStore(storeID uint64, regionCount int) {
}},
core.SetStoreStats(stats),
core.SetRegionCount(regionCount),
core.SetRegionSize(int64(regionCount)*defaultRegionSize/units.MiB),
core.SetLastHeartbeatTS(time.Now()),
core.SetRegionSize(int64(regionSize)),
)
mc.SetStoreLimit(storeID, storelimit.AddPeer, 60)
mc.SetStoreLimit(storeID, storelimit.RemovePeer, 60)
Expand Down Expand Up @@ -522,6 +564,11 @@ func (mc *Cluster) AddLeaderRegionWithWriteInfo(
return items
}

// DropCacheAllRegion removes all regions from the cache.
func (mc *Cluster) DropCacheAllRegion() {
mc.ResetRegionCache()
}

// UpdateStoreLeaderWeight updates store leader weight.
func (mc *Cluster) UpdateStoreLeaderWeight(storeID uint64, weight float64) {
store := mc.GetStore(storeID)
Expand Down Expand Up @@ -822,10 +869,6 @@ func (mc *Cluster) AddSuspectRegions(ids ...uint64) {
}
}

// SetHotPendingInfluenceMetrics mock method
func (mc *Cluster) SetHotPendingInfluenceMetrics(storeLabel, rwTy, dim string, load float64) {
}

// GetBasicCluster mock method
func (mc *Cluster) GetBasicCluster() *core.BasicCluster {
return mc.BasicCluster
Expand Down
3 changes: 1 addition & 2 deletions pkg/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/tikv/pd/pkg/cache"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/schedule"
"github.com/tikv/pd/pkg/schedule/config"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/labeler"
Expand All @@ -34,7 +33,7 @@ import (
// DefaultCacheSize is the default length of waiting list.
const DefaultCacheSize = 1000

var denyCheckersByLabelerCounter = schedule.LabelerEventCounter.WithLabelValues("checkers", "deny")
var denyCheckersByLabelerCounter = labeler.LabelerEventCounter.WithLabelValues("checkers", "deny")

// Controller is used to manage all checkers.
type Controller struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/schedule/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type Config interface {
IsDebugMetricsEnabled() bool
GetClusterVersion() *semver.Version
GetStoreLimitVersion() string
IsDiagnosticAllowed() bool
// for test purpose
SetPlacementRuleEnabled(bool)
SetSplitMergeInterval(time.Duration)
Expand Down

0 comments on commit dfd801d

Please sign in to comment.