From bab5b568f5a150b8c069250f75c2e9cae9a9c102 Mon Sep 17 00:00:00 2001 From: Bin Shi Date: Fri, 12 May 2023 16:51:15 +0800 Subject: [PATCH 1/9] Fix flaky TestGetMinTS. The tso servers are loading keyspace groups asynchronously. Make sure all keyspace groups are available for serving tso requests from corresponding keyspaces by querying IsKeyspaceServing(keyspaceID, the Desired KeyspaceGroupID). if use default keyspace group id in the query, it will always return true as the keyspace will be served by default keyspace group before the keyspace groups are loaded. Signed-off-by: Bin Shi --- tests/integrations/tso/client_test.go | 34 ++++++++++++++++----------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index 861aac9acd4..9391ba4969a 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -133,27 +133,33 @@ func (suite *tsoClientTestSuite) SetupSuite() { }) } - for _, keyspaceGroup := range suite.keyspaceGroups { - suite.keyspaceIDs = append(suite.keyspaceIDs, keyspaceGroup.keyspaceIDs...) - } - - // Make sure all keyspace groups are available. + // The tso servers are loading keyspace groups asynchronously. Make sure all keyspace groups + // are available for serving tso requests from corresponding keyspaces by querying + // IsKeyspaceServing(keyspaceID, the Desired KeyspaceGroupID). if use default keyspace group id + // in the query, it will always return true as the keyspace will be served by default keyspace + // group before the keyspace groups are loaded. testutil.Eventually(re, func() bool { - for _, keyspaceID := range suite.keyspaceIDs { - served := false - for _, server := range suite.tsoCluster.GetServers() { - if server.IsKeyspaceServing(keyspaceID, mcsutils.DefaultKeyspaceGroupID) { - served = true - break + for _, keyspaceGroup := range suite.keyspaceGroups { + for _, keyspaceID := range keyspaceGroup.keyspaceIDs { + served := false + for _, server := range suite.tsoCluster.GetServers() { + if server.IsKeyspaceServing(keyspaceID, keyspaceGroup.keyspaceGroupID) { + served = true + break + } + } + if !served { + return false } - } - if !served { - return false } } return true }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) + for _, keyspaceGroup := range suite.keyspaceGroups { + suite.keyspaceIDs = append(suite.keyspaceIDs, keyspaceGroup.keyspaceIDs...) + } + // Create clients and make sure they all have discovered the tso service. suite.clients = mcs.WaitForMultiKeyspacesTSOAvailable( suite.ctx, re, suite.keyspaceIDs, strings.Split(suite.backendEndpoints, ",")) From 4c219c305285d1191eb9987391438afbcdb40e0a Mon Sep 17 00:00:00 2001 From: Bin Shi Date: Fri, 12 May 2023 17:50:48 +0800 Subject: [PATCH 2/9] Enable -v in go test for debugging Signed-off-by: Bin Shi --- tests/integrations/tso/Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integrations/tso/Makefile b/tests/integrations/tso/Makefile index 820bbf33ccf..4573863039c 100644 --- a/tests/integrations/tso/Makefile +++ b/tests/integrations/tso/Makefile @@ -32,7 +32,7 @@ test: failpoint-enable $(MAKE) failpoint-disable ci-test-job: - CGO_ENABLED=1 go test -tags deadlock -race -covermode=atomic -coverprofile=covprofile -coverpkg=$(ROOT_PATH)/... github.com/tikv/pd/tests/integrations/tso + CGO_ENABLED=1 go test -v -tags deadlock -race -covermode=atomic -coverprofile=covprofile -coverpkg=$(ROOT_PATH)/... github.com/tikv/pd/tests/integrations/tso install-tools: cd $(ROOT_PATH) && $(MAKE) install-tools From 6450fd27f6f5e903b634f82ce7cdf39b10fb281f Mon Sep 17 00:00:00 2001 From: Bin Shi Date: Mon, 15 May 2023 08:24:04 +0800 Subject: [PATCH 3/9] Add more debugging info Signed-off-by: Bin Shi --- pkg/tso/allocator_manager.go | 4 +- pkg/tso/tso.go | 10 +++- tests/integrations/tso/client_test.go | 74 +++++++++++++++------------ tests/integrations/tso/testutil.go | 2 +- 4 files changed, 53 insertions(+), 37 deletions(-) diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index a780e7da74e..2549f58b299 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -750,7 +750,9 @@ func (am *AllocatorManager) updateAllocator(ag *allocatorGroup) { } if err := ag.allocator.UpdateTSO(); err != nil { log.Warn("failed to update allocator's timestamp", - zap.String("dc-location", ag.dcLocation), errs.ZapError(err)) + zap.String("dc-location", ag.dcLocation), + zap.String("name", am.member.Name()), + errs.ZapError(err)) am.ResetAllocatorGroup(ag.dcLocation) return } diff --git a/pkg/tso/tso.go b/pkg/tso/tso.go index 525385b42eb..fa1bfcb04f6 100644 --- a/pkg/tso/tso.go +++ b/pkg/tso/tso.go @@ -286,7 +286,11 @@ func (t *timestampOracle) UpdateTimestamp(leadership *election.Leadership) error jetLag := typeutil.SubRealTimeByWallClock(now, prevPhysical) if jetLag > 3*t.updatePhysicalInterval && jetLag > jetLagWarningThreshold { - log.Warn("clock offset", zap.Duration("jet-lag", jetLag), zap.Time("prev-physical", prevPhysical), zap.Time("now", now), zap.Duration("update-physical-interval", t.updatePhysicalInterval)) + log.Warn("clock offset", + zap.Duration("jet-lag", jetLag), + zap.Time("prev-physical", prevPhysical), + zap.Time("now", now), + zap.Duration("update-physical-interval", t.updatePhysicalInterval)) tsoCounter.WithLabelValues("slow_save", t.dcLocation).Inc() } @@ -314,6 +318,10 @@ func (t *timestampOracle) UpdateTimestamp(leadership *election.Leadership) error if typeutil.SubRealTimeByWallClock(t.lastSavedTime.Load().(time.Time), next) <= UpdateTimestampGuard { save := next.Add(t.saveInterval) if err := t.storage.SaveTimestamp(t.getTimestampPath(), save); err != nil { + log.Warn("save timestamp failed", + zap.String("dc-location", t.dcLocation), + zap.String("timestamp-path", t.getTimestampPath()), + zap.Error(err)) tsoCounter.WithLabelValues("err_save_update_ts", t.dcLocation).Inc() return err } diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index 9391ba4969a..7245cb75c85 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -16,6 +16,7 @@ package tso import ( "context" + "fmt" "math" "math/rand" "strings" @@ -114,6 +115,10 @@ func (suite *tsoClientTestSuite) SetupSuite() { {2, []uint32{2}}, } + for _, keyspaceGroup := range suite.keyspaceGroups { + suite.keyspaceIDs = append(suite.keyspaceIDs, keyspaceGroup.keyspaceIDs...) + } + for _, param := range suite.keyspaceGroups { if param.keyspaceGroupID == 0 { // we have already created default keyspace group, so we can skip it. @@ -133,38 +138,40 @@ func (suite *tsoClientTestSuite) SetupSuite() { }) } - // The tso servers are loading keyspace groups asynchronously. Make sure all keyspace groups - // are available for serving tso requests from corresponding keyspaces by querying - // IsKeyspaceServing(keyspaceID, the Desired KeyspaceGroupID). if use default keyspace group id - // in the query, it will always return true as the keyspace will be served by default keyspace - // group before the keyspace groups are loaded. - testutil.Eventually(re, func() bool { - for _, keyspaceGroup := range suite.keyspaceGroups { - for _, keyspaceID := range keyspaceGroup.keyspaceIDs { - served := false - for _, server := range suite.tsoCluster.GetServers() { - if server.IsKeyspaceServing(keyspaceID, keyspaceGroup.keyspaceGroupID) { - served = true - break - } - } - if !served { - return false + suite.waitForAllKeyspaceGroupsInServing(re) + } + + fmt.Println("TestSuite Setup Done !!!!!!!!!!!!!!!!!!!!!") +} + +func (suite *tsoClientTestSuite) waitForAllKeyspaceGroupsInServing(re *require.Assertions) { + // The tso servers are loading keyspace groups asynchronously. Make sure all keyspace groups + // are available for serving tso requests from corresponding keyspaces by querying + // IsKeyspaceServing(keyspaceID, the Desired KeyspaceGroupID). if use default keyspace group id + // in the query, it will always return true as the keyspace will be served by default keyspace + // group before the keyspace groups are loaded. + testutil.Eventually(re, func() bool { + for _, keyspaceGroup := range suite.keyspaceGroups { + for _, keyspaceID := range keyspaceGroup.keyspaceIDs { + served := false + for _, server := range suite.tsoCluster.GetServers() { + if server.IsKeyspaceServing(keyspaceID, keyspaceGroup.keyspaceGroupID) { + served = true + break } } + if !served { + return false + } } - return true - }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) - - for _, keyspaceGroup := range suite.keyspaceGroups { - suite.keyspaceIDs = append(suite.keyspaceIDs, keyspaceGroup.keyspaceIDs...) } + return true + }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) - // Create clients and make sure they all have discovered the tso service. - suite.clients = mcs.WaitForMultiKeyspacesTSOAvailable( - suite.ctx, re, suite.keyspaceIDs, strings.Split(suite.backendEndpoints, ",")) - re.Equal(len(suite.keyspaceIDs), len(suite.clients)) - } + // Create clients and make sure they all have discovered the tso service. + suite.clients = mcs.WaitForMultiKeyspacesTSOAvailable( + suite.ctx, re, suite.keyspaceIDs, strings.Split(suite.backendEndpoints, ",")) + re.Equal(len(suite.keyspaceIDs), len(suite.clients)) } func (suite *tsoClientTestSuite) TearDownSuite() { @@ -251,9 +258,8 @@ func (suite *tsoClientTestSuite) TestDiscoverTSOServiceWithLegacyPath() { // TestGetMinTS tests the correctness of GetMinTS. func (suite *tsoClientTestSuite) TestGetMinTS() { - // Skip this test for the time being due to https://github.com/tikv/pd/issues/6453 - // TODO: fix it #6453 - suite.T().SkipNow() + re := suite.Require() + suite.waitForAllKeyspaceGroupsInServing(re) var wg sync.WaitGroup wg.Add(tsoRequestConcurrencyNumber * len(suite.clients)) @@ -264,9 +270,9 @@ func (suite *tsoClientTestSuite) TestGetMinTS() { var lastMinTS uint64 for j := 0; j < tsoRequestRound; j++ { physical, logical, err := client.GetMinTS(suite.ctx) - suite.NoError(err) + re.NoError(err) minTS := tsoutil.ComposeTS(physical, logical) - suite.Less(lastMinTS, minTS) + re.Less(lastMinTS, minTS) lastMinTS = minTS // Now we check whether the returned ts is the minimum one @@ -274,9 +280,9 @@ func (suite *tsoClientTestSuite) TestGetMinTS() { // less than the new timestamps of all keyspace groups. for _, client := range suite.clients { physical, logical, err := client.GetTS(suite.ctx) - suite.NoError(err) + re.NoError(err) ts := tsoutil.ComposeTS(physical, logical) - suite.Less(minTS, ts) + re.Less(minTS, ts) } } }(client) diff --git a/tests/integrations/tso/testutil.go b/tests/integrations/tso/testutil.go index 55e24fd1e58..2a4e5eabd90 100644 --- a/tests/integrations/tso/testutil.go +++ b/tests/integrations/tso/testutil.go @@ -22,7 +22,7 @@ import ( const ( serverCount = 3 tsoRequestConcurrencyNumber = 5 - tsoRequestRound = 30 + tsoRequestRound = 300 tsoCount = 10 ) From b07d8b1ebd1ac1b693cfeca67d8a6717ebe0097f Mon Sep 17 00:00:00 2001 From: Bin Shi Date: Mon, 15 May 2023 13:04:33 +0800 Subject: [PATCH 4/9] Fix the issue "non-default keyspace groups use the same timestamp path by mistake" which actually caused this random test failure. Signed-off-by: Bin Shi --- pkg/tso/allocator_manager.go | 18 +++++++++++ pkg/tso/global_allocator.go | 30 ++++++++++++++++++- pkg/tso/keyspace_group_manager.go | 7 +++-- pkg/tso/local_allocator.go | 25 ++++++++++++++-- pkg/tso/tso.go | 19 ++++++------ .../mcs/tso/keyspace_group_manager_test.go | 26 ++++++++++++++-- 6 files changed, 109 insertions(+), 16 deletions(-) diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index 2549f58b299..9f3c7a1d120 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -50,6 +50,7 @@ const ( checkStep = time.Minute patrolStep = time.Second defaultAllocatorLeaderLease = 3 + globalTSOAllocatorEtcdPrefix = "gta" localTSOAllocatorEtcdPrefix = "lta" localTSOSuffixEtcdPrefix = "lts" ) @@ -273,6 +274,23 @@ func (am *AllocatorManager) setUpLocalAllocator(parentCtx context.Context, dcLoc go am.allocatorLeaderLoop(parentCtx, localTSOAllocator) } +// GetTimestampPath returns the timestamp path in etcd for the given DCLocation. +func (am *AllocatorManager) GetTimestampPath(dcLocation string) string { + if am == nil { + return "" + } + if len(dcLocation) == 0 { + dcLocation = GlobalDCLocation + } + + am.mu.RLock() + defer am.mu.RUnlock() + if allocatorGroup, exist := am.mu.allocatorGroups[dcLocation]; exist { + return path.Join(am.rootPath, allocatorGroup.allocator.GetTimestampPath()) + } + return "" +} + // tsoAllocatorLoop is used to run the TSO Allocator updating daemon. func (am *AllocatorManager) tsoAllocatorLoop() { defer logutil.LogPanic() diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index 0227f2c1a64..3617d413ba6 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "path" "sync" "sync/atomic" "time" @@ -26,6 +27,7 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/mcs/utils" mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/utils/logutil" @@ -45,6 +47,14 @@ type Allocator interface { IsInitialize() bool // UpdateTSO is used to update the TSO in memory and the time window in etcd. UpdateTSO() error + // GetTimestampPath returns the timestamp path in etcd, which is: + // 1. for the default keyspace group: + // a. timestamp in /pd/{cluster_id}/timestamp + // b. lta/{dc-location}/timestamp in /pd/{cluster_id}/lta/{dc-location}/timestamp + // 1. for the non-default keyspace groups: + // a. {group}/gts/timestamp in /ms/{cluster_id}/tso/{group}/gta/timestamp + // b. {group}/lts/{dc-location}/timestamp in /ms/{cluster_id}/tso/{group}/lta/{dc-location}/timestamp + GetTimestampPath() string // SetTSO sets the physical part with given TSO. It's mainly used for BR restore. // Cannot set the TSO smaller than now in any case. // if ignoreSmaller=true, if input ts is smaller than current, ignore silently, else return error @@ -80,6 +90,16 @@ func NewGlobalTSOAllocator( am *AllocatorManager, startGlobalLeaderLoop bool, ) Allocator { + // Construct the timestampOracle path prefix, which is: + // 1. for the default keyspace group: + // "" in /pd/{cluster_id}/timestamp + // 2. for the non-default keyspace groups: + // {group}/gta in /ms/{cluster_id}/tso/{group}/gta/timestamp + tsPath := "" + if am.kgID != utils.DefaultKeyspaceGroupID { + tsPath = path.Join(fmt.Sprintf("%05d", am.kgID), globalTSOAllocatorEtcdPrefix) + } + ctx, cancel := context.WithCancel(ctx) gta := &GlobalTSOAllocator{ ctx: ctx, @@ -89,7 +109,7 @@ func NewGlobalTSOAllocator( timestampOracle: ×tampOracle{ client: am.member.GetLeadership().GetClient(), rootPath: am.rootPath, - ltsPath: "", + tsPath: tsPath, storage: am.storage, saveInterval: am.saveInterval, updatePhysicalInterval: am.updatePhysicalInterval, @@ -127,6 +147,14 @@ func (gta *GlobalTSOAllocator) getSyncRTT() int64 { return syncRTT.(int64) } +// GetTimestampPath returns the timestamp path in etcd. +func (gta *GlobalTSOAllocator) GetTimestampPath() string { + if gta == nil || gta.timestampOracle == nil { + return "" + } + return gta.timestampOracle.GetTimestampPath() +} + func (gta *GlobalTSOAllocator) estimateMaxTS(count uint32, suffixBits int) (*pdpb.Timestamp, bool, error) { physical, logical, lastUpdateTime := gta.timestampOracle.generateTSO(int64(count), 0) if physical == 0 { diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 7038ef8e373..4d32ae92c80 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -186,9 +186,9 @@ type KeyspaceGroupManager struct { // 1. The path for keyspace group primary election. Format: "/ms/{cluster_id}/tso/{group}/primary" // 2. The path for LoadTimestamp/SaveTimestamp in the storage endpoint for all the non-default // keyspace groups. - // Key: /ms/{cluster_id}/tso/{group}/gts/timestamp + // Key: /ms/{cluster_id}/tso/{group}/gta/timestamp // Value: ts(time.Time) - // Key: /ms/{cluster_id}/tso/{group}/lts/{dc-location}/timestamp + // Key: /ms/{cluster_id}/tso/{group}/lta/{dc-location}/timestamp // Value: ts(time.Time) // Note: The {group} is 5 digits integer with leading zeros. tsoSvcRootPath string @@ -425,6 +425,9 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro } // Initialize all kinds of maps. am := NewAllocatorManager(kgm.ctx, group.ID, participant, tsRootPath, storage, kgm.cfg, true) + log.Info("created allocator manager", + zap.Uint32("keyspace-group-id", group.ID), + zap.String("timestamp-path", am.GetTimestampPath(""))) kgm.Lock() group.KeyspaceLookupTable = make(map[uint32]struct{}) for _, kid := range group.Keyspaces { diff --git a/pkg/tso/local_allocator.go b/pkg/tso/local_allocator.go index c3bb4c02aad..6458df22cb0 100644 --- a/pkg/tso/local_allocator.go +++ b/pkg/tso/local_allocator.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/election" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/tsoutil" "github.com/tikv/pd/pkg/utils/typeutil" @@ -36,6 +37,7 @@ import ( // which is only used to allocate TSO in one DC each. // One PD server may hold multiple Local TSO Allocators. type LocalTSOAllocator struct { + // kgID is the keyspace group ID allocatorManager *AllocatorManager // leadership is used to campaign the corresponding DC's Local TSO Allocator. leadership *election.Leadership @@ -43,7 +45,7 @@ type LocalTSOAllocator struct { // for election use, notice that the leadership that member holds is // the leadership for PD leader. Local TSO Allocator's leadership is for the // election of Local TSO Allocator leader among several PD servers and - // Local TSO Allocator only use member's some etcd and pbpd.Member info. + // Local TSO Allocator only use member's some etcd and pdpb.Member info. // So it's not conflicted. rootPath string allocatorLeader atomic.Value // stored as *pdpb.Member @@ -55,13 +57,24 @@ func NewLocalTSOAllocator( leadership *election.Leadership, dcLocation string, ) Allocator { + // Construct the timestampOracle path prefix, which is: + // 1. for the default keyspace group: + // lta/{dc-location} in /pd/{cluster_id}/lta/{dc-location}/timestamp + // 2. for the non-default keyspace groups: + // {group}/lta/{dc-location} in /ms/{cluster_id}/tso/{group}/lta/{dc-location}/timestamp + var tsPath string + if am.kgID == utils.DefaultKeyspaceGroupID { + tsPath = path.Join(localTSOAllocatorEtcdPrefix, dcLocation) + } else { + tsPath = path.Join(fmt.Sprintf("%05d", am.kgID), localTSOAllocatorEtcdPrefix, dcLocation) + } return &LocalTSOAllocator{ allocatorManager: am, leadership: leadership, timestampOracle: ×tampOracle{ client: leadership.GetClient(), rootPath: am.rootPath, - ltsPath: path.Join(localTSOAllocatorEtcdPrefix, dcLocation), + tsPath: tsPath, storage: am.storage, saveInterval: am.saveInterval, updatePhysicalInterval: am.updatePhysicalInterval, @@ -73,6 +86,14 @@ func NewLocalTSOAllocator( } } +// GetTimestampPath returns the timestamp path in etcd. +func (lta *LocalTSOAllocator) GetTimestampPath() string { + if lta == nil || lta.timestampOracle == nil { + return "" + } + return lta.timestampOracle.GetTimestampPath() +} + // GetDCLocation returns the local allocator's dc-location. func (lta *LocalTSOAllocator) GetDCLocation() string { return lta.timestampOracle.dcLocation diff --git a/pkg/tso/tso.go b/pkg/tso/tso.go index fa1bfcb04f6..aa1a424d8cd 100644 --- a/pkg/tso/tso.go +++ b/pkg/tso/tso.go @@ -62,8 +62,8 @@ type tsoObject struct { type timestampOracle struct { client *clientv3.Client rootPath string - // When ltsPath is empty, it means that it is a global timestampOracle. - ltsPath string + // When tsPath is empty, it means that it is a global timestampOracle. + tsPath string storage endpoint.TSOStorage // TODO: remove saveInterval saveInterval time.Duration @@ -141,8 +141,9 @@ func (t *timestampOracle) calibrateLogical(rawLogical int64, suffixBits int) int return rawLogical< Date: Mon, 15 May 2023 13:16:26 +0800 Subject: [PATCH 5/9] Fix fmt error Signed-off-by: Bin Shi --- pkg/tso/allocator_manager.go | 12 ++++++------ pkg/tso/global_allocator.go | 3 +-- .../mcs/tso/keyspace_group_manager_test.go | 1 - tests/integrations/tso/client_test.go | 3 --- 4 files changed, 7 insertions(+), 12 deletions(-) diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index 9f3c7a1d120..f2381e9e7d8 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -46,13 +46,13 @@ import ( const ( // GlobalDCLocation is the Global TSO Allocator's DC location label. - GlobalDCLocation = "global" - checkStep = time.Minute - patrolStep = time.Second - defaultAllocatorLeaderLease = 3 + GlobalDCLocation = "global" + checkStep = time.Minute + patrolStep = time.Second + defaultAllocatorLeaderLease = 3 globalTSOAllocatorEtcdPrefix = "gta" - localTSOAllocatorEtcdPrefix = "lta" - localTSOSuffixEtcdPrefix = "lts" + localTSOAllocatorEtcdPrefix = "lta" + localTSOSuffixEtcdPrefix = "lts" ) var ( diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index 3617d413ba6..1d961fd1b95 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/mcs/utils" mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/utils/logutil" @@ -96,7 +95,7 @@ func NewGlobalTSOAllocator( // 2. for the non-default keyspace groups: // {group}/gta in /ms/{cluster_id}/tso/{group}/gta/timestamp tsPath := "" - if am.kgID != utils.DefaultKeyspaceGroupID { + if am.kgID != mcsutils.DefaultKeyspaceGroupID { tsPath = path.Join(fmt.Sprintf("%05d", am.kgID), globalTSOAllocatorEtcdPrefix) } diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 9974f3825c1..d84e3d75453 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -201,7 +201,6 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKe } else { timestampPath = fmt.Sprintf("/ms/%s/tso/%05d/gta/timestamp", clusterID, param.keyspaceGroupID) - } re.Equal(timestampPath, am.GetTimestampPath("")) diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index 7245cb75c85..c57e3a032d1 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -16,7 +16,6 @@ package tso import ( "context" - "fmt" "math" "math/rand" "strings" @@ -140,8 +139,6 @@ func (suite *tsoClientTestSuite) SetupSuite() { suite.waitForAllKeyspaceGroupsInServing(re) } - - fmt.Println("TestSuite Setup Done !!!!!!!!!!!!!!!!!!!!!") } func (suite *tsoClientTestSuite) waitForAllKeyspaceGroupsInServing(re *require.Assertions) { From 34705ec177492f4eafa2d2f0fde98a3d5bba53db Mon Sep 17 00:00:00 2001 From: Bin Shi Date: Mon, 15 May 2023 14:45:00 +0800 Subject: [PATCH 6/9] Fix test failures in TestTSOKeyspaceGroupSplit Signed-off-by: Bin Shi --- .../mcs/tso/keyspace_group_manager_test.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index d84e3d75453..8f8151b75fb 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -269,15 +269,15 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() { re.Equal(uint32(2), kg2.ID) re.Equal([]uint32{222, 333}, kg2.Keyspaces) re.True(kg2.IsSplitTarget()) + time.Sleep(3 * time.Second) // Check the split TSO from keyspace group 2. var splitTS pdpb.Timestamp testutil.Eventually(re, func() bool { splitTS, err = suite.requestTSO(re, 1, 222, 2) return err == nil && tsoutil.CompareTimestamp(&splitTS, &pdpb.Timestamp{}) > 0 }) + splitTS, err = suite.requestTSO(re, 1, 222, 2) re.Greater(tsoutil.CompareTimestamp(&splitTS, &ts), 0) - // Finish the split. - handlersutil.MustFinishSplitKeyspaceGroup(re, suite.pdLeaderServer, 2) } func (suite *tsoKeyspaceGroupManagerTestSuite) requestTSO( @@ -285,10 +285,11 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) requestTSO( count, keyspaceID, keyspaceGroupID uint32, ) (pdpb.Timestamp, error) { primary := suite.tsoCluster.WaitForPrimaryServing(re, keyspaceID, keyspaceGroupID) - tam, err := primary.GetTSOAllocatorManager(keyspaceGroupID) + kgm := primary.GetKeyspaceGroupManager() + re.NotNil(kgm) + ts, _, err := kgm.HandleTSORequest(keyspaceID, keyspaceGroupID, tsopkg.GlobalDCLocation, count) re.NoError(err) - re.NotNil(tam) - return tam.HandleRequest(tsopkg.GlobalDCLocation, count) + return ts, err } func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitElection() { From 68b1950baaf565c9081e491bb301743ec928e4e4 Mon Sep 17 00:00:00 2001 From: Bin Shi Date: Mon, 15 May 2023 15:02:24 +0800 Subject: [PATCH 7/9] remove -v from ci-test-job in Makefile Signed-off-by: Bin Shi --- tests/integrations/tso/Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integrations/tso/Makefile b/tests/integrations/tso/Makefile index 4573863039c..820bbf33ccf 100644 --- a/tests/integrations/tso/Makefile +++ b/tests/integrations/tso/Makefile @@ -32,7 +32,7 @@ test: failpoint-enable $(MAKE) failpoint-disable ci-test-job: - CGO_ENABLED=1 go test -v -tags deadlock -race -covermode=atomic -coverprofile=covprofile -coverpkg=$(ROOT_PATH)/... github.com/tikv/pd/tests/integrations/tso + CGO_ENABLED=1 go test -tags deadlock -race -covermode=atomic -coverprofile=covprofile -coverpkg=$(ROOT_PATH)/... github.com/tikv/pd/tests/integrations/tso install-tools: cd $(ROOT_PATH) && $(MAKE) install-tools From 2a4102adaf68933ab6768ea0b38e5f15fa12da50 Mon Sep 17 00:00:00 2001 From: Bin Shi Date: Mon, 15 May 2023 16:58:25 +0800 Subject: [PATCH 8/9] Handle feedback Signed-off-by: Bin Shi --- tests/integrations/mcs/tso/keyspace_group_manager_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 8f8151b75fb..4b650bf1e25 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -269,7 +269,6 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() { re.Equal(uint32(2), kg2.ID) re.Equal([]uint32{222, 333}, kg2.Keyspaces) re.True(kg2.IsSplitTarget()) - time.Sleep(3 * time.Second) // Check the split TSO from keyspace group 2. var splitTS pdpb.Timestamp testutil.Eventually(re, func() bool { From 52d3f883186c9a0d086ab38f4d72d81acf61536c Mon Sep 17 00:00:00 2001 From: Bin Shi Date: Tue, 16 May 2023 09:45:28 +0800 Subject: [PATCH 9/9] Handle more feedback Signed-off-by: Bin Shi --- pkg/tso/local_allocator.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/tso/local_allocator.go b/pkg/tso/local_allocator.go index 6458df22cb0..9c2867966bc 100644 --- a/pkg/tso/local_allocator.go +++ b/pkg/tso/local_allocator.go @@ -37,7 +37,6 @@ import ( // which is only used to allocate TSO in one DC each. // One PD server may hold multiple Local TSO Allocators. type LocalTSOAllocator struct { - // kgID is the keyspace group ID allocatorManager *AllocatorManager // leadership is used to campaign the corresponding DC's Local TSO Allocator. leadership *election.Leadership