Skip to content

Commit

Permalink
loads initial keyspace groups from etcd
Browse files Browse the repository at this point in the history
Signed-off-by: Bin Shi <binshi.bing@gmail.com>
  • Loading branch information
binshi-bing committed Mar 30, 2023
1 parent 4fe7223 commit b2816a1
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 34 deletions.
6 changes: 3 additions & 3 deletions client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ const (
MismatchLeaderErr = "mismatch leader id"
// NotServedErr indicates an tso node/pod received the requests for the keyspace groups which are not served by it.
// Note: keep the same as the ones defined on the server side, because the client side checks if an error message
// contains this string to judge whether the leader is changed.
NotServedErr = "is not served"
RetryTimeoutErr = "retry timeout"
// contains this string to judge whether the leader is changed.
NotServedErr = "is not served"
RetryTimeoutErr = "retry timeout"
)

// client errors
Expand Down
10 changes: 10 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -731,11 +731,21 @@ error = '''
get allocator failed, %s
'''

["PD:tso:ErrGetAllocatorManager"]
error = '''
get allocator manager failed, %s
'''

["PD:tso:ErrGetLocalAllocator"]
error = '''
get local allocator failed, %s
'''

["PD:tso:ErrKeyspaceGroupIDInvalid"]
error = '''
the keyspace group id is invalid, %s
'''

