Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs: add set handler for balancer and alloc node for default keyspace group #6342

Merged
merged 16 commits into from Apr 25, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
137 changes: 111 additions & 26 deletions pkg/keyspace/tso_keyspace_group.go
Expand Up @@ -36,9 +36,10 @@
)

const (
defaultBalancerPolicy = balancer.PolicyRoundRobin
allocNodeTimeout = 1 * time.Second
allocNodeInterval = 10 * time.Millisecond
defaultBalancerPolicy = balancer.PolicyRoundRobin
allocNodesForDefaultKeyspaceGroupInterval = 1 * time.Second
allocNodesTimeout = 1 * time.Second
allocNodesInterval = 10 * time.Millisecond
// TODO: move it to etcdutil
watchEtcdChangeRetryInterval = 1 * time.Second
maxRetryTimes = 25
Expand Down Expand Up @@ -95,8 +96,9 @@
client: client,
tsoServiceKey: key,
tsoServiceEndKey: clientv3.GetPrefixRangeEnd(key) + "/",
policy: defaultBalancerPolicy,
groups: groups,
nodesBalancer: balancer.GenByPolicy[string](defaultBalancerPolicy),
serviceRegistryMap: make(map[string]string),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this map?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see #6346

}
}

Expand All @@ -114,6 +116,14 @@

m.Lock()
defer m.Unlock()

// If the etcd client is not nil, start the watch loop.
if m.client != nil {
m.wg.Add(2)
go m.startWatchLoop()
go m.allocDefaultNodesForKeyspaceGroup()
}

// Ignore the error if default keyspace group already exists in the storage (e.g. PD restart/recover).
err := m.saveKeyspaceGroups([]*endpoint.KeyspaceGroup{defaultKeyspaceGroup}, false)
if err != nil && err != ErrKeyspaceGroupExists {
Expand All @@ -129,14 +139,6 @@
userKind := endpoint.StringUserKind(group.UserKind)
m.groups[userKind].Put(group)
}

// If the etcd client is not nil, start the watch loop.
if m.client != nil {
m.nodesBalancer = balancer.GenByPolicy[string](m.policy)
m.serviceRegistryMap = make(map[string]string)
m.wg.Add(1)
go m.startWatchLoop()
}
return nil
}

Expand All @@ -146,6 +148,47 @@
m.wg.Wait()
}

func (m *GroupManager) allocDefaultNodesForKeyspaceGroup() {
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
defer logutil.LogPanic()
defer m.wg.Done()
ticker := time.NewTicker(allocNodesForDefaultKeyspaceGroupInterval)
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
defer ticker.Stop()
for {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need timeout for it?

select {
case <-m.ctx.Done():
return
case <-ticker.C:
}
countOfNodes := m.GetNodesCount()
if countOfNodes < utils.KeyspaceGroupDefaultReplicaCount {
log.Info("the count of nodes is not enough to allocate the default keyspace group", zap.Int("count", countOfNodes))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this log keep flushing every second?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will adjust it

continue
}
groups, err := m.store.LoadKeyspaceGroups(utils.DefaultKeyspaceGroupID, 0)
if err != nil {
log.Error("failed to load the default keyspace group", zap.Error(err))
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
continue

Check warning on line 170 in pkg/keyspace/tso_keyspace_group.go

View check run for this annotation

Codecov / codecov/patch

pkg/keyspace/tso_keyspace_group.go#L169-L170

Added lines #L169 - L170 were not covered by tests
}
withError := false
for _, group := range groups {
if len(group.Members) < utils.KeyspaceGroupDefaultReplicaCount {
nodes, err := m.AllocNodesForKeyspaceGroup(group.ID, utils.KeyspaceGroupDefaultReplicaCount)
if err != nil {
withError = true
log.Error("failed to alloc default nodes for keyspace group", zap.Error(err))
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
continue

Check warning on line 179 in pkg/keyspace/tso_keyspace_group.go

View check run for this annotation

Codecov / codecov/patch

pkg/keyspace/tso_keyspace_group.go#L177-L179

Added lines #L177 - L179 were not covered by tests
}
log.Info("alloc default nodes for keyspace group", zap.Int("count", len(nodes)))
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
group.Members = nodes
}
}
if !withError {
// all keyspace groups have equal or more than default replica count
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Continue or return?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return. It will only occur when load keyspace group. In the future, we support scale-out and update nodes in balancer, it will update always. cc @binshi-bing

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have done all the work here, then we can return. The problem is that it seems that we have a severe bug -- when utils.KeyspaceGroupDefaultReplicaCount is 2 and there are two tso nodes just registered with the third tso node upcoming, this function will proceed to load all keyspace groups and allocate 2 nodes to all of them. After the third TSO node being registered, it won't be assigned any keyspace group. This is common case when we to create a new cluster including api service and tso service, where we first create API nodes then gradually add tso nodes.

Because of this reason, I prefer to let operator manually call balance api, after all tso nodes are registered, to assign tso nodes to the keyspace groups whose member count is less than utils.KeyspaceGroupDefaultReplicaCount instead of doing this job in group manager's bootstrap.

@lhy1024, @rleungx, @JmPotato let's discuss on Monday.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lhy1024 , after some thoughts, let's keep your way, because we'll mostly setup 2 tso nodes for now and we can also use move keyspace group api to change the distribution.

}
}
}

