Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add more tests and comments #1387

Merged
merged 4 commits into from
Dec 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions pkg/integration_test/pdctl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,7 @@ func (s *integrationTestSuite) TestConfig(c *C) {
leaderServer := cluster.GetServer(cluster.GetLeader())
s.bootstrapCluster(leaderServer, c)
mustPutStore(c, leaderServer.server, store.Id, store.State, store.Labels)
defer cluster.Destroy()

// config show
args := []string{"-u", pdAddr, "config", "show"}
Expand Down Expand Up @@ -734,6 +735,125 @@ func (s *integrationTestSuite) TestConfig(c *C) {
c.Assert(scheduleCfg.DisableLearner, Equals, leaderServer.server.GetScheduleConfig().DisableLearner)
}

func (s *integrationTestSuite) TestLog(c *C) {
c.Parallel()

cluster, err := newTestCluster(1)
c.Assert(err, IsNil)
err = cluster.RunInitialServers()
c.Assert(err, IsNil)
cluster.WaitLeader()
pdAddr := cluster.config.GetClientURLs()
cmd := initCommand()

store := metapb.Store{
Id: 1,
State: metapb.StoreState_Up,
}
leaderServer := cluster.GetServer(cluster.GetLeader())
s.bootstrapCluster(leaderServer, c)
mustPutStore(c, leaderServer.server, store.Id, store.State, store.Labels)
defer cluster.Destroy()

var testCases = []struct {
cmd []string
expect string
}{
// log [fatal|error|warn|info|debug]
{
cmd: []string{"-u", pdAddr, "log", "fatal"},
expect: "fatal",
},
{
cmd: []string{"-u", pdAddr, "log", "error"},
expect: "error",
},
{
cmd: []string{"-u", pdAddr, "log", "warn"},
expect: "warn",
},
{
cmd: []string{"-u", pdAddr, "log", "info"},
expect: "info",
},
{
cmd: []string{"-u", pdAddr, "log", "debug"},
expect: "debug",
},
}

for _, testCase := range testCases {
_, _, err = executeCommandC(cmd, testCase.cmd...)
c.Assert(err, IsNil)
c.Assert(leaderServer.server.GetConfig().Log.Level, Equals, testCase.expect)
}
}

func (s *integrationTestSuite) TestTableNS(c *C) {
c.Parallel()

cluster, err := newTestCluster(1)
c.Assert(err, IsNil)
err = cluster.RunInitialServers()
c.Assert(err, IsNil)
cluster.WaitLeader()
pdAddr := cluster.config.GetClientURLs()
cmd := initCommand()

store := metapb.Store{
Id: 1,
State: metapb.StoreState_Up,
}
leaderServer := cluster.GetServer(cluster.GetLeader())
s.bootstrapCluster(leaderServer, c)
mustPutStore(c, leaderServer.server, store.Id, store.State, store.Labels)
classifier := leaderServer.server.GetClassifier()
defer cluster.Destroy()

// table_ns create <namespace>
c.Assert(leaderServer.server.IsNamespaceExist("ts1"), IsFalse)
args := []string{"-u", pdAddr, "table_ns", "create", "ts1"}
_, _, err = executeCommandC(cmd, args...)
c.Assert(err, IsNil)
c.Assert(leaderServer.server.IsNamespaceExist("ts1"), IsTrue)

// table_ns add <name> <table_id>
args = []string{"-u", pdAddr, "table_ns", "add", "ts1", "1"}
_, _, err = executeCommandC(cmd, args...)
c.Assert(err, IsNil)
c.Assert(classifier.IsTableIDExist(1), IsTrue)

// table_ns remove <name> <table_id>
args = []string{"-u", pdAddr, "table_ns", "remove", "ts1", "1"}
_, _, err = executeCommandC(cmd, args...)
c.Assert(err, IsNil)
c.Assert(classifier.IsTableIDExist(1), IsFalse)

// table_ns set_meta <namespace>
args = []string{"-u", pdAddr, "table_ns", "set_meta", "ts1"}
_, _, err = executeCommandC(cmd, args...)
c.Assert(err, IsNil)
c.Assert(classifier.IsMetaExist(), IsTrue)

// table_ns rm_meta <namespace>
args = []string{"-u", pdAddr, "table_ns", "rm_meta", "ts1"}
_, _, err = executeCommandC(cmd, args...)
c.Assert(err, IsNil)
c.Assert(classifier.IsMetaExist(), IsFalse)

// table_ns set_store <store_id> <namespace>
args = []string{"-u", pdAddr, "table_ns", "set_store", "1", "ts1"}
_, _, err = executeCommandC(cmd, args...)
c.Assert(err, IsNil)
c.Assert(classifier.IsStoreIDExist(1), IsTrue)

// table_ns rm_store <store_id> <namespace>
args = []string{"-u", pdAddr, "table_ns", "rm_store", "1", "ts1"}
_, _, err = executeCommandC(cmd, args...)
c.Assert(err, IsNil)
c.Assert(classifier.IsStoreIDExist(1), IsFalse)
}

func initCommand() *cobra.Command {
commandFlags := pdctl.CommandFlags{}
rootCmd := &cobra.Command{}
Expand Down
5 changes: 5 additions & 0 deletions server/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ func (c *clusterInfo) OnStoreVersionChange() {
minVersion = v
}
}
// If the cluster version of PD is less than the minimum version of all stores,
// it will update the cluster version.
if clusterVersion.LessThan(*minVersion) {
c.opt.SetClusterVersion(*minVersion)
err := c.opt.persist(c.kv)
Expand Down Expand Up @@ -745,10 +747,12 @@ func newPrepareChecker() *prepareChecker {
}
}

// Before starting up the scheduler, we need to take the proportion of the regions on each store into consideration.
func (checker *prepareChecker) check(c *clusterInfo) bool {
if checker.isPrepared || time.Since(checker.start) > collectTimeout {
return true
}
// The number of active regions should be more than total region of all stores * collectFactor
if float64(c.core.Regions.Length())*collectFactor > float64(checker.sum) {
return false
}
Expand All @@ -757,6 +761,7 @@ func (checker *prepareChecker) check(c *clusterInfo) bool {
continue
}
storeID := store.GetId()
// For each store, the number of active regions should be more than total region of the store * collectFactor
if float64(c.core.Regions.GetStoreRegionCount(storeID))*collectFactor > float64(checker.reactiveRegions[storeID]) {
return false
}
Expand Down
44 changes: 43 additions & 1 deletion server/cluster_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/pd/server/core"
)

var _ = Suite(&testClusterWorkerSuite{})

type testClusterWorkerSuite struct{}
type testClusterWorkerSuite struct {
baseCluster
}

func (s *testClusterWorkerSuite) TestReportSplit(c *C) {
var cluster RaftCluster
Expand All @@ -44,3 +47,42 @@ func (s *testClusterWorkerSuite) TestReportBatchSplit(c *C) {
_, err := cluster.handleBatchReportSplit(&pdpb.ReportBatchSplitRequest{Regions: regions})
c.Assert(err, IsNil)
}

func (s *testClusterWorkerSuite) TestValidRequestRegion(c *C) {
var err error
_, s.svr, s.cleanup, err = NewTestServer()
c.Assert(err, IsNil)
s.client = s.svr.client
mustWaitLeader(c, []*Server{s.svr})
s.grpcPDClient = mustNewGrpcClient(c, s.svr.GetAddr())
defer s.cleanup()
s.svr.bootstrapCluster(s.newBootstrapRequest(c, s.svr.clusterID, "127.0.0.1:0"))

cluster := s.svr.GetRaftCluster()
c.Assert(cluster, NotNil)

r1 := core.NewRegionInfo(&metapb.Region{
Id: 1,
StartKey: []byte(""),
EndKey: []byte("a"),
Peers: []*metapb.Peer{{
Id: 1,
StoreId: 1,
}},
RegionEpoch: &metapb.RegionEpoch{ConfVer: 2, Version: 2},
}, &metapb.Peer{
Id: 1,
StoreId: 1,
})
err = cluster.HandleRegionHeartbeat(r1)
c.Assert(err, IsNil)
r2 := &metapb.Region{Id: 2, StartKey: []byte("a"), EndKey: []byte("b")}
c.Assert(cluster.validRequestRegion(r2), NotNil)
r3 := &metapb.Region{Id: 1, StartKey: []byte(""), EndKey: []byte("a"), RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 2}}
c.Assert(cluster.validRequestRegion(r3), NotNil)
r4 := &metapb.Region{Id: 1, StartKey: []byte(""), EndKey: []byte("a"), RegionEpoch: &metapb.RegionEpoch{ConfVer: 2, Version: 1}}
c.Assert(cluster.validRequestRegion(r4), NotNil)
r5 := &metapb.Region{Id: 1, StartKey: []byte(""), EndKey: []byte("a"), RegionEpoch: &metapb.RegionEpoch{ConfVer: 2, Version: 2}}
c.Assert(cluster.validRequestRegion(r5), IsNil)
cluster.stop()
}
16 changes: 16 additions & 0 deletions server/namespace/classifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ type Classifier interface {
AllowMerge(*core.RegionInfo, *core.RegionInfo) bool
// Reload underlying namespaces
ReloadNamespaces() error
// These function below are only for tests
IsMetaExist() bool
IsTableIDExist(int64) bool
IsStoreIDExist(uint64) bool
}

type defaultClassifier struct{}
Expand Down Expand Up @@ -66,6 +70,18 @@ func (c defaultClassifier) ReloadNamespaces() error {
return nil
}

func (c defaultClassifier) IsMetaExist() bool {
return false
}

func (c defaultClassifier) IsTableIDExist(tableID int64) bool {
return false
}

func (c defaultClassifier) IsStoreIDExist(storeID uint64) bool {
return false
}

// CreateClassifierFunc is for creating namespace classifier.
type CreateClassifierFunc func(*core.KV, core.IDAllocator) (Classifier, error)

Expand Down
12 changes: 12 additions & 0 deletions server/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,18 @@ func (c *mapClassifer) IsNamespaceExist(name string) bool {
return false
}

func (c *mapClassifer) IsMetaExist() bool {
return false
}

func (c *mapClassifer) IsTableIDExist(tableID int64) bool {
return false
}

func (c *mapClassifer) IsStoreIDExist(storeID uint64) bool {
return false
}

func (c *mapClassifer) AllowMerge(one *core.RegionInfo, other *core.RegionInfo) bool {
return c.GetRegionNamespace(one) == c.GetRegionNamespace(other)
}
Expand Down
12 changes: 12 additions & 0 deletions server/region_statistics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,18 @@ func (c mockClassifier) ReloadNamespaces() error {
return nil
}

func (c mockClassifier) IsMetaExist() bool {
return false
}

func (c mockClassifier) IsTableIDExist(tableID int64) bool {
return false
}

func (c mockClassifier) IsStoreIDExist(storeID uint64) bool {
return false
}

var _ = Suite(&testRegionStatisticsSuite{})

type testRegionStatisticsSuite struct{}
Expand Down
8 changes: 5 additions & 3 deletions server/schedule/hot_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func (w *HotSpotCache) CheckWrite(region *core.RegionInfo, stores *core.StoresIn
v, isExist := w.writeFlow.Peek(region.GetID())
if isExist {
value = v.(*core.RegionStat)
// This is used for the simulator.
if !Simulating {
interval := time.Since(value.LastUpdateTime).Seconds()
if interval < minHotRegionReportInterval {
Expand All @@ -80,6 +81,7 @@ func (w *HotSpotCache) CheckRead(region *core.RegionInfo, stores *core.StoresInf
v, isExist := w.readFlow.Peek(region.GetID())
if isExist {
value = v.(*core.RegionStat)
// This is used for the simulator.
if !Simulating {
interval := time.Since(value.LastUpdateTime).Seconds()
if interval < minHotRegionReportInterval {
Expand All @@ -103,7 +105,7 @@ func (w *HotSpotCache) incMetrics(name string, kind FlowKind) {
}

func calculateWriteHotThreshold(stores *core.StoresInfo) uint64 {
// hotRegionThreshold is use to pick hot region
// hotRegionThreshold is used to pick hot region
// suppose the number of the hot Regions is statCacheMaxLen
// and we use total written Bytes past storeHeartBeatReportInterval seconds to divide the number of hot Regions
// divide 2 because the store reports data about two times than the region record write to rocksdb
Expand All @@ -117,8 +119,8 @@ func calculateWriteHotThreshold(stores *core.StoresInfo) uint64 {
}

func calculateReadHotThreshold(stores *core.StoresInfo) uint64 {
// hotRegionThreshold is use to pick hot region
// suppose the number of the hot Regions is statLRUMaxLen
// hotRegionThreshold is used to pick hot region
// suppose the number of the hot Regions is statCacheMaxLen
// and we use total Read Bytes past storeHeartBeatReportInterval seconds to divide the number of hot Regions
divisor := float64(statCacheMaxLen)
hotRegionThreshold := uint64(stores.TotalBytesReadRate() / divisor)
Expand Down
4 changes: 3 additions & 1 deletion server/schedulers/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,9 @@ func (s *testScatterRegionSuite) scatter(c *C, numStores, numRegions uint64) {

// Add regions 1~4.
seq := newSequencer(numStores)
for i := uint64(1); i <= numRegions; i++ {
// Region 1 has the same distribution with the Region 2, which is used to test selectPeerToReplace.
tc.AddLeaderRegion(1, 1, 2, 3)
for i := uint64(2); i <= numRegions; i++ {
tc.AddLeaderRegion(i, seq.next(), seq.next(), seq.next())
}

Expand Down
5 changes: 5 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,11 @@ func (s *Server) ClusterID() uint64 {
return s.clusterID
}

// GetClassifier returns the classifier of this server.
func (s *Server) GetClassifier() namespace.Classifier {
return s.classifier
}

// txn returns an etcd client transaction wrapper.
// The wrapper will set a request timeout to the context and log slow transactions.
func (s *Server) txn() clientv3.Txn {
Expand Down
Loading