Skip to content

Commit

Permalink
pd-ctl, tests: impl the merge all keyspace groups command (#6782)
Browse files Browse the repository at this point in the history
close #6756

- Impl the merge all keyspace groups command.
- Further reuse of TSO cluster-related test code.

Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato committed Jul 11, 2023
1 parent 61dfad3 commit e18b9be
Show file tree
Hide file tree
Showing 16 changed files with 291 additions and 233 deletions.
33 changes: 0 additions & 33 deletions tests/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/id"
"github.com/tikv/pd/pkg/keyspace"
tsoserver "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/swaggerserver"
Expand Down Expand Up @@ -86,38 +85,6 @@ func NewTestAPIServer(ctx context.Context, cfg *config.Config) (*TestServer, err
return createTestServer(ctx, cfg, []string{utils.APIServiceName})
}

// StartSingleTSOTestServer creates and starts a tso server with default config for testing.
func StartSingleTSOTestServer(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*tsoserver.Server, func(), error) {
cfg := tsoserver.NewConfig()
cfg.BackendEndpoints = backendEndpoints
cfg.ListenAddr = listenAddrs
cfg, err := tsoserver.GenerateConfig(cfg)
re.NoError(err)
// Setup the logger.
err = logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog)
if err != nil {
return nil, nil, err
}
zapLogOnce.Do(func() {
log.ReplaceGlobals(cfg.Logger, cfg.LogProps)
})
re.NoError(err)
return NewTSOTestServer(ctx, cfg)
}

// NewTSOTestServer creates a tso server with given config for testing.
func NewTSOTestServer(ctx context.Context, cfg *tsoserver.Config) (*tsoserver.Server, testutil.CleanupFunc, error) {
s := tsoserver.CreateServer(ctx, cfg)
if err := s.Run(); err != nil {
return nil, nil, err
}
cleanup := func() {
s.Close()
os.RemoveAll(cfg.DataDir)
}
return s, cleanup, nil
}

func createTestServer(ctx context.Context, cfg *config.Config, services []string) (*TestServer, error) {
err := logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog)
if err != nil {
Expand Down
11 changes: 5 additions & 6 deletions tests/integrations/mcs/discovery/register_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/tikv/pd/pkg/utils/tempurl"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/tests"
"github.com/tikv/pd/tests/integrations/mcs"
"go.uber.org/goleak"
)

Expand Down Expand Up @@ -97,7 +96,7 @@ func (suite *serverRegisterTestSuite) checkServerRegister(serviceName string) {
re.Equal(addr, returnedEntry.ServiceAddr)

// test primary when only one server
expectedPrimary := mcs.WaitForPrimaryServing(suite.Require(), map[string]bs.Server{addr: s})
expectedPrimary := tests.WaitForPrimaryServing(suite.Require(), map[string]bs.Server{addr: s})
primary, exist := suite.pdLeader.GetServer().GetServicePrimaryAddr(suite.ctx, serviceName)
re.True(exist)
re.Equal(primary, expectedPrimary)
Expand Down Expand Up @@ -131,15 +130,15 @@ func (suite *serverRegisterTestSuite) checkServerPrimaryChange(serviceName strin
serverMap[s.GetAddr()] = s
}

expectedPrimary := mcs.WaitForPrimaryServing(suite.Require(), serverMap)
expectedPrimary := tests.WaitForPrimaryServing(suite.Require(), serverMap)
primary, exist = suite.pdLeader.GetServer().GetServicePrimaryAddr(suite.ctx, serviceName)
re.True(exist)
re.Equal(expectedPrimary, primary)
// close old primary
serverMap[primary].Close()
delete(serverMap, primary)

expectedPrimary = mcs.WaitForPrimaryServing(suite.Require(), serverMap)
expectedPrimary = tests.WaitForPrimaryServing(suite.Require(), serverMap)
// test API server discovery
client := suite.pdLeader.GetEtcdClient()
endpoints, err := discovery.Discover(client, suite.clusterID, serviceName)
Expand All @@ -156,9 +155,9 @@ func (suite *serverRegisterTestSuite) addServer(serviceName string) (bs.Server,
re := suite.Require()
switch serviceName {
case utils.TSOServiceName:
return mcs.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc())
return tests.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc())
case utils.ResourceManagerServiceName:
return mcs.StartSingleResourceManagerTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc())
return tests.StartSingleResourceManagerTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc())
default:
return nil, nil
}
Expand Down
21 changes: 10 additions & 11 deletions tests/integrations/mcs/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/server/apiv2/handlers"
"github.com/tikv/pd/tests"
"github.com/tikv/pd/tests/integrations/mcs"
)

