Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: fix unstable test TestMinResolvedTS #5669

Merged
merged 2 commits into from
Nov 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 13 additions & 13 deletions server/api/min_resolved_ts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package api

import (
"fmt"
"reflect"
"testing"
"time"

Expand Down Expand Up @@ -72,7 +73,6 @@ func (suite *minResolvedTSTestSuite) TestMinResolvedTS() {
// case2: stop run job
zero := typeutil.Duration{Duration: 0}
suite.setMinResolvedTSPersistenceInterval(zero)
time.Sleep(interval.Duration) // wait sync
suite.checkMinResolvedTS(&minResolvedTS{
MinResolvedTS: 0,
IsRealTime: false,
Expand All @@ -81,8 +81,9 @@ func (suite *minResolvedTSTestSuite) TestMinResolvedTS() {
// case3: start run job
interval = typeutil.Duration{Duration: suite.defaultInterval}
suite.setMinResolvedTSPersistenceInterval(interval)
suite.Equal(interval, suite.svr.GetRaftCluster().GetOpts().GetPDServerConfig().MinResolvedTSPersistenceInterval)
time.Sleep(suite.defaultInterval) // wait sync
suite.Eventually(func() bool {
return interval == suite.svr.GetRaftCluster().GetOpts().GetPDServerConfig().MinResolvedTSPersistenceInterval
}, time.Second*10, time.Millisecond*20)
suite.checkMinResolvedTS(&minResolvedTS{
MinResolvedTS: 0,
IsRealTime: true,
Expand All @@ -92,7 +93,6 @@ func (suite *minResolvedTSTestSuite) TestMinResolvedTS() {
rc := suite.svr.GetRaftCluster()
ts := uint64(233)
rc.SetMinResolvedTS(1, ts)
time.Sleep(suite.defaultInterval) // wait sync
suite.checkMinResolvedTS(&minResolvedTS{
MinResolvedTS: ts,
IsRealTime: true,
Expand All @@ -101,14 +101,12 @@ func (suite *minResolvedTSTestSuite) TestMinResolvedTS() {
// case5: stop persist and return last persist value when interval is 0
interval = typeutil.Duration{Duration: 0}
suite.setMinResolvedTSPersistenceInterval(interval)
time.Sleep(suite.defaultInterval) // wait sync
suite.checkMinResolvedTS(&minResolvedTS{
MinResolvedTS: ts,
IsRealTime: false,
PersistInterval: interval,
})
rc.SetMinResolvedTS(1, ts+1)
time.Sleep(suite.defaultInterval) // wait sync
suite.checkMinResolvedTS(&minResolvedTS{
MinResolvedTS: ts, // last persist value
IsRealTime: false,
Expand All @@ -123,11 +121,13 @@ func (suite *minResolvedTSTestSuite) setMinResolvedTSPersistenceInterval(duratio
}

func (suite *minResolvedTSTestSuite) checkMinResolvedTS(expect *minResolvedTS) {
res, err := testDialClient.Get(suite.url)
suite.NoError(err)
defer res.Body.Close()
listResp := &minResolvedTS{}
err = apiutil.ReadJSON(res.Body, listResp)
suite.NoError(err)
suite.Equal(expect, listResp)
suite.Eventually(func() bool {
res, err := testDialClient.Get(suite.url)
suite.NoError(err)
defer res.Body.Close()
listResp := &minResolvedTS{}
err = apiutil.ReadJSON(res.Body, listResp)
suite.NoError(err)
return reflect.DeepEqual(expect, listResp)
}, time.Second*10, time.Millisecond*20)
}
44 changes: 21 additions & 23 deletions tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1252,25 +1252,26 @@ func putRegionWithLeader(re *require.Assertions, rc *cluster.RaftCluster, id id.
re.Equal(3, rc.GetStore(storeID).GetLeaderCount())
}

func checkMinResolvedTS(re *require.Assertions, rc *cluster.RaftCluster, expect uint64, interval time.Duration) {
time.Sleep(interval)
ts := rc.GetMinResolvedTS()
re.Equal(expect, ts)
func checkMinResolvedTS(re *require.Assertions, rc *cluster.RaftCluster, expect uint64) {
re.Eventually(func() bool {
ts := rc.GetMinResolvedTS()
return expect == ts
}, time.Second*10, time.Millisecond*50)
}

func checkMinResolvedTSFromStorage(re *require.Assertions, rc *cluster.RaftCluster, expect uint64, interval time.Duration) {
time.Sleep(interval)
ts2, err := rc.GetStorage().LoadMinResolvedTS()
re.NoError(err)
re.Equal(expect, ts2)
func checkMinResolvedTSFromStorage(re *require.Assertions, rc *cluster.RaftCluster, expect uint64) {
re.Eventually(func() bool {
ts2, err := rc.GetStorage().LoadMinResolvedTS()
re.NoError(err)
return expect == ts2
}, time.Second*10, time.Millisecond*50)
}

func setMinResolvedTSPersistenceInterval(re *require.Assertions, rc *cluster.RaftCluster, svr *server.Server, interval time.Duration) {
cfg := rc.GetOpts().GetPDServerConfig().Clone()
cfg.MinResolvedTSPersistenceInterval = typeutil.NewDuration(interval)
err := svr.SetPDServerConfig(*cfg)
re.NoError(err)
time.Sleep(time.Millisecond + interval)
}

func TestMinResolvedTS(t *testing.T) {
Expand Down Expand Up @@ -1321,7 +1322,6 @@ func TestMinResolvedTS(t *testing.T) {
// default run job
re.NotEqual(rc.GetOpts().GetMinResolvedTSPersistenceInterval(), 0)
setMinResolvedTSPersistenceInterval(re, rc, svr, 0)
time.Sleep(config.DefaultMinResolvedTSPersistenceInterval) // wait sync
re.Equal(time.Duration(0), rc.GetOpts().GetMinResolvedTSPersistenceInterval())

// case1: cluster is no initialized
Expand All @@ -1335,14 +1335,13 @@ func TestMinResolvedTS(t *testing.T) {
// case2: add leader peer to store1 but no run job
// min resolved ts should be zero
putRegionWithLeader(re, rc, id, store1)
checkMinResolvedTS(re, rc, 0, cluster.DefaultMinResolvedTSPersistenceInterval)
checkMinResolvedTS(re, rc, 0)

// case3: add leader peer to store1 and run job
// min resolved ts should be store1TS
interval := time.Millisecond
setMinResolvedTSPersistenceInterval(re, rc, svr, interval)
checkMinResolvedTS(re, rc, store1TS, interval)
checkMinResolvedTSFromStorage(re, rc, store1TS, interval)
setMinResolvedTSPersistenceInterval(re, rc, svr, time.Millisecond)
checkMinResolvedTS(re, rc, store1TS)
checkMinResolvedTSFromStorage(re, rc, store1TS)

// case4: add tiflash store
// min resolved ts should no change
Expand All @@ -1357,16 +1356,15 @@ func TestMinResolvedTS(t *testing.T) {
// case6: set store1 to tombstone
// min resolved ts should change to store 3
resetStoreState(re, rc, store1, metapb.StoreState_Tombstone)
time.Sleep(interval) // wait sync
checkMinResolvedTS(re, rc, store3TS, interval)
checkMinResolvedTSFromStorage(re, rc, store3TS, interval)
checkMinResolvedTS(re, rc, store3TS)
checkMinResolvedTSFromStorage(re, rc, store3TS)

// case7: add a store with leader peer but no report min resolved ts
// min resolved ts should be no change
store4 := addStoreAndCheckMinResolvedTS(re, false /* not tiflash */, 0, store3TS)
putRegionWithLeader(re, rc, id, store4)
checkMinResolvedTS(re, rc, store3TS, interval)
checkMinResolvedTSFromStorage(re, rc, store3TS, interval)
checkMinResolvedTS(re, rc, store3TS)
checkMinResolvedTSFromStorage(re, rc, store3TS)
resetStoreState(re, rc, store4, metapb.StoreState_Tombstone)

// case8: set min resolved ts persist interval to zero
Expand All @@ -1376,9 +1374,9 @@ func TestMinResolvedTS(t *testing.T) {
store5 := addStoreAndCheckMinResolvedTS(re, false /* not tiflash */, store5TS, store3TS)
resetStoreState(re, rc, store3, metapb.StoreState_Tombstone)
putRegionWithLeader(re, rc, id, store5)
checkMinResolvedTS(re, rc, store3TS, interval)
checkMinResolvedTS(re, rc, store3TS)
setMinResolvedTSPersistenceInterval(re, rc, svr, time.Millisecond)
checkMinResolvedTS(re, rc, store5TS, interval)
checkMinResolvedTS(re, rc, store5TS)
}

// See https://github.com/tikv/pd/issues/4941
Expand Down