Skip to content

Commit

Permalink
Merge branch 'master' into resource_manager/support_run_away
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] committed May 25, 2023
2 parents 1686306 + f8ca1e8 commit 53bc6ac
Show file tree
Hide file tree
Showing 41 changed files with 2,438 additions and 2,150 deletions.
8 changes: 3 additions & 5 deletions client/tso_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,17 +488,15 @@ func (c *tsoServiceDiscovery) updateMember() error {
}
}

oldPrimary, primarySwitched, secondaryChanged :=
oldPrimary, primarySwitched, _ :=
c.keyspaceGroupSD.update(keyspaceGroup, primaryAddr, secondaryAddrs, addrs)
if primarySwitched {
log.Info("[tso] updated keyspace group service discovery info",
zap.String("keyspace-group-service", keyspaceGroup.String()))
if err := c.afterPrimarySwitched(oldPrimary, primaryAddr); err != nil {
return err
}
}
if primarySwitched || secondaryChanged {
log.Info("[tso] updated keyspace group service discovery info",
zap.String("keyspace-group-service", keyspaceGroup.String()))
}

// Even if the primary address is empty, we still updated other returned info above, including the
// keyspace group info and the secondary addresses.
Expand Down
12 changes: 10 additions & 2 deletions metrics/grafana/pd.json
Original file line number Diff line number Diff line change
Expand Up @@ -8718,12 +8718,20 @@
"steppedLine": false,
"targets": [
{
"expr": "etcd_debugging_mvcc_db_total_size_in_bytes{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", job=\"pd\"}",
"expr": "etcd_mvcc_db_total_size_in_bytes{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", job=\"pd\"}",
"format": "time_series",
"hide": false,
"intervalFactor": 1,
"legendFormat": "{{instance}}",
"legendFormat": "{{instance}}-physically-allocated",
"refId": "A"
},
{
"expr": "etcd_mvcc_db_total_size_in_use_in_bytes{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", job=\"pd\"}",
"format": "time_series",
"hide": false,
"intervalFactor": 1,
"legendFormat": "{{instance}}-logically-in-use",
"refId": "B"
}
],
"thresholds": [],
Expand Down
16 changes: 16 additions & 0 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,13 +649,27 @@ func (manager *Manager) allocID() (uint32, error) {
// PatrolKeyspaceAssignment is used to patrol all keyspaces and assign them to the keyspace groups.
func (manager *Manager) PatrolKeyspaceAssignment() error {
var (
// Some statistics info.
start = time.Now()
patrolledKeyspaceCount uint64
assignedKeyspaceCount uint64
// The current start ID of the patrol, used for logging.
currentStartID = manager.nextPatrolStartID
// The next start ID of the patrol, used for the next patrol.
nextStartID = currentStartID
moreToPatrol = true
err error
)
defer func() {
log.Debug("[keyspace] patrol keyspace assignment finished",
zap.Duration("cost", time.Since(start)),
zap.Uint64("patrolled-keyspace-count", patrolledKeyspaceCount),
zap.Uint64("assigned-keyspace-count", assignedKeyspaceCount),
zap.Int("batch-size", keyspacePatrolBatchSize),
zap.Uint32("current-start-id", currentStartID),
zap.Uint32("next-start-id", nextStartID),
)
}()
for moreToPatrol {
err = manager.store.RunInTxn(manager.ctx, func(txn kv.Txn) error {
defaultKeyspaceGroup, err := manager.kgm.store.LoadKeyspaceGroup(txn, utils.DefaultKeyspaceGroupID)
Expand Down Expand Up @@ -694,6 +708,7 @@ func (manager *Manager) PatrolKeyspaceAssignment() error {
if ks == nil {
continue
}
patrolledKeyspaceCount++
manager.metaLock.Lock(ks.Id)
if ks.Config == nil {
ks.Config = make(map[string]string, 1)
Expand All @@ -720,6 +735,7 @@ func (manager *Manager) PatrolKeyspaceAssignment() error {
zap.Uint32("keyspace-id", ks.Id), zap.Error(err))
return err
}
assignedKeyspaceCount++
}
if assigned {
err = manager.kgm.store.SaveKeyspaceGroup(txn, defaultKeyspaceGroup)
Expand Down
44 changes: 44 additions & 0 deletions pkg/keyspace/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,3 +434,47 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() {
re.Contains(defaultKeyspaceGroup.Keyspaces, uint32(i))
}
}

// Benchmark the keyspace assignment patrol.
func BenchmarkPatrolKeyspaceAssignment1000(b *testing.B) {
benchmarkPatrolKeyspaceAssignmentN(1000, b)
}

func BenchmarkPatrolKeyspaceAssignment10000(b *testing.B) {
benchmarkPatrolKeyspaceAssignmentN(10000, b)
}

func BenchmarkPatrolKeyspaceAssignment100000(b *testing.B) {
benchmarkPatrolKeyspaceAssignmentN(100000, b)
}

func benchmarkPatrolKeyspaceAssignmentN(
n int, b *testing.B,
) {
suite := new(keyspaceTestSuite)
suite.SetT(&testing.T{})
suite.SetupSuite()
suite.SetupTest()
re := suite.Require()
// Create some keyspaces without any keyspace group.
for i := 1; i <= n; i++ {
now := time.Now().Unix()
err := suite.manager.saveNewKeyspace(&keyspacepb.KeyspaceMeta{
Id: uint32(i),
Name: strconv.Itoa(i),
State: keyspacepb.KeyspaceState_ENABLED,
CreatedAt: now,
StateChangedAt: now,
})
re.NoError(err)
}
// Benchmark the keyspace assignment patrol.
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := suite.manager.PatrolKeyspaceAssignment()
re.NoError(err)
}
b.StopTimer()
suite.TearDownTest()
suite.TearDownSuite()
}
75 changes: 59 additions & 16 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/pkg/errors"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/storelimit"
"github.com/tikv/pd/pkg/errs"
Expand Down Expand Up @@ -63,7 +64,7 @@ type Cluster struct {

// NewCluster creates a new Cluster
func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster {
clus := &Cluster{
c := &Cluster{
ctx: ctx,
BasicCluster: core.NewBasicCluster(),
IDAllocator: mockid.NewIDAllocator(),
Expand All @@ -74,13 +75,13 @@ func NewCluster(ctx context.Context, opts *config.PersistOptions) *Cluster {
StoreConfigManager: config.NewTestStoreConfigManager(nil),
Storage: storage.NewStorageWithMemoryBackend(),
}
if clus.PersistOptions.GetReplicationConfig().EnablePlacementRules {
clus.initRuleManager()
if c.PersistOptions.GetReplicationConfig().EnablePlacementRules {
c.initRuleManager()
}
// It should be updated to the latest feature version.
clus.PersistOptions.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.HotScheduleWithQuery))
clus.RegionLabeler, _ = labeler.NewRegionLabeler(ctx, storage.NewStorageWithMemoryBackend(), time.Second*5)
return clus
c.PersistOptions.SetClusterVersion(versioninfo.MinSupportedVersion(versioninfo.HotScheduleWithQuery))
c.RegionLabeler, _ = labeler.NewRegionLabeler(ctx, c.Storage, time.Second*5)
return c
}

// GetStoreConfig returns the store config.
Expand All @@ -93,16 +94,35 @@ func (mc *Cluster) GetOpts() sc.Config {
return mc.PersistOptions
}

// GetStorage returns the storage.
func (mc *Cluster) GetStorage() storage.Storage {
return mc.Storage
}

// GetAllocator returns the ID allocator.
func (mc *Cluster) GetAllocator() id.Allocator {
return mc.IDAllocator
}

// GetStorage returns the storage.
func (mc *Cluster) GetStorage() storage.Storage {
return mc.Storage
// IsUnsafeRecovering returns if the cluster is in unsafe recovering.
func (mc *Cluster) IsUnsafeRecovering() bool {
return false
}

// GetPersistOptions returns the persist options.
func (mc *Cluster) GetPersistOptions() *config.PersistOptions {
return mc.PersistOptions
}

// UpdateRegionsLabelLevelStats updates the label level stats for the regions.
func (mc *Cluster) UpdateRegionsLabelLevelStats(regions []*core.RegionInfo) {}

// IsSchedulerExisted checks if the scheduler with name is existed or not.
func (mc *Cluster) IsSchedulerExisted(name string) (bool, error) { return false, nil }

// IsSchedulerDisabled checks if the scheduler with name is disabled or not.
func (mc *Cluster) IsSchedulerDisabled(name string) (bool, error) { return false, nil }

// ScanRegions scans region with start key, until number greater than limit.
func (mc *Cluster) ScanRegions(startKey, endKey []byte, limit int) []*core.RegionInfo {
return mc.ScanRange(startKey, endKey, limit)
Expand Down Expand Up @@ -195,7 +215,7 @@ func (mc *Cluster) AllocPeer(storeID uint64) (*metapb.Peer, error) {

func (mc *Cluster) initRuleManager() {
if mc.RuleManager == nil {
mc.RuleManager = placement.NewRuleManager(storage.NewStorageWithMemoryBackend(), mc, mc.GetOpts())
mc.RuleManager = placement.NewRuleManager(mc.GetStorage(), mc, mc.GetOpts())
mc.RuleManager.Initialize(int(mc.GetReplicationConfig().MaxReplicas), mc.GetReplicationConfig().LocationLabels)
}
}
Expand Down Expand Up @@ -259,6 +279,22 @@ func (mc *Cluster) SetStoreBusy(storeID uint64, busy bool) {
mc.PutStore(newStore)
}

// BuryStore marks a store as tombstone in cluster.
func (mc *Cluster) BuryStore(storeID uint64, forceBury bool) error {
store := mc.GetStore(storeID)
if store.IsUp() {
if !forceBury {
return errs.ErrStoreIsUp.FastGenByArgs()
} else if !store.IsDisconnected() {
return errors.Errorf("The store %v is not offline nor disconnected", storeID)
}
}

newStore := store.Clone(core.TombstoneStore())
mc.PutStore(newStore)
return nil
}

// AddLeaderStore adds store with specified count of leader.
func (mc *Cluster) AddLeaderStore(storeID uint64, leaderCount int, leaderSizes ...int64) {
stats := &pdpb.StoreStats{}
Expand All @@ -285,7 +321,13 @@ func (mc *Cluster) AddLeaderStore(storeID uint64, leaderCount int, leaderSizes .
}

// AddRegionStore adds store with specified count of region.
func (mc *Cluster) AddRegionStore(storeID uint64, regionCount int) {
func (mc *Cluster) AddRegionStore(storeID uint64, regionCount int, regionSizes ...uint64) {
var regionSize uint64
if len(regionSizes) == 0 {
regionSize = uint64(int64(regionCount) * defaultRegionSize / units.MiB)
} else {
regionSize = regionSizes[0]
}
stats := &pdpb.StoreStats{}
stats.Capacity = defaultStoreCapacity
stats.UsedSize = uint64(regionCount) * defaultRegionSize
Expand All @@ -299,8 +341,8 @@ func (mc *Cluster) AddRegionStore(storeID uint64, regionCount int) {
}},
core.SetStoreStats(stats),
core.SetRegionCount(regionCount),
core.SetRegionSize(int64(regionCount)*defaultRegionSize/units.MiB),
core.SetLastHeartbeatTS(time.Now()),
core.SetRegionSize(int64(regionSize)),
)
mc.SetStoreLimit(storeID, storelimit.AddPeer, 60)
mc.SetStoreLimit(storeID, storelimit.RemovePeer, 60)
Expand Down Expand Up @@ -522,6 +564,11 @@ func (mc *Cluster) AddLeaderRegionWithWriteInfo(
return items
}

// DropCacheAllRegion removes all regions from the cache.
func (mc *Cluster) DropCacheAllRegion() {
mc.ResetRegionCache()
}

// UpdateStoreLeaderWeight updates store leader weight.
func (mc *Cluster) UpdateStoreLeaderWeight(storeID uint64, weight float64) {
store := mc.GetStore(storeID)
Expand Down Expand Up @@ -822,10 +869,6 @@ func (mc *Cluster) AddSuspectRegions(ids ...uint64) {
}
}

// SetHotPendingInfluenceMetrics mock method
func (mc *Cluster) SetHotPendingInfluenceMetrics(storeLabel, rwTy, dim string, load float64) {
}

// GetBasicCluster mock method
func (mc *Cluster) GetBasicCluster() *core.BasicCluster {
return mc.BasicCluster
Expand Down
3 changes: 1 addition & 2 deletions pkg/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/tikv/pd/pkg/cache"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/schedule"
"github.com/tikv/pd/pkg/schedule/config"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/labeler"
Expand All @@ -34,7 +33,7 @@ import (
// DefaultCacheSize is the default length of waiting list.
const DefaultCacheSize = 1000

var denyCheckersByLabelerCounter = schedule.LabelerEventCounter.WithLabelValues("checkers", "deny")
var denyCheckersByLabelerCounter = labeler.LabelerEventCounter.WithLabelValues("checkers", "deny")

// Controller is used to manage all checkers.
type Controller struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/schedule/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type Config interface {
IsDebugMetricsEnabled() bool
GetClusterVersion() *semver.Version
GetStoreLimitVersion() string
IsDiagnosticAllowed() bool
// for test purpose
SetPlacementRuleEnabled(bool)
SetSplitMergeInterval(time.Duration)
Expand Down

0 comments on commit 53bc6ac

Please sign in to comment.