Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix "non-default keyspace groups use the same timestamp path by mistake" #6457

Merged
merged 9 commits into from
May 16, 2023
34 changes: 27 additions & 7 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,13 @@

const (
// GlobalDCLocation is the Global TSO Allocator's DC location label.
GlobalDCLocation = "global"
checkStep = time.Minute
patrolStep = time.Second
defaultAllocatorLeaderLease = 3
localTSOAllocatorEtcdPrefix = "lta"
localTSOSuffixEtcdPrefix = "lts"
GlobalDCLocation = "global"
checkStep = time.Minute
patrolStep = time.Second
defaultAllocatorLeaderLease = 3
globalTSOAllocatorEtcdPrefix = "gta"
localTSOAllocatorEtcdPrefix = "lta"
localTSOSuffixEtcdPrefix = "lts"
)

var (
Expand Down Expand Up @@ -273,6 +274,23 @@
go am.allocatorLeaderLoop(parentCtx, localTSOAllocator)
}

// GetTimestampPath returns the timestamp path in etcd for the given DCLocation.
func (am *AllocatorManager) GetTimestampPath(dcLocation string) string {
if am == nil {
return ""

Check warning on line 280 in pkg/tso/allocator_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/tso/allocator_manager.go#L280

Added line #L280 was not covered by tests
}
if len(dcLocation) == 0 {
dcLocation = GlobalDCLocation
}

am.mu.RLock()
defer am.mu.RUnlock()
if allocatorGroup, exist := am.mu.allocatorGroups[dcLocation]; exist {
return path.Join(am.rootPath, allocatorGroup.allocator.GetTimestampPath())
}
return ""

Check warning on line 291 in pkg/tso/allocator_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/tso/allocator_manager.go#L291

Added line #L291 was not covered by tests
}

