Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

keyspace: add priority of tso node for the keyspace group #6602

Merged
merged 6 commits into from
Jun 19, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 54 additions & 6 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@
case <-ticker.C:
}
countOfNodes := m.GetNodesCount()
if countOfNodes < utils.KeyspaceGroupDefaultReplicaCount {
if countOfNodes < utils.DefaultKeyspaceGroupReplicaCount {
continue
}
groups, err := m.store.LoadKeyspaceGroups(utils.DefaultKeyspaceGroupID, 0)
Expand All @@ -187,8 +187,8 @@
}
withError := false
for _, group := range groups {
if len(group.Members) < utils.KeyspaceGroupDefaultReplicaCount {
nodes, err := m.AllocNodesForKeyspaceGroup(group.ID, utils.KeyspaceGroupDefaultReplicaCount)
if len(group.Members) < utils.DefaultKeyspaceGroupReplicaCount {
nodes, err := m.AllocNodesForKeyspaceGroup(group.ID, utils.DefaultKeyspaceGroupReplicaCount)
if err != nil {
withError = true
log.Error("failed to alloc nodes for keyspace group", zap.Uint32("keyspace-group-id", group.ID), zap.Error(err))
Expand Down Expand Up @@ -531,7 +531,7 @@
return ErrKeyspaceGroupInMerging
}
// Check if the source keyspace group has enough replicas.
if len(splitSourceKg.Members) < utils.KeyspaceGroupDefaultReplicaCount {
if len(splitSourceKg.Members) < utils.DefaultKeyspaceGroupReplicaCount {
return ErrKeyspaceGroupNotEnoughReplicas
}
// Check if the new keyspace group already exists.
Expand Down Expand Up @@ -702,7 +702,10 @@
continue
}
exists[addr] = struct{}{}
nodes = append(nodes, endpoint.KeyspaceGroupMember{Address: addr})
nodes = append(nodes, endpoint.KeyspaceGroupMember{
Address: addr,
Priority: utils.DefaultKeyspaceGroupReplicaPriority,
})
}
kg.Members = nodes
return m.store.SaveKeyspaceGroup(txn, kg)
Expand Down Expand Up @@ -737,7 +740,52 @@
}
members := make([]endpoint.KeyspaceGroupMember, 0, len(nodes))
for _, node := range nodes {
members = append(members, endpoint.KeyspaceGroupMember{Address: node})
members = append(members, endpoint.KeyspaceGroupMember{
Address: node,
Priority: utils.DefaultKeyspaceGroupReplicaPriority,
})
}
kg.Members = members
return m.store.SaveKeyspaceGroup(txn, kg)
})
if err != nil {
return err

Check warning on line 752 in pkg/keyspace/tso_keyspace_group.go

View check run for this annotation

Codecov / codecov/patch

pkg/keyspace/tso_keyspace_group.go#L752

Added line #L752 was not covered by tests
}
m.groups[endpoint.StringUserKind(kg.UserKind)].Put(kg)
return nil
}