const (
Expand Down Expand Up @@ -86,11 +85,11 @@ func (suite *keyspaceGroupTestSuite) TestAllocNodesUpdate() {
// add three nodes.
nodes := make(map[string]bs.Server)
for i := 0; i < utils.DefaultKeyspaceGroupReplicaCount+1; i++ {
s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc())
s, cleanup := tests.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc())
defer cleanup()
nodes[s.GetAddr()] = s
}
mcs.WaitForPrimaryServing(suite.Require(), nodes)
tests.WaitForPrimaryServing(suite.Require(), nodes)

// create a keyspace group.
kgs := &handlers.CreateKeyspaceGroupParams{KeyspaceGroups: []*endpoint.KeyspaceGroup{
Expand Down Expand Up @@ -135,11 +134,11 @@ func (suite *keyspaceGroupTestSuite) TestAllocNodesUpdate() {
func (suite *keyspaceGroupTestSuite) TestAllocReplica() {
nodes := make(map[string]bs.Server)
for i := 0; i < utils.DefaultKeyspaceGroupReplicaCount; i++ {
s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc())
s, cleanup := tests.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc())
defer cleanup()
nodes[s.GetAddr()] = s
}
mcs.WaitForPrimaryServing(suite.Require(), nodes)
tests.WaitForPrimaryServing(suite.Require(), nodes)

// miss replica.
id := 1
Expand Down Expand Up @@ -188,10 +187,10 @@ func (suite *keyspaceGroupTestSuite) TestAllocReplica() {
suite.Equal(http.StatusBadRequest, code)

// the keyspace group is exist, the new replica is more than the old replica.
s2, cleanup2 := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc())
s2, cleanup2 := tests.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc())
defer cleanup2()
nodes[s2.GetAddr()] = s2
mcs.WaitForPrimaryServing(suite.Require(), nodes)
tests.WaitForPrimaryServing(suite.Require(), nodes)
params = &handlers.AllocNodesForKeyspaceGroupParams{
Replica: utils.DefaultKeyspaceGroupReplicaCount + 1,
}
Expand Down Expand Up @@ -228,12 +227,12 @@ func (suite *keyspaceGroupTestSuite) TestSetNodes() {
nodes := make(map[string]bs.Server)
nodesList := []string{}
for i := 0; i < utils.DefaultKeyspaceGroupReplicaCount; i++ {
s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc())
s, cleanup := tests.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc())
defer cleanup()
nodes[s.GetAddr()] = s
nodesList = append(nodesList, s.GetAddr())
}
mcs.WaitForPrimaryServing(suite.Require(), nodes)
tests.WaitForPrimaryServing(suite.Require(), nodes)

// the keyspace group is not exist.
id := 1
Expand Down Expand Up @@ -288,11 +287,11 @@ func (suite *keyspaceGroupTestSuite) TestSetNodes() {
func (suite *keyspaceGroupTestSuite) TestDefaultKeyspaceGroup() {
nodes := make(map[string]bs.Server)
for i := 0; i < utils.DefaultKeyspaceGroupReplicaCount; i++ {
s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc())
s, cleanup := tests.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc())
defer cleanup()
nodes[s.GetAddr()] = s
}
mcs.WaitForPrimaryServing(suite.Require(), nodes)
tests.WaitForPrimaryServing(suite.Require(), nodes)

// the default keyspace group is exist.
var kg *endpoint.KeyspaceGroup
Expand Down
3 changes: 1 addition & 2 deletions tests/integrations/mcs/resourcemanager/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/tikv/pd/client/grpcutil"
"github.com/tikv/pd/pkg/utils/tempurl"
"github.com/tikv/pd/tests"
"github.com/tikv/pd/tests/integrations/mcs"
)

