Skip to content

Commit

Permalink
client/http: implement more rule and batch related interfaces (#7430)
Browse files Browse the repository at this point in the history
ref #7300

- Implement more rule and batch related interfaces.
- Add more types and methods.
- Refine the tests.

Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato committed Nov 27, 2023
1 parent 4e9240a commit 58e9b20
Show file tree
Hide file tree
Showing 8 changed files with 319 additions and 61 deletions.
35 changes: 22 additions & 13 deletions client/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,20 @@ import (
// The following constants are the paths of PD HTTP APIs.
const (
// Metadata
HotRead = "/pd/api/v1/hotspot/regions/read"
HotWrite = "/pd/api/v1/hotspot/regions/write"
HotHistory = "/pd/api/v1/hotspot/regions/history"
RegionByIDPrefix = "/pd/api/v1/region/id"
regionByKey = "/pd/api/v1/region/key"
Regions = "/pd/api/v1/regions"
regionsByKey = "/pd/api/v1/regions/key"
RegionsByStoreIDPrefix = "/pd/api/v1/regions/store"
EmptyRegions = "/pd/api/v1/regions/check/empty-region"
AccelerateSchedule = "/pd/api/v1/regions/accelerate-schedule"
store = "/pd/api/v1/store"
Stores = "/pd/api/v1/stores"
StatsRegion = "/pd/api/v1/stats/region"
HotRead = "/pd/api/v1/hotspot/regions/read"
HotWrite = "/pd/api/v1/hotspot/regions/write"
HotHistory = "/pd/api/v1/hotspot/regions/history"
RegionByIDPrefix = "/pd/api/v1/region/id"
regionByKey = "/pd/api/v1/region/key"
Regions = "/pd/api/v1/regions"
regionsByKey = "/pd/api/v1/regions/key"
RegionsByStoreIDPrefix = "/pd/api/v1/regions/store"
EmptyRegions = "/pd/api/v1/regions/check/empty-region"
AccelerateSchedule = "/pd/api/v1/regions/accelerate-schedule"
AccelerateScheduleInBatch = "/pd/api/v1/regions/accelerate-schedule/batch"
store = "/pd/api/v1/store"
Stores = "/pd/api/v1/stores"
StatsRegion = "/pd/api/v1/stats/region"
// Config
Config = "/pd/api/v1/config"
ClusterVersion = "/pd/api/v1/config/cluster-version"
Expand All @@ -44,8 +45,11 @@ const (
// Rule
PlacementRule = "/pd/api/v1/config/rule"
PlacementRules = "/pd/api/v1/config/rules"
PlacementRulesInBatch = "/pd/api/v1/config/rules/batch"
placementRulesByGroup = "/pd/api/v1/config/rules/group"
PlacementRuleBundle = "/pd/api/v1/config/placement-rule"
placementRuleGroup = "/pd/api/v1/config/rule_group"
placementRuleGroups = "/pd/api/v1/config/rule_groups"
RegionLabelRule = "/pd/api/v1/config/region-label/rule"
RegionLabelRules = "/pd/api/v1/config/region-label/rules"
RegionLabelRulesByIDs = "/pd/api/v1/config/region-label/rules/ids"
Expand Down Expand Up @@ -136,6 +140,11 @@ func PlacementRuleBundleWithPartialParameter(partial bool) string {
return fmt.Sprintf("%s?partial=%t", PlacementRuleBundle, partial)
}

// PlacementRuleGroupByID returns the path of PD HTTP API to get placement rule group by ID.
func PlacementRuleGroupByID(id string) string {
return fmt.Sprintf("%s/%s", placementRuleGroup, id)
}

// SchedulerByName returns the scheduler API with the given scheduler name.
func SchedulerByName(name string) string {
return fmt.Sprintf("%s/%s", Schedulers, name)
Expand Down
101 changes: 89 additions & 12 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"crypto/tls"
"encoding/hex"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -46,25 +47,31 @@ type Client interface {
GetRegionByID(context.Context, uint64) (*RegionInfo, error)
GetRegionByKey(context.Context, []byte) (*RegionInfo, error)
GetRegions(context.Context) (*RegionsInfo, error)
GetRegionsByKeyRange(context.Context, []byte, []byte, int) (*RegionsInfo, error)
GetRegionsByKeyRange(context.Context, *KeyRange, int) (*RegionsInfo, error)
GetRegionsByStoreID(context.Context, uint64) (*RegionsInfo, error)
GetHotReadRegions(context.Context) (*StoreHotPeersInfos, error)
GetHotWriteRegions(context.Context) (*StoreHotPeersInfos, error)
GetRegionStatusByKeyRange(context.Context, []byte, []byte) (*RegionStats, error)
GetRegionStatusByKeyRange(context.Context, *KeyRange) (*RegionStats, error)
GetStores(context.Context) (*StoresInfo, error)
/* Rule-related interfaces */
GetAllPlacementRuleBundles(context.Context) ([]*GroupBundle, error)
GetPlacementRuleBundleByGroup(context.Context, string) (*GroupBundle, error)
GetPlacementRulesByGroup(context.Context, string) ([]*Rule, error)
SetPlacementRule(context.Context, *Rule) error
SetPlacementRuleInBatch(context.Context, []*RuleOp) error
SetPlacementRuleBundles(context.Context, []*GroupBundle, bool) error
DeletePlacementRule(context.Context, string, string) error
GetAllPlacementRuleGroups(context.Context) ([]*RuleGroup, error)
GetPlacementRuleGroupByID(context.Context, string) (*RuleGroup, error)
SetPlacementRuleGroup(context.Context, *RuleGroup) error
DeletePlacementRuleGroupByID(context.Context, string) error
GetAllRegionLabelRules(context.Context) ([]*LabelRule, error)
GetRegionLabelRulesByIDs(context.Context, []string) ([]*LabelRule, error)
SetRegionLabelRule(context.Context, *LabelRule) error
PatchRegionLabelRules(context.Context, *LabelRulePatch) error
/* Scheduling-related interfaces */
AccelerateSchedule(context.Context, []byte, []byte) error
AccelerateSchedule(context.Context, *KeyRange) error
AccelerateScheduleInBatch(context.Context, []*KeyRange) error
/* Other interfaces */
GetMinResolvedTSByStoresIDs(context.Context, []uint64) (uint64, map[uint64]uint64, error)

Expand Down Expand Up @@ -308,10 +315,10 @@ func (c *client) GetRegions(ctx context.Context) (*RegionsInfo, error) {
}

// GetRegionsByKeyRange gets the regions info by key range. If the limit is -1, it will return all regions within the range.
func (c *client) GetRegionsByKeyRange(ctx context.Context, startKey, endKey []byte, limit int) (*RegionsInfo, error) {
func (c *client) GetRegionsByKeyRange(ctx context.Context, keyRange *KeyRange, limit int) (*RegionsInfo, error) {
var regions RegionsInfo
err := c.requestWithRetry(ctx,
"GetRegionsByKeyRange", RegionsByKey(startKey, endKey, limit),
"GetRegionsByKeyRange", RegionsByKey(keyRange.StartKey, keyRange.EndKey, limit),
http.MethodGet, http.NoBody, &regions)
if err != nil {
return nil, err
Expand Down Expand Up @@ -356,10 +363,10 @@ func (c *client) GetHotWriteRegions(ctx context.Context) (*StoreHotPeersInfos, e
}

// GetRegionStatusByKeyRange gets the region status by key range.
func (c *client) GetRegionStatusByKeyRange(ctx context.Context, startKey, endKey []byte) (*RegionStats, error) {
func (c *client) GetRegionStatusByKeyRange(ctx context.Context, keyRange *KeyRange) (*RegionStats, error) {
var regionStats RegionStats
err := c.requestWithRetry(ctx,
"GetRegionStatusByKeyRange", RegionStatsByKeyRange(startKey, endKey),
"GetRegionStatusByKeyRange", RegionStatsByKeyRange(keyRange.StartKey, keyRange.StartKey),
http.MethodGet, http.NoBody, &regionStats,
)
if err != nil {
Expand Down Expand Up @@ -427,6 +434,17 @@ func (c *client) SetPlacementRule(ctx context.Context, rule *Rule) error {
http.MethodPost, bytes.NewBuffer(ruleJSON), nil)
}

// SetPlacementRuleInBatch sets the placement rules in batch.
func (c *client) SetPlacementRuleInBatch(ctx context.Context, ruleOps []*RuleOp) error {
ruleOpsJSON, err := json.Marshal(ruleOps)
if err != nil {
return errors.Trace(err)
}
return c.requestWithRetry(ctx,
"SetPlacementRuleInBatch", PlacementRulesInBatch,
http.MethodPost, bytes.NewBuffer(ruleOpsJSON), nil)
}

// SetPlacementRuleBundles sets the placement rule bundles.
// If `partial` is false, all old configurations will be over-written and dropped.
func (c *client) SetPlacementRuleBundles(ctx context.Context, bundles []*GroupBundle, partial bool) error {
Expand All @@ -446,6 +464,48 @@ func (c *client) DeletePlacementRule(ctx context.Context, group, id string) erro
http.MethodDelete, http.NoBody, nil)
}

// GetAllPlacementRuleGroups gets all placement rule groups.
func (c *client) GetAllPlacementRuleGroups(ctx context.Context) ([]*RuleGroup, error) {
var ruleGroups []*RuleGroup
err := c.requestWithRetry(ctx,
"GetAllPlacementRuleGroups", placementRuleGroups,
http.MethodGet, http.NoBody, &ruleGroups)
if err != nil {
return nil, err
}
return ruleGroups, nil
}

// GetPlacementRuleGroupByID gets the placement rule group by ID.
func (c *client) GetPlacementRuleGroupByID(ctx context.Context, id string) (*RuleGroup, error) {
var ruleGroup RuleGroup
err := c.requestWithRetry(ctx,
"GetPlacementRuleGroupByID", PlacementRuleGroupByID(id),
http.MethodGet, http.NoBody, &ruleGroup)
if err != nil {
return nil, err
}
return &ruleGroup, nil
}

// SetPlacementRuleGroup sets the placement rule group.
func (c *client) SetPlacementRuleGroup(ctx context.Context, ruleGroup *RuleGroup) error {
ruleGroupJSON, err := json.Marshal(ruleGroup)
if err != nil {
return errors.Trace(err)
}
return c.requestWithRetry(ctx,
"SetPlacementRuleGroup", placementRuleGroup,
http.MethodPost, bytes.NewBuffer(ruleGroupJSON), nil)
}

// DeletePlacementRuleGroupByID deletes the placement rule group by ID.
func (c *client) DeletePlacementRuleGroupByID(ctx context.Context, id string) error {
return c.requestWithRetry(ctx,
"DeletePlacementRuleGroupByID", PlacementRuleGroupByID(id),
http.MethodDelete, http.NoBody, nil)
}

// GetAllRegionLabelRules gets all region label rules.
func (c *client) GetAllRegionLabelRules(ctx context.Context) ([]*LabelRule, error) {
var labelRules []*LabelRule
Expand Down Expand Up @@ -497,17 +557,34 @@ func (c *client) PatchRegionLabelRules(ctx context.Context, labelRulePatch *Labe
}

// AccelerateSchedule accelerates the scheduling of the regions within the given key range.
func (c *client) AccelerateSchedule(ctx context.Context, startKey, endKey []byte) error {
input := map[string]string{
"start_key": url.QueryEscape(string(startKey)),
"end_key": url.QueryEscape(string(endKey)),
func (c *client) AccelerateSchedule(ctx context.Context, keyRange *KeyRange) error {
inputJSON, err := json.Marshal(map[string]string{
"start_key": url.QueryEscape(hex.EncodeToString(keyRange.StartKey)),
"end_key": url.QueryEscape(hex.EncodeToString(keyRange.EndKey)),
})
if err != nil {
return errors.Trace(err)
}
return c.requestWithRetry(ctx,
"AccelerateSchedule", AccelerateSchedule,
http.MethodPost, bytes.NewBuffer(inputJSON), nil)
}

// AccelerateScheduleInBatch accelerates the scheduling of the regions within the given key ranges in batch.
func (c *client) AccelerateScheduleInBatch(ctx context.Context, keyRanges []*KeyRange) error {
input := make([]map[string]string, 0, len(keyRanges))
for _, keyRange := range keyRanges {
input = append(input, map[string]string{
"start_key": url.QueryEscape(hex.EncodeToString(keyRange.StartKey)),
"end_key": url.QueryEscape(hex.EncodeToString(keyRange.EndKey)),
})
}
inputJSON, err := json.Marshal(input)
if err != nil {
return errors.Trace(err)
}
return c.requestWithRetry(ctx,
"AccelerateSchedule", AccelerateSchedule,
"AccelerateScheduleInBatch", AccelerateScheduleInBatch,
http.MethodPost, bytes.NewBuffer(inputJSON), nil)
}

Expand Down
61 changes: 60 additions & 1 deletion client/http/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,16 @@

package http

import "time"
import (
"encoding/json"
"time"
)

// KeyRange defines a range of keys.
type KeyRange struct {
StartKey []byte `json:"start_key"`
EndKey []byte `json:"end_key"`
}

// NOTICE: the structures below are copied from the PD API definitions.
// Please make sure the consistency if any change happens to the PD API.
Expand Down Expand Up @@ -247,6 +256,56 @@ type Rule struct {
CreateTimestamp uint64 `json:"create_timestamp,omitempty"` // only set at runtime, recorded rule create timestamp
}

// String returns the string representation of this rule.
func (r *Rule) String() string {
b, _ := json.Marshal(r)
return string(b)
}

// Clone returns a copy of Rule.
func (r *Rule) Clone() *Rule {
var clone Rule
json.Unmarshal([]byte(r.String()), &clone)
clone.StartKey = append(r.StartKey[:0:0], r.StartKey...)
clone.EndKey = append(r.EndKey[:0:0], r.EndKey...)
return &clone
}

// RuleOpType indicates the operation type
type RuleOpType string

const (
// RuleOpAdd a placement rule, only need to specify the field *Rule
RuleOpAdd RuleOpType = "add"
// RuleOpDel a placement rule, only need to specify the field `GroupID`, `ID`, `MatchID`
RuleOpDel RuleOpType = "del"
)

// RuleOp is for batching placement rule actions.
// The action type is distinguished by the field `Action`.
type RuleOp struct {
*Rule // information of the placement rule to add/delete the operation type
Action RuleOpType `json:"action"`
DeleteByIDPrefix bool `json:"delete_by_id_prefix"` // if action == delete, delete by the prefix of id
}

func (r RuleOp) String() string {
b, _ := json.Marshal(r)
return string(b)
}

// RuleGroup defines properties of a rule group.
type RuleGroup struct {
ID string `json:"id,omitempty"`
Index int `json:"index,omitempty"`
Override bool `json:"override,omitempty"`
}

func (g *RuleGroup) String() string {
b, _ := json.Marshal(g)
return string(b)
}

// GroupBundle represents a rule group and all rules belong to the group.
type GroupBundle struct {
ID string `json:"group_id"`
Expand Down
5 changes: 5 additions & 0 deletions pkg/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,11 @@ func (c *Controller) ClearSuspectKeyRanges() {
c.suspectKeyRanges.Clear()
}

// ClearSuspectRegions clears the suspect regions, only for unit test
func (c *Controller) ClearSuspectRegions() {
c.suspectRegions.Clear()
}

// IsPendingRegion returns true if the given region is in the pending list.
func (c *Controller) IsPendingRegion(regionID uint64) bool {
_, exist := c.ruleChecker.pendingList.Get(regionID)
Expand Down
5 changes: 5 additions & 0 deletions pkg/utils/tsoutil/tsoutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ const (
logicalBits = (1 << physicalShiftBits) - 1
)

// TimeToTS converts a `time.Time` to an `uint64` TS.
func TimeToTS(t time.Time) uint64 {
return ComposeTS(t.UnixNano()/int64(time.Millisecond), 0)
}

// ParseTS parses the ts to (physical,logical).
func ParseTS(ts uint64) (time.Time, uint64) {
physical, logical := ParseTSUint64(ts)
Expand Down
6 changes: 4 additions & 2 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2241,7 +2241,9 @@ func (c *RaftCluster) SetMinResolvedTS(storeID, minResolvedTS uint64) error {
return nil
}

func (c *RaftCluster) checkAndUpdateMinResolvedTS() (uint64, bool) {
// CheckAndUpdateMinResolvedTS checks and updates the min resolved ts of the cluster.
// This is exported for testing purpose.
func (c *RaftCluster) CheckAndUpdateMinResolvedTS() (uint64, bool) {
c.Lock()
defer c.Unlock()

Expand Down Expand Up @@ -2284,7 +2286,7 @@ func (c *RaftCluster) runMinResolvedTSJob() {
case <-ticker.C:
interval = c.opt.GetMinResolvedTSPersistenceInterval()
if interval != 0 {
if current, needPersist := c.checkAndUpdateMinResolvedTS(); needPersist {
if current, needPersist := c.CheckAndUpdateMinResolvedTS(); needPersist {
c.storage.SaveMinResolvedTS(current)
}
} else {
Expand Down
7 changes: 7 additions & 0 deletions server/cluster/scheduling_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,13 @@ func (sc *schedulingController) ClearSuspectKeyRanges() {
sc.coordinator.GetCheckerController().ClearSuspectKeyRanges()
}

// ClearSuspectRegions clears the suspect regions, only for unit test
func (sc *schedulingController) ClearSuspectRegions() {
sc.mu.RLock()
defer sc.mu.RUnlock()
sc.coordinator.GetCheckerController().ClearSuspectRegions()
}

// AddSuspectKeyRange adds the key range with the its ruleID as the key
// The instance of each keyRange is like following format:
// [2][]byte: start key/end key
Expand Down

0 comments on commit 58e9b20

Please sign in to comment.