diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index 28a16afeb74..ec17e70cb8b 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -36,9 +36,10 @@ import ( ) const ( - defaultBalancerPolicy = balancer.PolicyRoundRobin - allocNodeTimeout = 1 * time.Second - allocNodeInterval = 10 * time.Millisecond + defaultBalancerPolicy = balancer.PolicyRoundRobin + allocNodesToKeyspaceGroupsInterval = 1 * time.Second + allocNodesTimeout = 1 * time.Second + allocNodesInterval = 10 * time.Millisecond // TODO: move it to etcdutil watchEtcdChangeRetryInterval = 1 * time.Second maxRetryTimes = 25 @@ -71,8 +72,6 @@ type GroupManager struct { // tsoServiceEndKey is the end key of TSO service in etcd. tsoServiceEndKey string - policy balancer.Policy - // TODO: add user kind with different balancer // when we ensure where the correspondence between tso node and user kind will be found nodesBalancer balancer.Balancer[string] @@ -89,14 +88,15 @@ func NewKeyspaceGroupManager(ctx context.Context, store endpoint.KeyspaceGroupSt groups[endpoint.UserKind(i)] = newIndexedHeap(int(utils.MaxKeyspaceGroupCountInUse)) } return &GroupManager{ - ctx: ctx, - cancel: cancel, - store: store, - client: client, - tsoServiceKey: key, - tsoServiceEndKey: clientv3.GetPrefixRangeEnd(key) + "/", - policy: defaultBalancerPolicy, - groups: groups, + ctx: ctx, + cancel: cancel, + store: store, + client: client, + tsoServiceKey: key, + tsoServiceEndKey: clientv3.GetPrefixRangeEnd(key) + "/", + groups: groups, + nodesBalancer: balancer.GenByPolicy[string](defaultBalancerPolicy), + serviceRegistryMap: make(map[string]string), } } @@ -114,6 +114,14 @@ func (m *GroupManager) Bootstrap(ctx context.Context) error { m.Lock() defer m.Unlock() + + // If the etcd client is not nil, start the watch loop. + if m.client != nil { + m.wg.Add(2) + go m.startWatchLoop(ctx) + go m.allocNodesToAllKeyspaceGroups() + } + // Ignore the error if default keyspace group already exists in the storage (e.g. PD restart/recover). err := m.saveKeyspaceGroups([]*endpoint.KeyspaceGroup{defaultKeyspaceGroup}, false) if err != nil && err != ErrKeyspaceGroupExists { @@ -130,13 +138,6 @@ func (m *GroupManager) Bootstrap(ctx context.Context) error { m.groups[userKind].Put(group) } - // If the etcd client is not nil, start the watch loop. - if m.client != nil { - m.nodesBalancer = balancer.GenByPolicy[string](m.policy) - m.serviceRegistryMap = make(map[string]string) - m.wg.Add(1) - go m.startWatchLoop(ctx) - } return nil } @@ -146,6 +147,45 @@ func (m *GroupManager) Close() { m.wg.Wait() } +func (m *GroupManager) allocNodesToAllKeyspaceGroups() { + defer logutil.LogPanic() + defer m.wg.Done() + ticker := time.NewTicker(allocNodesToKeyspaceGroupsInterval) + defer ticker.Stop() + for { + select { + case <-m.ctx.Done(): + return + case <-ticker.C: + } + countOfNodes := m.GetNodesCount() + if countOfNodes < utils.KeyspaceGroupDefaultReplicaCount { + continue + } + groups, err := m.store.LoadKeyspaceGroups(utils.DefaultKeyspaceGroupID, 0) + if err != nil { + log.Error("failed to load the all keyspace group", zap.Error(err)) + continue + } + withError := false + for _, group := range groups { + if len(group.Members) < utils.KeyspaceGroupDefaultReplicaCount { + nodes, err := m.AllocNodesForKeyspaceGroup(group.ID, utils.KeyspaceGroupDefaultReplicaCount) + if err != nil { + withError = true + log.Error("failed to alloc nodes for keyspace group", zap.Error(err)) + continue + } + group.Members = nodes + } + } + if !withError { + // all keyspace groups have equal or more than default replica count + return + } + } +} + func (m *GroupManager) startWatchLoop(parentCtx context.Context) { defer logutil.LogPanic() defer m.wg.Done() @@ -156,12 +196,9 @@ func (m *GroupManager) startWatchLoop(parentCtx context.Context) { revision int64 err error ) + ticker := time.NewTicker(retryInterval) + defer ticker.Stop() for i := 0; i < maxRetryTimes; i++ { - select { - case <-ctx.Done(): - return - case <-time.After(retryInterval): - } resp, err = etcdutil.EtcdKVGet(m.client, m.tsoServiceKey, clientv3.WithRange(m.tsoServiceEndKey)) if err == nil { revision = resp.Header.Revision + 1 @@ -177,6 +214,11 @@ func (m *GroupManager) startWatchLoop(parentCtx context.Context) { break } log.Warn("failed to get tso service addrs from etcd and will retry", zap.Error(err)) + select { + case <-m.ctx.Done(): + return + case <-ticker.C: + } } if err != nil || revision == 0 { log.Warn("failed to get tso service addrs from etcd finally when loading", zap.Error(err)) @@ -603,18 +645,23 @@ func (m *GroupManager) FinishSplitKeyspaceByID(splitTargetID uint32) error { return nil } -// GetNodesNum returns the number of nodes. -func (m *GroupManager) GetNodesNum() int { +// GetNodesCount returns the count of nodes. +func (m *GroupManager) GetNodesCount() int { + if m.nodesBalancer == nil { + return 0 + } return m.nodesBalancer.Len() } // AllocNodesForKeyspaceGroup allocates nodes for the keyspace group. -func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, replica int) ([]endpoint.KeyspaceGroupMember, error) { - ctx, cancel := context.WithTimeout(m.ctx, allocNodeTimeout) +func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, desiredReplicaCount int) ([]endpoint.KeyspaceGroupMember, error) { + m.Lock() + defer m.Unlock() + ctx, cancel := context.WithTimeout(m.ctx, allocNodesTimeout) defer cancel() - ticker := time.NewTicker(allocNodeInterval) + ticker := time.NewTicker(allocNodesInterval) defer ticker.Stop() - nodes := make([]endpoint.KeyspaceGroupMember, 0, replica) + nodes := make([]endpoint.KeyspaceGroupMember, 0, desiredReplicaCount) err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) error { kg, err := m.store.LoadKeyspaceGroup(txn, id) if err != nil { @@ -628,14 +675,17 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, replica int) ([]end exists[member.Address] = struct{}{} nodes = append(nodes, member) } - for len(exists) < replica { + if len(exists) >= desiredReplicaCount { + return nil + } + for len(exists) < desiredReplicaCount { select { case <-ctx.Done(): return nil case <-ticker.C: } - num := m.GetNodesNum() - if num < replica || num == 0 { // double check + countOfNodes := m.GetNodesCount() + if countOfNodes < desiredReplicaCount || countOfNodes == 0 { // double check return ErrNoAvailableNode } addr := m.nodesBalancer.Next() @@ -654,5 +704,38 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, replica int) ([]end if err != nil { return nil, err } + log.Info("alloc nodes for keyspace group", zap.Uint32("id", id), zap.Reflect("nodes", nodes)) return nodes, nil } + +// SetNodesForKeyspaceGroup sets the nodes for the keyspace group. +func (m *GroupManager) SetNodesForKeyspaceGroup(id uint32, nodes []string) error { + m.Lock() + defer m.Unlock() + return m.store.RunInTxn(m.ctx, func(txn kv.Txn) error { + kg, err := m.store.LoadKeyspaceGroup(txn, id) + if err != nil { + return err + } + if kg == nil { + return ErrKeyspaceGroupNotExists + } + members := make([]endpoint.KeyspaceGroupMember, 0, len(nodes)) + for _, node := range nodes { + members = append(members, endpoint.KeyspaceGroupMember{Address: node}) + } + kg.Members = members + return m.store.SaveKeyspaceGroup(txn, kg) + }) +} + +// IsExistNode checks if the node exists. +func (m *GroupManager) IsExistNode(addr string) bool { + nodes := m.nodesBalancer.GetAll() + for _, node := range nodes { + if node == addr { + return true + } + } + return false +} diff --git a/pkg/mcs/utils/constant.go b/pkg/mcs/utils/constant.go index aa81425bc9f..e29aa6a5008 100644 --- a/pkg/mcs/utils/constant.go +++ b/pkg/mcs/utils/constant.go @@ -63,4 +63,7 @@ const ( // MaxKeyspaceGroupCountInUse is a much more reasonable value of the max count in the // foreseen future, and the former is just for extensibility in theory. MaxKeyspaceGroupCountInUse = uint32(4096) + + // KeyspaceGroupDefaultReplicaCount is the default replica count of keyspace group. + KeyspaceGroupDefaultReplicaCount = 2 ) diff --git a/server/apiv2/handlers/tso_keyspace_group.go b/server/apiv2/handlers/tso_keyspace_group.go index 8db553e765a..a9f7d9d1395 100644 --- a/server/apiv2/handlers/tso_keyspace_group.go +++ b/server/apiv2/handlers/tso_keyspace_group.go @@ -35,7 +35,8 @@ func RegisterTSOKeyspaceGroup(r *gin.RouterGroup) { router.GET("", GetKeyspaceGroups) router.GET("/:id", GetKeyspaceGroupByID) router.DELETE("/:id", DeleteKeyspaceGroupByID) - router.POST("/:id/alloc", AllocNodeForKeyspaceGroup) + router.POST("/:id/alloc", AllocNodesForKeyspaceGroup) + router.POST("/:id/nodes", SetNodesForKeyspaceGroup) router.POST("/:id/split", SplitKeyspaceGroupByID) router.DELETE("/:id/split", FinishSplitKeyspaceByID) } @@ -190,13 +191,13 @@ func FinishSplitKeyspaceByID(c *gin.Context) { c.JSON(http.StatusOK, nil) } -// AllocNodeForKeyspaceGroupParams defines the params for allocating nodes for keyspace groups. -type AllocNodeForKeyspaceGroupParams struct { +// AllocNodesForKeyspaceGroupParams defines the params for allocating nodes for keyspace groups. +type AllocNodesForKeyspaceGroupParams struct { Replica int `json:"replica"` } -// AllocNodeForKeyspaceGroup allocates nodes for keyspace group. -func AllocNodeForKeyspaceGroup(c *gin.Context) { +// AllocNodesForKeyspaceGroup allocates nodes for keyspace group. +func AllocNodesForKeyspaceGroup(c *gin.Context) { id, err := validateKeyspaceGroupID(c) if err != nil { c.AbortWithStatusJSON(http.StatusBadRequest, "invalid keyspace group id") @@ -204,14 +205,14 @@ func AllocNodeForKeyspaceGroup(c *gin.Context) { } svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) manager := svr.GetKeyspaceGroupManager() - allocParams := &AllocNodeForKeyspaceGroupParams{} + allocParams := &AllocNodesForKeyspaceGroupParams{} err = c.BindJSON(allocParams) if err != nil { c.AbortWithStatusJSON(http.StatusBadRequest, errs.ErrBindJSON.Wrap(err).GenWithStackByCause()) return } - if manager.GetNodesNum() < allocParams.Replica || allocParams.Replica < 1 { - c.AbortWithStatusJSON(http.StatusBadRequest, "invalid replica, should be in [1, nodes_num]") + if manager.GetNodesCount() < allocParams.Replica || allocParams.Replica < utils.KeyspaceGroupDefaultReplicaCount { + c.AbortWithStatusJSON(http.StatusBadRequest, "invalid replica, should be in [2, nodes_num]") return } keyspaceGroup, err := manager.GetKeyspaceGroupByID(id) @@ -232,6 +233,54 @@ func AllocNodeForKeyspaceGroup(c *gin.Context) { c.JSON(http.StatusOK, nodes) } +// SetNodesForKeyspaceGroupParams defines the params for setting nodes for keyspace groups. +// Notes: it should be used carefully. +type SetNodesForKeyspaceGroupParams struct { + Nodes []string `json:"nodes"` +} + +// SetNodesForKeyspaceGroup sets nodes for keyspace group. +func SetNodesForKeyspaceGroup(c *gin.Context) { + id, err := validateKeyspaceGroupID(c) + if err != nil { + c.AbortWithStatusJSON(http.StatusBadRequest, "invalid keyspace group id") + return + } + svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) + manager := svr.GetKeyspaceGroupManager() + setParams := &SetNodesForKeyspaceGroupParams{} + err = c.BindJSON(setParams) + if err != nil { + c.AbortWithStatusJSON(http.StatusBadRequest, errs.ErrBindJSON.Wrap(err).GenWithStackByCause()) + return + } + // check if keyspace group exists + keyspaceGroup, err := manager.GetKeyspaceGroupByID(id) + if err != nil || keyspaceGroup == nil { + c.AbortWithStatusJSON(http.StatusBadRequest, "keyspace group does not exist") + return + } + // check if nodes is less than default replica count + if len(setParams.Nodes) < utils.KeyspaceGroupDefaultReplicaCount { + c.AbortWithStatusJSON(http.StatusBadRequest, "invalid num of nodes") + return + } + // check if node exists + for _, node := range setParams.Nodes { + if !manager.IsExistNode(node) { + c.AbortWithStatusJSON(http.StatusBadRequest, "node does not exist") + return + } + } + // set nodes + err = manager.SetNodesForKeyspaceGroup(id, setParams.Nodes) + if err != nil { + c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) + return + } + c.JSON(http.StatusOK, nil) +} + func validateKeyspaceGroupID(c *gin.Context) (uint32, error) { id, err := strconv.ParseUint(c.Param("id"), 10, 64) if err != nil { diff --git a/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go b/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go index 7b0c09c2a7b..91186ca8211 100644 --- a/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go +++ b/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go @@ -22,9 +22,11 @@ import ( "io" "net/http" "testing" + "time" "github.com/stretchr/testify/suite" bs "github.com/tikv/pd/pkg/basicserver" + "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/tempurl" "github.com/tikv/pd/pkg/utils/testutil" @@ -80,7 +82,7 @@ func (suite *keyspaceGroupTestSuite) TearDownTest() { func (suite *keyspaceGroupTestSuite) TestAllocNodesUpdate() { // add three nodes. nodes := make(map[string]bs.Server) - for i := 0; i < 3; i++ { + for i := 0; i < utils.KeyspaceGroupDefaultReplicaCount+2; i++ { s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc()) defer cleanup() nodes[s.GetAddr()] = s @@ -99,55 +101,65 @@ func (suite *keyspaceGroupTestSuite) TestAllocNodesUpdate() { // alloc nodes for the keyspace group. id := 1 - params := &handlers.AllocNodeForKeyspaceGroupParams{ - Replica: 1, + params := &handlers.AllocNodesForKeyspaceGroupParams{ + Replica: utils.KeyspaceGroupDefaultReplicaCount, } - code, got := suite.tryAllocNodesForKeyspaceGroup(id, params) + got, code := suite.tryAllocNodesForKeyspaceGroup(id, params) suite.Equal(http.StatusOK, code) - suite.Equal(1, len(got)) - suite.Contains(nodes, got[0].Address) - oldNode := got[0].Address + suite.Equal(utils.KeyspaceGroupDefaultReplicaCount, len(got)) + oldMembers := make(map[string]struct{}) + for _, member := range got { + suite.Contains(nodes, member.Address) + oldMembers[member.Address] = struct{}{} + } // alloc node update to 2. - params.Replica = 2 - code, got = suite.tryAllocNodesForKeyspaceGroup(id, params) + params.Replica = utils.KeyspaceGroupDefaultReplicaCount + 1 + got, code = suite.tryAllocNodesForKeyspaceGroup(id, params) suite.Equal(http.StatusOK, code) - suite.Equal(2, len(got)) - suite.Contains(nodes, got[0].Address) - suite.Contains(nodes, got[1].Address) - suite.True(oldNode == got[0].Address || oldNode == got[1].Address) // the old node is also in the new result. - suite.NotEqual(got[0].Address, got[1].Address) // the two nodes are different. + suite.Equal(params.Replica, len(got)) + newMembers := make(map[string]struct{}) + for _, member := range got { + suite.Contains(nodes, member.Address) + newMembers[member.Address] = struct{}{} + } + for member := range oldMembers { + // old members should be in new members. + suite.Contains(newMembers, member) + } } -func (suite *keyspaceGroupTestSuite) TestReplica() { +func (suite *keyspaceGroupTestSuite) TestAllocReplica() { nodes := make(map[string]bs.Server) - s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc()) - defer cleanup() - nodes[s.GetAddr()] = s + for i := 0; i < utils.KeyspaceGroupDefaultReplicaCount; i++ { + s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc()) + defer cleanup() + nodes[s.GetAddr()] = s + } mcs.WaitForPrimaryServing(suite.Require(), nodes) // miss replica. id := 1 - params := &handlers.AllocNodeForKeyspaceGroupParams{} - code, got := suite.tryAllocNodesForKeyspaceGroup(id, params) + params := &handlers.AllocNodesForKeyspaceGroupParams{} + got, code := suite.tryAllocNodesForKeyspaceGroup(id, params) suite.Equal(http.StatusBadRequest, code) suite.Empty(got) - // replica is negative. - params = &handlers.AllocNodeForKeyspaceGroupParams{ - Replica: -1, + // replica is less than default replica. + params = &handlers.AllocNodesForKeyspaceGroupParams{ + Replica: utils.KeyspaceGroupDefaultReplicaCount - 1, } - code, _ = suite.tryAllocNodesForKeyspaceGroup(id, params) + _, code = suite.tryAllocNodesForKeyspaceGroup(id, params) suite.Equal(http.StatusBadRequest, code) // there is no any keyspace group. - params = &handlers.AllocNodeForKeyspaceGroupParams{ - Replica: 1, + params = &handlers.AllocNodesForKeyspaceGroupParams{ + Replica: utils.KeyspaceGroupDefaultReplicaCount, } - code, _ = suite.tryAllocNodesForKeyspaceGroup(id, params) + _, code = suite.tryAllocNodesForKeyspaceGroup(id, params) suite.Equal(http.StatusBadRequest, code) - // the keyspace group is exist. + // create a keyspace group. kgs := &handlers.CreateKeyspaceGroupParams{KeyspaceGroups: []*endpoint.KeyspaceGroup{ { ID: uint32(id), @@ -156,55 +168,141 @@ func (suite *keyspaceGroupTestSuite) TestReplica() { }} code = suite.tryCreateKeyspaceGroup(kgs) suite.Equal(http.StatusOK, code) - params = &handlers.AllocNodeForKeyspaceGroupParams{ - Replica: 1, + params = &handlers.AllocNodesForKeyspaceGroupParams{ + Replica: utils.KeyspaceGroupDefaultReplicaCount, } - code, got = suite.tryAllocNodesForKeyspaceGroup(id, params) + got, code = suite.tryAllocNodesForKeyspaceGroup(id, params) suite.Equal(http.StatusOK, code) - suite.True(checkNodes(got, nodes)) + for _, member := range got { + suite.Contains(nodes, member.Address) + } // the keyspace group is exist, but the replica is more than the num of nodes. - params = &handlers.AllocNodeForKeyspaceGroupParams{ - Replica: 2, + params = &handlers.AllocNodesForKeyspaceGroupParams{ + Replica: utils.KeyspaceGroupDefaultReplicaCount + 1, } - code, _ = suite.tryAllocNodesForKeyspaceGroup(id, params) + _, code = suite.tryAllocNodesForKeyspaceGroup(id, params) suite.Equal(http.StatusBadRequest, code) + // the keyspace group is exist, the new replica is more than the old replica. s2, cleanup2 := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc()) defer cleanup2() nodes[s2.GetAddr()] = s2 mcs.WaitForPrimaryServing(suite.Require(), nodes) - params = &handlers.AllocNodeForKeyspaceGroupParams{ - Replica: 2, + params = &handlers.AllocNodesForKeyspaceGroupParams{ + Replica: utils.KeyspaceGroupDefaultReplicaCount + 1, } - code, got = suite.tryAllocNodesForKeyspaceGroup(id, params) + got, code = suite.tryAllocNodesForKeyspaceGroup(id, params) suite.Equal(http.StatusOK, code) - suite.True(checkNodes(got, nodes)) + for _, member := range got { + suite.Contains(nodes, member.Address) + } // the keyspace group is exist, the new replica is equal to the old replica. - params = &handlers.AllocNodeForKeyspaceGroupParams{ - Replica: 2, + params = &handlers.AllocNodesForKeyspaceGroupParams{ + Replica: utils.KeyspaceGroupDefaultReplicaCount + 1, } - code, _ = suite.tryAllocNodesForKeyspaceGroup(id, params) + _, code = suite.tryAllocNodesForKeyspaceGroup(id, params) suite.Equal(http.StatusBadRequest, code) // the keyspace group is exist, the new replica is less than the old replica. - params = &handlers.AllocNodeForKeyspaceGroupParams{ - Replica: 1, + params = &handlers.AllocNodesForKeyspaceGroupParams{ + Replica: utils.KeyspaceGroupDefaultReplicaCount, } - code, _ = suite.tryAllocNodesForKeyspaceGroup(id, params) + _, code = suite.tryAllocNodesForKeyspaceGroup(id, params) suite.Equal(http.StatusBadRequest, code) // the keyspace group is not exist. id = 2 - params = &handlers.AllocNodeForKeyspaceGroupParams{ - Replica: 1, + params = &handlers.AllocNodesForKeyspaceGroupParams{ + Replica: utils.KeyspaceGroupDefaultReplicaCount, } - code, _ = suite.tryAllocNodesForKeyspaceGroup(id, params) + _, code = suite.tryAllocNodesForKeyspaceGroup(id, params) suite.Equal(http.StatusBadRequest, code) } -func (suite *keyspaceGroupTestSuite) tryAllocNodesForKeyspaceGroup(id int, request *handlers.AllocNodeForKeyspaceGroupParams) (int, []endpoint.KeyspaceGroupMember) { +func (suite *keyspaceGroupTestSuite) TestSetNodes() { + nodes := make(map[string]bs.Server) + nodesList := []string{} + for i := 0; i < utils.KeyspaceGroupDefaultReplicaCount; i++ { + s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc()) + defer cleanup() + nodes[s.GetAddr()] = s + nodesList = append(nodesList, s.GetAddr()) + } + mcs.WaitForPrimaryServing(suite.Require(), nodes) + + // the keyspace group is not exist. + id := 1 + params := &handlers.SetNodesForKeyspaceGroupParams{ + Nodes: nodesList, + } + _, code := suite.trySetNodesForKeyspaceGroup(id, params) + suite.Equal(http.StatusBadRequest, code) + + // the keyspace group is exist. + kgs := &handlers.CreateKeyspaceGroupParams{KeyspaceGroups: []*endpoint.KeyspaceGroup{ + { + ID: uint32(id), + UserKind: endpoint.Standard.String(), + }, + }} + code = suite.tryCreateKeyspaceGroup(kgs) + suite.Equal(http.StatusOK, code) + params = &handlers.SetNodesForKeyspaceGroupParams{ + Nodes: nodesList, + } + kg, code := suite.trySetNodesForKeyspaceGroup(id, params) + suite.Equal(http.StatusOK, code) + suite.Len(kg.Members, 2) + for _, member := range kg.Members { + suite.Contains(nodes, member.Address) + } + + // the keyspace group is exist, but the nodes is not exist. + params = &handlers.SetNodesForKeyspaceGroupParams{ + Nodes: append(nodesList, "pingcap.com:2379"), + } + _, code = suite.trySetNodesForKeyspaceGroup(id, params) + suite.Equal(http.StatusBadRequest, code) + + // the keyspace group is exist, but the count of nodes is less than the default replica. + params = &handlers.SetNodesForKeyspaceGroupParams{ + Nodes: []string{nodesList[0]}, + } + _, code = suite.trySetNodesForKeyspaceGroup(id, params) + suite.Equal(http.StatusBadRequest, code) + + // the keyspace group is not exist. + id = 2 + params = &handlers.SetNodesForKeyspaceGroupParams{ + Nodes: nodesList, + } + _, code = suite.trySetNodesForKeyspaceGroup(id, params) + suite.Equal(http.StatusBadRequest, code) +} + +func (suite *keyspaceGroupTestSuite) TestDefaultKeyspaceGroup() { + nodes := make(map[string]bs.Server) + for i := 0; i < utils.KeyspaceGroupDefaultReplicaCount; i++ { + s, cleanup := mcs.StartSingleTSOTestServer(suite.ctx, suite.Require(), suite.backendEndpoints, tempurl.Alloc()) + defer cleanup() + nodes[s.GetAddr()] = s + } + mcs.WaitForPrimaryServing(suite.Require(), nodes) + + // the default keyspace group is exist. + time.Sleep(2 * time.Second) + kg, code := suite.tryGetKeyspaceGroup(utils.DefaultKeyspaceGroupID) + suite.Equal(http.StatusOK, code) + suite.Equal(utils.DefaultKeyspaceGroupID, kg.ID) + suite.Len(kg.Members, utils.KeyspaceGroupDefaultReplicaCount) + for _, member := range kg.Members { + suite.Contains(nodes, member.Address) + } +} + +func (suite *keyspaceGroupTestSuite) tryAllocNodesForKeyspaceGroup(id int, request *handlers.AllocNodesForKeyspaceGroupParams) ([]endpoint.KeyspaceGroupMember, int) { data, err := json.Marshal(request) suite.NoError(err) httpReq, err := http.NewRequest(http.MethodPost, suite.server.GetAddr()+keyspaceGroupsPrefix+fmt.Sprintf("/%d/alloc", id), bytes.NewBuffer(data)) @@ -218,7 +316,7 @@ func (suite *keyspaceGroupTestSuite) tryAllocNodesForKeyspaceGroup(id int, reque suite.NoError(err) suite.NoError(json.Unmarshal(bodyBytes, &nodes)) } - return resp.StatusCode, nodes + return nodes, resp.StatusCode } func (suite *keyspaceGroupTestSuite) tryCreateKeyspaceGroup(request *handlers.CreateKeyspaceGroupParams) int { @@ -232,14 +330,31 @@ func (suite *keyspaceGroupTestSuite) tryCreateKeyspaceGroup(request *handlers.Cr return resp.StatusCode } -func checkNodes(nodes []endpoint.KeyspaceGroupMember, servers map[string]bs.Server) bool { - if len(nodes) != len(servers) { - return false +func (suite *keyspaceGroupTestSuite) tryGetKeyspaceGroup(id uint32) (*endpoint.KeyspaceGroup, int) { + httpReq, err := http.NewRequest(http.MethodGet, suite.server.GetAddr()+keyspaceGroupsPrefix+fmt.Sprintf("/%d", id), nil) + suite.NoError(err) + resp, err := suite.dialClient.Do(httpReq) + suite.NoError(err) + defer resp.Body.Close() + kg := &endpoint.KeyspaceGroup{} + if resp.StatusCode == http.StatusOK { + bodyBytes, err := io.ReadAll(resp.Body) + suite.NoError(err) + suite.NoError(json.Unmarshal(bodyBytes, kg)) } - for _, node := range nodes { - if _, ok := servers[node.Address]; !ok { - return false - } + return kg, resp.StatusCode +} + +func (suite *keyspaceGroupTestSuite) trySetNodesForKeyspaceGroup(id int, request *handlers.SetNodesForKeyspaceGroupParams) (*endpoint.KeyspaceGroup, int) { + data, err := json.Marshal(request) + suite.NoError(err) + httpReq, err := http.NewRequest(http.MethodPost, suite.server.GetAddr()+keyspaceGroupsPrefix+fmt.Sprintf("/%d/nodes", id), bytes.NewBuffer(data)) + suite.NoError(err) + resp, err := suite.dialClient.Do(httpReq) + suite.NoError(err) + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return nil, resp.StatusCode } - return true + return suite.tryGetKeyspaceGroup(uint32(id)) } diff --git a/tests/integrations/mcs/tso/server_test.go b/tests/integrations/mcs/tso/server_test.go index 64c59685d51..1b8bcc64f68 100644 --- a/tests/integrations/mcs/tso/server_test.go +++ b/tests/integrations/mcs/tso/server_test.go @@ -271,7 +271,7 @@ func (suite *APIServerForwardTestSuite) TestForwardTSORelated() { func (suite *APIServerForwardTestSuite) TestForwardTSOWhenPrimaryChanged() { re := suite.Require() - tc, err := mcs.NewTestTSOCluster(suite.ctx, 3, suite.backendEndpoints) + tc, err := mcs.NewTestTSOCluster(suite.ctx, 2, suite.backendEndpoints) re.NoError(err) defer tc.Destroy() tc.WaitForDefaultPrimaryServing(re)