func (m *GroupManager) startWatchLoop() {
defer logutil.LogPanic()
defer m.wg.Done()
Expand All @@ -156,12 +199,9 @@
revision int64
err error
)
ticker := time.NewTicker(retryInterval)
defer ticker.Stop()
for i := 0; i < maxRetryTimes; i++ {
select {
case <-ctx.Done():
return
case <-time.After(retryInterval):
}
resp, err = etcdutil.EtcdKVGet(m.client, m.tsoServiceKey, clientv3.WithRange(m.tsoServiceEndKey))
if err == nil {
revision = resp.Header.Revision
Expand All @@ -177,6 +217,11 @@
break
}
log.Warn("failed to get tso service addrs from etcd and will retry", zap.Error(err))
select {
case <-m.ctx.Done():
return
case <-ticker.C:

Check warning on line 223 in pkg/keyspace/tso_keyspace_group.go

View check run for this annotation

Codecov / codecov/patch

pkg/keyspace/tso_keyspace_group.go#L220-L223

Added lines #L220 - L223 were not covered by tests
}
}
if err != nil || revision == 0 {
log.Warn("failed to get tso service addrs from etcd finally when loading", zap.Error(err))
Expand Down Expand Up @@ -603,18 +648,23 @@
return nil
}

// GetNodesNum returns the number of nodes.
func (m *GroupManager) GetNodesNum() int {
// GetNodesCount returns the count of nodes.
func (m *GroupManager) GetNodesCount() int {
if m.nodesBalancer == nil {
return 0

Check warning on line 654 in pkg/keyspace/tso_keyspace_group.go

View check run for this annotation

Codecov / codecov/patch

pkg/keyspace/tso_keyspace_group.go#L654

Added line #L654 was not covered by tests
}
return m.nodesBalancer.Len()
}

// AllocNodesForKeyspaceGroup allocates nodes for the keyspace group.
func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, replica int) ([]endpoint.KeyspaceGroupMember, error) {
ctx, cancel := context.WithTimeout(m.ctx, allocNodeTimeout)
func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, desiredReplicaCount int) ([]endpoint.KeyspaceGroupMember, error) {
m.Lock()
defer m.Unlock()
ctx, cancel := context.WithTimeout(m.ctx, allocNodesTimeout)
defer cancel()
ticker := time.NewTicker(allocNodeInterval)
ticker := time.NewTicker(allocNodesInterval)
defer ticker.Stop()
nodes := make([]endpoint.KeyspaceGroupMember, 0, replica)
nodes := make([]endpoint.KeyspaceGroupMember, 0, desiredReplicaCount)
err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
kg, err := m.store.LoadKeyspaceGroup(txn, id)
if err != nil {
Expand All @@ -628,14 +678,17 @@
exists[member.Address] = struct{}{}
nodes = append(nodes, member)
}
for len(exists) < replica {
if len(exists) >= desiredReplicaCount {
return nil

Check warning on line 682 in pkg/keyspace/tso_keyspace_group.go

View check run for this annotation

Codecov / codecov/patch

pkg/keyspace/tso_keyspace_group.go#L682

Added line #L682 was not covered by tests
}
for len(exists) < desiredReplicaCount {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
}
num := m.GetNodesNum()
if num < replica || num == 0 { // double check
countOfNodes := m.GetNodesCount()
if countOfNodes < desiredReplicaCount || countOfNodes == 0 { // double check
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need countOfNodes == 0 when countOfNodes >= desiredReplicaCount?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider meeting offline node

return ErrNoAvailableNode
}
addr := m.nodesBalancer.Next()
Expand All @@ -656,3 +709,35 @@
}
return nodes, nil
}

// SetNodesForKeyspaceGroup sets the nodes for the keyspace group.
func (m *GroupManager) SetNodesForKeyspaceGroup(id uint32, nodes []string) error {
m.Lock()
defer m.Unlock()
return m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
kg, err := m.store.LoadKeyspaceGroup(txn, id)
if err != nil {
return err

Check warning on line 720 in pkg/keyspace/tso_keyspace_group.go

View check run for this annotation

Codecov / codecov/patch

pkg/keyspace/tso_keyspace_group.go#L720

Added line #L720 was not covered by tests
}
if kg == nil {
return ErrKeyspaceGroupNotExists

Check warning on line 723 in pkg/keyspace/tso_keyspace_group.go

View check run for this annotation

Codecov / codecov/patch

pkg/keyspace/tso_keyspace_group.go#L723

Added line #L723 was not covered by tests
}
members := make([]endpoint.KeyspaceGroupMember, 0, len(nodes))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel here we have more work to do in the future, such as sanity check and service availability check for the nodes switched to before actually refreshing the member list. Please ignore this comment for now.

for _, node := range nodes {
members = append(members, endpoint.KeyspaceGroupMember{Address: node})
}
kg.Members = members
return m.store.SaveKeyspaceGroup(txn, kg)
})
}

