Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load initial assignment and dynamically watch/apply keyspace groups' membership/distribution change #6247

Merged
merged 2 commits into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1167,7 +1167,9 @@ func IsLeaderChange(err error) bool {
return true
}
errMsg := err.Error()
return strings.Contains(errMsg, errs.NotLeaderErr) || strings.Contains(errMsg, errs.MismatchLeaderErr)
return strings.Contains(errMsg, errs.NotLeaderErr) ||
strings.Contains(errMsg, errs.MismatchLeaderErr) ||
strings.Contains(errMsg, errs.NotServedErr)
}

func trimHTTPPrefix(str string) string {
Expand Down
14 changes: 11 additions & 3 deletions client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,19 @@ import (
)

const (
// NotLeaderErr indicates the the non-leader member received the requests which should be received by leader.
// NotLeaderErr indicates the non-leader member received the requests which should be received by leader.
// Note: keep the same as the ones defined on the server side, because the client side checks if an error message
// contains this string to judge whether the leader is changed.
NotLeaderErr = "is not leader"
// MismatchLeaderErr indicates the the non-leader member received the requests which should be received by leader.
// MismatchLeaderErr indicates the non-leader member received the requests which should be received by leader.
// Note: keep the same as the ones defined on the server side, because the client side checks if an error message
// contains this string to judge whether the leader is changed.
MismatchLeaderErr = "mismatch leader id"
RetryTimeoutErr = "retry timeout"
// NotServedErr indicates an tso node/pod received the requests for the keyspace groups which are not served by it.
// Note: keep the same as the ones defined on the server side, because the client side checks if an error message
// contains this string to judge whether the leader is changed.
NotServedErr = "is not served"
RetryTimeoutErr = "retry timeout"
)

// client errors
Expand Down
20 changes: 20 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
# AUTOGENERATED BY github.com/pingcap/errors/errdoc-gen
# YOU CAN CHANGE THE 'description'/'workaround' FIELDS IF THEM ARE IMPROPER.

["ErrLoadKeyspaceGroupsTerminated"]
error = '''
load keyspace groups terminated
'''

["ErrLoadKeyspaceGroupsTimeout"]
error = '''
load keyspace groups timeout
'''

["PD:ErrEncryptionKMS"]
error = '''
KMS error
Expand Down Expand Up @@ -731,11 +741,21 @@ error = '''
get allocator failed, %s
'''

["PD:tso:ErrGetAllocatorManager"]
error = '''
get allocator manager failed, %s
'''

["PD:tso:ErrGetLocalAllocator"]
error = '''
get local allocator failed, %s
'''

["PD:tso:ErrKeyspaceGroupIDInvalid"]
error = '''
the keyspace group id is invalid, %s
'''

["PD:tso:ErrLogicOverflow"]
error = '''
logic part overflow
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/google/pprof v0.0.0-20211122183932-1daafda22083 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/google/uuid v1.3.0
github.com/gorilla/websocket v1.4.2 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
Expand Down
32 changes: 22 additions & 10 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,18 @@ package errs
import "github.com/pingcap/errors"

const (
// NotLeaderErr indicates the the non-leader member received the requests which should be received by leader.
// NotLeaderErr indicates the non-leader member received the requests which should be received by leader.
// Note: keep the same as the ones defined on the client side, because the client side checks if an error message
// contains this string to judge whether the leader is changed.
NotLeaderErr = "is not leader"
// MismatchLeaderErr indicates the the non-leader member received the requests which should be received by leader.
// MismatchLeaderErr indicates the non-leader member received the requests which should be received by leader.
// Note: keep the same as the ones defined on the client side, because the client side checks if an error message
// contains this string to judge whether the leader is changed.
MismatchLeaderErr = "mismatch leader id"
// NotServedErr indicates an tso node/pod received the requests for the keyspace groups which are not served by it.
// Note: keep the same as the ones defined on the client side, because the client side checks if an error message
// contains this string to judge whether the leader is changed.
NotServedErr = "is not served"
)

// common error in multiple packages
Expand All @@ -31,14 +39,18 @@ var (

// tso errors
var (
ErrSetLocalTSOConfig = errors.Normalize("set local tso config failed, %s", errors.RFCCodeText("PD:tso:ErrSetLocalTSOConfig"))
ErrGetAllocator = errors.Normalize("get allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetAllocator"))
ErrGetLocalAllocator = errors.Normalize("get local allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetLocalAllocator"))
ErrSyncMaxTS = errors.Normalize("sync max ts failed, %s", errors.RFCCodeText("PD:tso:ErrSyncMaxTS"))
ErrResetUserTimestamp = errors.Normalize("reset user timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrResetUserTimestamp"))
ErrGenerateTimestamp = errors.Normalize("generate timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrGenerateTimestamp"))
ErrLogicOverflow = errors.Normalize("logic part overflow", errors.RFCCodeText("PD:tso:ErrLogicOverflow"))
ErrProxyTSOTimeout = errors.Normalize("proxy tso timeout", errors.RFCCodeText("PD:tso:ErrProxyTSOTimeout"))
ErrSetLocalTSOConfig = errors.Normalize("set local tso config failed, %s", errors.RFCCodeText("PD:tso:ErrSetLocalTSOConfig"))
ErrGetAllocator = errors.Normalize("get allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetAllocator"))
ErrGetLocalAllocator = errors.Normalize("get local allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetLocalAllocator"))
ErrSyncMaxTS = errors.Normalize("sync max ts failed, %s", errors.RFCCodeText("PD:tso:ErrSyncMaxTS"))
ErrResetUserTimestamp = errors.Normalize("reset user timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrResetUserTimestamp"))
ErrGenerateTimestamp = errors.Normalize("generate timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrGenerateTimestamp"))
ErrLogicOverflow = errors.Normalize("logic part overflow", errors.RFCCodeText("PD:tso:ErrLogicOverflow"))
ErrProxyTSOTimeout = errors.Normalize("proxy tso timeout", errors.RFCCodeText("PD:tso:ErrProxyTSOTimeout"))
ErrKeyspaceGroupIDInvalid = errors.Normalize("the keyspace group id is invalid, %s", errors.RFCCodeText("PD:tso:ErrKeyspaceGroupIDInvalid"))
ErrGetAllocatorManager = errors.Normalize("get allocator manager failed, %s", errors.RFCCodeText("PD:tso:ErrGetAllocatorManager"))
ErrLoadKeyspaceGroupsTimeout = errors.Normalize("load keyspace groups timeout", errors.RFCCodeText("ErrLoadKeyspaceGroupsTimeout"))
ErrLoadKeyspaceGroupsTerminated = errors.Normalize("load keyspace groups terminated", errors.RFCCodeText("ErrLoadKeyspaceGroupsTerminated"))
)

