Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pd-ctl, tests: impl the merge all keyspace groups command #6782

Merged
merged 7 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 0 additions & 33 deletions tests/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/id"
"github.com/tikv/pd/pkg/keyspace"
tsoserver "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/swaggerserver"
Expand Down Expand Up @@ -86,38 +85,6 @@ func NewTestAPIServer(ctx context.Context, cfg *config.Config) (*TestServer, err
return createTestServer(ctx, cfg, []string{utils.APIServiceName})
}

// StartSingleTSOTestServer creates and starts a tso server with default config for testing.
func StartSingleTSOTestServer(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*tsoserver.Server, func(), error) {
cfg := tsoserver.NewConfig()
cfg.BackendEndpoints = backendEndpoints
cfg.ListenAddr = listenAddrs
cfg, err := tsoserver.GenerateConfig(cfg)
re.NoError(err)
// Setup the logger.
err = logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog)
if err != nil {
return nil, nil, err
}
zapLogOnce.Do(func() {
log.ReplaceGlobals(cfg.Logger, cfg.LogProps)
})
re.NoError(err)
return NewTSOTestServer(ctx, cfg)
}

// NewTSOTestServer creates a tso server with given config for testing.
func NewTSOTestServer(ctx context.Context, cfg *tsoserver.Config) (*tsoserver.Server, testutil.CleanupFunc, error) {
s := tsoserver.CreateServer(ctx, cfg)
if err := s.Run(); err != nil {
return nil, nil, err
}
cleanup := func() {
s.Close()
os.RemoveAll(cfg.DataDir)
}
return s, cleanup, nil
}

func createTestServer(ctx context.Context, cfg *config.Config, services []string) (*TestServer, error) {
err := logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog)
if err != nil {
Expand Down
11 changes: 5 additions & 6 deletions tests/integrations/mcs/discovery/register_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/tikv/pd/pkg/utils/tempurl"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/tests"
"github.com/tikv/pd/tests/integrations/mcs"
"go.uber.org/goleak"
)

Expand Down Expand Up @@ -97,7 +96,7 @@ func (suite *serverRegisterTestSuite) checkServerRegister(serviceName string) {
re.Equal(addr, returnedEntry.ServiceAddr)

// test primary when only one server
expectedPrimary := mcs.WaitForPrimaryServing(suite.Require(), map[string]bs.Server{addr: s})
expectedPrimary := tests.WaitForPrimaryServing(suite.Require(), map[string]bs.Server{addr: s})
primary, exist := suite.pdLeader.GetServer().GetServicePrimaryAddr(suite.ctx, serviceName)
re.True(exist)
re.Equal(primary, expectedPrimary)
Expand Down Expand Up @@ -131,15 +130,15 @@ func (suite *serverRegisterTestSuite) checkServerPrimaryChange(serviceName strin
serverMap[s.GetAddr()] = s
}

expectedPrimary := mcs.WaitForPrimaryServing(suite.Require(), serverMap)
expectedPrimary := tests.WaitForPrimaryServing(suite.Require(), serverMap)
primary, exist = suite.pdLeader.GetServer().GetServicePrimaryAddr(suite.ctx, serviceName)
re.True(exist)
re.Equal(expectedPrimary, primary)
// close old primary
serverMap[primary].Close()
delete(serverMap, primary)

expectedPrimary = mcs.WaitForPrimaryServing(suite.Require(), serverMap)
expectedPrimary = tests.WaitForPrimaryServing(suite.Require(), serverMap)
// test API server discovery
client := suite.pdLeader.GetEtcdClient()
endpoints, err := discovery.Discover(client, suite.clusterID, serviceName)
Expand All @@ -156,9 +155,9 @@ func (suite *serverRegisterTestSuite) addServer(serviceName string) (bs.Server,
re := suite.Require()
switch serviceName {
case utils.TSOServiceName:
return mcs.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc())
return tests.StartSingleTSOTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc())
case utils.ResourceManagerServiceName:
return mcs.StartSingleResourceManagerTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc())
return tests.StartSingleResourceManagerTestServer(suite.ctx, re, suite.backendEndpoints, tempurl.Alloc())
default:
return nil, nil
}
Expand Down
21 changes: 10 additions & 11 deletions tests/integrations/mcs/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/server/apiv2/handlers"
"github.com/tikv/pd/tests"
"github.com/tikv/pd/tests/integrations/mcs"
)

