diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index a780e7da74e..f2381e9e7d8 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -46,12 +46,13 @@ import ( const ( // GlobalDCLocation is the Global TSO Allocator's DC location label. - GlobalDCLocation = "global" - checkStep = time.Minute - patrolStep = time.Second - defaultAllocatorLeaderLease = 3 - localTSOAllocatorEtcdPrefix = "lta" - localTSOSuffixEtcdPrefix = "lts" + GlobalDCLocation = "global" + checkStep = time.Minute + patrolStep = time.Second + defaultAllocatorLeaderLease = 3 + globalTSOAllocatorEtcdPrefix = "gta" + localTSOAllocatorEtcdPrefix = "lta" + localTSOSuffixEtcdPrefix = "lts" ) var ( @@ -273,6 +274,23 @@ func (am *AllocatorManager) setUpLocalAllocator(parentCtx context.Context, dcLoc go am.allocatorLeaderLoop(parentCtx, localTSOAllocator) } +// GetTimestampPath returns the timestamp path in etcd for the given DCLocation. +func (am *AllocatorManager) GetTimestampPath(dcLocation string) string { + if am == nil { + return "" + } + if len(dcLocation) == 0 { + dcLocation = GlobalDCLocation + } + + am.mu.RLock() + defer am.mu.RUnlock() + if allocatorGroup, exist := am.mu.allocatorGroups[dcLocation]; exist { + return path.Join(am.rootPath, allocatorGroup.allocator.GetTimestampPath()) + } + return "" +} + // tsoAllocatorLoop is used to run the TSO Allocator updating daemon. func (am *AllocatorManager) tsoAllocatorLoop() { defer logutil.LogPanic() @@ -750,7 +768,9 @@ func (am *AllocatorManager) updateAllocator(ag *allocatorGroup) { } if err := ag.allocator.UpdateTSO(); err != nil { log.Warn("failed to update allocator's timestamp", - zap.String("dc-location", ag.dcLocation), errs.ZapError(err)) + zap.String("dc-location", ag.dcLocation), + zap.String("name", am.member.Name()), + errs.ZapError(err)) am.ResetAllocatorGroup(ag.dcLocation) return } diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index 0227f2c1a64..1d961fd1b95 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "path" "sync" "sync/atomic" "time" @@ -45,6 +46,14 @@ type Allocator interface { IsInitialize() bool // UpdateTSO is used to update the TSO in memory and the time window in etcd. UpdateTSO() error + // GetTimestampPath returns the timestamp path in etcd, which is: + // 1. for the default keyspace group: + // a. timestamp in /pd/{cluster_id}/timestamp + // b. lta/{dc-location}/timestamp in /pd/{cluster_id}/lta/{dc-location}/timestamp + // 1. for the non-default keyspace groups: + // a. {group}/gts/timestamp in /ms/{cluster_id}/tso/{group}/gta/timestamp + // b. {group}/lts/{dc-location}/timestamp in /ms/{cluster_id}/tso/{group}/lta/{dc-location}/timestamp + GetTimestampPath() string // SetTSO sets the physical part with given TSO. It's mainly used for BR restore. // Cannot set the TSO smaller than now in any case. // if ignoreSmaller=true, if input ts is smaller than current, ignore silently, else return error @@ -80,6 +89,16 @@ func NewGlobalTSOAllocator( am *AllocatorManager, startGlobalLeaderLoop bool, ) Allocator { + // Construct the timestampOracle path prefix, which is: + // 1. for the default keyspace group: + // "" in /pd/{cluster_id}/timestamp + // 2. for the non-default keyspace groups: + // {group}/gta in /ms/{cluster_id}/tso/{group}/gta/timestamp + tsPath := "" + if am.kgID != mcsutils.DefaultKeyspaceGroupID { + tsPath = path.Join(fmt.Sprintf("%05d", am.kgID), globalTSOAllocatorEtcdPrefix) + } + ctx, cancel := context.WithCancel(ctx) gta := &GlobalTSOAllocator{ ctx: ctx, @@ -89,7 +108,7 @@ func NewGlobalTSOAllocator( timestampOracle: ×tampOracle{ client: am.member.GetLeadership().GetClient(), rootPath: am.rootPath, - ltsPath: "", + tsPath: tsPath, storage: am.storage, saveInterval: am.saveInterval, updatePhysicalInterval: am.updatePhysicalInterval, @@ -127,6 +146,14 @@ func (gta *GlobalTSOAllocator) getSyncRTT() int64 { return syncRTT.(int64) } +// GetTimestampPath returns the timestamp path in etcd. +func (gta *GlobalTSOAllocator) GetTimestampPath() string { + if gta == nil || gta.timestampOracle == nil { + return "" + } + return gta.timestampOracle.GetTimestampPath() +} + func (gta *GlobalTSOAllocator) estimateMaxTS(count uint32, suffixBits int) (*pdpb.Timestamp, bool, error) { physical, logical, lastUpdateTime := gta.timestampOracle.generateTSO(int64(count), 0) if physical == 0 { diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 7038ef8e373..4d32ae92c80 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -186,9 +186,9 @@ type KeyspaceGroupManager struct { // 1. The path for keyspace group primary election. Format: "/ms/{cluster_id}/tso/{group}/primary" // 2. The path for LoadTimestamp/SaveTimestamp in the storage endpoint for all the non-default // keyspace groups. - // Key: /ms/{cluster_id}/tso/{group}/gts/timestamp + // Key: /ms/{cluster_id}/tso/{group}/gta/timestamp // Value: ts(time.Time) - // Key: /ms/{cluster_id}/tso/{group}/lts/{dc-location}/timestamp + // Key: /ms/{cluster_id}/tso/{group}/lta/{dc-location}/timestamp // Value: ts(time.Time) // Note: The {group} is 5 digits integer with leading zeros. tsoSvcRootPath string @@ -425,6 +425,9 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro } // Initialize all kinds of maps. am := NewAllocatorManager(kgm.ctx, group.ID, participant, tsRootPath, storage, kgm.cfg, true) + log.Info("created allocator manager", + zap.Uint32("keyspace-group-id", group.ID), + zap.String("timestamp-path", am.GetTimestampPath(""))) kgm.Lock() group.KeyspaceLookupTable = make(map[uint32]struct{}) for _, kid := range group.Keyspaces { diff --git a/pkg/tso/local_allocator.go b/pkg/tso/local_allocator.go index c3bb4c02aad..9c2867966bc 100644 --- a/pkg/tso/local_allocator.go +++ b/pkg/tso/local_allocator.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/election" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/tsoutil" "github.com/tikv/pd/pkg/utils/typeutil" @@ -43,7 +44,7 @@ type LocalTSOAllocator struct { // for election use, notice that the leadership that member holds is // the leadership for PD leader. Local TSO Allocator's leadership is for the // election of Local TSO Allocator leader among several PD servers and - // Local TSO Allocator only use member's some etcd and pbpd.Member info. + // Local TSO Allocator only use member's some etcd and pdpb.Member info. // So it's not conflicted. rootPath string allocatorLeader atomic.Value // stored as *pdpb.Member @@ -55,13 +56,24 @@ func NewLocalTSOAllocator( leadership *election.Leadership, dcLocation string, ) Allocator { + // Construct the timestampOracle path prefix, which is: + // 1. for the default keyspace group: + // lta/{dc-location} in /pd/{cluster_id}/lta/{dc-location}/timestamp + // 2. for the non-default keyspace groups: + // {group}/lta/{dc-location} in /ms/{cluster_id}/tso/{group}/lta/{dc-location}/timestamp + var tsPath string + if am.kgID == utils.DefaultKeyspaceGroupID { + tsPath = path.Join(localTSOAllocatorEtcdPrefix, dcLocation) + } else { + tsPath = path.Join(fmt.Sprintf("%05d", am.kgID), localTSOAllocatorEtcdPrefix, dcLocation) + } return &LocalTSOAllocator{ allocatorManager: am, leadership: leadership, timestampOracle: ×tampOracle{ client: leadership.GetClient(), rootPath: am.rootPath, - ltsPath: path.Join(localTSOAllocatorEtcdPrefix, dcLocation), + tsPath: tsPath, storage: am.storage, saveInterval: am.saveInterval, updatePhysicalInterval: am.updatePhysicalInterval, @@ -73,6 +85,14 @@ func NewLocalTSOAllocator( } } +// GetTimestampPath returns the timestamp path in etcd. +func (lta *LocalTSOAllocator) GetTimestampPath() string { + if lta == nil || lta.timestampOracle == nil { + return "" + } + return lta.timestampOracle.GetTimestampPath() +} + // GetDCLocation returns the local allocator's dc-location. func (lta *LocalTSOAllocator) GetDCLocation() string { return lta.timestampOracle.dcLocation diff --git a/pkg/tso/tso.go b/pkg/tso/tso.go index 525385b42eb..aa1a424d8cd 100644 --- a/pkg/tso/tso.go +++ b/pkg/tso/tso.go @@ -62,8 +62,8 @@ type tsoObject struct { type timestampOracle struct { client *clientv3.Client rootPath string - // When ltsPath is empty, it means that it is a global timestampOracle. - ltsPath string + // When tsPath is empty, it means that it is a global timestampOracle. + tsPath string storage endpoint.TSOStorage // TODO: remove saveInterval saveInterval time.Duration @@ -141,8 +141,9 @@ func (t *timestampOracle) calibrateLogical(rawLogical int64, suffixBits int) int return rawLogical< 3*t.updatePhysicalInterval && jetLag > jetLagWarningThreshold { - log.Warn("clock offset", zap.Duration("jet-lag", jetLag), zap.Time("prev-physical", prevPhysical), zap.Time("now", now), zap.Duration("update-physical-interval", t.updatePhysicalInterval)) + log.Warn("clock offset", + zap.Duration("jet-lag", jetLag), + zap.Time("prev-physical", prevPhysical), + zap.Time("now", now), + zap.Duration("update-physical-interval", t.updatePhysicalInterval)) tsoCounter.WithLabelValues("slow_save", t.dcLocation).Inc() } @@ -313,7 +318,11 @@ func (t *timestampOracle) UpdateTimestamp(leadership *election.Leadership) error // The time window needs to be updated and saved to etcd. if typeutil.SubRealTimeByWallClock(t.lastSavedTime.Load().(time.Time), next) <= UpdateTimestampGuard { save := next.Add(t.saveInterval) - if err := t.storage.SaveTimestamp(t.getTimestampPath(), save); err != nil { + if err := t.storage.SaveTimestamp(t.GetTimestampPath(), save); err != nil { + log.Warn("save timestamp failed", + zap.String("dc-location", t.dcLocation), + zap.String("timestamp-path", t.GetTimestampPath()), + zap.Error(err)) tsoCounter.WithLabelValues("err_save_update_ts", t.dcLocation).Inc() return err } diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 799fccd42e4..4b650bf1e25 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -16,6 +16,8 @@ package tso import ( "context" + "fmt" + "strconv" "strings" "sync" "testing" @@ -134,6 +136,8 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByDefaultKeysp } } + // Create a client for each keyspace and make sure they can successfully discover the service + // provided by the default keyspace group. keyspaceIDs := []uint32{0, 1, 2, 3, 1000} clients := mcs.WaitForMultiKeyspacesTSOAvailable( suite.ctx, re, keyspaceIDs, []string{suite.pdLeaderServer.GetAddr()}) @@ -148,6 +152,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKe // on a tso server. re := suite.Require() + // Create keyspace groups. params := []struct { keyspaceGroupID uint32 keyspaceIDs []uint32 @@ -176,15 +181,29 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKe }) } + // Wait until all keyspace groups are ready. testutil.Eventually(re, func() bool { for _, param := range params { for _, keyspaceID := range param.keyspaceIDs { served := false for _, server := range suite.tsoCluster.GetServers() { if server.IsKeyspaceServing(keyspaceID, param.keyspaceGroupID) { - tam, err := server.GetTSOAllocatorManager(param.keyspaceGroupID) + am, err := server.GetTSOAllocatorManager(param.keyspaceGroupID) re.NoError(err) - re.NotNil(tam) + re.NotNil(am) + + // Make sure every keyspace group is using the right timestamp path + // for loading/saving timestamp from/to etcd. + var timestampPath string + clusterID := strconv.FormatUint(suite.pdLeaderServer.GetClusterID(), 10) + if param.keyspaceGroupID == mcsutils.DefaultKeyspaceGroupID { + timestampPath = fmt.Sprintf("/pd/%s/timestamp", clusterID) + } else { + timestampPath = fmt.Sprintf("/ms/%s/tso/%05d/gta/timestamp", + clusterID, param.keyspaceGroupID) + } + re.Equal(timestampPath, am.GetTimestampPath("")) + served = true } } @@ -196,6 +215,8 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKe return true }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) + // Create a client for each keyspace and make sure they can successfully discover the service + // provided by the corresponding keyspace group. keyspaceIDs := make([]uint32, 0) for _, param := range params { keyspaceIDs = append(keyspaceIDs, param.keyspaceIDs...) @@ -254,9 +275,8 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() { splitTS, err = suite.requestTSO(re, 1, 222, 2) return err == nil && tsoutil.CompareTimestamp(&splitTS, &pdpb.Timestamp{}) > 0 }) + splitTS, err = suite.requestTSO(re, 1, 222, 2) re.Greater(tsoutil.CompareTimestamp(&splitTS, &ts), 0) - // Finish the split. - handlersutil.MustFinishSplitKeyspaceGroup(re, suite.pdLeaderServer, 2) } func (suite *tsoKeyspaceGroupManagerTestSuite) requestTSO( @@ -264,10 +284,11 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) requestTSO( count, keyspaceID, keyspaceGroupID uint32, ) (pdpb.Timestamp, error) { primary := suite.tsoCluster.WaitForPrimaryServing(re, keyspaceID, keyspaceGroupID) - tam, err := primary.GetTSOAllocatorManager(keyspaceGroupID) + kgm := primary.GetKeyspaceGroupManager() + re.NotNil(kgm) + ts, _, err := kgm.HandleTSORequest(keyspaceID, keyspaceGroupID, tsopkg.GlobalDCLocation, count) re.NoError(err) - re.NotNil(tam) - return tam.HandleRequest(tsopkg.GlobalDCLocation, count) + return ts, err } func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitElection() { diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index 861aac9acd4..c57e3a032d1 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -114,6 +114,10 @@ func (suite *tsoClientTestSuite) SetupSuite() { {2, []uint32{2}}, } + for _, keyspaceGroup := range suite.keyspaceGroups { + suite.keyspaceIDs = append(suite.keyspaceIDs, keyspaceGroup.keyspaceIDs...) + } + for _, param := range suite.keyspaceGroups { if param.keyspaceGroupID == 0 { // we have already created default keyspace group, so we can skip it. @@ -133,16 +137,22 @@ func (suite *tsoClientTestSuite) SetupSuite() { }) } - for _, keyspaceGroup := range suite.keyspaceGroups { - suite.keyspaceIDs = append(suite.keyspaceIDs, keyspaceGroup.keyspaceIDs...) - } + suite.waitForAllKeyspaceGroupsInServing(re) + } +} - // Make sure all keyspace groups are available. - testutil.Eventually(re, func() bool { - for _, keyspaceID := range suite.keyspaceIDs { +func (suite *tsoClientTestSuite) waitForAllKeyspaceGroupsInServing(re *require.Assertions) { + // The tso servers are loading keyspace groups asynchronously. Make sure all keyspace groups + // are available for serving tso requests from corresponding keyspaces by querying + // IsKeyspaceServing(keyspaceID, the Desired KeyspaceGroupID). if use default keyspace group id + // in the query, it will always return true as the keyspace will be served by default keyspace + // group before the keyspace groups are loaded. + testutil.Eventually(re, func() bool { + for _, keyspaceGroup := range suite.keyspaceGroups { + for _, keyspaceID := range keyspaceGroup.keyspaceIDs { served := false for _, server := range suite.tsoCluster.GetServers() { - if server.IsKeyspaceServing(keyspaceID, mcsutils.DefaultKeyspaceGroupID) { + if server.IsKeyspaceServing(keyspaceID, keyspaceGroup.keyspaceGroupID) { served = true break } @@ -151,14 +161,14 @@ func (suite *tsoClientTestSuite) SetupSuite() { return false } } - return true - }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) + } + return true + }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) - // Create clients and make sure they all have discovered the tso service. - suite.clients = mcs.WaitForMultiKeyspacesTSOAvailable( - suite.ctx, re, suite.keyspaceIDs, strings.Split(suite.backendEndpoints, ",")) - re.Equal(len(suite.keyspaceIDs), len(suite.clients)) - } + // Create clients and make sure they all have discovered the tso service. + suite.clients = mcs.WaitForMultiKeyspacesTSOAvailable( + suite.ctx, re, suite.keyspaceIDs, strings.Split(suite.backendEndpoints, ",")) + re.Equal(len(suite.keyspaceIDs), len(suite.clients)) } func (suite *tsoClientTestSuite) TearDownSuite() { @@ -245,9 +255,8 @@ func (suite *tsoClientTestSuite) TestDiscoverTSOServiceWithLegacyPath() { // TestGetMinTS tests the correctness of GetMinTS. func (suite *tsoClientTestSuite) TestGetMinTS() { - // Skip this test for the time being due to https://github.com/tikv/pd/issues/6453 - // TODO: fix it #6453 - suite.T().SkipNow() + re := suite.Require() + suite.waitForAllKeyspaceGroupsInServing(re) var wg sync.WaitGroup wg.Add(tsoRequestConcurrencyNumber * len(suite.clients)) @@ -258,9 +267,9 @@ func (suite *tsoClientTestSuite) TestGetMinTS() { var lastMinTS uint64 for j := 0; j < tsoRequestRound; j++ { physical, logical, err := client.GetMinTS(suite.ctx) - suite.NoError(err) + re.NoError(err) minTS := tsoutil.ComposeTS(physical, logical) - suite.Less(lastMinTS, minTS) + re.Less(lastMinTS, minTS) lastMinTS = minTS // Now we check whether the returned ts is the minimum one @@ -268,9 +277,9 @@ func (suite *tsoClientTestSuite) TestGetMinTS() { // less than the new timestamps of all keyspace groups. for _, client := range suite.clients { physical, logical, err := client.GetTS(suite.ctx) - suite.NoError(err) + re.NoError(err) ts := tsoutil.ComposeTS(physical, logical) - suite.Less(minTS, ts) + re.Less(minTS, ts) } } }(client) diff --git a/tests/integrations/tso/testutil.go b/tests/integrations/tso/testutil.go index 55e24fd1e58..2a4e5eabd90 100644 --- a/tests/integrations/tso/testutil.go +++ b/tests/integrations/tso/testutil.go @@ -22,7 +22,7 @@ import ( const ( serverCount = 3 tsoRequestConcurrencyNumber = 5 - tsoRequestRound = 30 + tsoRequestRound = 300 tsoCount = 10 )