// SetPriorityForKeyspaceGroup sets the priority of node for the keyspace group.
func (m *GroupManager) SetPriorityForKeyspaceGroup(id uint32, node string, priority int) error {
m.Lock()
defer m.Unlock()
var kg *endpoint.KeyspaceGroup
err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) error {
var err error
kg, err = m.store.LoadKeyspaceGroup(txn, id)
if err != nil {
return err

Check warning on line 767 in pkg/keyspace/tso_keyspace_group.go

View check run for this annotation

Codecov / codecov/patch

pkg/keyspace/tso_keyspace_group.go#L767

Added line #L767 was not covered by tests
}
if kg == nil {
return ErrKeyspaceGroupNotExists

Check warning on line 770 in pkg/keyspace/tso_keyspace_group.go

View check run for this annotation

Codecov / codecov/patch

pkg/keyspace/tso_keyspace_group.go#L770

Added line #L770 was not covered by tests
}
if kg.IsSplitting() {
return ErrKeyspaceGroupInSplit

Check warning on line 773 in pkg/keyspace/tso_keyspace_group.go

View check run for this annotation

Codecov / codecov/patch

pkg/keyspace/tso_keyspace_group.go#L773

Added line #L773 was not covered by tests
}
if kg.IsMerging() {
return ErrKeyspaceGroupInMerging

Check warning on line 776 in pkg/keyspace/tso_keyspace_group.go

View check run for this annotation

Codecov / codecov/patch

pkg/keyspace/tso_keyspace_group.go#L776

Added line #L776 was not covered by tests
}
inKeyspaceGroup := false
members := make([]endpoint.KeyspaceGroupMember, 0, len(kg.Members))
for _, member := range kg.Members {
if member.Address == node {
inKeyspaceGroup = true
member.Priority = priority
}
members = append(members, member)
}
if !inKeyspaceGroup {
return ErrNodeNotInKeyspaceGroup
}
kg.Members = members
return m.store.SaveKeyspaceGroup(txn, kg)
Expand Down
4 changes: 2 additions & 2 deletions pkg/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplit() {
ID: uint32(2),
UserKind: endpoint.Standard.String(),
Keyspaces: []uint32{111, 222, 333},
Members: make([]endpoint.KeyspaceGroupMember, utils.KeyspaceGroupDefaultReplicaCount),
Members: make([]endpoint.KeyspaceGroupMember, utils.DefaultKeyspaceGroupReplicaCount),
},
}
err := suite.kgm.CreateKeyspaceGroups(keyspaceGroups)
Expand Down Expand Up @@ -330,7 +330,7 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupMerge() {
ID: uint32(1),
UserKind: endpoint.Basic.String(),
Keyspaces: []uint32{111, 222, 333},
Members: make([]endpoint.KeyspaceGroupMember, utils.KeyspaceGroupDefaultReplicaCount),
Members: make([]endpoint.KeyspaceGroupMember, utils.DefaultKeyspaceGroupReplicaCount),
},
{
ID: uint32(3),
Expand Down
2 changes: 2 additions & 0 deletions pkg/keyspace/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ var (
ErrKeyspaceGroupNotInMerging = errors.New("keyspace group is not in merging state")
// ErrKeyspaceNotInKeyspaceGroup is used to indicate target keyspace is not in this keyspace group.
ErrKeyspaceNotInKeyspaceGroup = errors.New("keyspace is not in this keyspace group")
// ErrNodeNotInKeyspaceGroup is used to indicate the tso node is not in this keyspace group.
ErrNodeNotInKeyspaceGroup = errors.New("the tso node is not in this keyspace group")
// ErrKeyspaceGroupNotEnoughReplicas is used to indicate not enough replicas in the keyspace group.
ErrKeyspaceGroupNotEnoughReplicas = errors.New("not enough replicas in the keyspace group")
// ErrNoAvailableNode is used to indicate no available node in the keyspace group.
Expand Down
7 changes: 5 additions & 2 deletions pkg/mcs/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ const (
// foreseen future, and the former is just for extensibility in theory.
MaxKeyspaceGroupCountInUse = uint32(4096)

// KeyspaceGroupDefaultReplicaCount is the default replica count of keyspace group.
KeyspaceGroupDefaultReplicaCount = 2
// DefaultKeyspaceGroupReplicaCount is the default replica count of keyspace group.
DefaultKeyspaceGroupReplicaCount = 2

// DefaultKeyspaceGroupReplicaPriority is the default priority of a keyspace group replica.
DefaultKeyspaceGroupReplicaPriority = 100
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just use 0 as the default priority? If we don't set the priority, it is empty which is 0 as the value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

)
6 changes: 5 additions & 1 deletion pkg/storage/endpoint/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,12 @@ func IsUserKindValid(kind string) bool {
}

// KeyspaceGroupMember defines an election member which campaigns for the primary of the keyspace group.
// Its `Priority` is used in keyspace group primary weighted-election to balance primaries' distribution.
// Among multiple replicas of a keyspace group, the higher the priority, the more likely
// the replica is to be elected as primary.
type KeyspaceGroupMember struct {
Address string `json:"address"`
Address string `json:"address"`
Priority int `json:"priority"`
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
}

// SplitState defines the split state of a keyspace group.
Expand Down
23 changes: 16 additions & 7 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,11 @@
if !defaultKGConfigured {
log.Info("initializing default keyspace group")
group := &endpoint.KeyspaceGroup{
ID: mcsutils.DefaultKeyspaceGroupID,
Members: []endpoint.KeyspaceGroupMember{{Address: kgm.tsoServiceID.ServiceAddr}},
ID: mcsutils.DefaultKeyspaceGroupID,
Members: []endpoint.KeyspaceGroupMember{{
Address: kgm.tsoServiceID.ServiceAddr,
Priority: mcsutils.DefaultKeyspaceGroupReplicaPriority,
}},
Keyspaces: []uint32{mcsutils.DefaultKeyspaceID},
}
kgm.updateKeyspaceGroup(group)
Expand Down Expand Up @@ -400,7 +403,10 @@
// If the default keyspace group isn't assigned to any tso node/pod, assign it to everyone.
if group.ID == mcsutils.DefaultKeyspaceGroupID && len(group.Members) == 0 {
// TODO: fill members with all tso nodes/pods.
group.Members = []endpoint.KeyspaceGroupMember{{Address: kgm.tsoServiceID.ServiceAddr}}
group.Members = []endpoint.KeyspaceGroupMember{{
Address: kgm.tsoServiceID.ServiceAddr,
Priority: mcsutils.DefaultKeyspaceGroupReplicaPriority,
}}
}

if !kgm.isAssignedToMe(group) {
Expand Down Expand Up @@ -493,12 +499,12 @@
// could not be modified during the split process, so we can only check the
// member count of the source group here.
memberCount := len(sourceGroup.Members)
if memberCount < mcsutils.KeyspaceGroupDefaultReplicaCount {
if memberCount < mcsutils.DefaultKeyspaceGroupReplicaCount {
log.Error("the split source keyspace group does not have enough members",
zap.Uint32("target", targetGroup.ID),
zap.Uint32("source", splitSourceID),
zap.Int("member-count", memberCount),
zap.Int("replica-count", mcsutils.KeyspaceGroupDefaultReplicaCount))
zap.Int("replica-count", mcsutils.DefaultKeyspaceGroupReplicaCount))

Check warning on line 507 in pkg/tso/keyspace_group_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/tso/keyspace_group_manager.go#L507

Added line #L507 was not covered by tests
return false
}
return true
Expand Down Expand Up @@ -611,8 +617,11 @@
log.Info("removed default keyspace group meta config from the storage. " +
"now every tso node/pod will initialize it")
group := &endpoint.KeyspaceGroup{
ID: mcsutils.DefaultKeyspaceGroupID,
Members: []endpoint.KeyspaceGroupMember{{Address: kgm.tsoServiceID.ServiceAddr}},
ID: mcsutils.DefaultKeyspaceGroupID,
Members: []endpoint.KeyspaceGroupMember{{
Address: kgm.tsoServiceID.ServiceAddr,
Priority: mcsutils.DefaultKeyspaceGroupReplicaPriority,
}},
Keyspaces: []uint32{mcsutils.DefaultKeyspaceID},
}
kgm.updateKeyspaceGroup(group)
Expand Down
55 changes: 52 additions & 3 deletions server/apiv2/handlers/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"github.com/pkg/errors"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/server"
"github.com/tikv/pd/server/apiv2/middlewares"
Expand All @@ -40,6 +41,7 @@
router.DELETE("/:id", DeleteKeyspaceGroupByID)
router.POST("/:id/alloc", AllocNodesForKeyspaceGroup)
router.POST("/:id/nodes", SetNodesForKeyspaceGroup)
router.POST("/:id/priority", SetPriorityForKeyspaceGroup)
router.POST("/:id/split", SplitKeyspaceGroupByID)
router.DELETE("/:id/split", FinishSplitKeyspaceByID)
router.POST("/:id/merge", MergeKeyspaceGroups)
Expand Down Expand Up @@ -325,7 +327,7 @@
c.AbortWithStatusJSON(http.StatusBadRequest, errs.ErrBindJSON.Wrap(err).GenWithStackByCause())
return
}
if manager.GetNodesCount() < allocParams.Replica || allocParams.Replica < utils.KeyspaceGroupDefaultReplicaCount {
if manager.GetNodesCount() < allocParams.Replica || allocParams.Replica < utils.DefaultKeyspaceGroupReplicaCount {
c.AbortWithStatusJSON(http.StatusBadRequest, "invalid replica, should be in [2, nodes_num]")
return
}
Expand All @@ -347,7 +349,7 @@
c.JSON(http.StatusOK, nodes)
}

// SetNodesForKeyspaceGroupParams defines the params for setting nodes for keyspace groups.
// SetNodesForKeyspaceGroupParams defines the params for setting nodes for keyspace group.
// Notes: it should be used carefully.
type SetNodesForKeyspaceGroupParams struct {
Nodes []string `json:"nodes"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also support set priorities in SetNodesForKeyspaceGroup API? if priorities []int is null, then set default priority for every node in the Nodes.

Suggested change
Nodes []string `json:"nodes"`
Nodes []string `json:"nodes"`
Priorities []int `json:"priorities"`

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to support it in the next pr, it's ok.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now we can support "set-node <keyspace_group_id> <tso_node_addr> [<tso_node_addr>...]" and "set-priority <keyspace_group_id> <tso_node_addr> " in pd-ctl, if only run "set-node", it means to set default priority

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we merge these interfaces, it will be not convenient in pd-ctl

Expand Down Expand Up @@ -379,7 +381,7 @@
return
}
// check if nodes is less than default replica count
if len(setParams.Nodes) < utils.KeyspaceGroupDefaultReplicaCount {
if len(setParams.Nodes) < utils.DefaultKeyspaceGroupReplicaCount {
c.AbortWithStatusJSON(http.StatusBadRequest, "invalid num of nodes")
return
}
Expand All @@ -399,6 +401,53 @@
c.JSON(http.StatusOK, nil)
}

// SetPriorityForKeyspaceGroupParams defines the params for setting priority of tso node for the keyspace group.
type SetPriorityForKeyspaceGroupParams struct {
Node string `json:"node"`
Priority int `json:"priority"`
}

// SetPriorityForKeyspaceGroup sets priority of tso node for the keyspace group.
func SetPriorityForKeyspaceGroup(c *gin.Context) {
id, err := validateKeyspaceGroupID(c)
if err != nil {
c.AbortWithStatusJSON(http.StatusBadRequest, "invalid keyspace group id")
return

Check warning on line 415 in server/apiv2/handlers/tso_keyspace_group.go

View check run for this annotation

Codecov / codecov/patch

server/apiv2/handlers/tso_keyspace_group.go#L414-L415

Added lines #L414 - L415 were not covered by tests
}
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
manager := svr.GetKeyspaceGroupManager()
if manager == nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr)
return

Check warning on line 421 in server/apiv2/handlers/tso_keyspace_group.go

View check run for this annotation

Codecov / codecov/patch

server/apiv2/handlers/tso_keyspace_group.go#L420-L421

Added lines #L420 - L421 were not covered by tests
}
setParams := &SetPriorityForKeyspaceGroupParams{}
err = c.BindJSON(setParams)
if err != nil {
c.AbortWithStatusJSON(http.StatusBadRequest, errs.ErrBindJSON.Wrap(err).GenWithStackByCause())
return

Check warning on line 427 in server/apiv2/handlers/tso_keyspace_group.go

View check run for this annotation

Codecov / codecov/patch

server/apiv2/handlers/tso_keyspace_group.go#L426-L427

Added lines #L426 - L427 were not covered by tests
}
// check if keyspace group exists
kg, err := manager.GetKeyspaceGroupByID(id)
if err != nil || kg == nil {
c.AbortWithStatusJSON(http.StatusBadRequest, "keyspace group does not exist")
return

Check warning on line 433 in server/apiv2/handlers/tso_keyspace_group.go

View check run for this annotation

Codecov / codecov/patch

server/apiv2/handlers/tso_keyspace_group.go#L432-L433

Added lines #L432 - L433 were not covered by tests
}
// check if node exists
members := kg.Members
if slice.NoneOf(members, func(i int) bool {
return members[i].Address == setParams.Node
}) {
c.AbortWithStatusJSON(http.StatusBadRequest, "tso node does not exist in the keyspace group")
}
// set priority
err = manager.SetPriorityForKeyspaceGroup(id, setParams.Node, setParams.Priority)
if err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
}
c.JSON(http.StatusOK, nil)
}

func validateKeyspaceGroupID(c *gin.Context) (uint32, error) {
id, err := strconv.ParseUint(c.Param("id"), 10, 64)
if err != nil {
Expand Down
Loading
Loading