Skip to content

Commit

Permalink
keyspace: patrol keyspace assignment before the first split (tikv#6388)
Browse files Browse the repository at this point in the history
ref tikv#6232

Patrol the keyspace assignment before the first split to make sure every keyspace has its group assignment.

Signed-off-by: JmPotato <ghzpotato@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and rleungx committed Aug 2, 2023
1 parent 4a374c6 commit a1d541e
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 11 deletions.
84 changes: 73 additions & 11 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,20 @@ type GroupManager struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
// the lock for the groups

sync.RWMutex
// groups is the cache of keyspace group related information.
// user kind -> keyspace group
groups map[endpoint.UserKind]*indexedHeap
// patrolKeyspaceAssignmentOnce is used to indicate whether we have patrolled all keyspaces
// and assign them to the keyspace groups.
patrolKeyspaceAssignmentOnce bool

// store is the storage for keyspace group related information.
store endpoint.KeyspaceGroupStorage
store interface {
endpoint.KeyspaceGroupStorage
endpoint.KeyspaceStorage
}

client *clientv3.Client

Expand All @@ -80,7 +86,15 @@ type GroupManager struct {
}

// NewKeyspaceGroupManager creates a Manager of keyspace group related data.
func NewKeyspaceGroupManager(ctx context.Context, store endpoint.KeyspaceGroupStorage, client *clientv3.Client, clusterID uint64) *GroupManager {
func NewKeyspaceGroupManager(
ctx context.Context,
store interface {
endpoint.KeyspaceGroupStorage
endpoint.KeyspaceStorage
},
client *clientv3.Client,
clusterID uint64,
) *GroupManager {
ctx, cancel := context.WithCancel(ctx)
key := discovery.TSOPath(clusterID)
groups := make(map[endpoint.UserKind]*indexedHeap)
Expand Down Expand Up @@ -156,6 +170,38 @@ func (m *GroupManager) Close() {
m.wg.Wait()
}

// patrolKeyspaceAssignment is used to patrol all keyspaces and assign them to the keyspace groups.
func (m *GroupManager) patrolKeyspaceAssignment() error {
m.Lock()
defer m.Unlock()
if m.patrolKeyspaceAssignmentOnce {
return nil
}
keyspaces, err := m.store.LoadRangeKeyspace(utils.DefaultKeyspaceID, 0)
if err != nil {
return err
}
config, err := m.getKeyspaceConfigByKindLocked(endpoint.Basic)
if err != nil {
return err
}
for _, ks := range keyspaces {
if ks == nil {
continue
}
groupID, err := strconv.ParseUint(config[TSOKeyspaceGroupIDKey], 10, 64)
if err != nil {
return err
}
err = m.updateKeyspaceForGroupLocked(endpoint.Basic, groupID, ks.GetId(), opAdd)
if err != nil {
return err
}
}
m.patrolKeyspaceAssignmentOnce = true
return nil
}

func (m *GroupManager) allocNodesToAllKeyspaceGroups() {
defer logutil.LogPanic()
defer m.wg.Done()
Expand Down Expand Up @@ -426,11 +472,18 @@ func (m *GroupManager) GetKeyspaceConfigByKind(userKind endpoint.UserKind) (map[
}
m.RLock()
defer m.RUnlock()
return m.getKeyspaceConfigByKindLocked(userKind)
}

func (m *GroupManager) getKeyspaceConfigByKindLocked(userKind endpoint.UserKind) (map[string]string, error) {
groups, ok := m.groups[userKind]
if !ok {
return map[string]string{}, errors.Errorf("user kind %s not found", userKind)
}
kg := groups.Top()
if kg == nil {
return map[string]string{}, errors.Errorf("no keyspace group for user kind %s", userKind)
}
id := strconv.FormatUint(uint64(kg.ID), 10)
config := map[string]string{
UserKindKey: userKind.String(),
Expand All @@ -452,9 +505,13 @@ func (m *GroupManager) UpdateKeyspaceForGroup(userKind endpoint.UserKind, groupI

m.Lock()
defer m.Unlock()
kg := m.groups[userKind].Get(uint32(id))
return m.updateKeyspaceForGroupLocked(userKind, id, keyspaceID, mutation)
}

func (m *GroupManager) updateKeyspaceForGroupLocked(userKind endpoint.UserKind, groupID uint64, keyspaceID uint32, mutation int) error {
kg := m.groups[userKind].Get(uint32(groupID))
if kg == nil {
return errors.Errorf("keyspace group %d not found", id)
return errors.Errorf("keyspace group %d not found", groupID)
}
if kg.IsSplitting() {
return ErrKeyspaceGroupInSplit
Expand Down Expand Up @@ -535,11 +592,14 @@ func (m *GroupManager) UpdateKeyspaceGroup(oldGroupID, newGroupID string, oldUse
// SplitKeyspaceGroupByID splits the keyspace group by ID into a new keyspace group with the given new ID.
// And the keyspaces in the old keyspace group will be moved to the new keyspace group.
func (m *GroupManager) SplitKeyspaceGroupByID(splitSourceID, splitTargetID uint32, keyspaces []uint32) error {
err := m.patrolKeyspaceAssignment()
if err != nil {
return err
}
var splitSourceKg, splitTargetKg *endpoint.KeyspaceGroup
m.Lock()
defer m.Unlock()
// TODO: avoid to split when the keyspaces is empty.
if err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) (err error) {
if err = m.store.RunInTxn(m.ctx, func(txn kv.Txn) (err error) {
// Load the old keyspace group first.
splitSourceKg, err = m.store.LoadKeyspaceGroup(txn, splitSourceID)
if err != nil {
Expand All @@ -564,13 +624,15 @@ func (m *GroupManager) SplitKeyspaceGroupByID(splitSourceID, splitTargetID uint3
if splitTargetKg != nil {
return ErrKeyspaceGroupExists
}
keyspaceNum := len(keyspaces)
sourceKeyspaceNum := len(splitSourceKg.Keyspaces)
// Check if the keyspaces are all in the old keyspace group.
if len(keyspaces) > len(splitSourceKg.Keyspaces) {
if keyspaceNum == 0 || keyspaceNum > sourceKeyspaceNum {
return ErrKeyspaceNotInKeyspaceGroup
}
var (
oldKeyspaceMap = make(map[uint32]struct{}, len(splitSourceKg.Keyspaces))
newKeyspaceMap = make(map[uint32]struct{}, len(keyspaces))
oldKeyspaceMap = make(map[uint32]struct{}, sourceKeyspaceNum)
newKeyspaceMap = make(map[uint32]struct{}, keyspaceNum)
)
for _, keyspace := range splitSourceKg.Keyspaces {
oldKeyspaceMap[keyspace] = struct{}{}
Expand All @@ -582,7 +644,7 @@ func (m *GroupManager) SplitKeyspaceGroupByID(splitSourceID, splitTargetID uint3
newKeyspaceMap[keyspace] = struct{}{}
}
// Get the split keyspace group for the old keyspace group.
splitKeyspaces := make([]uint32, 0, len(splitSourceKg.Keyspaces)-len(keyspaces))
splitKeyspaces := make([]uint32, 0, sourceKeyspaceNum-keyspaceNum)
for _, keyspace := range splitSourceKg.Keyspaces {
if _, ok := newKeyspaceMap[keyspace]; !ok {
splitKeyspaces = append(splitKeyspaces, keyspace)
Expand Down
32 changes: 32 additions & 0 deletions pkg/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"
"time"

"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/stretchr/testify/suite"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/mock/mockcluster"
Expand Down Expand Up @@ -251,6 +252,9 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplit() {
// split the keyspace group 1 to 4
err = suite.kgm.SplitKeyspaceGroupByID(1, 4, []uint32{333})
re.ErrorIs(err, ErrKeyspaceGroupNotEnoughReplicas)
// split the keyspace group 2 to 4 without giving any keyspace
err = suite.kgm.SplitKeyspaceGroupByID(2, 4, []uint32{})
re.ErrorIs(err, ErrKeyspaceNotInKeyspaceGroup)
// split the keyspace group 2 to 4
err = suite.kgm.SplitKeyspaceGroupByID(2, 4, []uint32{333})
re.NoError(err)
Expand Down Expand Up @@ -317,3 +321,31 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplit() {
err = suite.kgm.SplitKeyspaceGroupByID(2, 5, []uint32{111, 222, 444})
re.ErrorIs(err, ErrKeyspaceNotInKeyspaceGroup)
}

func (suite *keyspaceGroupTestSuite) TestPatrolKeyspaceAssignment() {
re := suite.Require()
// Force the patrol to run once.
suite.kgm.patrolKeyspaceAssignmentOnce = false
// Create a keyspace group without any keyspace.
err := suite.kgm.CreateKeyspaceGroups([]*endpoint.KeyspaceGroup{
{
ID: uint32(1),
UserKind: endpoint.Basic.String(),
Members: make([]endpoint.KeyspaceGroupMember, 2),
},
})
re.NoError(err)
// Create a keyspace without any keyspace group.
now := time.Now().Unix()
err = suite.kg.saveNewKeyspace(&keyspacepb.KeyspaceMeta{
Id: 111,
Name: "111",
State: keyspacepb.KeyspaceState_ENABLED,
CreatedAt: now,
StateChangedAt: now,
})
re.NoError(err)
// Split to see if the keyspace is attached to the group.
err = suite.kgm.SplitKeyspaceGroupByID(1, 2, []uint32{111})
re.NoError(err)
}
4 changes: 4 additions & 0 deletions tests/integrations/mcs/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TearDownTest() {

func cleanupKeyspaceGroups(re *require.Assertions, server *tests.TestServer) {
for _, group := range handlersutil.MustLoadKeyspaceGroups(re, server, "0", "0") {
// Do not delete default keyspace group.
if group.ID == mcsutils.DefaultKeyspaceGroupID {
continue
}
handlersutil.MustDeleteKeyspaceGroup(re, server, group.ID)
}
}
Expand Down

0 comments on commit a1d541e

Please sign in to comment.