Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: decouple coordinator with server #6503

Merged
merged 4 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
65 changes: 54 additions & 11 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/pkg/errors"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/storelimit"
"github.com/tikv/pd/pkg/errs"
Expand Down Expand Up @@ -79,7 +80,7 @@
}
// It should be updated to the latest feature version.
clus.PersistOptions.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.HotScheduleWithQuery))
clus.RegionLabeler, _ = labeler.NewRegionLabeler(ctx, storage.NewStorageWithMemoryBackend(), time.Second*5)
clus.RegionLabeler, _ = labeler.NewRegionLabeler(ctx, clus.Storage, time.Second*5)
rleungx marked this conversation as resolved.
Show resolved Hide resolved
return clus
}

Expand All @@ -93,16 +94,35 @@
return mc.PersistOptions
}

// GetStorage returns the storage.
func (mc *Cluster) GetStorage() storage.Storage {
return mc.Storage
}

// GetAllocator returns the ID allocator.
func (mc *Cluster) GetAllocator() id.Allocator {
return mc.IDAllocator
}

// GetStorage returns the storage.
func (mc *Cluster) GetStorage() storage.Storage {
return mc.Storage
// IsUnsafeRecovering returns if the cluster is in unsafe recovering.
func (mc *Cluster) IsUnsafeRecovering() bool {
return false
}

// GetPersistOptions returns the persist options.
func (mc *Cluster) GetPersistOptions() *config.PersistOptions {
return mc.PersistOptions
}

// UpdateRegionsLabelLevelStats updates the label level stats for the regions.
func (mc *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) {}

Check warning on line 118 in pkg/mock/mockcluster/mockcluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mock/mockcluster/mockcluster.go#L118

Added line #L118 was not covered by tests

// IsSchedulerExisted checks if the scheduler with name is existed or not.
func (mc *Cluster) IsSchedulerExisted(name string) (bool, error) { return false, nil }

Check warning on line 121 in pkg/mock/mockcluster/mockcluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mock/mockcluster/mockcluster.go#L121

Added line #L121 was not covered by tests

// IsSchedulerDisabled checks if the scheduler with name is disabled or not.
func (mc *Cluster) IsSchedulerDisabled(name string) (bool, error) { return false, nil }

Check warning on line 124 in pkg/mock/mockcluster/mockcluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mock/mockcluster/mockcluster.go#L124

Added line #L124 was not covered by tests

// ScanRegions scans region with start key, until number greater than limit.
func (mc *Cluster) ScanRegions(startKey, endKey []byte, limit int) []*core.RegionInfo {
return mc.ScanRange(startKey, endKey, limit)
Expand Down Expand Up @@ -195,7 +215,7 @@

func (mc *Cluster) initRuleManager() {
if mc.RuleManager == nil {
mc.RuleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), mc, mc.GetOpts())
mc.RuleManager = placement.NewRuleManager(mc.GetStorage(), mc, mc.GetOpts())
mc.RuleManager.Initialize(int(mc.GetReplicationConfig().MaxReplicas), mc.GetReplicationConfig().LocationLabels)
}
}
Expand Down Expand Up @@ -259,6 +279,22 @@
mc.PutStore(newStore)
}

// BuryStore marks a store as tombstone in cluster.
func (mc *Cluster) BuryStore(storeID uint64, forceBury bool) error {
store := mc.GetStore(storeID)
if store.IsUp() {
if !forceBury {
return errs.ErrStoreIsUp.FastGenByArgs()

Check warning on line 287 in pkg/mock/mockcluster/mockcluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mock/mockcluster/mockcluster.go#L287

Added line #L287 was not covered by tests
} else if !store.IsDisconnected() {
return errors.Errorf("The store %v is not offline nor disconnected", storeID)

Check warning on line 289 in pkg/mock/mockcluster/mockcluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mock/mockcluster/mockcluster.go#L289

Added line #L289 was not covered by tests
}
}

newStore := store.Clone(core.TombstoneStore())
mc.PutStore(newStore)
return nil
}

// AddLeaderStore adds store with specified count of leader.
func (mc *Cluster) AddLeaderStore(storeID uint64, leaderCount int, leaderSizes ...int64) {
stats := &pdpb.StoreStats{}
Expand All @@ -285,7 +321,13 @@
}

// AddRegionStore adds store with specified count of region.
func (mc *Cluster) AddRegionStore(storeID uint64, regionCount int) {
func (mc *Cluster) AddRegionStore(storeID uint64, regionCount int, regionSizes ...uint64) {
var regionSize uint64
if len(regionSizes) == 0 {
regionSize = uint64(int64(regionCount) * defaultRegionSize / units.MiB)
} else {
regionSize = regionSizes[0]

Check warning on line 329 in pkg/mock/mockcluster/mockcluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mock/mockcluster/mockcluster.go#L329

Added line #L329 was not covered by tests
}
stats := &pdpb.StoreStats{}
stats.Capacity = defaultStoreCapacity
stats.UsedSize = uint64(regionCount) * defaultRegionSize
Expand All @@ -299,8 +341,8 @@
}},
core.SetStoreStats(stats),
core.SetRegionCount(regionCount),
core.SetRegionSize(int64(regionCount)*defaultRegionSize/units.MiB),
core.SetLastHeartbeatTS(time.Now()),
core.SetRegionSize(int64(regionSize)),
)
mc.SetStoreLimit(storeID, storelimit.AddPeer, 60)
mc.SetStoreLimit(storeID, storelimit.RemovePeer, 60)
Expand Down Expand Up @@ -522,6 +564,11 @@
return items
}

// DropCacheAllRegion removes all regions from the cache.
func (mc *Cluster) DropCacheAllRegion() {
mc.ResetRegionCache()
}

// UpdateStoreLeaderWeight updates store leader weight.
func (mc *Cluster) UpdateStoreLeaderWeight(storeID uint64, weight float64) {
store := mc.GetStore(storeID)
Expand Down Expand Up @@ -822,10 +869,6 @@
}
}

// SetHotPendingInfluenceMetrics mock method
func (mc *Cluster) SetHotPendingInfluenceMetrics(storeLabel, rwTy, dim string, load float64) {
}

// GetBasicCluster mock method
func (mc *Cluster) GetBasicCluster() *core.BasicCluster {
return mc.BasicCluster
Expand Down
3 changes: 1 addition & 2 deletions pkg/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/tikv/pd/pkg/cache"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/schedule"
"github.com/tikv/pd/pkg/schedule/config"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/labeler"
Expand All @@ -34,7 +33,7 @@ import (
// DefaultCacheSize is the default length of waiting list.
const DefaultCacheSize = 1000

var denyCheckersByLabelerCounter = schedule.LabelerEventCounter.WithLabelValues("checkers", "deny")
var denyCheckersByLabelerCounter = labeler.LabelerEventCounter.WithLabelValues("checkers", "deny")

// Controller is used to manage all checkers.
type Controller struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/schedule/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type Config interface {
IsDebugMetricsEnabled() bool
GetClusterVersion() *semver.Version
GetStoreLimitVersion() string
IsDiagnosticAllowed() bool
// for test purpose
SetPlacementRuleEnabled(bool)
SetSplitMergeInterval(time.Duration)
Expand Down