Skip to content

Commit

Permalink
*: simplify WaitUntil usage (#4613)
Browse files Browse the repository at this point in the history
* *: simplify WaitUntil usage

ref #4399

Signed-off-by: disksing <i@disksing.com>

* minor fix

Signed-off-by: disksing <i@disksing.com>
  • Loading branch information
disksing committed Jan 26, 2022
1 parent 4f5a78a commit c1d1ecd
Show file tree
Hide file tree
Showing 18 changed files with 48 additions and 48 deletions.
2 changes: 1 addition & 1 deletion client/option_test.go
Expand Up @@ -47,7 +47,7 @@ func (s *testClientSuite) TestDynamicOptionChange(c *C) {
expectBool := true
o.setEnableTSOFollowerProxy(expectBool)
// Check the value changing notification.
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
<-o.enableTSOFollowerProxyCh
return true
})
Expand Down
6 changes: 3 additions & 3 deletions pkg/mock/mockhbstream/mockhbstream_test.go
Expand Up @@ -66,13 +66,13 @@ func (s *testHeartbeatStreamSuite) TestActivity(c *C) {

// Active stream is stream1.
hbs.BindStream(1, stream1)
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
hbs.SendMsg(region, proto.Clone(msg).(*pdpb.RegionHeartbeatResponse))
return stream1.Recv() != nil && stream2.Recv() == nil
})
// Rebind to stream2.
hbs.BindStream(1, stream2)
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
hbs.SendMsg(region, proto.Clone(msg).(*pdpb.RegionHeartbeatResponse))
return stream1.Recv() == nil && stream2.Recv() != nil
})
Expand All @@ -83,7 +83,7 @@ func (s *testHeartbeatStreamSuite) TestActivity(c *C) {
c.Assert(res.GetHeader().GetError(), NotNil)
// Switch back to 1 again.
hbs.BindStream(1, stream1)
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
hbs.SendMsg(region, proto.Clone(msg).(*pdpb.RegionHeartbeatResponse))
return stream1.Recv() != nil && stream2.Recv() == nil
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/testutil/testutil.go
Expand Up @@ -31,7 +31,7 @@ const (

// CheckFunc is a condition checker that passed to WaitUntil. Its implementation
// may call c.Fatal() to abort the test, or c.Log() to add more information.
type CheckFunc func(c *check.C) bool
type CheckFunc func() bool

// WaitOp represents available options when execute WaitUntil
type WaitOp struct {
Expand Down Expand Up @@ -63,7 +63,7 @@ func WaitUntil(c *check.C, f CheckFunc, opts ...WaitOption) {
opt(option)
}
for i := 0; i < option.retryTimes; i++ {
if f(c) {
if f() {
return
}
time.Sleep(option.sleepInterval)
Expand Down
2 changes: 1 addition & 1 deletion server/api/server_test.go
Expand Up @@ -123,7 +123,7 @@ func mustNewCluster(c *C, num int, opts ...func(cfg *config.Config)) ([]*config.
}

func mustWaitLeader(c *C, svrs []*server.Server) {
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
var leader *pdpb.Member
for _, svr := range svrs {
l := svr.GetLeader()
Expand Down
2 changes: 1 addition & 1 deletion server/api/tso_test.go
Expand Up @@ -48,7 +48,7 @@ func (s *testTsoSuite) TearDownSuite(c *C) {
}

func (s *testTsoSuite) TestTransferAllocator(c *C) {
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
s.svr.GetTSOAllocatorManager().ClusterDCLocationChecker()
_, err := s.svr.GetTSOAllocatorManager().GetAllocator("dc-1")
return err == nil
Expand Down
12 changes: 6 additions & 6 deletions server/cluster/coordinator_test.go
Expand Up @@ -894,7 +894,7 @@ func BenchmarkPatrolRegion(b *testing.B) {
}

func waitOperator(c *C, co *coordinator, regionID uint64) {
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
return co.opController.GetOperator(regionID) != nil
})
}
Expand Down Expand Up @@ -1206,7 +1206,7 @@ func (s *testScheduleControllerSuite) TestInterval(c *C) {

func waitAddLearner(c *C, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo {
var res *pdpb.RegionHeartbeatResponse
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
if res = stream.Recv(); res != nil {
return res.GetRegionId() == region.GetID() &&
res.GetChangePeer().GetChangeType() == eraftpb.ConfChangeType_AddLearnerNode &&
Expand All @@ -1222,7 +1222,7 @@ func waitAddLearner(c *C, stream mockhbstream.HeartbeatStream, region *core.Regi

func waitPromoteLearner(c *C, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo {
var res *pdpb.RegionHeartbeatResponse
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
if res = stream.Recv(); res != nil {
return res.GetRegionId() == region.GetID() &&
res.GetChangePeer().GetChangeType() == eraftpb.ConfChangeType_AddNode &&
Expand All @@ -1239,7 +1239,7 @@ func waitPromoteLearner(c *C, stream mockhbstream.HeartbeatStream, region *core.

func waitRemovePeer(c *C, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo {
var res *pdpb.RegionHeartbeatResponse
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
if res = stream.Recv(); res != nil {
return res.GetRegionId() == region.GetID() &&
res.GetChangePeer().GetChangeType() == eraftpb.ConfChangeType_RemoveNode &&
Expand All @@ -1255,7 +1255,7 @@ func waitRemovePeer(c *C, stream mockhbstream.HeartbeatStream, region *core.Regi

func waitTransferLeader(c *C, stream mockhbstream.HeartbeatStream, region *core.RegionInfo, storeID uint64) *core.RegionInfo {
var res *pdpb.RegionHeartbeatResponse
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
if res = stream.Recv(); res != nil {
if res.GetRegionId() == region.GetID() {
for _, peer := range append(res.GetTransferLeader().GetPeers(), res.GetTransferLeader().GetPeer()) {
Expand All @@ -1273,7 +1273,7 @@ func waitTransferLeader(c *C, stream mockhbstream.HeartbeatStream, region *core.
}

func waitNoResponse(c *C, stream mockhbstream.HeartbeatStream) {
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
res := stream.Recv()
return res == nil
})
Expand Down
2 changes: 1 addition & 1 deletion server/server_test.go
Expand Up @@ -43,7 +43,7 @@ func TestMain(m *testing.M) {

func mustWaitLeader(c *C, svrs []*Server) *Server {
var leader *Server
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
for _, s := range svrs {
if !s.IsClosed() && s.member.IsLeader() {
leader = s
Expand Down
24 changes: 12 additions & 12 deletions tests/client/client_test.go
Expand Up @@ -90,7 +90,7 @@ func (s *clientTestSuite) TestClientLeaderChange(c *C) {
cli := setupCli(c, s.ctx, endpoints)

var ts1, ts2 uint64
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
p1, l1, err := cli.GetTS(context.TODO())
if err == nil {
ts1 = tsoutil.ComposeTS(p1, l1)
Expand All @@ -111,7 +111,7 @@ func (s *clientTestSuite) TestClientLeaderChange(c *C) {
waitLeader(c, cli.(client), cluster.GetServer(leader).GetConfig().ClientUrls)

// Check TS won't fall back after leader changed.
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
p2, l2, err := cli.GetTS(context.TODO())
if err == nil {
ts2 = tsoutil.ComposeTS(p2, l2)
Expand Down Expand Up @@ -140,7 +140,7 @@ func (s *clientTestSuite) TestLeaderTransfer(c *C) {
cli := setupCli(c, s.ctx, endpoints)

var lastTS uint64
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
physical, logical, err := cli.GetTS(context.TODO())
if err == nil {
lastTS = tsoutil.ComposeTS(physical, logical)
Expand Down Expand Up @@ -217,7 +217,7 @@ func (s *clientTestSuite) TestTSOAllocatorLeader(c *C) {
var allocatorLeaderMap = make(map[string]string)
for _, dcLocation := range dcLocationConfig {
var pdName string
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
pdName = cluster.WaitAllocatorLeader(dcLocation)
return len(pdName) > 0
})
Expand Down Expand Up @@ -423,7 +423,7 @@ func (s *clientTestSuite) TestGetTsoFromFollowerClient1(c *C) {

c.Assert(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork", "return(true)"), IsNil)
var lastTS uint64
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
physical, logical, err := cli.GetTS(context.TODO())
if err == nil {
lastTS = tsoutil.ComposeTS(physical, logical)
Expand Down Expand Up @@ -451,7 +451,7 @@ func (s *clientTestSuite) TestGetTsoFromFollowerClient2(c *C) {

c.Assert(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork", "return(true)"), IsNil)
var lastTS uint64
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
physical, logical, err := cli.GetTS(context.TODO())
if err == nil {
lastTS = tsoutil.ComposeTS(physical, logical)
Expand Down Expand Up @@ -506,7 +506,7 @@ func setupCli(c *C, ctx context.Context, endpoints []string, opts ...pd.ClientOp
}

func waitLeader(c *C, cli client, leader string) {
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
cli.ScheduleCheckLeader()
return cli.GetLeaderAddr() == leader
})
Expand Down Expand Up @@ -721,7 +721,7 @@ func (s *testClientSuite) TestGetRegion(c *C) {
err := s.regionHeartbeat.Send(req)
c.Assert(err, IsNil)

testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
r, err := s.client.GetRegion(context.Background(), []byte("a"))
c.Assert(err, IsNil)
if r == nil {
Expand Down Expand Up @@ -759,7 +759,7 @@ func (s *testClientSuite) TestGetPrevRegion(c *C) {
}
time.Sleep(500 * time.Millisecond)
for i := 0; i < 20; i++ {
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
r, err := s.client.GetPrevRegion(context.Background(), []byte{byte(i)})
c.Assert(err, IsNil)
if i > 0 && i < regionLen {
Expand Down Expand Up @@ -798,7 +798,7 @@ func (s *testClientSuite) TestScanRegions(c *C) {
}

// Wait for region heartbeats.
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
scanRegions, err := s.client.ScanRegions(context.Background(), []byte{0}, nil, 10)
return err == nil && len(scanRegions) == 10
})
Expand Down Expand Up @@ -865,7 +865,7 @@ func (s *testClientSuite) TestGetRegionByID(c *C) {
err := s.regionHeartbeat.Send(req)
c.Assert(err, IsNil)

testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
r, err := s.client.GetRegionByID(context.Background(), regionID)
c.Assert(err, IsNil)
if r == nil {
Expand Down Expand Up @@ -1146,7 +1146,7 @@ func (s *testClientSuite) TestScatterRegion(c *C) {
err := s.regionHeartbeat.Send(req)
regionsID := []uint64{regionID}
c.Assert(err, IsNil)
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
scatterResp, err := s.client.ScatterRegions(context.Background(), regionsID, pd.WithGroup("test"), pd.WithRetry(1))
if c.Check(err, NotNil) {
return false
Expand Down
2 changes: 1 addition & 1 deletion tests/cluster.go
Expand Up @@ -611,7 +611,7 @@ func (c *TestCluster) WaitAllLeaders(testC *check.C, dcLocations map[string]stri
for _, dcLocation := range dcLocations {
wg.Add(1)
go func(dc string) {
testutil.WaitUntil(testC, func(testC *check.C) bool {
testutil.WaitUntil(testC, func() bool {
leaderName := c.WaitAllocatorLeader(dc)
return leaderName != ""
})
Expand Down
4 changes: 2 additions & 2 deletions tests/pdctl/member/member_test.go
Expand Up @@ -74,7 +74,7 @@ func (s *memberTestSuite) TestMember(c *C) {
args = []string{"-u", pdAddr, "member", "leader", "transfer", "pd2"}
_, err = pdctl.ExecuteCommand(cmd, args...)
c.Assert(err, IsNil)
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
return c.Check("pd2", Equals, svr.GetLeader().GetName())
})

Expand All @@ -84,7 +84,7 @@ func (s *memberTestSuite) TestMember(c *C) {
output, err = pdctl.ExecuteCommand(cmd, args...)
c.Assert(strings.Contains(string(output), "Success"), IsTrue)
c.Assert(err, IsNil)
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
return c.Check("pd2", Not(Equals), svr.GetLeader().GetName())
})

Expand Down
4 changes: 2 additions & 2 deletions tests/server/api/api_test.go
Expand Up @@ -87,7 +87,7 @@ func (s *serverTestSuite) TestReconnect(c *C) {
// Make sure they proxy requests to the new leader.
for name, s := range cluster.GetServers() {
if name != leader {
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
res, e := http.Get(s.GetConfig().AdvertiseClientUrls + "/pd/api/v1/version")
c.Assert(e, IsNil)
defer res.Body.Close()
Expand All @@ -103,7 +103,7 @@ func (s *serverTestSuite) TestReconnect(c *C) {
// Request will fail with no leader.
for name, s := range cluster.GetServers() {
if name != leader && name != newLeader {
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
res, err := http.Get(s.GetConfig().AdvertiseClientUrls + "/pd/api/v1/version")
c.Assert(err, IsNil)
defer res.Body.Close()
Expand Down
14 changes: 7 additions & 7 deletions tests/server/member/member_test.go
Expand Up @@ -111,7 +111,7 @@ func (s *memberTestSuite) TestMemberDelete(c *C) {
httpClient := &http.Client{Timeout: 15 * time.Second}
for _, t := range table {
c.Log(time.Now(), "try to delete:", t.path)
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
addr := leader.GetConfig().ClientUrls + "/pd/api/v1/members/" + t.path
req, err := http.NewRequest(http.MethodDelete, addr, nil)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -187,21 +187,21 @@ func (s *memberTestSuite) TestLeaderPriority(c *C) {
server1 := cluster.GetServer(leader1)
addr := server1.GetConfig().ClientUrls
// PD leader should sync with etcd leader.
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
return cluster.GetLeader() == leader1
})
// Bind a lower priority to current leader.
s.post(c, addr+"/pd/api/v1/members/name/"+leader1, `{"leader-priority": -1}`)
// Wait etcd leader change.
leader2 := s.waitEtcdLeaderChange(c, server1, leader1)
// PD leader should sync with etcd leader again.
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
return cluster.GetLeader() == leader2
})
}

func (s *memberTestSuite) post(c *C, url string, body string) {
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
res, err := http.Post(url, "", bytes.NewBufferString(body)) // #nosec
c.Assert(err, IsNil)
b, err := io.ReadAll(res.Body)
Expand All @@ -214,7 +214,7 @@ func (s *memberTestSuite) post(c *C, url string, body string) {

func (s *memberTestSuite) waitEtcdLeaderChange(c *C, server *tests.TestServer, old string) string {
var leader string
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
var err error
leader, err = server.GetEtcdLeader()
if err != nil {
Expand Down Expand Up @@ -271,7 +271,7 @@ func (s *memberTestSuite) TestLeaderResignWithBlock(c *C) {

func (s *memberTestSuite) waitLeaderChange(c *C, cluster *tests.TestCluster, old string) string {
var leader string
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
leader = cluster.GetLeader()
if leader == old || leader == "" {
return false
Expand Down Expand Up @@ -383,7 +383,7 @@ func (s *leaderTestSuite) sendRequest(c *C, addr string) {

func mustWaitLeader(c *C, svrs []*server.Server) *server.Server {
var leader *server.Server
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
for _, s := range svrs {
if !s.IsClosed() && s.GetMember().IsLeader() {
leader = s
Expand Down
2 changes: 1 addition & 1 deletion tests/server/region_syncer/region_syncer_test.go
Expand Up @@ -140,7 +140,7 @@ func (s *regionSyncerTestSuite) TestRegionSyncer(c *C) {
c.Assert(followerServer, NotNil)
cacheRegions := leaderServer.GetServer().GetBasicCluster().GetRegions()
c.Assert(cacheRegions, HasLen, regionLen)
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
for _, region := range cacheRegions {
r := followerServer.GetServer().GetBasicCluster().GetRegion(region.GetID())
if !(c.Check(r.GetMeta(), DeepEquals, region.GetMeta()) &&
Expand Down
2 changes: 1 addition & 1 deletion tests/server/server_test.go
Expand Up @@ -138,7 +138,7 @@ func (s *serverTestSuite) TestLeader(c *C) {

err = cluster.GetServer(leader1).Stop()
c.Assert(err, IsNil)
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
leader := cluster.GetLeader()
return leader != leader1
})
Expand Down
6 changes: 3 additions & 3 deletions tests/server/tso/allocator_test.go
Expand Up @@ -156,7 +156,7 @@ func (s *testAllocatorSuite) TestPriorityAndDifferentLocalTSO(c *C) {
c.Assert(err, IsNil)
dcLocationConfig["pd4"] = "dc-4"
cluster.CheckClusterDCLocation()
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
leaderName := cluster.WaitAllocatorLeader("dc-4")
return leaderName != ""
})
Expand All @@ -180,7 +180,7 @@ func (s *testAllocatorSuite) TestPriorityAndDifferentLocalTSO(c *C) {
for serverName, dcLocation := range dcLocationConfig {
go func(serName, dc string) {
defer wg.Done()
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
leaderName := cluster.WaitAllocatorLeader(dc)
return leaderName == serName
}, testutil.WithRetryTimes(12), testutil.WithSleepInterval(5*time.Second))
Expand Down Expand Up @@ -231,7 +231,7 @@ func (s *testAllocatorSuite) testTSOSuffix(c *C, cluster *tests.TestCluster, am
allocator, err := am.GetAllocator(dcLocation)
c.Assert(err, IsNil)
var tso pdpb.Timestamp
testutil.WaitUntil(c, func(c *C) bool {
testutil.WaitUntil(c, func() bool {
tso, err = allocator.GenerateTSO(1)
c.Assert(err, IsNil)
return tso.GetPhysical() != 0
Expand Down

0 comments on commit c1d1ecd

Please sign in to comment.