Skip to content

Commit

Permalink
Merge branch 'master' into region_filter
Browse files Browse the repository at this point in the history
  • Loading branch information
CabinfeverB committed Jul 14, 2022
2 parents 20937fc + 6aaff24 commit 987a1fa
Show file tree
Hide file tree
Showing 26 changed files with 855 additions and 190 deletions.
11 changes: 11 additions & 0 deletions server/api/admin.go
Expand Up @@ -58,6 +58,17 @@ func (h *adminHandler) DeleteRegionCache(w http.ResponseWriter, r *http.Request)
h.rd.JSON(w, http.StatusOK, "The region is removed from server cache.")
}

// @Tags admin
// @Summary Drop all regions from cache.
// @Produce json
// @Success 200 {string} string "All regions are removed from server cache."
// @Router /admin/cache/regions [delete]
func (h *adminHandler) DeleteAllRegionCache(w http.ResponseWriter, r *http.Request) {
rc := getCluster(r)
rc.DropCacheAllRegion()
h.rd.JSON(w, http.StatusOK, "All regions are removed from server cache.")
}

// FIXME: details of input json body params
// @Tags admin
// @Summary Reset the ts.
Expand Down
68 changes: 68 additions & 0 deletions server/api/admin_test.go
Expand Up @@ -94,6 +94,74 @@ func (suite *adminTestSuite) TestDropRegion() {
suite.Equal(uint64(50), region.GetRegionEpoch().Version)
}

func (suite *adminTestSuite) TestDropRegions() {
cluster := suite.svr.GetRaftCluster()

n := uint64(10000)
np := uint64(3)

regions := make([]*core.RegionInfo, 0, n)
for i := uint64(0); i < n; i++ {
peers := make([]*metapb.Peer, 0, np)
for j := uint64(0); j < np; j++ {
peer := &metapb.Peer{
Id: i*np + j,
}
peer.StoreId = (i + j) % n
peers = append(peers, peer)
}
// initialize region's epoch to (100, 100).
region := cluster.GetRegionByKey([]byte(fmt.Sprintf("%d", i))).Clone(
core.SetPeers(peers),
core.SetRegionConfVer(100),
core.SetRegionVersion(100),
)
regions = append(regions, region)

err := cluster.HandleRegionHeartbeat(region)
suite.NoError(err)
}

// Region epoch cannot decrease.
for i := uint64(0); i < n; i++ {
region := regions[i].Clone(
core.SetRegionConfVer(50),
core.SetRegionVersion(50),
)
regions[i] = region
err := cluster.HandleRegionHeartbeat(region)
suite.Error(err)
}

for i := uint64(0); i < n; i++ {
region := cluster.GetRegionByKey([]byte(fmt.Sprintf("%d", i)))

suite.Equal(uint64(100), region.GetRegionEpoch().ConfVer)
suite.Equal(uint64(100), region.GetRegionEpoch().Version)
}

// After drop all regions from cache, lower version is accepted.
url := fmt.Sprintf("%s/admin/cache/regions", suite.urlPrefix)
req, err := http.NewRequest(http.MethodDelete, url, nil)
suite.NoError(err)
res, err := testDialClient.Do(req)
suite.NoError(err)
suite.Equal(http.StatusOK, res.StatusCode)
res.Body.Close()

for _, region := range regions {
err := cluster.HandleRegionHeartbeat(region)
suite.NoError(err)
}

for i := uint64(0); i < n; i++ {
region := cluster.GetRegionByKey([]byte(fmt.Sprintf("%d", i)))

suite.Equal(uint64(50), region.GetRegionEpoch().ConfVer)
suite.Equal(uint64(50), region.GetRegionEpoch().Version)
}
}

