Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: make TestSplitKeyspaceGroup stable #6584

Merged
merged 8 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/keyspace/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (suite *keyspaceTestSuite) SetupTest() {
allocator := mockid.NewIDAllocator()
kgm := NewKeyspaceGroupManager(suite.ctx, store, nil, 0)
suite.manager = NewKeyspaceManager(suite.ctx, store, nil, allocator, &mockConfig{}, kgm)
suite.NoError(kgm.Bootstrap())
suite.NoError(kgm.Bootstrap(suite.ctx))
suite.NoError(suite.manager.Bootstrap())
}

Expand Down
56 changes: 38 additions & 18 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -51,9 +52,11 @@ const (

// GroupManager is the manager of keyspace group related data.
type GroupManager struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
client *clientv3.Client
clusterID uint64

sync.RWMutex
// groups is the cache of keyspace group related information.
Expand Down Expand Up @@ -90,24 +93,24 @@ func NewKeyspaceGroupManager(
cancel: cancel,
store: store,
groups: groups,
client: client,
clusterID: clusterID,
nodesBalancer: balancer.GenByPolicy[string](defaultBalancerPolicy),
serviceRegistryMap: make(map[string]string),
}

// If the etcd client is not nil, start the watch loop for the registered tso servers.
// The PD(TSO) Client relies on this info to discover tso servers.
if client != nil {
m.initTSONodesWatcher(client, clusterID)
m.wg.Add(2)
if m.client != nil {
m.initTSONodesWatcher(m.client, m.clusterID)
m.wg.Add(1)
go m.tsoNodesWatcher.StartWatchLoop()
go m.allocNodesToAllKeyspaceGroups()
}

return m
}

// Bootstrap saves default keyspace group info and init group mapping in the memory.
func (m *GroupManager) Bootstrap() error {
func (m *GroupManager) Bootstrap(ctx context.Context) error {
// Force the membership restriction that the default keyspace must belong to default keyspace group.
// Have no information to specify the distribution of the default keyspace group replicas, so just
// leave the replica/member list empty. The TSO service will assign the default keyspace group replica
Expand Down Expand Up @@ -137,6 +140,11 @@ func (m *GroupManager) Bootstrap() error {
m.groups[userKind].Put(group)
}

// It will only alloc node when the group manager is on API leader.
if m.client != nil {
m.wg.Add(1)
go m.allocNodesToAllKeyspaceGroups(ctx)
}
return nil
}

Expand All @@ -146,7 +154,7 @@ func (m *GroupManager) Close() {
m.wg.Wait()
}

func (m *GroupManager) allocNodesToAllKeyspaceGroups() {
func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) {
defer logutil.LogPanic()
defer m.wg.Done()
ticker := time.NewTicker(allocNodesToKeyspaceGroupsInterval)
Expand All @@ -158,6 +166,9 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups() {
log.Info("start to alloc nodes to all keyspace groups")
for {
select {
case <-ctx.Done():
log.Info("stop to alloc nodes to all keyspace groups")
return
case <-m.ctx.Done():
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
log.Info("stop to alloc nodes to all keyspace groups")
return
Expand Down Expand Up @@ -338,11 +349,6 @@ func (m *GroupManager) saveKeyspaceGroups(keyspaceGroups []*endpoint.KeyspaceGro
Members: keyspaceGroup.Members,
Keyspaces: keyspaceGroup.Keyspaces,
}
if oldKG.IsSplitting() {
newKG.SplitState = &endpoint.SplitState{
SplitSource: oldKG.SplitState.SplitSource,
}
}
err = m.store.SaveKeyspaceGroup(txn, newKG)
if err != nil {
return err
Expand Down Expand Up @@ -380,6 +386,8 @@ func (m *GroupManager) getKeyspaceConfigByKindLocked(userKind endpoint.UserKind)
return config, nil
}

var failpointOnce sync.Once

// UpdateKeyspaceForGroup updates the keyspace field for the keyspace group.
func (m *GroupManager) UpdateKeyspaceForGroup(userKind endpoint.UserKind, groupID string, keyspaceID uint32, mutation int) error {
// when server is not in API mode, we don't need to update the keyspace for keyspace group
Expand All @@ -391,6 +399,12 @@ func (m *GroupManager) UpdateKeyspaceForGroup(userKind endpoint.UserKind, groupI
return err
}

failpoint.Inject("externalAllocNode", func(val failpoint.Value) {
failpointOnce.Do(func() {
addrs := val.(string)
m.SetNodesForKeyspaceGroup(utils.DefaultKeyspaceGroupID, strings.Split(addrs, ","))
})
})
m.Lock()
defer m.Unlock()
return m.updateKeyspaceForGroupLocked(userKind, id, keyspaceID, mutation)
Expand Down Expand Up @@ -425,7 +439,6 @@ func (m *GroupManager) updateKeyspaceForGroupLocked(userKind endpoint.UserKind,
if err := m.saveKeyspaceGroups([]*endpoint.KeyspaceGroup{kg}, true); err != nil {
return err
}

m.groups[userKind].Put(kg)
}
return nil
Expand Down Expand Up @@ -696,8 +709,10 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, desiredReplicaCount
func (m *GroupManager) SetNodesForKeyspaceGroup(id uint32, nodes []string) error {
m.Lock()
defer m.Unlock()
return m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
kg, err := m.store.LoadKeyspaceGroup(txn, id)
var kg *endpoint.KeyspaceGroup
err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
var err error
kg, err = m.store.LoadKeyspaceGroup(txn, id)
if err != nil {
return err
}
Expand All @@ -714,6 +729,11 @@ func (m *GroupManager) SetNodesForKeyspaceGroup(id uint32, nodes []string) error
kg.Members = members
return m.store.SaveKeyspaceGroup(txn, kg)
})
if err != nil {
return err
}
m.groups[endpoint.StringUserKind(kg.UserKind)].Put(kg)
return nil
}

// IsExistNode checks if the node exists.
Expand Down
2 changes: 1 addition & 1 deletion pkg/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (suite *keyspaceGroupTestSuite) SetupTest() {
idAllocator := mockid.NewIDAllocator()
cluster := mockcluster.NewCluster(suite.ctx, mockconfig.NewTestOptions())
suite.kg = NewKeyspaceManager(suite.ctx, store, cluster, idAllocator, &mockConfig{}, suite.kgm)
suite.NoError(suite.kgm.Bootstrap())
suite.NoError(suite.kgm.Bootstrap(suite.ctx))
}

func (suite *keyspaceGroupTestSuite) TearDownTest() {
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func (c *RaftCluster) Start(s Server) error {
}

if s.IsAPIServiceMode() {
err = c.keyspaceGroupManager.Bootstrap()
err = c.keyspaceGroupManager.Bootstrap(c.ctx)
if err != nil {
return err
}
Expand Down
49 changes: 47 additions & 2 deletions tests/pdctl/keyspace/keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestSplitKeyspaceGroup(t *testing.T) {
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`))
re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayStartServerLoop", `return(true)`))
keyspaces := make([]string, 0)
for i := 0; i < 500; i++ {
for i := 0; i < 129; i++ { // 128 is the default max txn ops limit in etcd.
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i))
}
tc, err := tests.NewTestAPICluster(ctx, 3, func(conf *config.Config, serverName string) {
Expand Down Expand Up @@ -126,8 +126,53 @@ func TestSplitKeyspaceGroup(t *testing.T) {
output, err := pdctl.ExecuteCommand(cmd, args...)
re.NoError(err)
return strings.Contains(string(output), "Success")
}, testutil.WithWaitFor(20*time.Second))
})

re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes"))
re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServerLoop"))
}

func TestExternalAllocNodeWhenStart(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// external alloc node for keyspace group, when keyspace manager update keyspace info to keyspace group
// we hope the keyspace group can be updated correctly.
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/externalAllocNode", `return("127.0.0.1:2379,127.0.0.1:2380")`))
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`))
re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayStartServerLoop", `return(true)`))
keyspaces := make([]string, 0)
for i := 0; i < 10; i++ {
keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i))
}
tc, err := tests.NewTestAPICluster(ctx, 1, func(conf *config.Config, serverName string) {
conf.Keyspace.PreAlloc = keyspaces
})
re.NoError(err)
err = tc.RunInitialServers()
re.NoError(err)
pdAddr := tc.GetConfig().GetClientURL()

cmd := pdctlCmd.GetRootCmd()

time.Sleep(2 * time.Second)
tc.WaitLeader()
leaderServer := tc.GetServer(tc.GetLeader())
re.NoError(leaderServer.BootstrapCluster())

// check keyspace group information.
defaultKeyspaceGroupID := fmt.Sprintf("%d", utils.DefaultKeyspaceGroupID)
args := []string{"-u", pdAddr, "keyspace-group"}
testutil.Eventually(re, func() bool {
output, err := pdctl.ExecuteCommand(cmd, append(args, defaultKeyspaceGroupID)...)
re.NoError(err)
var keyspaceGroup endpoint.KeyspaceGroup
err = json.Unmarshal(output, &keyspaceGroup)
re.NoError(err)
return len(keyspaceGroup.Keyspaces) == len(keyspaces)+1 && len(keyspaceGroup.Members) == 2
})

re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/externalAllocNode"))
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes"))
re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServerLoop"))
}
8 changes: 2 additions & 6 deletions tests/pdctl/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,12 +463,8 @@ func TestScheduler(t *testing.T) {
result := make(map[string]interface{})
testutil.Eventually(re, func() bool {
mightExec([]string{"-u", pdAddr, "scheduler", "describe", "balance-leader-scheduler"}, &result)
return len(result) != 0
}, testutil.WithTickInterval(50*time.Millisecond))

testutil.Eventually(re, func() bool {
return result["status"] == "paused" && result["summary"] == ""
}, testutil.WithTickInterval(50*time.Millisecond))
return len(result) != 0 && result["status"] == "paused" && result["summary"] == ""
}, testutil.WithWaitFor(30*time.Second))

mustUsage([]string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler", "60"})
mustExec([]string{"-u", pdAddr, "scheduler", "resume", "balance-leader-scheduler"}, nil)
Expand Down