Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs: add set handler for balancer and alloc node for default keyspace group #6342

Merged
merged 16 commits into from Apr 25, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
142 changes: 117 additions & 25 deletions pkg/keyspace/tso_keyspace_group.go
Expand Up @@ -36,9 +36,10 @@
)

const (
defaultBalancerPolicy = balancer.PolicyRoundRobin
allocNodeTimeout = 1 * time.Second
allocNodeInterval = 10 * time.Millisecond
defaultBalancerPolicy = balancer.PolicyRoundRobin
allocNodesForDefaultKeyspaceGroupInterval = 1 * time.Second
allocNodesTimeout = 1 * time.Second
allocNodesInterval = 10 * time.Millisecond
// TODO: move it to etcdutil
watchEtcdChangeRetryInterval = 1 * time.Second
maxRetryTimes = 25
Expand Down Expand Up @@ -114,6 +115,15 @@

m.Lock()
defer m.Unlock()

// If the etcd client is not nil, start the watch loop.
if m.client != nil {
m.nodesBalancer = balancer.GenByPolicy[string](m.policy)
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
m.serviceRegistryMap = make(map[string]string)
m.wg.Add(1)
go m.startWatchLoop()
}

// Ignore the error if default keyspace group already exists in the storage (e.g. PD restart/recover).
err := m.saveKeyspaceGroups([]*endpoint.KeyspaceGroup{defaultKeyspaceGroup}, false)
if err != nil && err != ErrKeyspaceGroupExists {
Expand All @@ -126,17 +136,16 @@
return err
}
for _, group := range groups {
if group.ID == utils.DefaultKeyspaceGroupID {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that, not just about default keyspace group, this logic applies to any keyspace group instead, i.e., if any keyspace group's member/replica count is under the desired replica count, then alloc nodes to it. The desired replica count is specified as one of parameters when creating keyspace group and the default value is 2.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we also need to check params for set-nodes and alloc-nodes interface?

if len(group.Members) == 0 && m.client != nil {
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
// The default keyspace group should have one replica at least.
m.wg.Add(1)
go m.allocNodesForDefaultKeyspaceGroup()
}
}
userKind := endpoint.StringUserKind(group.UserKind)
m.groups[userKind].Put(group)
}

// If the etcd client is not nil, start the watch loop.
if m.client != nil {
m.nodesBalancer = balancer.GenByPolicy[string](m.policy)
m.serviceRegistryMap = make(map[string]string)
m.wg.Add(1)
go m.startWatchLoop()
}
return nil
}

Expand All @@ -146,6 +155,31 @@
m.wg.Wait()
}

func (m *GroupManager) allocNodesForDefaultKeyspaceGroup() {
defer logutil.LogPanic()
defer m.wg.Done()
ticker := time.NewTicker(allocNodesForDefaultKeyspaceGroupInterval)
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
defer ticker.Stop()
for {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need timeout for it?

select {
case <-m.ctx.Done():
return
case <-ticker.C:
}
kg, err := m.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
nodes := m.nodesBalancer.GetAll()
if err == nil && kg != nil && equalMembers(kg.Members, nodes) {
continue
}
err = m.SetNodesForKeyspaceGroup(utils.DefaultKeyspaceGroupID, nodes)
if err == nil {
log.Info("alloc nodes for default keyspace group", zap.Reflect("nodes", nodes))
} else {
log.Warn("failed to alloc nodes for default keyspace group", zap.Error(err))

Check warning on line 178 in pkg/keyspace/tso_keyspace_group.go

View check run for this annotation

Codecov / codecov/patch

pkg/keyspace/tso_keyspace_group.go#L178

Added line #L178 was not covered by tests
}
}
}