// IsExistNode checks if the node exists.
func (m *GroupManager) IsExistNode(addr string) bool {
nodes := m.nodesBalancer.GetAll()
for _, node := range nodes {
if node == addr {
return true
}
}
return false
}
3 changes: 3 additions & 0 deletions pkg/mcs/tso/server/server.go
Expand Up @@ -219,6 +219,9 @@
log.Error("failed to get election member", errs.ZapError(err))
return false
}
if member == nil {
return false

Check warning on line 223 in pkg/mcs/tso/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/tso/server/server.go#L223

Added line #L223 was not covered by tests
}
return member.IsLeader()
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/mcs/utils/constant.go
Expand Up @@ -63,4 +63,7 @@ const (
// MaxKeyspaceGroupCountInUse is a much more reasonable value of the max count in the
// foreseen future, and the former is just for extensibility in theory.
MaxKeyspaceGroupCountInUse = uint32(4096)

// KeyspaceGroupDefaultReplicaCount is the default replica count of keyspace group.
KeyspaceGroupDefaultReplicaCount = 2
)
3 changes: 3 additions & 0 deletions pkg/tso/keyspace_group_manager.go
Expand Up @@ -728,6 +728,9 @@
if err != nil {
return nil, err
}
if am == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When will this happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check if it is primary after kgm.deleteKeyspaceGroup with default keyspace group

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean delete the default keyspace group?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the default keyspace group no longer contain this member

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it actually goes here, it will panic I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can be removed after @binshi-bing has fixed

return nil, nil

Check warning on line 732 in pkg/tso/keyspace_group_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/tso/keyspace_group_manager.go#L732

Added line #L732 was not covered by tests
}
return am.GetMember(), nil
}

Expand Down
65 changes: 57 additions & 8 deletions server/apiv2/handlers/tso_keyspace_group.go
Expand Up @@ -35,7 +35,8 @@
router.GET("", GetKeyspaceGroups)
router.GET("/:id", GetKeyspaceGroupByID)
router.DELETE("/:id", DeleteKeyspaceGroupByID)
router.POST("/:id/alloc", AllocNodeForKeyspaceGroup)
router.POST("/:id/alloc", AllocNodesForKeyspaceGroup)
router.POST("/:id/nodes", SetNodesForKeyspaceGroup)
router.POST("/:id/split", SplitKeyspaceGroupByID)
router.DELETE("/:id/split", FinishSplitKeyspaceByID)
}
Expand Down Expand Up @@ -190,28 +191,28 @@
c.JSON(http.StatusOK, nil)
}