const (
Expand Down Expand Up @@ -86,11 +85,11 @@ func (suite *keyspaceGroupTestSuite) TestAllocNodesUpdate() {
// add three nodes.
nodes := make(map[string]bs.Server)
for i := 0; i < utils.DefaultKeyspaceGroupReplicaCount+1; i++ {
s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc())
s, cleanup := tests.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc())
defer cleanup()
nodes[s.GetAddr()] = s
}
mcs.WaitForPrimaryServing(suite.Require(), nodes)
tests.WaitForPrimaryServing(suite.Require(), nodes)

// create a keyspace group.
kgs := &handlers.CreateKeyspaceGroupParams{KeyspaceGroups: []*endpoint.KeyspaceGroup{
Expand Down Expand Up @@ -135,11 +134,11 @@ func (suite *keyspaceGroupTestSuite) TestAllocNodesUpdate() {
func (suite *keyspaceGroupTestSuite) TestAllocReplica() {
nodes := make(map[string]bs.Server)
for i := 0; i < utils.DefaultKeyspaceGroupReplicaCount; i++ {
s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc())
s, cleanup := tests.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc())
defer cleanup()
nodes[s.GetAddr()] = s
}
mcs.WaitForPrimaryServing(suite.Require(), nodes)
tests.WaitForPrimaryServing(suite.Require(), nodes)

// miss replica.
id := 1
Expand Down Expand Up @@ -188,10 +187,10 @@ func (suite *keyspaceGroupTestSuite) TestAllocReplica() {
suite.Equal(http.StatusBadRequest, code)

// the keyspace group is exist, the new replica is more than the old replica.
s2, cleanup2 := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc())
s2, cleanup2 := tests.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc())
defer cleanup2()
nodes[s2.GetAddr()] = s2
mcs.WaitForPrimaryServing(suite.Require(), nodes)
tests.WaitForPrimaryServing(suite.Require(), nodes)
params = &handlers.AllocNodesForKeyspaceGroupParams{
Replica: utils.DefaultKeyspaceGroupReplicaCount + 1,
}
Expand Down Expand Up @@ -228,12 +227,12 @@ func (suite *keyspaceGroupTestSuite) TestSetNodes() {
nodes := make(map[string]bs.Server)
nodesList := []string{}
for i := 0; i < utils.DefaultKeyspaceGroupReplicaCount; i++ {
s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc())
s, cleanup := tests.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc())
defer cleanup()
nodes[s.GetAddr()] = s
nodesList = append(nodesList, s.GetAddr())
}
mcs.WaitForPrimaryServing(suite.Require(), nodes)
tests.WaitForPrimaryServing(suite.Require(), nodes)

// the keyspace group is not exist.
id := 1
Expand Down Expand Up @@ -288,11 +287,11 @@ func (suite *keyspaceGroupTestSuite) TestSetNodes() {
func (suite *keyspaceGroupTestSuite) TestDefaultKeyspaceGroup() {
nodes := make(map[string]bs.Server)
for i := 0; i < utils.DefaultKeyspaceGroupReplicaCount; i++ {
s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc())
s, cleanup := tests.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc())
defer cleanup()
nodes[s.GetAddr()] = s
}
mcs.WaitForPrimaryServing(suite.Require(), nodes)
tests.WaitForPrimaryServing(suite.Require(), nodes)

// the default keyspace group is exist.
var kg *endpoint.KeyspaceGroup
Expand Down
3 changes: 1 addition & 2 deletions tests/integrations/mcs/resourcemanager/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/tikv/pd/client/grpcutil"
"github.com/tikv/pd/pkg/utils/tempurl"
"github.com/tikv/pd/tests"
"github.com/tikv/pd/tests/integrations/mcs"
)