["PD:tso:ErrLogicOverflow"]
error = '''
logic part overflow
Expand Down
6 changes: 3 additions & 3 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ import "github.com/pingcap/errors"
const (
// NotLeaderErr indicates the non-leader member received the requests which should be received by leader.
// Note: keep the same as the ones defined on the client side, because the client side checks if an error message
// contains this string to judge whether the leader is changed.
// contains this string to judge whether the leader is changed.
NotLeaderErr = "is not leader"
// MismatchLeaderErr indicates the non-leader member received the requests which should be received by leader.
// Note: keep the same as the ones defined on the client side, because the client side checks if an error message
// contains this string to judge whether the leader is changed.
// contains this string to judge whether the leader is changed.
MismatchLeaderErr = "mismatch leader id"
// NotServedErr indicates an tso node/pod received the requests for the keyspace groups which are not served by it.
// Note: keep the same as the ones defined on the client side, because the client side checks if an error message
// contains this string to judge whether the leader is changed.
// contains this string to judge whether the leader is changed.
NotServedErr = "is not served"
)

Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/tso/server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ package server
import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/tso"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/tso"
"go.uber.org/zap"
)

Expand Down
4 changes: 3 additions & 1 deletion pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,9 @@ func (s *Server) startServer() (err error) {
tsoSvcRootPath := fmt.Sprintf(tsoSvcRootPathFormat, s.clusterID)
s.keyspaceGroupManager = tso.NewKeyspaceGroupManager(
s.serverLoopCtx, s.etcdClient, s.listenURL.Host, legacySvcRootPath, tsoSvcRootPath, s.cfg)
s.keyspaceGroupManager.Initialize()
if err := s.keyspaceGroupManager.Initialize(); err != nil {
return err
}

s.tsoProtoFactory = &tsoutil.TSOProtoFactory{}
s.service = &Service{Server: s}
Expand Down
10 changes: 9 additions & 1 deletion pkg/storage/endpoint/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,19 @@ import (
"go.etcd.io/etcd/clientv3"
)

// KeyspaceGroupMember defines an election member which campaigns for the primary of the keyspace group.
type KeyspaceGroupMember struct {
Location string `json:"location"`
}

// KeyspaceGroup is the keyspace group.
type KeyspaceGroup struct {
ID uint32 `json:"id"`
UserKind string `json:"user-kind"`
// TODO: add `Members` field
// Members are the election members which campaign for the primary of the keyspace group.
Members []KeyspaceGroupMember `json:"members"`
// Keyspaces are the keyspace IDs which belong to the keyspace group.
Keyspaces []uint32 `json:"keyspaces"`
}

// KeyspaceGroupStorage is the interface for keyspace group storage.
Expand Down
83 changes: 58 additions & 25 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
Expand All @@ -38,29 +39,15 @@ import (
// primaryElectionSuffix is the suffix of the key for keyspace group primary election
const primaryElectionSuffix = "primary"

type KeyspaceGroupMember struct {
Location string `json:"location"`
}

// KeyspaceGroup is the keyspace group.
// TODO: merge with endpoint.KeyspaceGroup after #6229 is merged.
type KeyspaceGroup struct {
ID uint32 `json:"id"`
UserKind string `json:"user-kind"`
Members []KeyspaceGroupMember `json:"members"`
}

// KeyspaceGroupManager manages the members of the keyspace groups assigned to this host.
// The replicas campaign for the leaders which provide the tso service for the corresponding
// keyspace groups.
type KeyspaceGroupManager struct {
// mu protects isServing and the ams being concurrently updated when the service shutdown
// and online keyspace group membership/distribution change happen simultaneously.
// Accessing ams[i] doesn't need to acquire this lock.
// mu protects ams being concurrently updated when the service shutdown and online keyspace
// group membership/distribution change happen simultaneously. The former will close all
// keyspace groups, while the latter could create new keyspace groups. Accessing ams[i] doesn't
// need to acquire this lock.
mu sync.Mutex
// isClosed indicates whether the keyspace group manager is closed. If it's true,
// online keyspace group membership/distribution change won't be allowed.
isClosed bool
// ams stores the allocator managers of the keyspace groups. Each keyspace group is assigned
// with an allocator manager managing its global/local tso allocators.
// Use a fixed size array to maximize the efficiency of concurrent access to
Expand Down Expand Up @@ -101,6 +88,9 @@ type KeyspaceGroupManager struct {
// Value: ts(time.Time)
// Note: The {group} is 5 digits integer with leading zeros.
tsoSvcRootPath string
// storage is the storage endpoint for tso and keyspace group membership/distribution meta.
// The root path starts with legacySvcRootPath.
storage *endpoint.StorageEndpoint
// cfg is the TSO config
cfg ServiceConfig
maxResetTSGap func() time.Duration
Expand Down Expand Up @@ -137,17 +127,60 @@ func NewKeyspaceGroupManager(
}

// Initialize this KeyspaceGroupManager
func (kgm *KeyspaceGroupManager) Initialize() {
kgm.initKeyspaceGroup(mcsutils.DefaultKeySpaceGroupID)
func (kgm *KeyspaceGroupManager) Initialize() error {
kgm.storage = endpoint.NewStorageEndpoint(
kv.NewEtcdKVBase(kgm.etcdClient, kgm.legacySvcRootPath), nil)
if kgm.storage == nil {
return errors.New("failed to create storage endpoint for keyspace group metadata")
}

if err := kgm.loadKeyspaceGroups(); err != nil {
return err
}

kgm.wg.Add(1)
go kgm.updateKeyspaceGroupsLoop()

return nil
}

// Close this KeyspaceGroupManager
func (kgm *KeyspaceGroupManager) Close() {
kgm.cancel()
kgm.ams[mcsutils.DefaultKeySpaceGroupID].Load().close()
kgm.wg.Wait()

kgm.mu.Lock()
defer kgm.mu.Unlock()

for i := range kgm.ams {
if mgr := kgm.ams[i].Load(); mgr != nil {
mgr.close()
}
}
}

// loadKeyspaceGroups loads initial keyspace groups from etcd
func (kgm *KeyspaceGroupManager) loadKeyspaceGroups() error {
// Load the keyspace group membership from etcd
groups, err := kgm.storage.LoadKeyspaceGroups(0, 0)
if err != nil {
return err
}

// Initialize the keyspace groups
for _, group := range groups {
select {
case <-kgm.ctx.Done():
return nil
default:
}

kgm.mu.Lock()
defer kgm.mu.Unlock()
kgm.initKeyspaceGroup(group.ID)
}

return nil
}

// initKeyspaceGroup initializes the given keyspace group
Expand All @@ -164,23 +197,23 @@ func (kgm *KeyspaceGroupManager) initKeyspaceGroup(groupID uint32) {
path.Join(kgm.tsoSvcRootPath, fmt.Sprintf("%05d", groupID)),
primaryElectionSuffix, "keyspace group primary election", kgm.cfg.GetAdvertiseListenAddr())

defaultKsgGroupStorage := endpoint.NewStorageEndpoint(
kv.NewEtcdKVBase(kgm.etcdClient, kgm.legacySvcRootPath), nil)
kgm.ams[groupID].Store(
NewAllocatorManager(
kgm.ctx, true, groupID, participant,
kgm.legacySvcRootPath, defaultKsgGroupStorage,
kgm.legacySvcRootPath, kgm.storage,
kgm.cfg.IsLocalTSOEnabled(), kgm.cfg.GetTSOSaveInterval(),
kgm.cfg.GetTSOUpdatePhysicalInterval(), kgm.cfg.GetLeaderLease(),
kgm.cfg.GetTLSConfig(), kgm.maxResetTSGap))
}

// updateKeyspaceGroupsLoop periodically check if there is any change in keyspace group membership and distribution.
// updateKeyspaceGroupsLoop periodically check if there is any change in keyspace group membership/distribution.
func (kgm *KeyspaceGroupManager) updateKeyspaceGroupsLoop() {
defer logutil.LogPanic()
defer kgm.wg.Done()
_, cancel := context.WithCancel(kgm.ctx)
defer cancel()

// TODO: dynamically update the keyspace group membership/distribution
}

// GetAllocatorManager returns the AllocatorManager of the given keyspace group
Expand Down

0 comments on commit b2816a1

Please sign in to comment.