Skip to content

Commit

Permalink
tests/tso: add IDAllocator to make keyspace test stable (#8202)
Browse files Browse the repository at this point in the history
close #8099

Signed-off-by: husharp <jinhao.hu@pingcap.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
HuSharp and ti-chi-bot[bot] committed May 21, 2024
1 parent 58e7580 commit 0e73b7a
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 70 deletions.
2 changes: 1 addition & 1 deletion pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1439,7 +1439,7 @@ func (kgm *KeyspaceGroupManager) groupSplitPatroller() {
defer kgm.wg.Done()
patrolInterval := groupPatrolInterval
failpoint.Inject("fastGroupSplitPatroller", func() {
patrolInterval = time.Second
patrolInterval = 3 * time.Second
})
ticker := time.NewTicker(patrolInterval)
defer ticker.Stop()
Expand Down
158 changes: 89 additions & 69 deletions tests/integrations/mcs/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/tikv/pd/pkg/errs"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/mock/mockid"
"github.com/tikv/pd/pkg/storage/endpoint"
tsopkg "github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/etcdutil"
Expand All @@ -56,6 +57,13 @@ type tsoKeyspaceGroupManagerTestSuite struct {
pdLeaderServer *tests.TestServer
// tsoCluster is the TSO service cluster.
tsoCluster *tests.TestTSOCluster

allocator *mockid.IDAllocator
}

func (suite *tsoKeyspaceGroupManagerTestSuite) allocID() uint32 {
id, _ := suite.allocator.Alloc()
return uint32(id)
}

func TestTSOKeyspaceGroupManager(t *testing.T) {
Expand All @@ -77,6 +85,8 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) SetupSuite() {
re.NoError(suite.pdLeaderServer.BootstrapCluster())
suite.tsoCluster, err = tests.NewTestTSOCluster(suite.ctx, 2, suite.pdLeaderServer.GetAddr())
re.NoError(err)
suite.allocator = mockid.NewIDAllocator()
suite.allocator.SetBase(uint64(time.Now().Second()))
}

func (suite *tsoKeyspaceGroupManagerTestSuite) TearDownSuite() {
Expand Down Expand Up @@ -166,9 +176,9 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKe
keyspaceGroupID uint32
keyspaceIDs []uint32
}{
{0, []uint32{0, 10}},
{1, []uint32{1, 11}},
{2, []uint32{2, 12}},
{suite.allocID(), []uint32{0, 10}},
{suite.allocID(), []uint32{1, 11}},
{suite.allocID(), []uint32{2, 12}},
}

for _, param := range params {
Expand Down Expand Up @@ -242,51 +252,53 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKe

func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() {
re := suite.Require()
// Create the keyspace group 1 with keyspaces [111, 222, 333].
// Create the keyspace group `oldID` with keyspaces [111, 222, 333].
oldID := suite.allocID()
handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{
KeyspaceGroups: []*endpoint.KeyspaceGroup{
{
ID: 1,
ID: oldID,
UserKind: endpoint.Standard.String(),
Members: suite.tsoCluster.GetKeyspaceGroupMember(),
Keyspaces: []uint32{111, 222, 333},
},
},
})
kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 1)
re.Equal(uint32(1), kg1.ID)
kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, oldID)
re.Equal(oldID, kg1.ID)
re.Equal([]uint32{111, 222, 333}, kg1.Keyspaces)
re.False(kg1.IsSplitting())
// Get a TSO from the keyspace group 1.
// Get a TSO from the keyspace group `oldID`.
var (
ts pdpb.Timestamp
err error
)
testutil.Eventually(re, func() bool {
ts, err = suite.requestTSO(re, 222, 1)
ts, err = suite.requestTSO(re, 222, oldID)
return err == nil && tsoutil.CompareTimestamp(&ts, &pdpb.Timestamp{}) > 0
})
ts.Physical += time.Hour.Milliseconds()
// Set the TSO of the keyspace group 1 to a large value.
err = suite.tsoCluster.GetPrimaryServer(222, 1).ResetTS(tsoutil.GenerateTS(&ts), false, true, 1)
// Set the TSO of the keyspace group `oldID` to a large value.
err = suite.tsoCluster.GetPrimaryServer(222, oldID).ResetTS(tsoutil.GenerateTS(&ts), false, true, oldID)
re.NoError(err)
// Split the keyspace group 1 to 2.
handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, 1, &handlers.SplitKeyspaceGroupByIDParams{
NewID: 2,
// Split the keyspace group `oldID` to `newID`.
newID := suite.allocID()
handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, oldID, &handlers.SplitKeyspaceGroupByIDParams{
NewID: newID,
Keyspaces: []uint32{222, 333},
})
// Wait for the split to complete automatically even there is no TSO request from the outside.
testutil.Eventually(re, func() bool {
kg2, code := handlersutil.TryLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 2)
kg2, code := handlersutil.TryLoadKeyspaceGroupByID(re, suite.pdLeaderServer, newID)
if code != http.StatusOK {
return false
}
re.Equal(uint32(2), kg2.ID)
re.Equal(newID, kg2.ID)
re.Equal([]uint32{222, 333}, kg2.Keyspaces)
return !kg2.IsSplitting()
})
// Check the split TSO from keyspace group 2 now.
splitTS, err := suite.requestTSO(re, 222, 2)
// Check the split TSO from keyspace group `newID` now.
splitTS, err := suite.requestTSO(re, 222, newID)
re.NoError(err)
re.Greater(tsoutil.CompareTimestamp(&splitTS, &ts), 0)
}
Expand All @@ -304,60 +316,62 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) requestTSO(

func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitElection() {
re := suite.Require()
// Create the keyspace group 1 with keyspaces [111, 222, 333].
// Create the keyspace group `oldID` with keyspaces [111, 222, 333].
oldID := suite.allocID()
handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{
KeyspaceGroups: []*endpoint.KeyspaceGroup{
{
ID: 1,
ID: oldID,
UserKind: endpoint.Standard.String(),
Members: suite.tsoCluster.GetKeyspaceGroupMember(),
Keyspaces: []uint32{111, 222, 333},
},
},
})
kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 1)
re.Equal(uint32(1), kg1.ID)
kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, oldID)
re.Equal(oldID, kg1.ID)
re.Equal([]uint32{111, 222, 333}, kg1.Keyspaces)
re.False(kg1.IsSplitting())
// Split the keyspace group 1 to 2.
handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, 1, &handlers.SplitKeyspaceGroupByIDParams{
NewID: 2,
// Split the keyspace group `oldID` to `newID`.
newID := suite.allocID()
handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, oldID, &handlers.SplitKeyspaceGroupByIDParams{
NewID: newID,
Keyspaces: []uint32{222, 333},
})
kg2 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 2)
re.Equal(uint32(2), kg2.ID)
kg2 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, newID)
re.Equal(newID, kg2.ID)
re.Equal([]uint32{222, 333}, kg2.Keyspaces)
re.True(kg2.IsSplitTarget())
// Check the leadership.
member1, err := suite.tsoCluster.WaitForPrimaryServing(re, 111, 1).GetMember(111, 1)
member1, err := suite.tsoCluster.WaitForPrimaryServing(re, 111, oldID).GetMember(111, oldID)
re.NoError(err)
re.NotNil(member1)
member2, err := suite.tsoCluster.WaitForPrimaryServing(re, 222, 2).GetMember(222, 2)
member2, err := suite.tsoCluster.WaitForPrimaryServing(re, 222, newID).GetMember(222, newID)
re.NoError(err)
re.NotNil(member2)
// Wait for the leader of the keyspace group 1 and 2 to be elected.
// Wait for the leader of the keyspace group `oldID` and `newID` to be elected.
testutil.Eventually(re, func() bool {
return len(member1.GetLeaderListenUrls()) > 0 && len(member2.GetLeaderListenUrls()) > 0
})
// Check if the leader of the keyspace group 1 and 2 are the same.
// Check if the leader of the keyspace group `oldID` and `newID` are the same.
re.Equal(member1.GetLeaderListenUrls(), member2.GetLeaderListenUrls())
// Resign and block the leader of the keyspace group 1 from being elected.
// Resign and block the leader of the keyspace group `oldID` from being elected.
member1.(*member.Participant).SetCampaignChecker(func(*election.Leadership) bool {
return false
})
member1.ResetLeader()
// The leader of the keyspace group 2 should be resigned also.
// The leader of the keyspace group `newID` should be resigned also.
testutil.Eventually(re, func() bool {
return member2.IsLeader() == false
})
// Check if the leader of the keyspace group 1 and 2 are the same again.
// Check if the leader of the keyspace group `oldID` and `newID` are the same again.
member1.(*member.Participant).SetCampaignChecker(nil)
testutil.Eventually(re, func() bool {
return len(member1.GetLeaderListenUrls()) > 0 && len(member2.GetLeaderListenUrls()) > 0
})
re.Equal(member1.GetLeaderListenUrls(), member2.GetLeaderListenUrls())
// Wait for the keyspace groups to finish the split.
waitFinishSplit(re, suite.pdLeaderServer, 1, 2, []uint32{111}, []uint32{222, 333})
waitFinishSplit(re, suite.pdLeaderServer, oldID, newID, []uint32{111}, []uint32{222, 333})
}

