Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

keyspace: add priority of tso node for the keyspace group #6602

Merged
merged 6 commits into from
Jun 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 54 additions & 6 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) {
case <-ticker.C:
}
countOfNodes := m.GetNodesCount()
if countOfNodes < utils.KeyspaceGroupDefaultReplicaCount {
if countOfNodes < utils.DefaultKeyspaceGroupReplicaCount {
continue
}
groups, err := m.store.LoadKeyspaceGroups(utils.DefaultKeyspaceGroupID, 0)
Expand All @@ -187,8 +187,8 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) {
}
withError := false
for _, group := range groups {
if len(group.Members) < utils.KeyspaceGroupDefaultReplicaCount {
nodes, err := m.AllocNodesForKeyspaceGroup(group.ID, utils.KeyspaceGroupDefaultReplicaCount)
if len(group.Members) < utils.DefaultKeyspaceGroupReplicaCount {
nodes, err := m.AllocNodesForKeyspaceGroup(group.ID, utils.DefaultKeyspaceGroupReplicaCount)
if err != nil {
withError = true
log.Error("failed to alloc nodes for keyspace group", zap.Uint32("keyspace-group-id", group.ID), zap.Error(err))
Expand Down Expand Up @@ -531,7 +531,7 @@ func (m *GroupManager) SplitKeyspaceGroupByID(splitSourceID, splitTargetID uint3
return ErrKeyspaceGroupInMerging
}
// Check if the source keyspace group has enough replicas.
if len(splitSourceKg.Members) < utils.KeyspaceGroupDefaultReplicaCount {
if len(splitSourceKg.Members) < utils.DefaultKeyspaceGroupReplicaCount {
return ErrKeyspaceGroupNotEnoughReplicas
}
// Check if the new keyspace group already exists.
Expand Down Expand Up @@ -702,7 +702,10 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, desiredReplicaCount
continue
}
exists[addr] = struct{}{}
nodes = append(nodes, endpoint.KeyspaceGroupMember{Address: addr})
nodes = append(nodes, endpoint.KeyspaceGroupMember{
Address: addr,
Priority: utils.DefaultKeyspaceGroupReplicaPriority,
})
}
kg.Members = nodes
return m.store.SaveKeyspaceGroup(txn, kg)
Expand Down Expand Up @@ -737,7 +740,52 @@ func (m *GroupManager) SetNodesForKeyspaceGroup(id uint32, nodes []string) error
}
members := make([]endpoint.KeyspaceGroupMember, 0, len(nodes))
for _, node := range nodes {
members = append(members, endpoint.KeyspaceGroupMember{Address: node})
members = append(members, endpoint.KeyspaceGroupMember{
Address: node,
Priority: utils.DefaultKeyspaceGroupReplicaPriority,
})
}
kg.Members = members
return m.store.SaveKeyspaceGroup(txn, kg)
})
if err != nil {
return err
}
m.groups[endpoint.StringUserKind(kg.UserKind)].Put(kg)
return nil
}