func TestResourceManagerServer(t *testing.T) {
Expand All @@ -45,7 +44,7 @@ func TestResourceManagerServer(t *testing.T) {
leaderName := cluster.WaitLeader()
leader := cluster.GetServer(leaderName)

s, cleanup := mcs.StartSingleResourceManagerTestServer(ctx, re, leader.GetAddr(), tempurl.Alloc())
s, cleanup := tests.StartSingleResourceManagerTestServer(ctx, re, leader.GetAddr(), tempurl.Alloc())
addr := s.GetAddr()
defer cleanup()

Expand Down
94 changes: 0 additions & 94 deletions tests/integrations/mcs/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,38 +16,14 @@ package mcs

import (
"context"
"os"
"sync"
"time"

"github.com/pingcap/log"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
bs "github.com/tikv/pd/pkg/basicserver"
rm "github.com/tikv/pd/pkg/mcs/resourcemanager/server"
tso "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
)

var once sync.Once

// InitLogger initializes the logger for test.
func InitLogger(cfg *tso.Config) (err error) {
once.Do(func() {
// Setup the logger.
err = logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog)
if err != nil {
return
}
log.ReplaceGlobals(cfg.Logger, cfg.LogProps)
// Flushing any buffered log entries.
log.Sync()
})
return err
}

// SetupClientWithAPIContext creates a TSO client with api context name for test.
func SetupClientWithAPIContext(
ctx context.Context, re *require.Assertions, apiCtx pd.APIContext, endpoints []string, opts ...pd.ClientOption,
Expand All @@ -67,76 +43,6 @@ func SetupClientWithKeyspaceID(
return cli
}

// StartSingleResourceManagerTestServer creates and starts a resource manager server with default config for testing.
func StartSingleResourceManagerTestServer(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*rm.Server, func()) {
cfg := rm.NewConfig()
cfg.BackendEndpoints = backendEndpoints
cfg.ListenAddr = listenAddrs
cfg, err := rm.GenerateConfig(cfg)
re.NoError(err)

s, cleanup, err := rm.NewTestServer(ctx, re, cfg)
re.NoError(err)
testutil.Eventually(re, func() bool {
return !s.IsClosed()
}, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond))

return s, cleanup
}

// StartSingleTSOTestServerWithoutCheck creates and starts a tso server with default config for testing.
func StartSingleTSOTestServerWithoutCheck(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*tso.Server, func(), error) {
cfg := tso.NewConfig()
cfg.BackendEndpoints = backendEndpoints
cfg.ListenAddr = listenAddrs
cfg, err := tso.GenerateConfig(cfg)
re.NoError(err)
// Setup the logger.
err = InitLogger(cfg)
re.NoError(err)
return NewTSOTestServer(ctx, cfg)
}

// StartSingleTSOTestServer creates and starts a tso server with default config for testing.
func StartSingleTSOTestServer(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*tso.Server, func()) {
s, cleanup, err := StartSingleTSOTestServerWithoutCheck(ctx, re, backendEndpoints, listenAddrs)
re.NoError(err)
testutil.Eventually(re, func() bool {
return !s.IsClosed()
}, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond))

return s, cleanup
}

// NewTSOTestServer creates a tso server with given config for testing.
func NewTSOTestServer(ctx context.Context, cfg *tso.Config) (*tso.Server, testutil.CleanupFunc, error) {
s := tso.CreateServer(ctx, cfg)
if err := s.Run(); err != nil {
return nil, nil, err
}
cleanup := func() {
s.Close()
os.RemoveAll(cfg.DataDir)
}
return s, cleanup, nil
}

// WaitForPrimaryServing waits for one of servers being elected to be the primary/leader
func WaitForPrimaryServing(re *require.Assertions, serverMap map[string]bs.Server) string {
var primary string
testutil.Eventually(re, func() bool {
for name, s := range serverMap {
if s.IsServing() {
primary = name
return true
}
}
return false
}, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond))

return primary
}