func (m *GroupManager) startWatchLoop() {
defer logutil.LogPanic()
defer m.wg.Done()
Expand All @@ -156,12 +190,9 @@
revision int64
err error
)
ticker := time.NewTicker(retryInterval)
defer ticker.Stop()
for i := 0; i < maxRetryTimes; i++ {
select {
case <-ctx.Done():
return
case <-time.After(retryInterval):
}
resp, err = etcdutil.EtcdKVGet(m.client, m.tsoServiceKey, clientv3.WithRange(m.tsoServiceEndKey))
if err == nil {
revision = resp.Header.Revision
Expand All @@ -177,6 +208,11 @@
break
}
log.Warn("failed to get tso service addrs from etcd and will retry", zap.Error(err))
select {
case <-m.ctx.Done():
return
case <-ticker.C:

Check warning on line 214 in pkg/keyspace/tso_keyspace_group.go

View check run for this annotation

Codecov / codecov/patch

pkg/keyspace/tso_keyspace_group.go#L211-L214

Added lines #L211 - L214 were not covered by tests
}
}
if err != nil || revision == 0 {
log.Warn("failed to get tso service addrs from etcd finally when loading", zap.Error(err))
Expand Down Expand Up @@ -603,18 +639,23 @@
return nil
}

// GetNodesNum returns the number of nodes.
func (m *GroupManager) GetNodesNum() int {
// GetNodesCount returns the count of nodes.
func (m *GroupManager) GetNodesCount() int {
if m.nodesBalancer == nil {
return 0

Check warning on line 645 in pkg/keyspace/tso_keyspace_group.go

View check run for this annotation

Codecov / codecov/patch

pkg/keyspace/tso_keyspace_group.go#L645

Added line #L645 was not covered by tests
}
return m.nodesBalancer.Len()
}

// AllocNodesForKeyspaceGroup allocates nodes for the keyspace group.
func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, replica int) ([]endpoint.KeyspaceGroupMember, error) {
ctx, cancel := context.WithTimeout(m.ctx, allocNodeTimeout)
func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, desiredReplicaCount int) ([]endpoint.KeyspaceGroupMember, error) {
m.Lock()
defer m.Unlock()
ctx, cancel := context.WithTimeout(m.ctx, allocNodesTimeout)
defer cancel()
ticker := time.NewTicker(allocNodeInterval)
ticker := time.NewTicker(allocNodesInterval)
defer ticker.Stop()
nodes := make([]endpoint.KeyspaceGroupMember, 0, replica)
nodes := make([]endpoint.KeyspaceGroupMember, 0, desiredReplicaCount)
err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
kg, err := m.store.LoadKeyspaceGroup(txn, id)
if err != nil {
Expand All @@ -628,14 +669,17 @@
exists[member.Address] = struct{}{}
nodes = append(nodes, member)
}
for len(exists) < replica {
if len(exists) >= desiredReplicaCount {
return nil

Check warning on line 673 in pkg/keyspace/tso_keyspace_group.go

View check run for this annotation

Codecov / codecov/patch

pkg/keyspace/tso_keyspace_group.go#L673

Added line #L673 was not covered by tests
}
for len(exists) < desiredReplicaCount {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
}
num := m.GetNodesNum()
if num < replica || num == 0 { // double check
countOfNodes := m.GetNodesCount()
if countOfNodes < desiredReplicaCount || countOfNodes == 0 { // double check
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need countOfNodes == 0 when countOfNodes >= desiredReplicaCount?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider meeting offline node

return ErrNoAvailableNode
}
addr := m.nodesBalancer.Next()
Expand All @@ -656,3 +700,51 @@
}
return nodes, nil
}

// SetNodesForKeyspaceGroup sets the nodes for the keyspace group.
func (m *GroupManager) SetNodesForKeyspaceGroup(id uint32, nodes []string) error {
m.Lock()
defer m.Unlock()
return m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
kg, err := m.store.LoadKeyspaceGroup(txn, id)
if err != nil {
return err

Check warning on line 711 in pkg/keyspace/tso_keyspace_group.go

View check run for this annotation

Codecov / codecov/patch

pkg/keyspace/tso_keyspace_group.go#L711

Added line #L711 was not covered by tests
}
if kg == nil {
return ErrKeyspaceGroupNotExists

Check warning on line 714 in pkg/keyspace/tso_keyspace_group.go

View check run for this annotation

Codecov / codecov/patch

pkg/keyspace/tso_keyspace_group.go#L714

Added line #L714 was not covered by tests
}
members := make([]endpoint.KeyspaceGroupMember, 0, len(nodes))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel here we have more work to do in the future, such as sanity check and service availability check for the nodes switched to before actually refreshing the member list. Please ignore this comment for now.

for _, node := range nodes {
members = append(members, endpoint.KeyspaceGroupMember{Address: node})
}
kg.Members = members
return m.store.SaveKeyspaceGroup(txn, kg)
})
}

