Skip to content

Commit

Permalink
Add TestUpgradingAPIandTSOClusters
Browse files Browse the repository at this point in the history
Add TestUpgradingAPIandTSOClusters to test the scenario that after we restart the API cluster
then restart the TSO cluster, the TSO service can still serve TSO requests normally.

Signed-off-by: Bin Shi <binshi.bing@gmail.com>
  • Loading branch information
binshi-bing committed May 29, 2023
1 parent a432973 commit 3a74c2a
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 3 deletions.
68 changes: 65 additions & 3 deletions tests/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,8 +452,8 @@ func createTestCluster(ctx context.Context, initialServerCount int, isAPIService
schedulers.Register()
config := newClusterConfig(initialServerCount)
servers := make(map[string]*TestServer)
for _, conf := range config.InitialServers {
serverConf, err := conf.Generate(opts...)
for _, cfg := range config.InitialServers {
serverConf, err := cfg.Generate(opts...)
if err != nil {
return nil, err
}
Expand All @@ -466,7 +466,7 @@ func createTestCluster(ctx context.Context, initialServerCount int, isAPIService
if err != nil {
return nil, err
}
servers[conf.Name] = s
servers[cfg.Name] = s
}
return &TestCluster{
config: config,
Expand All @@ -480,6 +480,68 @@ func createTestCluster(ctx context.Context, initialServerCount int, isAPIService
}, nil
}

// RestartTestAPICluster restarts the API test cluster.
func RestartTestAPICluster(ctx context.Context, cluster *TestCluster) (*TestCluster, error) {
return restartTestCluster(ctx, cluster, true)
}

func restartTestCluster(
ctx context.Context, cluster *TestCluster, isAPIServiceMode bool,
) (newTestCluster *TestCluster, err error) {
schedulers.Register()
newTestCluster = &TestCluster{
config: cluster.config,
servers: make(map[string]*TestServer, len(cluster.servers)),
tsPool: struct {
sync.Mutex
pool map[uint64]struct{}
}{
pool: make(map[uint64]struct{}),
},
}

var serverMap sync.Map
var errorMap sync.Map
wg := sync.WaitGroup{}
for serverName, server := range newTestCluster.servers {
serverCfg := server.GetConfig()
wg.Add(1)
go func(serverName string, server *TestServer) {
defer wg.Done()
server.Destroy()
var (
newServer *TestServer
serverErr error
)
if isAPIServiceMode {
newServer, serverErr = NewTestAPIServer(ctx, serverCfg)
} else {
newServer, serverErr = NewTestServer(ctx, serverCfg)

Check warning on line 519 in tests/cluster.go

View check run for this annotation

Codecov / codecov/patch

tests/cluster.go#L507-L519

Added lines #L507 - L519 were not covered by tests
}
serverMap.Store(serverName, newServer)
errorMap.Store(serverName, serverErr)

Check warning on line 522 in tests/cluster.go

View check run for this annotation

Codecov / codecov/patch

tests/cluster.go#L521-L522

Added lines #L521 - L522 were not covered by tests
}(serverName, server)
}
wg.Wait()

errorMap.Range(func(key, value interface{}) bool {
if value != nil {
err = value.(error)
return false

Check warning on line 530 in tests/cluster.go

View check run for this annotation

Codecov / codecov/patch

tests/cluster.go#L528-L530

Added lines #L528 - L530 were not covered by tests
}
serverName := key.(string)
newServer, _ := serverMap.Load(serverName)
newTestCluster.servers[serverName] = newServer.(*TestServer)
return true

Check warning on line 535 in tests/cluster.go

View check run for this annotation

Codecov / codecov/patch

tests/cluster.go#L532-L535

Added lines #L532 - L535 were not covered by tests
})

if err != nil {
return nil, errors.New("failed to restart cluster. " + err.Error())

Check warning on line 539 in tests/cluster.go

View check run for this annotation

Codecov / codecov/patch

tests/cluster.go#L539

Added line #L539 was not covered by tests
}

return newTestCluster, nil
}

// RunServer starts to run TestServer.
func (c *TestCluster) RunServer(server *TestServer) <-chan error {
resC := make(chan error)
Expand Down
51 changes: 51 additions & 0 deletions tests/integrations/mcs/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package mcs
import (
"context"
"fmt"
"sync"
"time"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -53,6 +54,56 @@ func NewTestTSOCluster(ctx context.Context, initialServerCount int, backendEndpo
return tc, nil
}

// RestartTestTSOCluster restarts the TSO test cluster.
func RestartTestTSOCluster(
ctx context.Context, cluster *TestTSOCluster,
) (newCluster *TestTSOCluster, err error) {
newCluster = &TestTSOCluster{
ctx: ctx,
backendEndpoints: cluster.backendEndpoints,
servers: make(map[string]*tso.Server, len(cluster.servers)),
cleanupFuncs: make(map[string]testutil.CleanupFunc, len(cluster.servers)),
}
var (
serverMap sync.Map
cleanupMap sync.Map
errorMap sync.Map
)
wg := sync.WaitGroup{}
for addr, cleanup := range cluster.cleanupFuncs {
wg.Add(1)
go func(addr string, clean testutil.CleanupFunc) {
defer wg.Done()
clean()
serverCfg := cluster.servers[addr].GetConfig()
newServer, newCleanup, err := NewTSOTestServer(newCluster.ctx, serverCfg)
serverMap.Store(addr, newServer)
cleanupMap.Store(addr, newCleanup)
errorMap.Store(addr, err)
}(addr, cleanup)
}
wg.Wait()

errorMap.Range(func(key, value interface{}) bool {
if value != nil {
err = value.(error)
return false

Check warning on line 90 in tests/integrations/mcs/cluster.go

View check run for this annotation

Codecov / codecov/patch

tests/integrations/mcs/cluster.go#L89-L90

Added lines #L89 - L90 were not covered by tests
}
addr := key.(string)
newServer, _ := serverMap.Load(addr)
newCleanup, _ := cleanupMap.Load(addr)
newCluster.servers[addr] = newServer.(*tso.Server)
newCluster.cleanupFuncs[addr] = newCleanup.(testutil.CleanupFunc)
return true
})

if err != nil {
return nil, fmt.Errorf("failed to restart the cluster." + err.Error())

Check warning on line 101 in tests/integrations/mcs/cluster.go

View check run for this annotation

Codecov / codecov/patch

tests/integrations/mcs/cluster.go#L101

Added line #L101 was not covered by tests
}

return newCluster, nil
}

// AddServer adds a new TSO server to the test cluster.
func (tc *TestTSOCluster) AddServer(addr string) error {
cfg := tso.NewConfig()
Expand Down
47 changes: 47 additions & 0 deletions tests/integrations/tso/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,12 +451,59 @@ func TestMixedTSODeployment(t *testing.T) {
wg.Wait()
}

// TestUpgradingAPIandTSOClusters tests the scenario that after we restart the API cluster
// then restart the TSO cluster, the TSO service can still serve TSO requests normally.
func TestUpgradingAPIandTSOClusters(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())

// Create an API cluster which has 3 servers
apiCluster, err := tests.NewTestAPICluster(ctx, 3)
re.NoError(err)
err = apiCluster.RunInitialServers()
re.NoError(err)
leaderName := apiCluster.WaitLeader()
pdLeader := apiCluster.GetServer(leaderName)
backendEndpoints := pdLeader.GetAddr()

// Create a pd client in PD mode to let the API leader to forward requests to the TSO cluster.
re.NoError(failpoint.Enable("github.com/tikv/pd/client/usePDServiceMode", "return(true)"))
pdClient, err := pd.NewClientWithContext(context.Background(),
[]string{backendEndpoints}, pd.SecurityOption{}, pd.WithMaxErrorRetry(1))
re.NoError(err)

// Create a TSO cluster which has 2 servers
tsoCluster, err := mcs.NewTestTSOCluster(ctx, 2, backendEndpoints)
re.NoError(err)
tsoCluster.WaitForDefaultPrimaryServing(re)
// The TSO service should be eventually healthy
mcs.WaitForTSOServiceAvailable(ctx, re, pdClient)

// Restart the API cluster
apiCluster, err = tests.RestartTestAPICluster(ctx, apiCluster)
re.NoError(err)
// The TSO service should be eventually healthy
mcs.WaitForTSOServiceAvailable(ctx, re, pdClient)

// Restart the TSO cluster
tsoCluster, err = mcs.RestartTestTSOCluster(ctx, tsoCluster)
re.NoError(err)
// The TSO service should be eventually healthy
mcs.WaitForTSOServiceAvailable(ctx, re, pdClient)

tsoCluster.Destroy()
apiCluster.Destroy()
cancel()
re.NoError(failpoint.Disable("github.com/tikv/pd/client/usePDServiceMode"))
}

func checkTSO(ctx context.Context, re *require.Assertions, wg *sync.WaitGroup, backendEndpoints string) {
wg.Add(tsoRequestConcurrencyNumber)
for i := 0; i < tsoRequestConcurrencyNumber; i++ {
go func() {
defer wg.Done()
cli := mcs.SetupClientWithAPIContext(ctx, re, pd.NewAPIContextV1(), strings.Split(backendEndpoints, ","))
defer cli.Close()
var ts, lastTS uint64
for {
select {
Expand Down

0 comments on commit 3a74c2a

Please sign in to comment.