Skip to content

Commit

Permalink
mcs: add set handler for balancer and alloc node for default keyspace…
Browse files Browse the repository at this point in the history
… group (tikv#6342)

ref tikv#6233

Signed-off-by: lhy1024 <admin@liudos.us>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and rleungx committed Aug 2, 2023
1 parent f40e7f8 commit 671633d
Show file tree
Hide file tree
Showing 5 changed files with 351 additions and 101 deletions.
151 changes: 117 additions & 34 deletions pkg/keyspace/tso_keyspace_group.go
Expand Up @@ -36,9 +36,10 @@ import (
)

const (
defaultBalancerPolicy = balancer.PolicyRoundRobin
allocNodeTimeout = 1 * time.Second
allocNodeInterval = 10 * time.Millisecond
defaultBalancerPolicy = balancer.PolicyRoundRobin
allocNodesToKeyspaceGroupsInterval = 1 * time.Second
allocNodesTimeout = 1 * time.Second
allocNodesInterval = 10 * time.Millisecond
// TODO: move it to etcdutil
watchEtcdChangeRetryInterval = 1 * time.Second
maxRetryTimes = 25
Expand Down Expand Up @@ -71,8 +72,6 @@ type GroupManager struct {
// tsoServiceEndKey is the end key of TSO service in etcd.
tsoServiceEndKey string

policy balancer.Policy

// TODO: add user kind with different balancer
// when we ensure where the correspondence between tso node and user kind will be found
nodesBalancer balancer.Balancer[string]
Expand All @@ -89,14 +88,15 @@ func NewKeyspaceGroupManager(ctx context.Context, store endpoint.KeyspaceGroupSt
groups[endpoint.UserKind(i)] = newIndexedHeap(int(utils.MaxKeyspaceGroupCountInUse))
}
return &GroupManager{
ctx: ctx,
cancel: cancel,
store: store,
client: client,
tsoServiceKey: key,
tsoServiceEndKey: clientv3.GetPrefixRangeEnd(key) + "/",
policy: defaultBalancerPolicy,
groups: groups,
ctx: ctx,
cancel: cancel,
store: store,
client: client,
tsoServiceKey: key,
tsoServiceEndKey: clientv3.GetPrefixRangeEnd(key) + "/",
groups: groups,
nodesBalancer: balancer.GenByPolicy[string](defaultBalancerPolicy),
serviceRegistryMap: make(map[string]string),
}
}

Expand All @@ -114,6 +114,14 @@ func (m *GroupManager) Bootstrap(ctx context.Context) error {

m.Lock()
defer m.Unlock()

// If the etcd client is not nil, start the watch loop.
if m.client != nil {
m.wg.Add(2)
go m.startWatchLoop(ctx)
go m.allocNodesToAllKeyspaceGroups()
}

// Ignore the error if default keyspace group already exists in the storage (e.g. PD restart/recover).
err := m.saveKeyspaceGroups([]*endpoint.KeyspaceGroup{defaultKeyspaceGroup}, false)
if err != nil && err != ErrKeyspaceGroupExists {
Expand All @@ -130,13 +138,6 @@ func (m *GroupManager) Bootstrap(ctx context.Context) error {
m.groups[userKind].Put(group)
}

// If the etcd client is not nil, start the watch loop.
if m.client != nil {
m.nodesBalancer = balancer.GenByPolicy[string](m.policy)
m.serviceRegistryMap = make(map[string]string)
m.wg.Add(1)
go m.startWatchLoop(ctx)
}
return nil
}

Expand All @@ -146,6 +147,45 @@ func (m *GroupManager) Close() {
m.wg.Wait()
}

func (m *GroupManager) allocNodesToAllKeyspaceGroups() {
defer logutil.LogPanic()
defer m.wg.Done()
ticker := time.NewTicker(allocNodesToKeyspaceGroupsInterval)
defer ticker.Stop()
for {
select {
case <-m.ctx.Done():
return
case <-ticker.C:
}
countOfNodes := m.GetNodesCount()
if countOfNodes < utils.KeyspaceGroupDefaultReplicaCount {
continue
}
groups, err := m.store.LoadKeyspaceGroups(utils.DefaultKeyspaceGroupID, 0)
if err != nil {
log.Error("failed to load the all keyspace group", zap.Error(err))
continue
}
withError := false
for _, group := range groups {
if len(group.Members) < utils.KeyspaceGroupDefaultReplicaCount {
nodes, err := m.AllocNodesForKeyspaceGroup(group.ID, utils.KeyspaceGroupDefaultReplicaCount)
if err != nil {
withError = true
log.Error("failed to alloc nodes for keyspace group", zap.Error(err))
continue
}
group.Members = nodes
}
}
if !withError {
// all keyspace groups have equal or more than default replica count
return
}
}
}

func (m *GroupManager) startWatchLoop(parentCtx context.Context) {
defer logutil.LogPanic()
defer m.wg.Done()
Expand All @@ -156,12 +196,9 @@ func (m *GroupManager) startWatchLoop(parentCtx context.Context) {
revision int64
err error
)
ticker := time.NewTicker(retryInterval)
defer ticker.Stop()
for i := 0; i < maxRetryTimes; i++ {
select {
case <-ctx.Done():
return
case <-time.After(retryInterval):
}
resp, err = etcdutil.EtcdKVGet(m.client, m.tsoServiceKey, clientv3.WithRange(m.tsoServiceEndKey))
if err == nil {
revision = resp.Header.Revision + 1
Expand All @@ -177,6 +214,11 @@ func (m *GroupManager) startWatchLoop(parentCtx context.Context) {
break
}
log.Warn("failed to get tso service addrs from etcd and will retry", zap.Error(err))
select {
case <-m.ctx.Done():
return
case <-ticker.C:
}
}
if err != nil || revision == 0 {
log.Warn("failed to get tso service addrs from etcd finally when loading", zap.Error(err))
Expand Down Expand Up @@ -603,18 +645,23 @@ func (m *GroupManager) FinishSplitKeyspaceByID(splitTargetID uint32) error {
return nil
}

// GetNodesNum returns the number of nodes.
func (m *GroupManager) GetNodesNum() int {
// GetNodesCount returns the count of nodes.
func (m *GroupManager) GetNodesCount() int {
if m.nodesBalancer == nil {
return 0
}
return m.nodesBalancer.Len()
}

// AllocNodesForKeyspaceGroup allocates nodes for the keyspace group.
func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, replica int) ([]endpoint.KeyspaceGroupMember, error) {
ctx, cancel := context.WithTimeout(m.ctx, allocNodeTimeout)
func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, desiredReplicaCount int) ([]endpoint.KeyspaceGroupMember, error) {
m.Lock()
defer m.Unlock()
ctx, cancel := context.WithTimeout(m.ctx, allocNodesTimeout)
defer cancel()
ticker := time.NewTicker(allocNodeInterval)
ticker := time.NewTicker(allocNodesInterval)
defer ticker.Stop()
nodes := make([]endpoint.KeyspaceGroupMember, 0, replica)
nodes := make([]endpoint.KeyspaceGroupMember, 0, desiredReplicaCount)
err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
kg, err := m.store.LoadKeyspaceGroup(txn, id)
if err != nil {
Expand All @@ -628,14 +675,17 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, replica int) ([]end
exists[member.Address] = struct{}{}
nodes = append(nodes, member)
}
for len(exists) < replica {
if len(exists) >= desiredReplicaCount {
return nil
}
for len(exists) < desiredReplicaCount {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
}
num := m.GetNodesNum()
if num < replica || num == 0 { // double check
countOfNodes := m.GetNodesCount()
if countOfNodes < desiredReplicaCount || countOfNodes == 0 { // double check
return ErrNoAvailableNode
}
addr := m.nodesBalancer.Next()
Expand All @@ -654,5 +704,38 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, replica int) ([]end
if err != nil {
return nil, err
}
log.Info("alloc nodes for keyspace group", zap.Uint32("id", id), zap.Reflect("nodes", nodes))
return nodes, nil
}

// SetNodesForKeyspaceGroup sets the nodes for the keyspace group.
func (m *GroupManager) SetNodesForKeyspaceGroup(id uint32, nodes []string) error {
m.Lock()
defer m.Unlock()
return m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
kg, err := m.store.LoadKeyspaceGroup(txn, id)
if err != nil {
return err
}
if kg == nil {
return ErrKeyspaceGroupNotExists
}
members := make([]endpoint.KeyspaceGroupMember, 0, len(nodes))
for _, node := range nodes {
members = append(members, endpoint.KeyspaceGroupMember{Address: node})
}
kg.Members = members
return m.store.SaveKeyspaceGroup(txn, kg)
})
}

// IsExistNode checks if the node exists.
func (m *GroupManager) IsExistNode(addr string) bool {
nodes := m.nodesBalancer.GetAll()
for _, node := range nodes {
if node == addr {
return true
}
}
return false
}
3 changes: 3 additions & 0 deletions pkg/mcs/utils/constant.go
Expand Up @@ -63,4 +63,7 @@ const (
// MaxKeyspaceGroupCountInUse is a much more reasonable value of the max count in the
// foreseen future, and the former is just for extensibility in theory.
MaxKeyspaceGroupCountInUse = uint32(4096)

// KeyspaceGroupDefaultReplicaCount is the default replica count of keyspace group.
KeyspaceGroupDefaultReplicaCount = 2
)
65 changes: 57 additions & 8 deletions server/apiv2/handlers/tso_keyspace_group.go
Expand Up @@ -35,7 +35,8 @@ func RegisterTSOKeyspaceGroup(r *gin.RouterGroup) {
router.GET("", GetKeyspaceGroups)
router.GET("/:id", GetKeyspaceGroupByID)
router.DELETE("/:id", DeleteKeyspaceGroupByID)
router.POST("/:id/alloc", AllocNodeForKeyspaceGroup)
router.POST("/:id/alloc", AllocNodesForKeyspaceGroup)
router.POST("/:id/nodes", SetNodesForKeyspaceGroup)
router.POST("/:id/split", SplitKeyspaceGroupByID)
router.DELETE("/:id/split", FinishSplitKeyspaceByID)
}
Expand Down Expand Up @@ -190,28 +191,28 @@ func FinishSplitKeyspaceByID(c *gin.Context) {
c.JSON(http.StatusOK, nil)
}

// AllocNodeForKeyspaceGroupParams defines the params for allocating nodes for keyspace groups.
type AllocNodeForKeyspaceGroupParams struct {
// AllocNodesForKeyspaceGroupParams defines the params for allocating nodes for keyspace groups.
type AllocNodesForKeyspaceGroupParams struct {
Replica int `json:"replica"`
}

// AllocNodeForKeyspaceGroup allocates nodes for keyspace group.
func AllocNodeForKeyspaceGroup(c *gin.Context) {
// AllocNodesForKeyspaceGroup allocates nodes for keyspace group.
func AllocNodesForKeyspaceGroup(c *gin.Context) {
id, err := validateKeyspaceGroupID(c)
if err != nil {
c.AbortWithStatusJSON(http.StatusBadRequest, "invalid keyspace group id")
return
}
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
manager := svr.GetKeyspaceGroupManager()
allocParams := &AllocNodeForKeyspaceGroupParams{}
allocParams := &AllocNodesForKeyspaceGroupParams{}
err = c.BindJSON(allocParams)
if err != nil {
c.AbortWithStatusJSON(http.StatusBadRequest, errs.ErrBindJSON.Wrap(err).GenWithStackByCause())
return
}
if manager.GetNodesNum() < allocParams.Replica || allocParams.Replica < 1 {
c.AbortWithStatusJSON(http.StatusBadRequest, "invalid replica, should be in [1, nodes_num]")
if manager.GetNodesCount() < allocParams.Replica || allocParams.Replica < utils.KeyspaceGroupDefaultReplicaCount {
c.AbortWithStatusJSON(http.StatusBadRequest, "invalid replica, should be in [2, nodes_num]")
return
}
keyspaceGroup, err := manager.GetKeyspaceGroupByID(id)
Expand All @@ -232,6 +233,54 @@ func AllocNodeForKeyspaceGroup(c *gin.Context) {
c.JSON(http.StatusOK, nodes)
}

// SetNodesForKeyspaceGroupParams defines the params for setting nodes for keyspace groups.
// Notes: it should be used carefully.
type SetNodesForKeyspaceGroupParams struct {
Nodes []string `json:"nodes"`
}

// SetNodesForKeyspaceGroup sets nodes for keyspace group.
func SetNodesForKeyspaceGroup(c *gin.Context) {
id, err := validateKeyspaceGroupID(c)
if err != nil {
c.AbortWithStatusJSON(http.StatusBadRequest, "invalid keyspace group id")
return
}
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
manager := svr.GetKeyspaceGroupManager()
setParams := &SetNodesForKeyspaceGroupParams{}
err = c.BindJSON(setParams)
if err != nil {
c.AbortWithStatusJSON(http.StatusBadRequest, errs.ErrBindJSON.Wrap(err).GenWithStackByCause())
return
}
// check if keyspace group exists
keyspaceGroup, err := manager.GetKeyspaceGroupByID(id)
if err != nil || keyspaceGroup == nil {
c.AbortWithStatusJSON(http.StatusBadRequest, "keyspace group does not exist")
return
}
// check if nodes is less than default replica count
if len(setParams.Nodes) < utils.KeyspaceGroupDefaultReplicaCount {
c.AbortWithStatusJSON(http.StatusBadRequest, "invalid num of nodes")
return
}
// check if node exists
for _, node := range setParams.Nodes {
if !manager.IsExistNode(node) {
c.AbortWithStatusJSON(http.StatusBadRequest, "node does not exist")
return
}
}
// set nodes
err = manager.SetNodesForKeyspaceGroup(id, setParams.Nodes)
if err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
}
c.JSON(http.StatusOK, nil)
}

func validateKeyspaceGroupID(c *gin.Context) (uint32, error) {
id, err := strconv.ParseUint(c.Param("id"), 10, 64)
if err != nil {
Expand Down

0 comments on commit 671633d

Please sign in to comment.