// tsoAllocatorLoop is used to run the TSO Allocator updating daemon.
func (am *AllocatorManager) tsoAllocatorLoop() {
defer logutil.LogPanic()
Expand Down Expand Up @@ -750,7 +768,9 @@
}
if err := ag.allocator.UpdateTSO(); err != nil {
log.Warn("failed to update allocator's timestamp",
zap.String("dc-location", ag.dcLocation), errs.ZapError(err))
zap.String("dc-location", ag.dcLocation),
zap.String("name", am.member.Name()),
errs.ZapError(err))
am.ResetAllocatorGroup(ag.dcLocation)
return
}
Expand Down
29 changes: 28 additions & 1 deletion pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"context"
"errors"
"fmt"
"path"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -45,6 +46,14 @@
IsInitialize() bool
// UpdateTSO is used to update the TSO in memory and the time window in etcd.
UpdateTSO() error
// GetTimestampPath returns the timestamp path in etcd, which is:
// 1. for the default keyspace group:
// a. timestamp in /pd/{cluster_id}/timestamp
// b. lta/{dc-location}/timestamp in /pd/{cluster_id}/lta/{dc-location}/timestamp
// 1. for the non-default keyspace groups:
// a. {group}/gts/timestamp in /ms/{cluster_id}/tso/{group}/gta/timestamp
// b. {group}/lts/{dc-location}/timestamp in /ms/{cluster_id}/tso/{group}/lta/{dc-location}/timestamp
GetTimestampPath() string
// SetTSO sets the physical part with given TSO. It's mainly used for BR restore.
// Cannot set the TSO smaller than now in any case.
// if ignoreSmaller=true, if input ts is smaller than current, ignore silently, else return error
Expand Down Expand Up @@ -80,6 +89,16 @@
am *AllocatorManager,
startGlobalLeaderLoop bool,
) Allocator {
// Construct the timestampOracle path prefix, which is:
// 1. for the default keyspace group:
// "" in /pd/{cluster_id}/timestamp
// 2. for the non-default keyspace groups:
// {group}/gta in /ms/{cluster_id}/tso/{group}/gta/timestamp
tsPath := ""
if am.kgID != mcsutils.DefaultKeyspaceGroupID {
tsPath = path.Join(fmt.Sprintf("%05d", am.kgID), globalTSOAllocatorEtcdPrefix)
}

ctx, cancel := context.WithCancel(ctx)
gta := &GlobalTSOAllocator{
ctx: ctx,
Expand All @@ -89,7 +108,7 @@
timestampOracle: &timestampOracle{
client: am.member.GetLeadership().GetClient(),
rootPath: am.rootPath,
ltsPath: "",
tsPath: tsPath,
storage: am.storage,
saveInterval: am.saveInterval,
updatePhysicalInterval: am.updatePhysicalInterval,
Expand Down Expand Up @@ -127,6 +146,14 @@
return syncRTT.(int64)
}

// GetTimestampPath returns the timestamp path in etcd.
func (gta *GlobalTSOAllocator) GetTimestampPath() string {
if gta == nil || gta.timestampOracle == nil {
return ""

Check warning on line 152 in pkg/tso/global_allocator.go

View check run for this annotation

Codecov / codecov/patch

pkg/tso/global_allocator.go#L152

Added line #L152 was not covered by tests
}
return gta.timestampOracle.GetTimestampPath()
}

func (gta *GlobalTSOAllocator) estimateMaxTS(count uint32, suffixBits int) (*pdpb.Timestamp, bool, error) {
physical, logical, lastUpdateTime := gta.timestampOracle.generateTSO(int64(count), 0)
if physical == 0 {
Expand Down
7 changes: 5 additions & 2 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,9 @@ type KeyspaceGroupManager struct {
// 1. The path for keyspace group primary election. Format: "/ms/{cluster_id}/tso/{group}/primary"
// 2. The path for LoadTimestamp/SaveTimestamp in the storage endpoint for all the non-default
// keyspace groups.
// Key: /ms/{cluster_id}/tso/{group}/gts/timestamp
// Key: /ms/{cluster_id}/tso/{group}/gta/timestamp
// Value: ts(time.Time)
// Key: /ms/{cluster_id}/tso/{group}/lts/{dc-location}/timestamp
// Key: /ms/{cluster_id}/tso/{group}/lta/{dc-location}/timestamp
// Value: ts(time.Time)
// Note: The {group} is 5 digits integer with leading zeros.
tsoSvcRootPath string
Expand Down Expand Up @@ -425,6 +425,9 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
}
// Initialize all kinds of maps.
am := NewAllocatorManager(kgm.ctx, group.ID, participant, tsRootPath, storage, kgm.cfg, true)
log.Info("created allocator manager",
zap.Uint32("keyspace-group-id", group.ID),
zap.String("timestamp-path", am.GetTimestampPath("")))
kgm.Lock()
group.KeyspaceLookupTable = make(map[uint32]struct{})
for _, kid := range group.Keyspaces {
Expand Down
24 changes: 22 additions & 2 deletions pkg/tso/local_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/election"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
"github.com/tikv/pd/pkg/utils/typeutil"
Expand All @@ -43,7 +44,7 @@
// for election use, notice that the leadership that member holds is
// the leadership for PD leader. Local TSO Allocator's leadership is for the
// election of Local TSO Allocator leader among several PD servers and
// Local TSO Allocator only use member's some etcd and pbpd.Member info.
// Local TSO Allocator only use member's some etcd and pdpb.Member info.
// So it's not conflicted.
rootPath string
allocatorLeader atomic.Value // stored as *pdpb.Member
Expand All @@ -55,13 +56,24 @@
leadership *election.Leadership,
dcLocation string,
) Allocator {
// Construct the timestampOracle path prefix, which is:
// 1. for the default keyspace group:
// lta/{dc-location} in /pd/{cluster_id}/lta/{dc-location}/timestamp
// 2. for the non-default keyspace groups:
// {group}/lta/{dc-location} in /ms/{cluster_id}/tso/{group}/lta/{dc-location}/timestamp
var tsPath string
if am.kgID == utils.DefaultKeyspaceGroupID {
tsPath = path.Join(localTSOAllocatorEtcdPrefix, dcLocation)
} else {
tsPath = path.Join(fmt.Sprintf("%05d", am.kgID), localTSOAllocatorEtcdPrefix, dcLocation)

Check warning on line 68 in pkg/tso/local_allocator.go

View check run for this annotation

Codecov / codecov/patch

pkg/tso/local_allocator.go#L68

Added line #L68 was not covered by tests
}
return &LocalTSOAllocator{
allocatorManager: am,
leadership: leadership,
timestampOracle: &timestampOracle{
client: leadership.GetClient(),
rootPath: am.rootPath,
ltsPath: path.Join(localTSOAllocatorEtcdPrefix, dcLocation),
tsPath: tsPath,
storage: am.storage,
saveInterval: am.saveInterval,
updatePhysicalInterval: am.updatePhysicalInterval,
Expand All @@ -73,6 +85,14 @@
}
}

// GetTimestampPath returns the timestamp path in etcd.
func (lta *LocalTSOAllocator) GetTimestampPath() string {
if lta == nil || lta.timestampOracle == nil {
return ""

Check warning on line 91 in pkg/tso/local_allocator.go

View check run for this annotation

Codecov / codecov/patch

pkg/tso/local_allocator.go#L90-L91

Added lines #L90 - L91 were not covered by tests
binshi-bing marked this conversation as resolved.
Show resolved Hide resolved
}
return lta.timestampOracle.GetTimestampPath()

Check warning on line 93 in pkg/tso/local_allocator.go

View check run for this annotation

Codecov / codecov/patch

pkg/tso/local_allocator.go#L93

Added line #L93 was not covered by tests
}

// GetDCLocation returns the local allocator's dc-location.
func (lta *LocalTSOAllocator) GetDCLocation() string {
return lta.timestampOracle.dcLocation
Expand Down
27 changes: 18 additions & 9 deletions pkg/tso/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ type tsoObject struct {
type timestampOracle struct {
client *clientv3.Client
rootPath string
// When ltsPath is empty, it means that it is a global timestampOracle.
ltsPath string
// When tsPath is empty, it means that it is a global timestampOracle.
tsPath string
storage endpoint.TSOStorage
// TODO: remove saveInterval
saveInterval time.Duration
Expand Down Expand Up @@ -141,8 +141,9 @@ func (t *timestampOracle) calibrateLogical(rawLogical int64, suffixBits int) int
return rawLogical<<suffixBits + int64(t.suffix)
}

func (t *timestampOracle) getTimestampPath() string {
return path.Join(t.ltsPath, timestampKey)
// GetTimestampPath returns the timestamp path in etcd.
func (t *timestampOracle) GetTimestampPath() string {
return path.Join(t.tsPath, timestampKey)
}

// SyncTimestamp is used to synchronize the timestamp.
Expand All @@ -153,7 +154,7 @@ func (t *timestampOracle) SyncTimestamp(leadership *election.Leadership) error {
time.Sleep(time.Second)
})

last, err := t.storage.LoadTimestamp(t.ltsPath)
last, err := t.storage.LoadTimestamp(t.tsPath)
if err != nil {
return err
}
Expand All @@ -174,7 +175,7 @@ func (t *timestampOracle) SyncTimestamp(leadership *election.Leadership) error {
}

save := next.Add(t.saveInterval)
if err = t.storage.SaveTimestamp(t.getTimestampPath(), save); err != nil {
if err = t.storage.SaveTimestamp(t.GetTimestampPath(), save); err != nil {
tsoCounter.WithLabelValues("err_save_sync_ts", t.dcLocation).Inc()
return err
}
Expand Down Expand Up @@ -241,7 +242,7 @@ func (t *timestampOracle) resetUserTimestampInner(leadership *election.Leadershi
// save into etcd only if nextPhysical is close to lastSavedTime
if typeutil.SubRealTimeByWallClock(t.lastSavedTime.Load().(time.Time), nextPhysical) <= UpdateTimestampGuard {
save := nextPhysical.Add(t.saveInterval)
if err := t.storage.SaveTimestamp(t.getTimestampPath(), save); err != nil {
if err := t.storage.SaveTimestamp(t.GetTimestampPath(), save); err != nil {
tsoCounter.WithLabelValues("err_save_reset_ts", t.dcLocation).Inc()
return err
}
Expand Down Expand Up @@ -286,7 +287,11 @@ func (t *timestampOracle) UpdateTimestamp(leadership *election.Leadership) error

jetLag := typeutil.SubRealTimeByWallClock(now, prevPhysical)
if jetLag > 3*t.updatePhysicalInterval && jetLag > jetLagWarningThreshold {
log.Warn("clock offset", zap.Duration("jet-lag", jetLag), zap.Time("prev-physical", prevPhysical), zap.Time("now", now), zap.Duration("update-physical-interval", t.updatePhysicalInterval))
log.Warn("clock offset",
zap.Duration("jet-lag", jetLag),
zap.Time("prev-physical", prevPhysical),
zap.Time("now", now),
zap.Duration("update-physical-interval", t.updatePhysicalInterval))
tsoCounter.WithLabelValues("slow_save", t.dcLocation).Inc()
}

Expand All @@ -313,7 +318,11 @@ func (t *timestampOracle) UpdateTimestamp(leadership *election.Leadership) error
// The time window needs to be updated and saved to etcd.
if typeutil.SubRealTimeByWallClock(t.lastSavedTime.Load().(time.Time), next) <= UpdateTimestampGuard {
save := next.Add(t.saveInterval)
if err := t.storage.SaveTimestamp(t.getTimestampPath(), save); err != nil {
if err := t.storage.SaveTimestamp(t.GetTimestampPath(), save); err != nil {
log.Warn("save timestamp failed",
zap.String("dc-location", t.dcLocation),
zap.String("timestamp-path", t.GetTimestampPath()),
zap.Error(err))
tsoCounter.WithLabelValues("err_save_update_ts", t.dcLocation).Inc()
return err
}
Expand Down
35 changes: 28 additions & 7 deletions tests/integrations/mcs/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package tso

import (
"context"
"fmt"
"strconv"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -134,6 +136,8 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByDefaultKeysp
}
}

// Create a client for each keyspace and make sure they can successfully discover the service
// provided by the default keyspace group.
keyspaceIDs := []uint32{0, 1, 2, 3, 1000}
clients := mcs.WaitForMultiKeyspacesTSOAvailable(
suite.ctx, re, keyspaceIDs, []string{suite.pdLeaderServer.GetAddr()})
Expand All @@ -148,6 +152,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKe
// on a tso server.
re := suite.Require()

// Create keyspace groups.
params := []struct {
keyspaceGroupID uint32
keyspaceIDs []uint32
Expand Down Expand Up @@ -176,15 +181,29 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKe
})
}

// Wait until all keyspace groups are ready.
testutil.Eventually(re, func() bool {
for _, param := range params {
for _, keyspaceID := range param.keyspaceIDs {
served := false
for _, server := range suite.tsoCluster.GetServers() {
if server.IsKeyspaceServing(keyspaceID, param.keyspaceGroupID) {
tam, err := server.GetTSOAllocatorManager(param.keyspaceGroupID)
am, err := server.GetTSOAllocatorManager(param.keyspaceGroupID)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we test the correct timestamp paths are used for all keyspace groups.

re.NoError(err)
re.NotNil(tam)
re.NotNil(am)

// Make sure every keyspace group is using the right timestamp path
// for loading/saving timestamp from/to etcd.
var timestampPath string
clusterID := strconv.FormatUint(suite.pdLeaderServer.GetClusterID(), 10)
if param.keyspaceGroupID == mcsutils.DefaultKeyspaceGroupID {
timestampPath = fmt.Sprintf("/pd/%s/timestamp", clusterID)
} else {
timestampPath = fmt.Sprintf("/ms/%s/tso/%05d/gta/timestamp",
clusterID, param.keyspaceGroupID)
}
re.Equal(timestampPath, am.GetTimestampPath(""))

served = true
}
}
Expand All @@ -196,6 +215,8 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestKeyspacesServedByNonDefaultKe
return true
}, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond))

// Create a client for each keyspace and make sure they can successfully discover the service
// provided by the corresponding keyspace group.
keyspaceIDs := make([]uint32, 0)
for _, param := range params {
keyspaceIDs = append(keyspaceIDs, param.keyspaceIDs...)
Expand Down Expand Up @@ -254,20 +275,20 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() {
splitTS, err = suite.requestTSO(re, 1, 222, 2)
return err == nil && tsoutil.CompareTimestamp(&splitTS, &pdpb.Timestamp{}) > 0
})
splitTS, err = suite.requestTSO(re, 1, 222, 2)
re.Greater(tsoutil.CompareTimestamp(&splitTS, &ts), 0)
// Finish the split.
handlersutil.MustFinishSplitKeyspaceGroup(re, suite.pdLeaderServer, 2)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to explicitly to call this line, because the keyspace group manager will automatically finish the split when it's first time serving the tso request for the split target group.

}

func (suite *tsoKeyspaceGroupManagerTestSuite) requestTSO(
re *require.Assertions,
count, keyspaceID, keyspaceGroupID uint32,
) (pdpb.Timestamp, error) {
primary := suite.tsoCluster.WaitForPrimaryServing(re, keyspaceID, keyspaceGroupID)
tam, err := primary.GetTSOAllocatorManager(keyspaceGroupID)
kgm := primary.GetKeyspaceGroupManager()
re.NotNil(kgm)
ts, _, err := kgm.HandleTSORequest(keyspaceID, keyspaceGroupID, tsopkg.GlobalDCLocation, count)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call keyspace group manager's HandleTSORequest() instead of primary.GetTSOAllocatorManager(keyspaceGroupID) to trigger split process.

re.NoError(err)
re.NotNil(tam)
return tam.HandleRequest(tsopkg.GlobalDCLocation, count)
return ts, err
}

func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitElection() {
Expand Down
Loading
Loading