diff --git a/pkg/integration_test/pdctl_test.go b/pkg/integration_test/pdctl_test.go index e9619c86e02..61af95abf4d 100644 --- a/pkg/integration_test/pdctl_test.go +++ b/pkg/integration_test/pdctl_test.go @@ -615,6 +615,7 @@ func (s *integrationTestSuite) TestConfig(c *C) { leaderServer := cluster.GetServer(cluster.GetLeader()) s.bootstrapCluster(leaderServer, c) mustPutStore(c, leaderServer.server, store.Id, store.State, store.Labels) + defer cluster.Destroy() // config show args := []string{"-u", pdAddr, "config", "show"} @@ -734,6 +735,125 @@ func (s *integrationTestSuite) TestConfig(c *C) { c.Assert(scheduleCfg.DisableLearner, Equals, leaderServer.server.GetScheduleConfig().DisableLearner) } +func (s *integrationTestSuite) TestLog(c *C) { + c.Parallel() + + cluster, err := newTestCluster(1) + c.Assert(err, IsNil) + err = cluster.RunInitialServers() + c.Assert(err, IsNil) + cluster.WaitLeader() + pdAddr := cluster.config.GetClientURLs() + cmd := initCommand() + + store := metapb.Store{ + Id: 1, + State: metapb.StoreState_Up, + } + leaderServer := cluster.GetServer(cluster.GetLeader()) + s.bootstrapCluster(leaderServer, c) + mustPutStore(c, leaderServer.server, store.Id, store.State, store.Labels) + defer cluster.Destroy() + + var testCases = []struct { + cmd []string + expect string + }{ + // log [fatal|error|warn|info|debug] + { + cmd: []string{"-u", pdAddr, "log", "fatal"}, + expect: "fatal", + }, + { + cmd: []string{"-u", pdAddr, "log", "error"}, + expect: "error", + }, + { + cmd: []string{"-u", pdAddr, "log", "warn"}, + expect: "warn", + }, + { + cmd: []string{"-u", pdAddr, "log", "info"}, + expect: "info", + }, + { + cmd: []string{"-u", pdAddr, "log", "debug"}, + expect: "debug", + }, + } + + for _, testCase := range testCases { + _, _, err = executeCommandC(cmd, testCase.cmd...) + c.Assert(err, IsNil) + c.Assert(leaderServer.server.GetConfig().Log.Level, Equals, testCase.expect) + } +} + +func (s *integrationTestSuite) TestTableNS(c *C) { + c.Parallel() + + cluster, err := newTestCluster(1) + c.Assert(err, IsNil) + err = cluster.RunInitialServers() + c.Assert(err, IsNil) + cluster.WaitLeader() + pdAddr := cluster.config.GetClientURLs() + cmd := initCommand() + + store := metapb.Store{ + Id: 1, + State: metapb.StoreState_Up, + } + leaderServer := cluster.GetServer(cluster.GetLeader()) + s.bootstrapCluster(leaderServer, c) + mustPutStore(c, leaderServer.server, store.Id, store.State, store.Labels) + classifier := leaderServer.server.GetClassifier() + defer cluster.Destroy() + + // table_ns create + c.Assert(leaderServer.server.IsNamespaceExist("ts1"), IsFalse) + args := []string{"-u", pdAddr, "table_ns", "create", "ts1"} + _, _, err = executeCommandC(cmd, args...) + c.Assert(err, IsNil) + c.Assert(leaderServer.server.IsNamespaceExist("ts1"), IsTrue) + + // table_ns add + args = []string{"-u", pdAddr, "table_ns", "add", "ts1", "1"} + _, _, err = executeCommandC(cmd, args...) + c.Assert(err, IsNil) + c.Assert(classifier.IsTableIDExist(1), IsTrue) + + // table_ns remove + args = []string{"-u", pdAddr, "table_ns", "remove", "ts1", "1"} + _, _, err = executeCommandC(cmd, args...) + c.Assert(err, IsNil) + c.Assert(classifier.IsTableIDExist(1), IsFalse) + + // table_ns set_meta + args = []string{"-u", pdAddr, "table_ns", "set_meta", "ts1"} + _, _, err = executeCommandC(cmd, args...) + c.Assert(err, IsNil) + c.Assert(classifier.IsMetaExist(), IsTrue) + + // table_ns rm_meta + args = []string{"-u", pdAddr, "table_ns", "rm_meta", "ts1"} + _, _, err = executeCommandC(cmd, args...) + c.Assert(err, IsNil) + c.Assert(classifier.IsMetaExist(), IsFalse) + + // table_ns set_store + args = []string{"-u", pdAddr, "table_ns", "set_store", "1", "ts1"} + _, _, err = executeCommandC(cmd, args...) + c.Assert(err, IsNil) + c.Assert(classifier.IsStoreIDExist(1), IsTrue) + + // table_ns rm_store + args = []string{"-u", pdAddr, "table_ns", "rm_store", "1", "ts1"} + _, _, err = executeCommandC(cmd, args...) + c.Assert(err, IsNil) + c.Assert(classifier.IsStoreIDExist(1), IsFalse) +} + func initCommand() *cobra.Command { commandFlags := pdctl.CommandFlags{} rootCmd := &cobra.Command{} diff --git a/server/cluster_info.go b/server/cluster_info.go index 9a0de7c2f70..bf19bccc635 100644 --- a/server/cluster_info.go +++ b/server/cluster_info.go @@ -101,6 +101,8 @@ func (c *clusterInfo) OnStoreVersionChange() { minVersion = v } } + // If the cluster version of PD is less than the minimum version of all stores, + // it will update the cluster version. if clusterVersion.LessThan(*minVersion) { c.opt.SetClusterVersion(*minVersion) err := c.opt.persist(c.kv) @@ -745,10 +747,12 @@ func newPrepareChecker() *prepareChecker { } } +// Before starting up the scheduler, we need to take the proportion of the regions on each store into consideration. func (checker *prepareChecker) check(c *clusterInfo) bool { if checker.isPrepared || time.Since(checker.start) > collectTimeout { return true } + // The number of active regions should be more than total region of all stores * collectFactor if float64(c.core.Regions.Length())*collectFactor > float64(checker.sum) { return false } @@ -757,6 +761,7 @@ func (checker *prepareChecker) check(c *clusterInfo) bool { continue } storeID := store.GetId() + // For each store, the number of active regions should be more than total region of the store * collectFactor if float64(c.core.Regions.GetStoreRegionCount(storeID))*collectFactor > float64(checker.reactiveRegions[storeID]) { return false } diff --git a/server/cluster_worker_test.go b/server/cluster_worker_test.go index a324066ce4f..1d699609d62 100644 --- a/server/cluster_worker_test.go +++ b/server/cluster_worker_test.go @@ -17,11 +17,14 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/pd/server/core" ) var _ = Suite(&testClusterWorkerSuite{}) -type testClusterWorkerSuite struct{} +type testClusterWorkerSuite struct { + baseCluster +} func (s *testClusterWorkerSuite) TestReportSplit(c *C) { var cluster RaftCluster @@ -44,3 +47,42 @@ func (s *testClusterWorkerSuite) TestReportBatchSplit(c *C) { _, err := cluster.handleBatchReportSplit(&pdpb.ReportBatchSplitRequest{Regions: regions}) c.Assert(err, IsNil) } + +func (s *testClusterWorkerSuite) TestValidRequestRegion(c *C) { + var err error + _, s.svr, s.cleanup, err = NewTestServer() + c.Assert(err, IsNil) + s.client = s.svr.client + mustWaitLeader(c, []*Server{s.svr}) + s.grpcPDClient = mustNewGrpcClient(c, s.svr.GetAddr()) + defer s.cleanup() + s.svr.bootstrapCluster(s.newBootstrapRequest(c, s.svr.clusterID, "127.0.0.1:0")) + + cluster := s.svr.GetRaftCluster() + c.Assert(cluster, NotNil) + + r1 := core.NewRegionInfo(&metapb.Region{ + Id: 1, + StartKey: []byte(""), + EndKey: []byte("a"), + Peers: []*metapb.Peer{{ + Id: 1, + StoreId: 1, + }}, + RegionEpoch: &metapb.RegionEpoch{ConfVer: 2, Version: 2}, + }, &metapb.Peer{ + Id: 1, + StoreId: 1, + }) + err = cluster.HandleRegionHeartbeat(r1) + c.Assert(err, IsNil) + r2 := &metapb.Region{Id: 2, StartKey: []byte("a"), EndKey: []byte("b")} + c.Assert(cluster.validRequestRegion(r2), NotNil) + r3 := &metapb.Region{Id: 1, StartKey: []byte(""), EndKey: []byte("a"), RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 2}} + c.Assert(cluster.validRequestRegion(r3), NotNil) + r4 := &metapb.Region{Id: 1, StartKey: []byte(""), EndKey: []byte("a"), RegionEpoch: &metapb.RegionEpoch{ConfVer: 2, Version: 1}} + c.Assert(cluster.validRequestRegion(r4), NotNil) + r5 := &metapb.Region{Id: 1, StartKey: []byte(""), EndKey: []byte("a"), RegionEpoch: &metapb.RegionEpoch{ConfVer: 2, Version: 2}} + c.Assert(cluster.validRequestRegion(r5), IsNil) + cluster.stop() +} diff --git a/server/namespace/classifier.go b/server/namespace/classifier.go index fb5cb99fa3d..2c063c40cf7 100644 --- a/server/namespace/classifier.go +++ b/server/namespace/classifier.go @@ -38,6 +38,10 @@ type Classifier interface { AllowMerge(*core.RegionInfo, *core.RegionInfo) bool // Reload underlying namespaces ReloadNamespaces() error + // These function below are only for tests + IsMetaExist() bool + IsTableIDExist(int64) bool + IsStoreIDExist(uint64) bool } type defaultClassifier struct{} @@ -66,6 +70,18 @@ func (c defaultClassifier) ReloadNamespaces() error { return nil } +func (c defaultClassifier) IsMetaExist() bool { + return false +} + +func (c defaultClassifier) IsTableIDExist(tableID int64) bool { + return false +} + +func (c defaultClassifier) IsStoreIDExist(storeID uint64) bool { + return false +} + // CreateClassifierFunc is for creating namespace classifier. type CreateClassifierFunc func(*core.KV, core.IDAllocator) (Classifier, error) diff --git a/server/namespace_test.go b/server/namespace_test.go index 7e61b6f2085..b8207c3883e 100644 --- a/server/namespace_test.go +++ b/server/namespace_test.go @@ -240,6 +240,18 @@ func (c *mapClassifer) IsNamespaceExist(name string) bool { return false } +func (c *mapClassifer) IsMetaExist() bool { + return false +} + +func (c *mapClassifer) IsTableIDExist(tableID int64) bool { + return false +} + +func (c *mapClassifer) IsStoreIDExist(storeID uint64) bool { + return false +} + func (c *mapClassifer) AllowMerge(one *core.RegionInfo, other *core.RegionInfo) bool { return c.GetRegionNamespace(one) == c.GetRegionNamespace(other) } diff --git a/server/region_statistics_test.go b/server/region_statistics_test.go index 79bf18d29dc..f8bbb3e3103 100644 --- a/server/region_statistics_test.go +++ b/server/region_statistics_test.go @@ -49,6 +49,18 @@ func (c mockClassifier) ReloadNamespaces() error { return nil } +func (c mockClassifier) IsMetaExist() bool { + return false +} + +func (c mockClassifier) IsTableIDExist(tableID int64) bool { + return false +} + +func (c mockClassifier) IsStoreIDExist(storeID uint64) bool { + return false +} + var _ = Suite(&testRegionStatisticsSuite{}) type testRegionStatisticsSuite struct{} diff --git a/server/schedule/hot_cache.go b/server/schedule/hot_cache.go index a278978d4ab..722264ca852 100644 --- a/server/schedule/hot_cache.go +++ b/server/schedule/hot_cache.go @@ -55,6 +55,7 @@ func (w *HotSpotCache) CheckWrite(region *core.RegionInfo, stores *core.StoresIn v, isExist := w.writeFlow.Peek(region.GetID()) if isExist { value = v.(*core.RegionStat) + // This is used for the simulator. if !Simulating { interval := time.Since(value.LastUpdateTime).Seconds() if interval < minHotRegionReportInterval { @@ -80,6 +81,7 @@ func (w *HotSpotCache) CheckRead(region *core.RegionInfo, stores *core.StoresInf v, isExist := w.readFlow.Peek(region.GetID()) if isExist { value = v.(*core.RegionStat) + // This is used for the simulator. if !Simulating { interval := time.Since(value.LastUpdateTime).Seconds() if interval < minHotRegionReportInterval { @@ -103,7 +105,7 @@ func (w *HotSpotCache) incMetrics(name string, kind FlowKind) { } func calculateWriteHotThreshold(stores *core.StoresInfo) uint64 { - // hotRegionThreshold is use to pick hot region + // hotRegionThreshold is used to pick hot region // suppose the number of the hot Regions is statCacheMaxLen // and we use total written Bytes past storeHeartBeatReportInterval seconds to divide the number of hot Regions // divide 2 because the store reports data about two times than the region record write to rocksdb @@ -117,8 +119,8 @@ func calculateWriteHotThreshold(stores *core.StoresInfo) uint64 { } func calculateReadHotThreshold(stores *core.StoresInfo) uint64 { - // hotRegionThreshold is use to pick hot region - // suppose the number of the hot Regions is statLRUMaxLen + // hotRegionThreshold is used to pick hot region + // suppose the number of the hot Regions is statCacheMaxLen // and we use total Read Bytes past storeHeartBeatReportInterval seconds to divide the number of hot Regions divisor := float64(statCacheMaxLen) hotRegionThreshold := uint64(stores.TotalBytesReadRate() / divisor) diff --git a/server/schedulers/scheduler_test.go b/server/schedulers/scheduler_test.go index fb3f67114f5..a100d9b1954 100644 --- a/server/schedulers/scheduler_test.go +++ b/server/schedulers/scheduler_test.go @@ -186,7 +186,9 @@ func (s *testScatterRegionSuite) scatter(c *C, numStores, numRegions uint64) { // Add regions 1~4. seq := newSequencer(numStores) - for i := uint64(1); i <= numRegions; i++ { + // Region 1 has the same distribution with the Region 2, which is used to test selectPeerToReplace. + tc.AddLeaderRegion(1, 1, 2, 3) + for i := uint64(2); i <= numRegions; i++ { tc.AddLeaderRegion(i, seq.next(), seq.next(), seq.next()) } diff --git a/server/server.go b/server/server.go index ff113416875..b60df38ba6e 100644 --- a/server/server.go +++ b/server/server.go @@ -479,6 +479,11 @@ func (s *Server) ClusterID() uint64 { return s.clusterID } +// GetClassifier returns the classifier of this server. +func (s *Server) GetClassifier() namespace.Classifier { + return s.classifier +} + // txn returns an etcd client transaction wrapper. // The wrapper will set a request timeout to the context and log slow transactions. func (s *Server) txn() clientv3.Txn { diff --git a/table/namespace_classifier.go b/table/namespace_classifier.go index 1527084fe52..e1c44ae9c53 100644 --- a/table/namespace_classifier.go +++ b/table/namespace_classifier.go @@ -169,13 +169,34 @@ func (c *tableNamespaceClassifier) GetNamespaces() []*Namespace { return c.nsInfo.getNamespaces() } -// GetNamespaceByName returns whether namespace exists +// IsNamespaceExist returns whether namespace exists. func (c *tableNamespaceClassifier) IsNamespaceExist(name string) bool { c.RLock() defer c.RUnlock() return c.nsInfo.getNamespaceByName(name) != nil } +// IsTableIDExist returns whether table ID exists in namespacesInfo. +func (c *tableNamespaceClassifier) IsTableIDExist(tableID int64) bool { + c.RLock() + defer c.RUnlock() + return c.nsInfo.isTableIDExist(tableID) +} + +// IsStoreIDExist returns whether store ID exists in namespacesInfo. +func (c *tableNamespaceClassifier) IsStoreIDExist(storeID uint64) bool { + c.RLock() + defer c.RUnlock() + return c.nsInfo.isStoreIDExist(storeID) +} + +// IsMetaExist returns whether meta is binded to a namespace. +func (c *tableNamespaceClassifier) IsMetaExist() bool { + c.RLock() + defer c.RUnlock() + return c.nsInfo.isMetaExist() +} + // CreateNamespace creates a new Namespace. func (c *tableNamespaceClassifier) CreateNamespace(name string) error { c.Lock() @@ -215,7 +236,7 @@ func (c *tableNamespaceClassifier) AddNamespaceTableID(name string, tableID int6 return errors.Errorf("invalid namespace Name %s, not found", name) } - if c.nsInfo.IsTableIDExist(tableID) { + if c.nsInfo.isTableIDExist(tableID) { return errors.New("Table ID already exists in this cluster") } @@ -250,7 +271,7 @@ func (c *tableNamespaceClassifier) AddMetaToNamespace(name string) error { if n == nil { return errors.Errorf("invalid namespace Name %s, not found", name) } - if c.nsInfo.IsMetaExist() { + if c.nsInfo.isMetaExist() { return errors.New("meta is already set") } @@ -284,7 +305,7 @@ func (c *tableNamespaceClassifier) AddNamespaceStoreID(name string, storeID uint return errors.Errorf("invalid namespace Name %s, not found", name) } - if c.nsInfo.IsStoreIDExist(storeID) { + if c.nsInfo.isStoreIDExist(storeID) { return errors.New("Store ID already exists in this namespace") } @@ -368,8 +389,8 @@ func (namespaceInfo *namespacesInfo) getNamespaces() []*Namespace { return nsList } -// IsTableIDExist returns true if table ID exists in namespacesInfo -func (namespaceInfo *namespacesInfo) IsTableIDExist(tableID int64) bool { +// isTableIDExist returns true if table ID exists in namespacesInfo +func (namespaceInfo *namespacesInfo) isTableIDExist(tableID int64) bool { for _, ns := range namespaceInfo.namespaces { _, ok := ns.TableIDs[tableID] if ok { @@ -379,8 +400,8 @@ func (namespaceInfo *namespacesInfo) IsTableIDExist(tableID int64) bool { return false } -// IsStoreIDExist returns true if store ID exists in namespacesInfo -func (namespaceInfo *namespacesInfo) IsStoreIDExist(storeID uint64) bool { +// isStoreIDExist returns true if store ID exists in namespacesInfo +func (namespaceInfo *namespacesInfo) isStoreIDExist(storeID uint64) bool { for _, ns := range namespaceInfo.namespaces { _, ok := ns.StoreIDs[storeID] if ok { @@ -390,8 +411,8 @@ func (namespaceInfo *namespacesInfo) IsStoreIDExist(storeID uint64) bool { return false } -// IsMetaExist returns true if meta is binded to a namespace. -func (namespaceInfo *namespacesInfo) IsMetaExist() bool { +// isMetaExist returns true if meta is binded to a namespace. +func (namespaceInfo *namespacesInfo) isMetaExist() bool { for _, ns := range namespaceInfo.namespaces { if ns.Meta { return true diff --git a/table/namespace_classifier_test.go b/table/namespace_classifier_test.go index 73fcfceb1bf..049f23046f7 100644 --- a/table/namespace_classifier_test.go +++ b/table/namespace_classifier_test.go @@ -163,7 +163,7 @@ func (s *testTableNamespaceSuite) TestNamespaceOperation(c *C) { namespaces = tableClassifier.GetNamespaces() c.Assert(namespaces, HasLen, 2) - c.Assert(nsInfo.IsTableIDExist(1), IsTrue) + c.Assert(nsInfo.isTableIDExist(1), IsTrue) // Add storeID err = tableClassifier.AddNamespaceStoreID("test1", 456) @@ -171,7 +171,7 @@ func (s *testTableNamespaceSuite) TestNamespaceOperation(c *C) { namespaces = tableClassifier.GetNamespaces() c.Assert(namespaces, HasLen, 2) - c.Assert(nsInfo.IsStoreIDExist(456), IsTrue) + c.Assert(nsInfo.isStoreIDExist(456), IsTrue) // Ensure that duplicate tableID cannot exist in one namespace err = tableClassifier.AddNamespaceTableID("test1", 1) @@ -228,5 +228,4 @@ func (s *testTableNamespaceSuite) TestTableNameSpaceReloadNamespaces(c *C) { ns = classifier.GetAllNamespaces() sort.Strings(ns) c.Assert(ns, DeepEquals, []string{"global", "ns1", "ns2"}) - }