func TestResourceManagerServer(t *testing.T) {
Expand All @@ -45,7 +44,7 @@ func TestResourceManagerServer(t *testing.T) {
leaderName := cluster.WaitLeader()
leader := cluster.GetServer(leaderName)

s, cleanup := mcs.StartSingleResourceManagerTestServer(ctx, re, leader.GetAddr(), tempurl.Alloc())
s, cleanup := tests.StartSingleResourceManagerTestServer(ctx, re, leader.GetAddr(), tempurl.Alloc())
addr := s.GetAddr()
defer cleanup()

Expand Down
94 changes: 0 additions & 94 deletions tests/integrations/mcs/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,38 +16,14 @@ package mcs

import (
"context"
"os"
"sync"
"time"

"github.com/pingcap/log"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
bs "github.com/tikv/pd/pkg/basicserver"
rm "github.com/tikv/pd/pkg/mcs/resourcemanager/server"
tso "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
)

var once sync.Once

// InitLogger initializes the logger for test.
func InitLogger(cfg *tso.Config) (err error) {
once.Do(func() {
// Setup the logger.
err = logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog)
if err != nil {
return
}
log.ReplaceGlobals(cfg.Logger, cfg.LogProps)
// Flushing any buffered log entries.
log.Sync()
})
return err
}

// SetupClientWithAPIContext creates a TSO client with api context name for test.
func SetupClientWithAPIContext(
ctx context.Context, re *require.Assertions, apiCtx pd.APIContext, endpoints []string, opts ...pd.ClientOption,
Expand All @@ -67,76 +43,6 @@ func SetupClientWithKeyspaceID(
return cli
}

// StartSingleResourceManagerTestServer creates and starts a resource manager server with default config for testing.
func StartSingleResourceManagerTestServer(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*rm.Server, func()) {
cfg := rm.NewConfig()
cfg.BackendEndpoints = backendEndpoints
cfg.ListenAddr = listenAddrs
cfg, err := rm.GenerateConfig(cfg)
re.NoError(err)

s, cleanup, err := rm.NewTestServer(ctx, re, cfg)
re.NoError(err)
testutil.Eventually(re, func() bool {
return !s.IsClosed()
}, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond))

return s, cleanup
}

// StartSingleTSOTestServerWithoutCheck creates and starts a tso server with default config for testing.
func StartSingleTSOTestServerWithoutCheck(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*tso.Server, func(), error) {
cfg := tso.NewConfig()
cfg.BackendEndpoints = backendEndpoints
cfg.ListenAddr = listenAddrs
cfg, err := tso.GenerateConfig(cfg)
re.NoError(err)
// Setup the logger.
err = InitLogger(cfg)
re.NoError(err)
return NewTSOTestServer(ctx, cfg)
}

// StartSingleTSOTestServer creates and starts a tso server with default config for testing.
func StartSingleTSOTestServer(ctx context.Context, re *require.Assertions, backendEndpoints, listenAddrs string) (*tso.Server, func()) {
s, cleanup, err := StartSingleTSOTestServerWithoutCheck(ctx, re, backendEndpoints, listenAddrs)
re.NoError(err)
testutil.Eventually(re, func() bool {
return !s.IsClosed()
}, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond))

return s, cleanup
}

// NewTSOTestServer creates a tso server with given config for testing.
func NewTSOTestServer(ctx context.Context, cfg *tso.Config) (*tso.Server, testutil.CleanupFunc, error) {
s := tso.CreateServer(ctx, cfg)
if err := s.Run(); err != nil {
return nil, nil, err
}
cleanup := func() {
s.Close()
os.RemoveAll(cfg.DataDir)
}
return s, cleanup, nil
}

// WaitForPrimaryServing waits for one of servers being elected to be the primary/leader
func WaitForPrimaryServing(re *require.Assertions, serverMap map[string]bs.Server) string {
var primary string
testutil.Eventually(re, func() bool {
for name, s := range serverMap {
if s.IsServing() {
primary = name
return true
}
}
return false
}, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond))

return primary
}