// member errors
Expand Down
9 changes: 8 additions & 1 deletion pkg/mcs/tso/server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package server
import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/tso"
"go.uber.org/zap"
)
Expand All @@ -31,12 +32,18 @@ func newHandler(s *Server) *Handler {
}

// ResetTS resets the ts with specified tso.
// TODO: Support multiple keyspace groups.
func (h *Handler) ResetTS(ts uint64, ignoreSmaller, skipUpperBoundCheck bool) error {
log.Info("reset-ts",
zap.Uint64("new-ts", ts),
zap.Bool("ignore-smaller", ignoreSmaller),
zap.Bool("skip-upper-bound-check", skipUpperBoundCheck))
tsoAllocator, err := h.s.GetTSOAllocatorManager().GetAllocator(tso.GlobalDCLocation)
tsoAllocatorManager, err := h.s.GetTSOAllocatorManager(mcsutils.DefaultKeySpaceGroupID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we judge it firstly?

if err != nil {
log.Error("failed to get allocator manager", errs.ZapError(err))
return err
}
tsoAllocator, err := tsoAllocatorManager.GetAllocator(tso.GlobalDCLocation)
if err != nil {
return err
}
Expand Down
43 changes: 33 additions & 10 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,10 @@ type Server struct {

// Callback functions for different stages
// startCallbacks will be called after the server is started.
startCallbacks []func()
startCallbacks []func()

// for service registry
serviceID *discovery.ServiceRegistryEntry
serviceRegister *discovery.ServiceRegister
}

Expand Down Expand Up @@ -199,14 +202,30 @@ func (s *Server) AddStartCallback(callbacks ...func()) {

// IsServing implements basicserver. It returns whether the server is the leader
// if there is embedded etcd, or the primary otherwise.
// TODO: support multiple keyspace groups
func (s *Server) IsServing() bool {
return atomic.LoadInt64(&s.isRunning) == 1 && s.keyspaceGroupManager.GetElectionMember(mcsutils.DefaultKeySpaceGroupID).IsLeader()
if atomic.LoadInt64(&s.isRunning) == 0 {
return false
}

member, err := s.keyspaceGroupManager.GetElectionMember(mcsutils.DefaultKeySpaceGroupID)
if err != nil {
log.Error("failed to get election member", errs.ZapError(err))
return false
}
return member.IsLeader()
}

// GetLeaderListenUrls gets service endpoints from the leader in election group.
// The entry at the index 0 is the primary's service endpoint.
func (s *Server) GetLeaderListenUrls() []string {
return s.keyspaceGroupManager.GetElectionMember(mcsutils.DefaultKeySpaceGroupID).GetLeaderListenUrls()
member, err := s.keyspaceGroupManager.GetElectionMember(mcsutils.DefaultKeySpaceGroupID)
if err != nil {
log.Error("failed to get election member", errs.ZapError(err))
return nil
}

return member.GetLeaderListenUrls()
}

// AddServiceReadyCallback implements basicserver.
Expand All @@ -229,8 +248,8 @@ func (s *Server) IsClosed() bool {
}

// GetTSOAllocatorManager returns the manager of TSO Allocator.
func (s *Server) GetTSOAllocatorManager() *tso.AllocatorManager {
return s.keyspaceGroupManager.GetAllocatorManager(mcsutils.DefaultKeySpaceGroupID)
func (s *Server) GetTSOAllocatorManager(keyspaceGroupID uint32) (*tso.AllocatorManager, error) {
return s.keyspaceGroupManager.GetAllocatorManager(keyspaceGroupID)
}

// IsLocalRequest checks if the forwarded host is the current host
Expand Down Expand Up @@ -416,11 +435,16 @@ func (s *Server) startServer() (err error) {
}

s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
defaultKsgStorageTSRootPath := path.Join(pdRootPath, strconv.FormatUint(s.clusterID, 10))
legacySvcRootPath := path.Join(pdRootPath, strconv.FormatUint(s.clusterID, 10))
tsoSvcRootPath := fmt.Sprintf(tsoSvcRootPathFormat, s.clusterID)
s.serviceID = &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr}
s.keyspaceGroupManager = tso.NewKeyspaceGroupManager(
s.serverLoopCtx, s.etcdClient, s.listenURL.Host, defaultKsgStorageTSRootPath, tsoSvcRootPath, s.cfg)
s.keyspaceGroupManager.Initialize()
s.serverLoopCtx, s.serviceID, s.etcdClient, s.listenURL.Host, legacySvcRootPath, tsoSvcRootPath, s.cfg)
// The param `false` means that we don't initialize the keyspace group manager
// by loading the keyspace group meta from etcd.
if err := s.keyspaceGroupManager.Initialize(false); err != nil {
return err
}

s.tsoProtoFactory = &tsoutil.TSOProtoFactory{}
s.service = &Service{Server: s}
Expand Down Expand Up @@ -448,8 +472,7 @@ func (s *Server) startServer() (err error) {
}

// Server has started.
entry := &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr}
serializedEntry, err := entry.Serialize()
serializedEntry, err := s.serviceID.Serialize()
if err != nil {
return err
}
Expand Down
17 changes: 17 additions & 0 deletions pkg/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package endpoint
import (
"fmt"
"path"
"regexp"
"strconv"
"strings"

Expand Down Expand Up @@ -239,6 +240,22 @@ func KeyspaceGroupIDPath(id uint32) string {
return path.Join(tsoKeyspaceGroupPrefix, keyspaceGroupMembershipKey, encodeKeyspaceGroupID(id))
}

// ExtractKeyspaceGroupIDFromPath extracts keyspace group id from the given path, which contains
// the pattern of `tso/keyspace_groups/membership/(\d{5})$`.
func ExtractKeyspaceGroupIDFromPath(path string) (uint32, error) {
pattern := strings.Join([]string{KeyspaceGroupIDPrefix(), `(\d{5})$`}, "/")
re := regexp.MustCompile(pattern)
match := re.FindStringSubmatch(path)
if match == nil {
return 0, fmt.Errorf("invalid keyspace group id path: %s", path)
}
id, err := strconv.ParseUint(match[1], 10, 32)
if err != nil {
return 0, fmt.Errorf("failed to parse keyspace group ID: %v", err)
}
return uint32(id), nil
}

// encodeKeyspaceGroupID from uint32 to string.
func encodeKeyspaceGroupID(groupID uint32) string {
return fmt.Sprintf("%05d", groupID)
Expand Down
48 changes: 48 additions & 0 deletions pkg/storage/endpoint/key_path_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,51 @@ func BenchmarkRegionPath(b *testing.B) {
_ = RegionPath(uint64(i))
}
}

func TestExtractKeyspaceGroupIDFromPath(t *testing.T) {
re := require.New(t)

rightCases := []struct {
path string
id uint32
}{
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/00000", id: 0},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/00001", id: 1},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/12345", id: 12345},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/99999", id: 99999},
{path: "tso/keyspace_groups/membership/00000", id: 0},
{path: "tso/keyspace_groups/membership/00001", id: 1},
{path: "tso/keyspace_groups/membership/12345", id: 12345},
{path: "tso/keyspace_groups/membership/99999", id: 99999},
}

for _, tt := range rightCases {
id, err := ExtractKeyspaceGroupIDFromPath(tt.path)
re.Equal(tt.id, id)
re.NoError(err)
}

wrongCases := []struct {
path string
}{
{path: ""},
{path: "00001"},
{path: "xxx/keyspace_groups/membership/00001"},
{path: "tso/xxxxxxxxxxxxxxx/membership/00001"},
{path: "tso/keyspace_groups/xxxxxxxxxx/00001"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/xxxxxxxxxx/00001"},
{path: "/pd/{cluster_id}/xxx/keyspace_groups/membership/00001"},
{path: "/pd/{cluster_id}/tso/xxxxxxxxxxxxxxx/membership/00001"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/0"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/0001"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/123456"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/1234a"},
{path: "/pd/{cluster_id}/tso/keyspace_groups/membership/12345a"},
}

for _, tt := range wrongCases {
_, err := ExtractKeyspaceGroupIDFromPath(tt.path)
re.Error(err)
}
}
10 changes: 9 additions & 1 deletion pkg/storage/endpoint/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,19 @@ import (
"go.etcd.io/etcd/clientv3"
)

// KeyspaceGroupMember defines an election member which campaigns for the primary of the keyspace group.
type KeyspaceGroupMember struct {
Address string `json:"address"`
}

// KeyspaceGroup is the keyspace group.
type KeyspaceGroup struct {
ID uint32 `json:"id"`
UserKind string `json:"user-kind"`
// TODO: add `Members` field
// Members are the election members which campaign for the primary of the keyspace group.
Members []KeyspaceGroupMember `json:"members"`
// Keyspaces are the keyspace IDs which belong to the keyspace group.
Keyspaces []uint32 `json:"keyspaces"`
}

// KeyspaceGroupStorage is the interface for keyspace group storage.
Expand Down
5 changes: 3 additions & 2 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,8 @@ func (am *AllocatorManager) close() {
log.Info("closed the allocator manager")
}

func (am *AllocatorManager) getMember() *ElectionMember {
return &am.member
func (am *AllocatorManager) getMember() ElectionMember {
return am.member
}

// SetLocalTSOConfig receives the zone label of this PD server and write it into etcd as dc-location
Expand Down Expand Up @@ -1072,6 +1072,7 @@ func (am *AllocatorManager) HandleRequest(dcLocation string, count uint32) (pdpb
err := errs.ErrGetAllocator.FastGenByArgs(fmt.Sprintf("%s allocator not found, generate timestamp failed", dcLocation))
return pdpb.Timestamp{}, err
}

return allocatorGroup.allocator.GenerateTSO(count)
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error)

// Have dc-locations configured in the cluster, use the Global TSO generation way.
// (whit synchronization with other Local TSO Allocators)
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(gta.ctx)
defer cancel()
for i := 0; i < maxRetryCount; i++ {
var (
Expand Down Expand Up @@ -237,7 +237,7 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error)
skipCheck = true
goto SETTING_PHASE
}
// Is skipCheck is false and globalTSOResp remains the same, it means the estimatedTSO is valide.
// Is skipCheck is false and globalTSOResp remains the same, it means the estimatedTSO is valid.
if !skipCheck && tsoutil.CompareTimestamp(&globalTSOResp, estimatedMaxTSO) == 0 {
tsoCounter.WithLabelValues("global_tso_estimate", gta.timestampOracle.dcLocation).Inc()
}
Expand Down Expand Up @@ -309,7 +309,7 @@ type syncResp struct {

// SyncMaxTS is used to sync MaxTS with all Local TSO Allocator leaders in dcLocationMap.
// If maxTSO is the biggest TSO among all Local TSO Allocators, it will be written into
// each allocator and remines the same after the synchronization.
// each allocator and remains the same after the synchronization.
// If not, it will be replaced with the new max Local TSO and return.
func (gta *GlobalTSOAllocator) SyncMaxTS(
ctx context.Context,
Expand Down
Loading