// WaitForTSOServiceAvailable waits for the pd client being served by the tso server side
func WaitForTSOServiceAvailable(
ctx context.Context, re *require.Assertions, client pd.Client,
Expand Down
9 changes: 4 additions & 5 deletions tests/integrations/mcs/tso/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/tests"
"github.com/tikv/pd/tests/integrations/mcs"
)

const (
Expand All @@ -51,7 +50,7 @@ type tsoAPITestSuite struct {
ctx context.Context
cancel context.CancelFunc
pdCluster *tests.TestCluster
tsoCluster *mcs.TestTSOCluster
tsoCluster *tests.TestTSOCluster
}

func TestTSOAPI(t *testing.T) {
Expand All @@ -70,7 +69,7 @@ func (suite *tsoAPITestSuite) SetupTest() {
leaderName := suite.pdCluster.WaitLeader()
pdLeaderServer := suite.pdCluster.GetServer(leaderName)
re.NoError(pdLeaderServer.BootstrapCluster())
suite.tsoCluster, err = mcs.NewTestTSOCluster(suite.ctx, 1, pdLeaderServer.GetAddr())
suite.tsoCluster, err = tests.NewTestTSOCluster(suite.ctx, 1, pdLeaderServer.GetAddr())
re.NoError(err)
}

Expand Down Expand Up @@ -124,10 +123,10 @@ func TestTSOServerStartFirst(t *testing.T) {
addr := apiCluster.GetConfig().GetClientURL()
ch := make(chan struct{})
defer close(ch)
clusterCh := make(chan *mcs.TestTSOCluster)
clusterCh := make(chan *tests.TestTSOCluster)
defer close(clusterCh)
go func() {
tsoCluster, err := mcs.NewTestTSOCluster(ctx, 2, addr)
tsoCluster, err := tests.NewTestTSOCluster(ctx, 2, addr)
re.NoError(err)
primary := tsoCluster.WaitForDefaultPrimaryServing(re)
re.NotNil(primary)
Expand Down
8 changes: 4 additions & 4 deletions tests/integrations/mcs/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type tsoKeyspaceGroupManagerTestSuite struct {
// pdLeaderServer is the leader server of the PD cluster.
pdLeaderServer *tests.TestServer
// tsoCluster is the TSO service cluster.
tsoCluster *mcs.TestTSOCluster
tsoCluster *tests.TestTSOCluster
}

func TestTSOKeyspaceGroupManager(t *testing.T) {
Expand All @@ -74,7 +74,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) SetupSuite() {
leaderName := suite.cluster.WaitLeader()
suite.pdLeaderServer = suite.cluster.GetServer(leaderName)
re.NoError(suite.pdLeaderServer.BootstrapCluster())
suite.tsoCluster, err = mcs.NewTestTSOCluster(suite.ctx, 2, suite.pdLeaderServer.GetAddr())
suite.tsoCluster, err = tests.NewTestTSOCluster(suite.ctx, 2, suite.pdLeaderServer.GetAddr())
re.NoError(err)
}

Expand Down Expand Up @@ -515,7 +515,7 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) {
leaderServer := tc.GetServer(tc.GetLeader())
re.NoError(leaderServer.BootstrapCluster())

tsoCluster, err := mcs.NewTestTSOCluster(ctx, 2, pdAddr)
tsoCluster, err := tests.NewTestTSOCluster(ctx, 2, pdAddr)
re.NoError(err)
defer tsoCluster.Destroy()
tsoCluster.WaitForDefaultPrimaryServing(re)
Expand Down Expand Up @@ -707,7 +707,7 @@ func TestGetTSOImmediately(t *testing.T) {
leaderServer := tc.GetServer(tc.GetLeader())
re.NoError(leaderServer.BootstrapCluster())

tsoCluster, err := mcs.NewTestTSOCluster(ctx, 2, pdAddr)
tsoCluster, err := tests.NewTestTSOCluster(ctx, 2, pdAddr)
re.NoError(err)
defer tsoCluster.Destroy()
tsoCluster.WaitForDefaultPrimaryServing(re)
Expand Down

0 comments on commit e18b9be

Please sign in to comment.