// WaitForTSOServiceAvailable waits for the pd client being served by the tso server side
func WaitForTSOServiceAvailable(
ctx context.Context, re *require.Assertions, client pd.Client,
Expand Down
9 changes: 4 additions & 5 deletions tests/integrations/mcs/tso/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/tests"
"github.com/tikv/pd/tests/integrations/mcs"
)

const (
Expand All @@ -51,7 +50,7 @@ type tsoAPITestSuite struct {
ctx context.Context
cancel context.CancelFunc
pdCluster *tests.TestCluster
tsoCluster *mcs.TestTSOCluster
tsoCluster *tests.TestTSOCluster
}

func TestTSOAPI(t *testing.T) {
Expand All @@ -70,7 +69,7 @@ func (suite *tsoAPITestSuite) SetupTest() {
leaderName := suite.pdCluster.WaitLeader()
pdLeaderServer := suite.pdCluster.GetServer(leaderName)
re.NoError(pdLeaderServer.BootstrapCluster())
suite.tsoCluster, err = mcs.NewTestTSOCluster(suite.ctx, 1, pdLeaderServer.GetAddr())
suite.tsoCluster, err = tests.NewTestTSOCluster(suite.ctx, 1, pdLeaderServer.GetAddr())
re.NoError(err)
}

Expand Down Expand Up @@ -124,10 +123,10 @@ func TestTSOServerStartFirst(t *testing.T) {
addr := apiCluster.GetConfig().GetClientURL()
ch := make(chan struct{})
defer close(ch)
clusterCh := make(chan *mcs.TestTSOCluster)
clusterCh := make(chan *tests.TestTSOCluster)
defer close(clusterCh)
go func() {
tsoCluster, err := mcs.NewTestTSOCluster(ctx, 2, addr)
tsoCluster, err := tests.NewTestTSOCluster(ctx, 2, addr)
re.NoError(err)
primary := tsoCluster.WaitForDefaultPrimaryServing(re)
re.NotNil(primary)
Expand Down
8 changes: 4 additions & 4 deletions tests/integrations/mcs/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type tsoKeyspaceGroupManagerTestSuite struct {
// pdLeaderServer is the leader server of the PD cluster.
pdLeaderServer *tests.TestServer
// tsoCluster is the TSO service cluster.
tsoCluster *mcs.TestTSOCluster
tsoCluster *tests.TestTSOCluster
}

func TestTSOKeyspaceGroupManager(t *testing.T) {
Expand All @@ -74,7 +74,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) SetupSuite() {
leaderName := suite.cluster.WaitLeader()
suite.pdLeaderServer = suite.cluster.GetServer(leaderName)
re.NoError(suite.pdLeaderServer.BootstrapCluster())
suite.tsoCluster, err = mcs.NewTestTSOCluster(suite.ctx, 2, suite.pdLeaderServer.GetAddr())
suite.tsoCluster, err = tests.NewTestTSOCluster(suite.ctx, 2, suite.pdLeaderServer.GetAddr())
re.NoError(err)
}

Expand Down Expand Up @@ -515,7 +515,7 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) {
leaderServer := tc.GetServer(tc.GetLeader())
re.NoError(leaderServer.BootstrapCluster())

tsoCluster, err := mcs.NewTestTSOCluster(ctx, 2, pdAddr)
tsoCluster, err := tests.NewTestTSOCluster(ctx, 2, pdAddr)
re.NoError(err)
defer tsoCluster.Destroy()
tsoCluster.WaitForDefaultPrimaryServing(re)
Expand Down Expand Up @@ -707,7 +707,7 @@ func TestGetTSOImmediately(t *testing.T) {
leaderServer := tc.GetServer(tc.GetLeader())
re.NoError(leaderServer.BootstrapCluster())

tsoCluster, err := mcs.NewTestTSOCluster(ctx, 2, pdAddr)
tsoCluster, err := tests.NewTestTSOCluster(ctx, 2, pdAddr)
re.NoError(err)
defer tsoCluster.Destroy()
tsoCluster.WaitForDefaultPrimaryServing(re)
Expand Down
Loading