func (suite *adminTestSuite) TestPersistFile() {
data := []byte("#!/bin/sh\nrm -rf /")
re := suite.Require()
Expand Down
1 change: 1 addition & 0 deletions server/api/router.go
Expand Up @@ -286,6 +286,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {

adminHandler := newAdminHandler(svr, rd)
registerFunc(clusterRouter, "/admin/cache/region/{id}", adminHandler.DeleteRegionCache, setMethods(http.MethodDelete), setAuditBackend(localLog))
registerFunc(clusterRouter, "/admin/cache/regions", adminHandler.DeleteAllRegionCache, setMethods(http.MethodDelete), setAuditBackend(localLog))
registerFunc(clusterRouter, "/admin/reset-ts", adminHandler.ResetTS, setMethods(http.MethodPost), setAuditBackend(localLog))
registerFunc(apiRouter, "/admin/persist-file/{file_name}", adminHandler.SavePersistFile, setMethods(http.MethodPost), setAuditBackend(localLog))

Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster.go
Expand Up @@ -357,7 +357,7 @@ func (c *RaftCluster) LoadClusterInfo() (*RaftCluster, error) {
start = time.Now()

// used to load region from kv storage to cache storage.
if err := c.storage.LoadRegionsOnce(c.ctx, c.core.CheckAndPutRegion); err != nil {
if err := storage.TryLoadRegionsOnce(c.ctx, c.storage, c.core.CheckAndPutRegion); err != nil {
return nil, err
}
log.Info("load regions",
Expand Down
65 changes: 64 additions & 1 deletion server/cluster/coordinator.go
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/tikv/pd/server/schedule/checker"
"github.com/tikv/pd/server/schedule/hbstream"
"github.com/tikv/pd/server/schedule/operator"
"github.com/tikv/pd/server/schedule/plan"
"github.com/tikv/pd/server/statistics"
"github.com/tikv/pd/server/storage"
"go.uber.org/zap"
Expand Down Expand Up @@ -72,12 +73,14 @@ type coordinator struct {
opController *schedule.OperatorController
hbStreams *hbstream.HeartbeatStreams
pluginInterface *schedule.PluginInterface
diagnosis *diagnosisManager
}

// newCoordinator creates a new coordinator.
func newCoordinator(ctx context.Context, cluster *RaftCluster, hbStreams *hbstream.HeartbeatStreams) *coordinator {
ctx, cancel := context.WithCancel(ctx)
opController := schedule.NewOperatorController(ctx, cluster, hbStreams)
schedulers := make(map[string]*scheduleController)
return &coordinator{
ctx: ctx,
cancel: cancel,
Expand All @@ -86,10 +89,11 @@ func newCoordinator(ctx context.Context, cluster *RaftCluster, hbStreams *hbstre
checkers: checker.NewController(ctx, cluster, cluster.ruleManager, cluster.regionLabeler, opController),
regionScatterer: schedule.NewRegionScatterer(ctx, cluster),
regionSplitter: schedule.NewRegionSplitter(cluster, schedule.NewSplitRegionsHandler(cluster, opController)),
schedulers: make(map[string]*scheduleController),
schedulers: schedulers,
opController: opController,
hbStreams: hbStreams,
pluginInterface: schedule.NewPluginInterface(),
diagnosis: newDiagnosisManager(cluster, schedulers),
}
}

Expand Down Expand Up @@ -901,6 +905,11 @@ func (s *scheduleController) Schedule() []*operator.Operator {
return nil
}

func (s *scheduleController) DiagnoseDryRun() ([]*operator.Operator, []plan.Plan) {
cacheCluster := newCacheCluster(s.cluster)
return s.Scheduler.Schedule(cacheCluster, true)
}

// GetInterval returns the interval of scheduling for a scheduler.
func (s *scheduleController) GetInterval() time.Duration {
return s.nextInterval
Expand Down Expand Up @@ -933,6 +942,60 @@ func (s *scheduleController) GetDelayUntil() int64 {
return 0
}

const maxDiagnosisResultNum = 6

// diagnosisManager is used to manage diagnose mechanism which shares the actual scheduler with coordinator
type diagnosisManager struct {
cluster *RaftCluster
schedulers map[string]*scheduleController
dryRunResult map[string]*cache.FIFO
}

func newDiagnosisManager(cluster *RaftCluster, schedulerControllers map[string]*scheduleController) *diagnosisManager {
return &diagnosisManager{
cluster: cluster,
schedulers: schedulerControllers,
dryRunResult: make(map[string]*cache.FIFO),
}
}

func (d *diagnosisManager) diagnosisDryRun(name string) error {
if _, ok := d.schedulers[name]; !ok {
return errs.ErrSchedulerNotFound.FastGenByArgs()
}
ops, plans := d.schedulers[name].DiagnoseDryRun()
result := newDiagnosisResult(ops, plans)
if _, ok := d.dryRunResult[name]; !ok {
d.dryRunResult[name] = cache.NewFIFO(maxDiagnosisResultNum)
}
queue := d.dryRunResult[name]
queue.Put(result.timestamp, result)
return nil
}

type diagnosisResult struct {
timestamp uint64
unschedulablePlans []plan.Plan
schedulablePlans []plan.Plan
}

func newDiagnosisResult(ops []*operator.Operator, result []plan.Plan) *diagnosisResult {
index := len(ops)
if len(ops) > 0 {
if ops[0].Kind()&operator.OpMerge != 0 {
index /= 2
}
}
if index > len(result) {
return nil
}
return &diagnosisResult{
timestamp: uint64(time.Now().Unix()),
unschedulablePlans: result[index:],
schedulablePlans: result[:index],
}
}

func (c *coordinator) getPausedSchedulerDelayAt(name string) (int64, error) {
c.RLock()
defer c.RUnlock()
Expand Down
11 changes: 11 additions & 0 deletions server/cluster/coordinator_test.go
Expand Up @@ -301,6 +301,17 @@ func checkRegionAndOperator(re *require.Assertions, tc *testCluster, co *coordin
}
}

func TestDiagnosisDryRun(t *testing.T) {
re := require.New(t)

_, co, cleanup := prepare(nil, nil, func(co *coordinator) { co.run() }, re)
defer cleanup()
err := co.diagnosis.diagnosisDryRun(schedulers.EvictLeaderName)
re.Error(err)
err = co.diagnosis.diagnosisDryRun(schedulers.BalanceRegionName)
re.NoError(err)
}

func TestCheckRegion(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
Expand Down
11 changes: 8 additions & 3 deletions server/core/store.go
Expand Up @@ -19,7 +19,6 @@ import (
"strings"
"time"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
Expand Down Expand Up @@ -79,11 +78,17 @@ func NewStoreInfo(store *metapb.Store, opts ...StoreCreateOption) *StoreInfo {
return storeInfo
}

func (s *StoreInfo) cloneMetaStore() *metapb.Store {
b, _ := s.meta.Marshal()
store := &metapb.Store{}
store.Unmarshal(b)
return store
}

// Clone creates a copy of current StoreInfo.
func (s *StoreInfo) Clone(opts ...StoreCreateOption) *StoreInfo {
meta := proto.Clone(s.meta).(*metapb.Store)
store := &StoreInfo{
meta: meta,
meta: s.cloneMetaStore(),
storeStats: s.storeStats,
pauseLeaderTransfer: s.pauseLeaderTransfer,
slowStoreEvicted: s.slowStoreEvicted,
Expand Down
17 changes: 8 additions & 9 deletions server/core/store_option.go
Expand Up @@ -17,7 +17,6 @@ package core
import (
"time"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/tikv/pd/server/core/storelimit"
Expand All @@ -29,7 +28,7 @@ type StoreCreateOption func(region *StoreInfo)
// SetStoreAddress sets the address for the store.
func SetStoreAddress(address, statusAddress, peerAddress string) StoreCreateOption {
return func(store *StoreInfo) {
meta := proto.Clone(store.meta).(*metapb.Store)
meta := store.cloneMetaStore()
meta.Address = address
meta.StatusAddress = statusAddress
meta.PeerAddress = peerAddress
Expand All @@ -40,7 +39,7 @@ func SetStoreAddress(address, statusAddress, peerAddress string) StoreCreateOpti
// SetStoreLabels sets the labels for the store.
func SetStoreLabels(labels []*metapb.StoreLabel) StoreCreateOption {
return func(store *StoreInfo) {
meta := proto.Clone(store.meta).(*metapb.Store)
meta := store.cloneMetaStore()
meta.Labels = labels
store.meta = meta
}
Expand All @@ -49,7 +48,7 @@ func SetStoreLabels(labels []*metapb.StoreLabel) StoreCreateOption {
// SetStoreStartTime sets the start timestamp for the store.
func SetStoreStartTime(startTS int64) StoreCreateOption {
return func(store *StoreInfo) {
meta := proto.Clone(store.meta).(*metapb.Store)
meta := store.cloneMetaStore()
meta.StartTimestamp = startTS
store.meta = meta
}
Expand All @@ -58,7 +57,7 @@ func SetStoreStartTime(startTS int64) StoreCreateOption {
// SetStoreVersion sets the version for the store.
func SetStoreVersion(githash, version string) StoreCreateOption {
return func(store *StoreInfo) {
meta := proto.Clone(store.meta).(*metapb.Store)
meta := store.cloneMetaStore()
meta.Version = version
meta.GitHash = githash
store.meta = meta
Expand All @@ -68,7 +67,7 @@ func SetStoreVersion(githash, version string) StoreCreateOption {
// SetStoreDeployPath sets the deploy path for the store.
func SetStoreDeployPath(deployPath string) StoreCreateOption {
return func(store *StoreInfo) {
meta := proto.Clone(store.meta).(*metapb.Store)
meta := store.cloneMetaStore()
meta.DeployPath = deployPath
store.meta = meta
}
Expand All @@ -77,7 +76,7 @@ func SetStoreDeployPath(deployPath string) StoreCreateOption {
// OfflineStore offline a store
func OfflineStore(physicallyDestroyed bool) StoreCreateOption {
return func(store *StoreInfo) {
meta := proto.Clone(store.meta).(*metapb.Store)
meta := store.cloneMetaStore()
meta.State = metapb.StoreState_Offline
meta.NodeState = metapb.NodeState_Removing
meta.PhysicallyDestroyed = physicallyDestroyed
Expand All @@ -88,7 +87,7 @@ func OfflineStore(physicallyDestroyed bool) StoreCreateOption {
// UpStore up a store
func UpStore() StoreCreateOption {
return func(store *StoreInfo) {
meta := proto.Clone(store.meta).(*metapb.Store)
meta := store.cloneMetaStore()
meta.State = metapb.StoreState_Up
meta.NodeState = metapb.NodeState_Serving
store.meta = meta
Expand All @@ -98,7 +97,7 @@ func UpStore() StoreCreateOption {
// TombstoneStore set a store to tombstone.
func TombstoneStore() StoreCreateOption {
return func(store *StoreInfo) {
meta := proto.Clone(store.meta).(*metapb.Store)
meta := store.cloneMetaStore()
meta.State = metapb.StoreState_Tombstone
meta.NodeState = metapb.NodeState_Removed
store.meta = meta
Expand Down
10 changes: 10 additions & 0 deletions server/core/store_stats.go
Expand Up @@ -67,6 +67,16 @@ func (ss *storeStats) GetStoreStats() *pdpb.StoreStats {
return ss.rawStats
}

// CloneStoreStats returns the statistics information cloned from the store.
func (ss *storeStats) CloneStoreStats() *pdpb.StoreStats {
ss.mu.RLock()
b, _ := ss.rawStats.Marshal()
ss.mu.RUnlock()
stats := &pdpb.StoreStats{}
stats.Unmarshal(b)
return stats
}

// GetCapacity returns the capacity size of the store.
func (ss *storeStats) GetCapacity() uint64 {
ss.mu.RLock()
Expand Down
20 changes: 20 additions & 0 deletions server/core/store_test.go
Expand Up @@ -90,6 +90,26 @@ func TestCloneStore(t *testing.T) {
wg.Wait()
}

func TestCloneMetaStore(t *testing.T) {
re := require.New(t)
store := &metapb.Store{Id: 1, Address: "mock://tikv-1", Labels: []*metapb.StoreLabel{{Key: "zone", Value: "z1"}, {Key: "host", Value: "h1"}}}
store2 := NewStoreInfo(store).cloneMetaStore()
re.Equal(store2.Labels, store.Labels)
store2.Labels[0].Value = "changed value"
re.NotEqual(store2.Labels, store.Labels)
}

func BenchmarkStoreClone(b *testing.B) {
meta := &metapb.Store{Id: 1,
Address: "mock://tikv-1",
Labels: []*metapb.StoreLabel{{Key: "zone", Value: "z1"}, {Key: "host", Value: "h1"}}}
store := NewStoreInfo(meta)
b.ResetTimer()
for t := 0; t < b.N; t++ {
store.Clone(SetLeaderCount(t))
}
}

func TestRegionScore(t *testing.T) {
re := require.New(t)
stats := &pdpb.StoreStats{}
Expand Down

0 comments on commit 987a1fa

Please sign in to comment.