// SetPriorityForKeyspaceGroup sets the priority of node for the keyspace group.
func (m *GroupManager) SetPriorityForKeyspaceGroup(id uint32, node string, priority int) error {
m.Lock()
defer m.Unlock()
var kg *endpoint.KeyspaceGroup
err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
var err error
kg, err = m.store.LoadKeyspaceGroup(txn, id)
if err != nil {
return err
}
if kg == nil {
return ErrKeyspaceGroupNotExists
}
if kg.IsSplitting() {
return ErrKeyspaceGroupInSplit
}
if kg.IsMerging() {
return ErrKeyspaceGroupInMerging
}
inKeyspaceGroup := false
members := make([]endpoint.KeyspaceGroupMember, 0, len(kg.Members))
for _, member := range kg.Members {
if member.Address == node {
inKeyspaceGroup = true
member.Priority = priority
}
members = append(members, member)
}
if !inKeyspaceGroup {
return ErrNodeNotInKeyspaceGroup
}
kg.Members = members
return m.store.SaveKeyspaceGroup(txn, kg)
Expand Down
4 changes: 2 additions & 2 deletions pkg/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplit() {
ID: uint32(2),
UserKind: endpoint.Standard.String(),
Keyspaces: []uint32{111, 222, 333},
Members: make([]endpoint.KeyspaceGroupMember, utils.KeyspaceGroupDefaultReplicaCount),
Members: make([]endpoint.KeyspaceGroupMember, utils.DefaultKeyspaceGroupReplicaCount),
},
}
err := suite.kgm.CreateKeyspaceGroups(keyspaceGroups)
Expand Down Expand Up @@ -330,7 +330,7 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupMerge() {
ID: uint32(1),
UserKind: endpoint.Basic.String(),
Keyspaces: []uint32{111, 222, 333},
Members: make([]endpoint.KeyspaceGroupMember, utils.KeyspaceGroupDefaultReplicaCount),
Members: make([]endpoint.KeyspaceGroupMember, utils.DefaultKeyspaceGroupReplicaCount),
},
{
ID: uint32(3),
Expand Down
2 changes: 2 additions & 0 deletions pkg/keyspace/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ var (
ErrKeyspaceGroupNotInMerging = errors.New("keyspace group is not in merging state")
// ErrKeyspaceNotInKeyspaceGroup is used to indicate target keyspace is not in this keyspace group.
ErrKeyspaceNotInKeyspaceGroup = errors.New("keyspace is not in this keyspace group")
// ErrNodeNotInKeyspaceGroup is used to indicate the tso node is not in this keyspace group.
ErrNodeNotInKeyspaceGroup = errors.New("the tso node is not in this keyspace group")
// ErrKeyspaceGroupNotEnoughReplicas is used to indicate not enough replicas in the keyspace group.
ErrKeyspaceGroupNotEnoughReplicas = errors.New("not enough replicas in the keyspace group")
// ErrModifyDefaultKeyspaceGroup is used to indicate that default keyspace group cannot be modified.
Expand Down
7 changes: 5 additions & 2 deletions pkg/mcs/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ const (
// foreseen future, and the former is just for extensibility in theory.
MaxKeyspaceGroupCountInUse = uint32(4096)

// KeyspaceGroupDefaultReplicaCount is the default replica count of keyspace group.
KeyspaceGroupDefaultReplicaCount = 2
// DefaultKeyspaceGroupReplicaCount is the default replica count of keyspace group.
DefaultKeyspaceGroupReplicaCount = 2

// DefaultKeyspaceGroupReplicaPriority is the default priority of a keyspace group replica.
DefaultKeyspaceGroupReplicaPriority = 0
)
6 changes: 5 additions & 1 deletion pkg/storage/endpoint/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,12 @@ func IsUserKindValid(kind string) bool {
}

// KeyspaceGroupMember defines an election member which campaigns for the primary of the keyspace group.
// Its `Priority` is used in keyspace group primary weighted-election to balance primaries' distribution.
// Among multiple replicas of a keyspace group, the higher the priority, the more likely
// the replica is to be elected as primary.
type KeyspaceGroupMember struct {
Address string `json:"address"`
Address string `json:"address"`
Priority int `json:"priority"`
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
}

// SplitState defines the split state of a keyspace group.
Expand Down
23 changes: 16 additions & 7 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,11 @@ func (kgm *KeyspaceGroupManager) Initialize() error {
if !defaultKGConfigured {
log.Info("initializing default keyspace group")
group := &endpoint.KeyspaceGroup{
ID: mcsutils.DefaultKeyspaceGroupID,
Members: []endpoint.KeyspaceGroupMember{{Address: kgm.tsoServiceID.ServiceAddr}},
ID: mcsutils.DefaultKeyspaceGroupID,
Members: []endpoint.KeyspaceGroupMember{{
Address: kgm.tsoServiceID.ServiceAddr,
Priority: mcsutils.DefaultKeyspaceGroupReplicaPriority,
}},
Keyspaces: []uint32{mcsutils.DefaultKeyspaceID},
}
kgm.updateKeyspaceGroup(group)
Expand Down Expand Up @@ -400,7 +403,10 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro
// If the default keyspace group isn't assigned to any tso node/pod, assign it to everyone.
if group.ID == mcsutils.DefaultKeyspaceGroupID && len(group.Members) == 0 {
// TODO: fill members with all tso nodes/pods.
group.Members = []endpoint.KeyspaceGroupMember{{Address: kgm.tsoServiceID.ServiceAddr}}
group.Members = []endpoint.KeyspaceGroupMember{{
Address: kgm.tsoServiceID.ServiceAddr,
Priority: mcsutils.DefaultKeyspaceGroupReplicaPriority,
}}
}

if !kgm.isAssignedToMe(group) {
Expand Down Expand Up @@ -493,12 +499,12 @@ func validateSplit(
// could not be modified during the split process, so we can only check the
// member count of the source group here.
memberCount := len(sourceGroup.Members)
if memberCount < mcsutils.KeyspaceGroupDefaultReplicaCount {
if memberCount < mcsutils.DefaultKeyspaceGroupReplicaCount {
log.Error("the split source keyspace group does not have enough members",
zap.Uint32("target", targetGroup.ID),
zap.Uint32("source", splitSourceID),
zap.Int("member-count", memberCount),
zap.Int("replica-count", mcsutils.KeyspaceGroupDefaultReplicaCount))
zap.Int("replica-count", mcsutils.DefaultKeyspaceGroupReplicaCount))
return false
}
return true
Expand Down Expand Up @@ -611,8 +617,11 @@ func (kgm *KeyspaceGroupManager) deleteKeyspaceGroup(groupID uint32) {
log.Info("removed default keyspace group meta config from the storage. " +
"now every tso node/pod will initialize it")
group := &endpoint.KeyspaceGroup{
ID: mcsutils.DefaultKeyspaceGroupID,
Members: []endpoint.KeyspaceGroupMember{{Address: kgm.tsoServiceID.ServiceAddr}},
ID: mcsutils.DefaultKeyspaceGroupID,
Members: []endpoint.KeyspaceGroupMember{{
Address: kgm.tsoServiceID.ServiceAddr,
Priority: mcsutils.DefaultKeyspaceGroupReplicaPriority,
}},
Keyspaces: []uint32{mcsutils.DefaultKeyspaceID},
}
kgm.updateKeyspaceGroup(group)
Expand Down
55 changes: 52 additions & 3 deletions server/apiv2/handlers/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pkg/errors"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/server"
"github.com/tikv/pd/server/apiv2/middlewares"
Expand All @@ -40,6 +41,7 @@ func RegisterTSOKeyspaceGroup(r *gin.RouterGroup) {
router.DELETE("/:id", DeleteKeyspaceGroupByID)
router.POST("/:id/alloc", AllocNodesForKeyspaceGroup)
router.POST("/:id/nodes", SetNodesForKeyspaceGroup)
router.POST("/:id/priority", SetPriorityForKeyspaceGroup)
router.POST("/:id/split", SplitKeyspaceGroupByID)
router.DELETE("/:id/split", FinishSplitKeyspaceByID)
router.POST("/:id/merge", MergeKeyspaceGroups)
Expand Down Expand Up @@ -325,7 +327,7 @@ func AllocNodesForKeyspaceGroup(c *gin.Context) {
c.AbortWithStatusJSON(http.StatusBadRequest, errs.ErrBindJSON.Wrap(err).GenWithStackByCause())
return
}
if manager.GetNodesCount() < allocParams.Replica || allocParams.Replica < utils.KeyspaceGroupDefaultReplicaCount {
if manager.GetNodesCount() < allocParams.Replica || allocParams.Replica < utils.DefaultKeyspaceGroupReplicaCount {
c.AbortWithStatusJSON(http.StatusBadRequest, "invalid replica, should be in [2, nodes_num]")
return
}
Expand All @@ -347,7 +349,7 @@ func AllocNodesForKeyspaceGroup(c *gin.Context) {
c.JSON(http.StatusOK, nodes)
}

// SetNodesForKeyspaceGroupParams defines the params for setting nodes for keyspace groups.
// SetNodesForKeyspaceGroupParams defines the params for setting nodes for keyspace group.
// Notes: it should be used carefully.
type SetNodesForKeyspaceGroupParams struct {
Nodes []string `json:"nodes"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also support set priorities in SetNodesForKeyspaceGroup API? if priorities []int is null, then set default priority for every node in the Nodes.

Suggested change
Nodes []string `json:"nodes"`
Nodes []string `json:"nodes"`
Priorities []int `json:"priorities"`

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to support it in the next pr, it's ok.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now we can support "set-node <keyspace_group_id> <tso_node_addr> [<tso_node_addr>...]" and "set-priority <keyspace_group_id> <tso_node_addr> " in pd-ctl, if only run "set-node", it means to set default priority

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we merge these interfaces, it will be not convenient in pd-ctl

Expand Down Expand Up @@ -379,7 +381,7 @@ func SetNodesForKeyspaceGroup(c *gin.Context) {
return
}
// check if nodes is less than default replica count
if len(setParams.Nodes) < utils.KeyspaceGroupDefaultReplicaCount {
if len(setParams.Nodes) < utils.DefaultKeyspaceGroupReplicaCount {
c.AbortWithStatusJSON(http.StatusBadRequest, "invalid num of nodes")
return
}
Expand All @@ -399,6 +401,53 @@ func SetNodesForKeyspaceGroup(c *gin.Context) {
c.JSON(http.StatusOK, nil)
}

// SetPriorityForKeyspaceGroupParams defines the params for setting priority of tso node for the keyspace group.
type SetPriorityForKeyspaceGroupParams struct {
Node string `json:"node"`
Priority int `json:"priority"`
}

// SetPriorityForKeyspaceGroup sets priority of tso node for the keyspace group.
func SetPriorityForKeyspaceGroup(c *gin.Context) {
id, err := validateKeyspaceGroupID(c)
if err != nil {
c.AbortWithStatusJSON(http.StatusBadRequest, "invalid keyspace group id")
return
}
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
manager := svr.GetKeyspaceGroupManager()
if manager == nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr)
return
}
setParams := &SetPriorityForKeyspaceGroupParams{}
err = c.BindJSON(setParams)
if err != nil {
c.AbortWithStatusJSON(http.StatusBadRequest, errs.ErrBindJSON.Wrap(err).GenWithStackByCause())
return
}
// check if keyspace group exists
kg, err := manager.GetKeyspaceGroupByID(id)
if err != nil || kg == nil {
c.AbortWithStatusJSON(http.StatusBadRequest, "keyspace group does not exist")
return
}
// check if node exists
members := kg.Members
if slice.NoneOf(members, func(i int) bool {
return members[i].Address == setParams.Node
}) {
c.AbortWithStatusJSON(http.StatusBadRequest, "tso node does not exist in the keyspace group")
}
// set priority
err = manager.SetPriorityForKeyspaceGroup(id, setParams.Node, setParams.Priority)
if err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
}
c.JSON(http.StatusOK, nil)
}

func validateKeyspaceGroupID(c *gin.Context) (uint32, error) {
id, err := strconv.ParseUint(c.Param("id"), 10, 64)
if err != nil {
Expand Down
Loading