// IsExistNode checks if the node exists.
func (m *GroupManager) IsExistNode(addr string) bool {
nodes := m.nodesBalancer.GetAll()
for _, node := range nodes {
if node == addr {
return true
}
}
return false
}

func equalMembers(a []endpoint.KeyspaceGroupMember, b []string) bool {
if len(a) != len(b) {
return false
}
isExist := make(map[string]struct{}, len(b))
for _, node := range b {
isExist[node] = struct{}{}
}
for _, member := range a {
if _, ok := isExist[member.Address]; !ok {
return false

Check warning on line 746 in pkg/keyspace/tso_keyspace_group.go

View check run for this annotation

Codecov / codecov/patch

pkg/keyspace/tso_keyspace_group.go#L746

Added line #L746 was not covered by tests
}
}
return true
}
3 changes: 3 additions & 0 deletions pkg/mcs/tso/server/server.go
Expand Up @@ -219,6 +219,9 @@
log.Error("failed to get election member", errs.ZapError(err))
return false
}
if member == nil {
return false

Check warning on line 223 in pkg/mcs/tso/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/tso/server/server.go#L223

Added line #L223 was not covered by tests
}
return member.IsLeader()
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/tso/keyspace_group_manager.go
Expand Up @@ -728,6 +728,9 @@
if err != nil {
return nil, err
}
if am == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When will this happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check if it is primary after kgm.deleteKeyspaceGroup with default keyspace group

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean delete the default keyspace group?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the default keyspace group no longer contain this member

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it actually goes here, it will panic I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can be removed after @binshi-bing has fixed

return nil, nil

Check warning on line 732 in pkg/tso/keyspace_group_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/tso/keyspace_group_manager.go#L732

Added line #L732 was not covered by tests
}
return am.GetMember(), nil
}

Expand Down
63 changes: 56 additions & 7 deletions server/apiv2/handlers/tso_keyspace_group.go
Expand Up @@ -35,7 +35,8 @@
router.GET("", GetKeyspaceGroups)
router.GET("/:id", GetKeyspaceGroupByID)
router.DELETE("/:id", DeleteKeyspaceGroupByID)
router.POST("/:id/alloc", AllocNodeForKeyspaceGroup)
router.POST("/:id/alloc", AllocNodesForKeyspaceGroup)
router.POST("/:id/nodes", SetNodesForKeyspaceGroup)
router.POST("/:id/split", SplitKeyspaceGroupByID)
router.DELETE("/:id/split", FinishSplitKeyspaceByID)
}
Expand Down Expand Up @@ -190,27 +191,27 @@
c.JSON(http.StatusOK, nil)
}