func waitFinishSplit(
Expand Down Expand Up @@ -390,30 +404,32 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitClient()
re := suite.Require()
// Enable the failpoint to slow down the system time to test whether the TSO is monotonic.
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/systemTimeSlow", `return(true)`))
// Create the keyspace group 1 with keyspaces [444, 555, 666].
// Create the keyspace group `oldID` with keyspaces [444, 555, 666].
oldID := suite.allocID()
handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{
KeyspaceGroups: []*endpoint.KeyspaceGroup{
{
ID: 1,
ID: oldID,
UserKind: endpoint.Standard.String(),
Members: suite.tsoCluster.GetKeyspaceGroupMember(),
Keyspaces: []uint32{444, 555, 666},
},
},
})
kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 1)
re.Equal(uint32(1), kg1.ID)
kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, oldID)
re.Equal(oldID, kg1.ID)
re.Equal([]uint32{444, 555, 666}, kg1.Keyspaces)
re.False(kg1.IsSplitting())
// Request the TSO for keyspace 555 concurrently via client.
cancel := suite.dispatchClient(re, 555, 1)
// Split the keyspace group 1 to 2.
handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, 1, &handlers.SplitKeyspaceGroupByIDParams{
NewID: 2,
cancel := suite.dispatchClient(re, 555, oldID)
// Split the keyspace group `oldID` to `newID`.
newID := suite.allocID()
handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, oldID, &handlers.SplitKeyspaceGroupByIDParams{
NewID: newID,
Keyspaces: []uint32{555, 666},
})
// Wait for the keyspace groups to finish the split.
waitFinishSplit(re, suite.pdLeaderServer, 1, 2, []uint32{444}, []uint32{555, 666})
waitFinishSplit(re, suite.pdLeaderServer, oldID, newID, []uint32{444}, []uint32{555, 666})
// Stop the client.
cancel()
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/systemTimeSlow"))
Expand Down Expand Up @@ -569,48 +585,49 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) {

func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMerge() {
re := suite.Require()
// Create the keyspace group 1 and 2 with keyspaces [111, 222] and [333].
// Create the keyspace group `firstID` and `secondID` with keyspaces [111, 222] and [333].
firstID, secondID := suite.allocID(), suite.allocID()
handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{
KeyspaceGroups: []*endpoint.KeyspaceGroup{
{
ID: 1,
ID: firstID,
UserKind: endpoint.Standard.String(),
Members: suite.tsoCluster.GetKeyspaceGroupMember(),
Keyspaces: []uint32{111, 222},
},
{
ID: 2,
ID: secondID,
UserKind: endpoint.Standard.String(),
Members: suite.tsoCluster.GetKeyspaceGroupMember(),
Keyspaces: []uint32{333},
},
},
})
// Get a TSO from the keyspace group 1.
// Get a TSO from the keyspace group `firstID`.
var (
ts pdpb.Timestamp
err error
)
testutil.Eventually(re, func() bool {
ts, err = suite.requestTSO(re, 222, 1)
ts, err = suite.requestTSO(re, 222, firstID)
return err == nil && tsoutil.CompareTimestamp(&ts, &pdpb.Timestamp{}) > 0
})
ts.Physical += time.Hour.Milliseconds()
// Set the TSO of the keyspace group 1 to a large value.
err = suite.tsoCluster.GetPrimaryServer(222, 1).ResetTS(tsoutil.GenerateTS(&ts), false, true, 1)
// Set the TSO of the keyspace group `firstID` to a large value.
err = suite.tsoCluster.GetPrimaryServer(222, firstID).ResetTS(tsoutil.GenerateTS(&ts), false, true, firstID)
re.NoError(err)
// Merge the keyspace group 1 and 2 to the default keyspace group.
// Merge the keyspace group `firstID` and `secondID` to the default keyspace group.
handlersutil.MustMergeKeyspaceGroup(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, &handlers.MergeKeyspaceGroupsParams{
MergeList: []uint32{1, 2},
MergeList: []uint32{firstID, secondID},
})
// Check the keyspace group 1 and 2 are merged to the default keyspace group.
// Check the keyspace group `firstID` and `secondID` are merged to the default keyspace group.
kg := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID)
re.Equal(mcsutils.DefaultKeyspaceGroupID, kg.ID)
for _, keyspaceID := range []uint32{111, 222, 333} {
re.Contains(kg.Keyspaces, keyspaceID)
}
re.True(kg.IsMergeTarget())
// Check the merged TSO from the default keyspace group is greater than the TSO from the keyspace group 1.
// Check the merged TSO from the default keyspace group is greater than the TSO from the keyspace group`firstID`.
var mergedTS pdpb.Timestamp
testutil.Eventually(re, func() bool {
mergedTS, err = suite.requestTSO(re, 333, mcsutils.DefaultKeyspaceGroupID)
Expand All @@ -624,26 +641,27 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMerge() {

func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMergeClient() {
re := suite.Require()
// Create the keyspace group 1 with keyspaces [111, 222, 333].
// Create the keyspace group `id` with keyspaces [111, 222, 333].
id := suite.allocID()
handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{
KeyspaceGroups: []*endpoint.KeyspaceGroup{
{
ID: 1,
ID: id,
UserKind: endpoint.Standard.String(),
Members: suite.tsoCluster.GetKeyspaceGroupMember(),
Keyspaces: []uint32{111, 222, 333},
},
},
})
kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 1)
re.Equal(uint32(1), kg1.ID)
kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, id)
re.Equal(id, kg1.ID)
re.Equal([]uint32{111, 222, 333}, kg1.Keyspaces)
re.False(kg1.IsMerging())
// Request the TSO for keyspace 222 concurrently via client.
cancel := suite.dispatchClient(re, 222, 1)
cancel := suite.dispatchClient(re, 222, id)
// Merge the keyspace group 1 to the default keyspace group.
handlersutil.MustMergeKeyspaceGroup(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, &handlers.MergeKeyspaceGroupsParams{
MergeList: []uint32{1},
MergeList: []uint32{id},
})
// Wait for the default keyspace group to finish the merge.
waitFinishMerge(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, []uint32{111, 222, 333})
Expand Down Expand Up @@ -671,24 +689,25 @@ func waitFinishMerge(

func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMergeBeforeInitTSO() {
re := suite.Require()
// Make sure the TSO of keyspace group 1 won't be initialized before it's merged.
// Make sure the TSO of keyspace group `id` won't be initialized before it's merged.
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/failedToSaveTimestamp", `return(true)`))
// Request the TSO for the default keyspace concurrently via client.
id := suite.allocID()
cancel := suite.dispatchClient(re, mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID)
// Create the keyspace group 1 with keyspaces [111, 222, 333].
handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{
KeyspaceGroups: []*endpoint.KeyspaceGroup{
{
ID: 1,
ID: id,
UserKind: endpoint.Standard.String(),
Members: suite.tsoCluster.GetKeyspaceGroupMember(),
Keyspaces: []uint32{111, 222, 333},
},
},
})
// Merge the keyspace group 1 to the default keyspace group.
// Merge the keyspace group `id` to the default keyspace group.
handlersutil.MustMergeKeyspaceGroup(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, &handlers.MergeKeyspaceGroupsParams{
MergeList: []uint32{1},
MergeList: []uint32{id},
})
// Wait for the default keyspace group to finish the merge.
waitFinishMerge(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, []uint32{111, 222, 333})
Expand Down Expand Up @@ -775,12 +794,13 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspaceGroupMergeIntoDefault
keyspaces = make([]uint32, 0, keyspaceGroupNum)
)
for i := 1; i <= keyspaceGroupNum; i++ {
id := suite.allocID()
keyspaceGroups = append(keyspaceGroups, &endpoint.KeyspaceGroup{
ID: uint32(i),
ID: id,
UserKind: endpoint.UserKind(rand.Intn(int(endpoint.UserKindCount))).String(),
Keyspaces: []uint32{uint32(i)},
Keyspaces: []uint32{id},
})
keyspaces = append(keyspaces, uint32(i))
keyspaces = append(keyspaces, id)
if i != keyspaceGroupNum {
continue
}
Expand All @@ -797,7 +817,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspaceGroupMergeIntoDefault
re.NotNil(svr)
for i := 1; i < keyspaceGroupNum; i++ {
// Check if the keyspace group is served.
svr = suite.tsoCluster.WaitForPrimaryServing(re, uint32(i), uint32(i))
svr = suite.tsoCluster.WaitForPrimaryServing(re, keyspaceGroups[i].ID, keyspaceGroups[i].ID)
re.NotNil(svr)
}
// Merge all the keyspace groups into the default keyspace group.
Expand Down

0 comments on commit 0e73b7a

Please sign in to comment.