Skip to content

Commit

Permalink
fix test suite race
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Jul 11, 2023
1 parent 3cce629 commit 2304649
Showing 1 changed file with 16 additions and 13 deletions.
29 changes: 16 additions & 13 deletions tests/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/join"
"go.etcd.io/etcd/clientv3"
"go.uber.org/atomic"
)

// TestServer states.
Expand All @@ -70,7 +71,7 @@ type TestServer struct {
sync.RWMutex
server *server.Server
grpcServer *server.GrpcServer
state int32
state atomic.Int32
}

var zapLogOnce sync.Once
Expand Down Expand Up @@ -106,47 +107,49 @@ func createTestServer(ctx context.Context, cfg *config.Config, services []string
return &TestServer{
server: svr,
grpcServer: &server.GrpcServer{Server: svr},
state: Initial,
state: *atomic.NewInt32(Initial),
}, nil
}

// Run starts to run a TestServer.
func (s *TestServer) Run() error {
s.Lock()
defer s.Unlock()
if s.state != Initial && s.state != Stop {
return errors.Errorf("server(state%d) cannot run", s.state)
state := s.state.Load()
if state != Initial && state != Stop {
return errors.Errorf("server(state%d) cannot run", state)
}
if err := s.server.Run(); err != nil {
return err
}
s.state = Running
s.state.Store(Running)
return nil
}

// Stop is used to stop a TestServer.
func (s *TestServer) Stop() error {
s.Lock()
defer s.Unlock()
if s.state != Running {
return errors.Errorf("server(state%d) cannot stop", s.state)
state := s.state.Load()
if state != Running {
return errors.Errorf("server(state%d) cannot stop", state)
}
s.server.Close()
s.state = Stop
s.state.Store(Stop)
return nil
}

// Destroy is used to destroy a TestServer.
func (s *TestServer) Destroy() error {
s.Lock()
defer s.Unlock()
if s.state == Running {
if s.state.Load() == Running {
s.server.Close()
}
if err := os.RemoveAll(s.server.GetConfig().DataDir); err != nil {
return err
}
s.state = Destroy
s.state.Store(Destroy)
return nil
}

Expand All @@ -162,7 +165,7 @@ func (s *TestServer) ResignLeader() error {
func (s *TestServer) State() int32 {
s.RLock()
defer s.RUnlock()
return s.state
return s.state.Load()
}

// GetConfig returns the current TestServer's configuration.
Expand Down Expand Up @@ -628,7 +631,7 @@ func (c *TestCluster) WaitLeader(ops ...WaitOption) string {
counter := make(map[string]int)
running := 0
for _, s := range c.servers {
if s.state == Running {
if s.state.Load() == Running {
running++
}
n := s.GetLeader().GetName()
Expand Down Expand Up @@ -695,7 +698,7 @@ func (c *TestCluster) WaitAllocatorLeader(dcLocation string, ops ...WaitOption)
counter := make(map[string]int)
running := 0
for _, s := range c.servers {
if s.state == Running && s.GetTSOAllocatorManager().EnableLocalTSO() {
if s.state.Load() == Running && s.GetTSOAllocatorManager().EnableLocalTSO() {
running++
}
serverName := s.GetAllocatorLeader(dcLocation).GetName()
Expand Down

0 comments on commit 2304649

Please sign in to comment.