// AllocNodeForKeyspaceGroupParams defines the params for allocating nodes for keyspace groups.
type AllocNodeForKeyspaceGroupParams struct {
// AllocNodesForKeyspaceGroupParams defines the params for allocating nodes for keyspace groups.
type AllocNodesForKeyspaceGroupParams struct {
Replica int `json:"replica"`
}

// AllocNodeForKeyspaceGroup allocates nodes for keyspace group.
func AllocNodeForKeyspaceGroup(c *gin.Context) {
// AllocNodesForKeyspaceGroup allocates nodes for keyspace group.
func AllocNodesForKeyspaceGroup(c *gin.Context) {
id, err := validateKeyspaceGroupID(c)
if err != nil {
c.AbortWithStatusJSON(http.StatusBadRequest, "invalid keyspace group id")
return
}
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
manager := svr.GetKeyspaceGroupManager()
allocParams := &AllocNodeForKeyspaceGroupParams{}
allocParams := &AllocNodesForKeyspaceGroupParams{}
err = c.BindJSON(allocParams)
if err != nil {
c.AbortWithStatusJSON(http.StatusBadRequest, errs.ErrBindJSON.Wrap(err).GenWithStackByCause())
return
}
if manager.GetNodesNum() < allocParams.Replica || allocParams.Replica < 1 {
c.AbortWithStatusJSON(http.StatusBadRequest, "invalid replica, should be in [1, nodes_num]")
if manager.GetNodesCount() < allocParams.Replica || allocParams.Replica < utils.KeyspaceGroupDefaultReplicaCount {
c.AbortWithStatusJSON(http.StatusBadRequest, "invalid replica, should be in [2, nodes_num]")
return
}
keyspaceGroup, err := manager.GetKeyspaceGroupByID(id)
Expand All @@ -232,6 +233,54 @@
c.JSON(http.StatusOK, nodes)
}

// SetNodesForKeyspaceGroupParams defines the params for setting nodes for keyspace groups.
// Notes: it should be used carefully.
type SetNodesForKeyspaceGroupParams struct {
Nodes []string `json:"nodes"`
}

// SetNodesForKeyspaceGroup sets nodes for keyspace group.
func SetNodesForKeyspaceGroup(c *gin.Context) {
id, err := validateKeyspaceGroupID(c)
if err != nil {
c.AbortWithStatusJSON(http.StatusBadRequest, "invalid keyspace group id")
return

Check warning on line 247 in server/apiv2/handlers/tso_keyspace_group.go

View check run for this annotation

Codecov / codecov/patch

server/apiv2/handlers/tso_keyspace_group.go#L246-L247

Added lines #L246 - L247 were not covered by tests
}
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
manager := svr.GetKeyspaceGroupManager()
setParams := &SetNodesForKeyspaceGroupParams{}
err = c.BindJSON(setParams)
if err != nil {
c.AbortWithStatusJSON(http.StatusBadRequest, errs.ErrBindJSON.Wrap(err).GenWithStackByCause())
return

Check warning on line 255 in server/apiv2/handlers/tso_keyspace_group.go

View check run for this annotation

Codecov / codecov/patch

server/apiv2/handlers/tso_keyspace_group.go#L254-L255

Added lines #L254 - L255 were not covered by tests
}
// check if keyspace group exists
keyspaceGroup, err := manager.GetKeyspaceGroupByID(id)
if err != nil || keyspaceGroup == nil {
c.AbortWithStatusJSON(http.StatusBadRequest, "keyspace group does not exist")
return
}
// check if nodes is less than default replica count
if len(setParams.Nodes) < utils.KeyspaceGroupDefaultReplicaCount {
c.AbortWithStatusJSON(http.StatusBadRequest, "invalid num of nodes")
return
}
// check if node exists
for _, node := range setParams.Nodes {
if !manager.IsExistNode(node) {
c.AbortWithStatusJSON(http.StatusBadRequest, "node does not exist")
return
}
}
// set nodes
err = manager.SetNodesForKeyspaceGroup(id, setParams.Nodes)
if err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return

Check warning on line 279 in server/apiv2/handlers/tso_keyspace_group.go

View check run for this annotation

Codecov / codecov/patch

server/apiv2/handlers/tso_keyspace_group.go#L278-L279

Added lines #L278 - L279 were not covered by tests
}
c.JSON(http.StatusOK, nil)
}

func validateKeyspaceGroupID(c *gin.Context) (uint32, error) {
id, err := strconv.ParseUint(c.Param("id"), 10, 64)
if err != nil {
Expand Down