// AllocNodeForKeyspaceGroupParams defines the params for allocating nodes for keyspace groups.
type AllocNodeForKeyspaceGroupParams struct {
// AllocNodesForKeyspaceGroupParams defines the params for allocating nodes for keyspace groups.
type AllocNodesForKeyspaceGroupParams struct {
Replica int `json:"replica"`
}

// AllocNodeForKeyspaceGroup allocates nodes for keyspace group.
func AllocNodeForKeyspaceGroup(c *gin.Context) {
// AllocNodesForKeyspaceGroup allocates nodes for keyspace group.
func AllocNodesForKeyspaceGroup(c *gin.Context) {
id, err := validateKeyspaceGroupID(c)
if err != nil {
c.AbortWithStatusJSON(http.StatusBadRequest, "invalid keyspace group id")
return
}
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
manager := svr.GetKeyspaceGroupManager()
allocParams := &AllocNodeForKeyspaceGroupParams{}
allocParams := &AllocNodesForKeyspaceGroupParams{}
err = c.BindJSON(allocParams)
if err != nil {
c.AbortWithStatusJSON(http.StatusBadRequest, errs.ErrBindJSON.Wrap(err).GenWithStackByCause())
return
}
if manager.GetNodesNum() < allocParams.Replica || allocParams.Replica < 1 {
if manager.GetNodesCount() < allocParams.Replica || allocParams.Replica < 1 {
c.AbortWithStatusJSON(http.StatusBadRequest, "invalid replica, should be in [1, nodes_num]")
return
}
Expand All @@ -232,6 +233,54 @@
c.JSON(http.StatusOK, nodes)
}

// SetNodesForKeyspaceGroupParams defines the params for setting nodes for keyspace groups.
// Notes: it should be used carefully.
type SetNodesForKeyspaceGroupParams struct {
Nodes []string `json:"nodes"`
}

// SetNodesForKeyspaceGroup sets nodes for keyspace group.
func SetNodesForKeyspaceGroup(c *gin.Context) {
id, err := validateKeyspaceGroupID(c)
if err != nil {
c.AbortWithStatusJSON(http.StatusBadRequest, "invalid keyspace group id")
return

Check warning on line 247 in server/apiv2/handlers/tso_keyspace_group.go

View check run for this annotation

Codecov / codecov/patch

server/apiv2/handlers/tso_keyspace_group.go#L246-L247

Added lines #L246 - L247 were not covered by tests
}
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
manager := svr.GetKeyspaceGroupManager()
setParams := &SetNodesForKeyspaceGroupParams{}
err = c.BindJSON(setParams)
if err != nil {
c.AbortWithStatusJSON(http.StatusBadRequest, errs.ErrBindJSON.Wrap(err).GenWithStackByCause())
return

Check warning on line 255 in server/apiv2/handlers/tso_keyspace_group.go

View check run for this annotation

Codecov / codecov/patch

server/apiv2/handlers/tso_keyspace_group.go#L254-L255

Added lines #L254 - L255 were not covered by tests
}
// check keyspace group whether exist
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
keyspaceGroup, err := manager.GetKeyspaceGroupByID(id)
if err != nil || keyspaceGroup == nil {
c.AbortWithStatusJSON(http.StatusBadRequest, "keyspace group does not exist")
return
}
// check nodes whether empty
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
if len(setParams.Nodes) == 0 {
c.AbortWithStatusJSON(http.StatusBadRequest, "invalid empty nodes")
return
}
// check nodes whether exist
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
for _, node := range setParams.Nodes {
if !manager.IsExistNode(node) {
c.AbortWithStatusJSON(http.StatusBadRequest, "node does not exist")
return
}
}
// set nodes
err = manager.SetNodesForKeyspaceGroup(id, setParams.Nodes)
if err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return

Check warning on line 279 in server/apiv2/handlers/tso_keyspace_group.go

View check run for this annotation

Codecov / codecov/patch

server/apiv2/handlers/tso_keyspace_group.go#L278-L279

Added lines #L278 - L279 were not covered by tests
}
c.JSON(http.StatusOK, nil)
}

func validateKeyspaceGroupID(c *gin.Context) (uint32, error) {
id, err := strconv.ParseUint(c.Param("id"), 10, 64)
if err != nil {
Expand Down