Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests, tso: add more TSO split tests #6338

Merged
merged 6 commits into from
Apr 20, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,11 @@ error = '''
init file log error, %s
'''

["PD:member:ErrCheckCampaign"]
error = '''
check campaign failed
'''

["PD:member:ErrEtcdLeaderNotFound"]
error = '''
etcd leader not found
Expand All @@ -491,11 +496,6 @@ error = '''
marshal leader failed
'''

["PD:member:ErrPreCheckCampaign"]
error = '''
pre-check campaign failed
'''

["PD:netstat:ErrNetstatTCPSocks"]
error = '''
TCP socks error
Expand Down
2 changes: 1 addition & 1 deletion pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ var (
var (
ErrEtcdLeaderNotFound = errors.Normalize("etcd leader not found", errors.RFCCodeText("PD:member:ErrEtcdLeaderNotFound"))
ErrMarshalLeader = errors.Normalize("marshal leader failed", errors.RFCCodeText("PD:member:ErrMarshalLeader"))
ErrPreCheckCampaign = errors.Normalize("pre-check campaign failed", errors.RFCCodeText("PD:member:ErrPreCheckCampaign"))
ErrCheckCampaign = errors.Normalize("check campaign failed", errors.RFCCodeText("PD:member:ErrCheckCampaign"))
)

// core errors
Expand Down
9 changes: 9 additions & 0 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,15 @@ func (s *Server) GetLeaderListenUrls() []string {
return member.GetLeaderListenUrls()
}

// GetMember returns the election member of the given keyspace and keyspace group.
func (s *Server) GetMember(keyspaceID, keyspaceGroupID uint32) (tso.ElectionMember, error) {
member, err := s.keyspaceGroupManager.GetElectionMember(keyspaceID, keyspaceGroupID)
if err != nil {
return nil, err
}
return member, nil
}

// AddServiceReadyCallback implements basicserver.
// It adds callbacks when it's ready for providing tso service.
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) {
Expand Down
30 changes: 21 additions & 9 deletions pkg/member/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ type Participant struct {
// leader key when this participant is successfully elected as the leader of
// the group. Every write will use it to check the leadership.
memberValue string
// preCampaignChecker is called before the campaign. If it returns false, the
// campaign will be skipped.
preCampaignChecker leadershipCheckFunc
// campaignChecker is used to check whether the additional constraints for a
// campaign are satisfied. If it returns false, the campaign will fail.
campaignChecker atomic.Value // Store as leadershipCheckFunc
}

// NewParticipant create a new Participant.
Expand Down Expand Up @@ -109,7 +109,7 @@ func (m *Participant) Client() *clientv3.Client {
// IsLeader returns whether the participant is the leader or not by checking its leadership's
// lease and leader info.
func (m *Participant) IsLeader() bool {
return m.leadership.Check() && m.GetLeader().GetId() == m.member.GetId()
return m.leadership.Check() && m.GetLeader().GetId() == m.member.GetId() && m.campaignCheck()
}

// IsLeaderElected returns true if the leader exists; otherwise false
Expand Down Expand Up @@ -167,8 +167,8 @@ func (m *Participant) GetLeadership() *election.Leadership {

// CampaignLeader is used to campaign the leadership and make it become a leader.
func (m *Participant) CampaignLeader(leaseTimeout int64) error {
if m.preCampaignChecker != nil && !m.preCampaignChecker(m.leadership) {
return errs.ErrPreCheckCampaign
if !m.campaignCheck() {
return errs.ErrCheckCampaign
}
return m.leadership.Campaign(leaseTimeout, m.MemberValue())
}
Expand Down Expand Up @@ -337,7 +337,19 @@ func (m *Participant) GetLeaderPriority(id uint64) (int, error) {
return int(priority), nil
}

// SetPreCampaignChecker sets the pre-campaign checker.
func (m *Participant) SetPreCampaignChecker(checker leadershipCheckFunc) {
m.preCampaignChecker = checker
func (m *Participant) campaignCheck() bool {
checker := m.campaignChecker.Load()
if checker == nil {
return true
}
checkerFunc, ok := checker.(leadershipCheckFunc)
if !ok || checkerFunc == nil {
return true
}
return checkerFunc(m.leadership)
}

// SetCampaignChecker sets the pre-campaign checker.
func (m *Participant) SetCampaignChecker(checker leadershipCheckFunc) {
m.campaignChecker.Store(checker)
}
3 changes: 2 additions & 1 deletion pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,8 @@ func (am *AllocatorManager) close() {
log.Info("closed the allocator manager")
}

func (am *AllocatorManager) getMember() ElectionMember {
// GetMember returns the ElectionMember of this AllocatorManager.
func (am *AllocatorManager) GetMember() ElectionMember {
return am.member
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ func (gta *GlobalTSOAllocator) campaignLeader() {
if errors.Is(err, errs.ErrEtcdTxnConflict) {
log.Info("campaign tso primary meets error due to txn conflict, another tso server may campaign successfully",
zap.String("campaign-tso-primary-name", gta.member.Name()))
} else if errors.Is(err, errs.ErrPreCheckCampaign) {
} else if errors.Is(err, errs.ErrCheckCampaign) {
log.Info("campaign tso primary meets error due to pre-check campaign failed, the tso keyspace group may be in split",
zap.String("campaign-tso-primary-name", gta.member.Name()))
} else {
Expand Down
8 changes: 4 additions & 4 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,8 +574,8 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
zap.Uint32("source", splitSource))
return
}
participant.SetPreCampaignChecker(func(leadership *election.Leadership) bool {
return splitSourceAM.getMember().IsLeader()
participant.SetCampaignChecker(func(leadership *election.Leadership) bool {
return splitSourceAM.GetMember().IsLeader()
})
}
// Only the default keyspace group uses the legacy service root path for LoadTimestamp/SyncTimestamp.
Expand Down Expand Up @@ -672,7 +672,7 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroupMembership(
}
// Check if the split is completed.
if oldGroup.IsSplitTarget() && !newGroup.IsSplitting() {
kgm.ams[groupID].getMember().(*member.Participant).SetPreCampaignChecker(nil)
kgm.ams[groupID].GetMember().(*member.Participant).SetCampaignChecker(nil)
}
kgm.kgs[groupID] = newGroup
}
Expand Down Expand Up @@ -727,7 +727,7 @@ func (kgm *KeyspaceGroupManager) GetElectionMember(
if err != nil {
return nil, err
}
return am.getMember(), nil
return am.GetMember(), nil
}

// HandleTSORequest forwards TSO allocation requests to correct TSO Allocators of the given keyspace group.
Expand Down
43 changes: 27 additions & 16 deletions tests/integrations/mcs/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import (
"github.com/stretchr/testify/require"
tso "github.com/tikv/pd/pkg/mcs/tso/server"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/tempurl"
"github.com/tikv/pd/pkg/utils/testutil"
)

// TestCluster is a test cluster for TSO.
type TestCluster struct {
// TestTSOCluster is a test cluster for TSO.
type TestTSOCluster struct {
ctx context.Context

backendEndpoints string
Expand All @@ -35,8 +36,8 @@ type TestCluster struct {
}

// NewTestTSOCluster creates a new TSO test cluster.
func NewTestTSOCluster(ctx context.Context, initialServerCount int, backendEndpoints string) (tc *TestCluster, err error) {
tc = &TestCluster{
func NewTestTSOCluster(ctx context.Context, initialServerCount int, backendEndpoints string) (tc *TestTSOCluster, err error) {
tc = &TestTSOCluster{
ctx: ctx,
backendEndpoints: backendEndpoints,
servers: make(map[string]*tso.Server, initialServerCount),
Expand All @@ -52,7 +53,7 @@ func NewTestTSOCluster(ctx context.Context, initialServerCount int, backendEndpo
}

// AddServer adds a new TSO server to the test cluster.
func (tc *TestCluster) AddServer(addr string) error {
func (tc *TestTSOCluster) AddServer(addr string) error {
cfg := tso.NewConfig()
cfg.BackendEndpoints = tc.backendEndpoints
cfg.ListenAddr = addr
Expand All @@ -75,7 +76,7 @@ func (tc *TestCluster) AddServer(addr string) error {
}

// Destroy stops and destroy the test cluster.
func (tc *TestCluster) Destroy() {
func (tc *TestTSOCluster) Destroy() {
for _, cleanup := range tc.cleanupFuncs {
cleanup()
}
Expand All @@ -84,14 +85,14 @@ func (tc *TestCluster) Destroy() {
}

// DestroyServer stops and destroy the test server by the given address.
func (tc *TestCluster) DestroyServer(addr string) {
func (tc *TestTSOCluster) DestroyServer(addr string) {
tc.cleanupFuncs[addr]()
delete(tc.cleanupFuncs, addr)
delete(tc.servers, addr)
}

// GetPrimary returns the primary TSO server.
func (tc *TestCluster) GetPrimary(keyspaceID, keyspaceGroupID uint32) *tso.Server {
func (tc *TestTSOCluster) GetPrimary(keyspaceID, keyspaceGroupID uint32) *tso.Server {
for _, server := range tc.servers {
if server.IsKeyspaceServing(keyspaceID, keyspaceGroupID) {
return server
Expand All @@ -101,12 +102,12 @@ func (tc *TestCluster) GetPrimary(keyspaceID, keyspaceGroupID uint32) *tso.Serve
}

// WaitForPrimaryServing waits for one of servers being elected to be the primary/leader of the given keyspace.
func (tc *TestCluster) WaitForPrimaryServing(re *require.Assertions, keyspaceID, keyspaceGroupID uint32) string {
var primary string
func (tc *TestTSOCluster) WaitForPrimaryServing(re *require.Assertions, keyspaceID, keyspaceGroupID uint32) *tso.Server {
var primary *tso.Server
testutil.Eventually(re, func() bool {
for name, s := range tc.servers {
if s.IsKeyspaceServing(keyspaceID, keyspaceGroupID) {
primary = name
for _, server := range tc.servers {
if server.IsKeyspaceServing(keyspaceID, keyspaceGroupID) {
primary = server
return true
}
}
Expand All @@ -117,12 +118,12 @@ func (tc *TestCluster) WaitForPrimaryServing(re *require.Assertions, keyspaceID,
}

// WaitForDefaultPrimaryServing waits for one of servers being elected to be the primary/leader of the default keyspace.
func (tc *TestCluster) WaitForDefaultPrimaryServing(re *require.Assertions) string {
func (tc *TestTSOCluster) WaitForDefaultPrimaryServing(re *require.Assertions) *tso.Server {
return tc.WaitForPrimaryServing(re, mcsutils.DefaultKeyspaceID, mcsutils.DefaultKeyspaceGroupID)
}

// GetServer returns the TSO server by the given address.
func (tc *TestCluster) GetServer(addr string) *tso.Server {
func (tc *TestTSOCluster) GetServer(addr string) *tso.Server {
for srvAddr, server := range tc.servers {
if srvAddr == addr {
return server
Expand All @@ -132,6 +133,16 @@ func (tc *TestCluster) GetServer(addr string) *tso.Server {
}

// GetServers returns all TSO servers.
func (tc *TestCluster) GetServers() map[string]*tso.Server {
func (tc *TestTSOCluster) GetServers() map[string]*tso.Server {
return tc.servers
}

// GetKeyspaceGroupMember converts the TSO servers to KeyspaceGroupMember and returns.
func (tc *TestTSOCluster) GetKeyspaceGroupMember() (members []endpoint.KeyspaceGroupMember) {
for _, server := range tc.servers {
members = append(members, endpoint.KeyspaceGroupMember{
Address: server.GetAddr(),
})
}
return
}
Loading