Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client/http: encapsulate rule-related PD HTTP interfaces #7397

Merged
merged 3 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion client/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const (
regionsByKey = "/pd/api/v1/regions/key"
RegionsByStoreIDPrefix = "/pd/api/v1/regions/store"
EmptyRegions = "/pd/api/v1/regions/check/empty-region"
accelerateSchedule = "/pd/api/v1/regions/accelerate-schedule"
AccelerateSchedule = "/pd/api/v1/regions/accelerate-schedule"
store = "/pd/api/v1/store"
Stores = "/pd/api/v1/stores"
StatsRegion = "/pd/api/v1/stats/region"
Expand All @@ -45,7 +45,10 @@ const (
PlacementRule = "/pd/api/v1/config/rule"
PlacementRules = "/pd/api/v1/config/rules"
placementRulesByGroup = "/pd/api/v1/config/rules/group"
PlacementRuleBundle = "/pd/api/v1/config/placement-rule"
RegionLabelRule = "/pd/api/v1/config/region-label/rule"
RegionLabelRules = "/pd/api/v1/config/region-label/rules"
RegionLabelRulesByIDs = "/pd/api/v1/config/region-label/rules/ids"
// Scheduler
Schedulers = "/pd/api/v1/schedulers"
scatterRangeScheduler = "/pd/api/v1/schedulers/scatter-range-"
Expand Down Expand Up @@ -123,6 +126,16 @@ func PlacementRuleByGroupAndID(group, id string) string {
return fmt.Sprintf("%s/%s/%s", PlacementRule, group, id)
}

// PlacementRuleBundleByGroup returns the path of PD HTTP API to get placement rule bundle by group.
func PlacementRuleBundleByGroup(group string) string {
return fmt.Sprintf("%s/%s", PlacementRuleBundle, group)
}

// PlacementRuleBundleWithPartialParameter returns the path of PD HTTP API to get placement rule bundle with partial parameter.
func PlacementRuleBundleWithPartialParameter(partial bool) string {
return fmt.Sprintf("%s?partial=%t", PlacementRuleBundle, partial)
}

// SchedulerByName returns the scheduler API with the given scheduler name.
func SchedulerByName(name string) string {
return fmt.Sprintf("%s/%s", Schedulers, name)
Expand Down
148 changes: 132 additions & 16 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (

// Client is a PD (Placement Driver) HTTP client.
type Client interface {
/* Meta-related interfaces */
GetRegionByID(context.Context, uint64) (*RegionInfo, error)
GetRegionByKey(context.Context, []byte) (*RegionInfo, error)
GetRegions(context.Context) (*RegionsInfo, error)
Expand All @@ -51,11 +52,24 @@ type Client interface {
GetHotWriteRegions(context.Context) (*StoreHotPeersInfos, error)
GetRegionStatusByKeyRange(context.Context, []byte, []byte) (*RegionStats, error)
GetStores(context.Context) (*StoresInfo, error)
/* Rule-related interfaces */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have two groups of API for the placement rule, maybe we can reduce it later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, after replacing the existing TiDB code, we can reduce the exposed interfaces here.

GetAllPlacementRuleBundles(context.Context) ([]*GroupBundle, error)
GetPlacementRuleBundleByGroup(context.Context, string) (*GroupBundle, error)
GetPlacementRulesByGroup(context.Context, string) ([]*Rule, error)
SetPlacementRule(context.Context, *Rule) error
SetPlacementRuleBundles(context.Context, []*GroupBundle, bool) error
DeletePlacementRule(context.Context, string, string) error
GetMinResolvedTSByStoresIDs(context.Context, []uint64) (uint64, map[uint64]uint64, error)
GetAllRegionLabelRules(context.Context) ([]*LabelRule, error)
GetRegionLabelRulesByIDs(context.Context, []string) ([]*LabelRule, error)
SetRegionLabelRule(context.Context, *LabelRule) error
PatchRegionLabelRules(context.Context, *LabelRulePatch) error
/* Scheduling-related interfaces */
AccelerateSchedule(context.Context, []byte, []byte) error
/* Other interfaces */
GetMinResolvedTSByStoresIDs(context.Context, []uint64) (uint64, map[uint64]uint64, error)

/* Client-related methods */
WithRespHandler(func(resp *http.Response) error) Client
Close()
}

Expand All @@ -66,6 +80,8 @@ type client struct {
tlsConf *tls.Config
cli *http.Client

respHandler func(resp *http.Response) error

requestCounter *prometheus.CounterVec
executionDuration *prometheus.HistogramVec
}
Expand Down Expand Up @@ -143,6 +159,14 @@ func (c *client) Close() {
log.Info("[pd] http client closed")
}

// WithRespHandler sets and returns a new client with the given HTTP response handler.
// This allows the caller to customize how the response is handled, including error handling logic.
func (c *client) WithRespHandler(handler func(resp *http.Response) error) Client {
newClient := *c
newClient.respHandler = handler
return &newClient
}

func (c *client) reqCounter(name, status string) {
if c.requestCounter == nil {
return
Expand Down Expand Up @@ -204,6 +228,12 @@ func (c *client) request(
}
c.execDuration(name, time.Since(start))
c.reqCounter(name, resp.Status)

// Give away the response handling to the caller if the handler is set.
if c.respHandler != nil {
return c.respHandler(resp)
}

defer func() {
err = resp.Body.Close()
if err != nil {
Expand Down Expand Up @@ -345,6 +375,30 @@ func (c *client) GetStores(ctx context.Context) (*StoresInfo, error) {
return &stores, nil
}

// GetAllPlacementRuleBundles gets all placement rules bundles.
func (c *client) GetAllPlacementRuleBundles(ctx context.Context) ([]*GroupBundle, error) {
var bundles []*GroupBundle
err := c.requestWithRetry(ctx,
"GetPlacementRuleBundle", PlacementRuleBundle,
http.MethodGet, nil, &bundles)
if err != nil {
return nil, err
}
return bundles, nil
}

// GetPlacementRuleBundleByGroup gets the placement rules bundle by group.
func (c *client) GetPlacementRuleBundleByGroup(ctx context.Context, group string) (*GroupBundle, error) {
var bundle GroupBundle
err := c.requestWithRetry(ctx,
"GetPlacementRuleBundleByGroup", PlacementRuleBundleByGroup(group),
http.MethodGet, nil, &bundle)
if err != nil {
return nil, err
}
return &bundle, nil
}

// GetPlacementRulesByGroup gets the placement rules by group.
func (c *client) GetPlacementRulesByGroup(ctx context.Context, group string) ([]*Rule, error) {
var rules []*Rule
Expand All @@ -368,13 +422,90 @@ func (c *client) SetPlacementRule(ctx context.Context, rule *Rule) error {
http.MethodPost, bytes.NewBuffer(ruleJSON), nil)
}

// SetPlacementRuleBundles sets the placement rule bundles.
// If `partial` is false, all old configurations will be over-written and dropped.
func (c *client) SetPlacementRuleBundles(ctx context.Context, bundles []*GroupBundle, partial bool) error {
bundlesJSON, err := json.Marshal(bundles)
if err != nil {
return errors.Trace(err)
}
return c.requestWithRetry(ctx,
"SetPlacementRuleBundles", PlacementRuleBundleWithPartialParameter(partial),
http.MethodPost, bytes.NewBuffer(bundlesJSON), nil)
}

// DeletePlacementRule deletes the placement rule.
func (c *client) DeletePlacementRule(ctx context.Context, group, id string) error {
return c.requestWithRetry(ctx,
"DeletePlacementRule", PlacementRuleByGroupAndID(group, id),
http.MethodDelete, nil, nil)
}

// GetAllRegionLabelRules gets all region label rules.
func (c *client) GetAllRegionLabelRules(ctx context.Context) ([]*LabelRule, error) {
var labelRules []*LabelRule
err := c.requestWithRetry(ctx,
"GetAllRegionLabelRules", RegionLabelRules,
http.MethodGet, nil, &labelRules)
if err != nil {
return nil, err
}
return labelRules, nil
}

// GetRegionLabelRulesByIDs gets the region label rules by IDs.
func (c *client) GetRegionLabelRulesByIDs(ctx context.Context, ruleIDs []string) ([]*LabelRule, error) {
idsJSON, err := json.Marshal(ruleIDs)
if err != nil {
return nil, errors.Trace(err)
}
var labelRules []*LabelRule
err = c.requestWithRetry(ctx,
"GetRegionLabelRulesByIDs", RegionLabelRulesByIDs,
http.MethodGet, bytes.NewBuffer(idsJSON), &labelRules)
if err != nil {
return nil, err
}
return labelRules, nil
}

// SetRegionLabelRule sets the region label rule.
func (c *client) SetRegionLabelRule(ctx context.Context, labelRule *LabelRule) error {
labelRuleJSON, err := json.Marshal(labelRule)
if err != nil {
return errors.Trace(err)
}
return c.requestWithRetry(ctx,
"SetRegionLabelRule", RegionLabelRule,
http.MethodPost, bytes.NewBuffer(labelRuleJSON), nil)
}

// PatchRegionLabelRules patches the region label rules.
func (c *client) PatchRegionLabelRules(ctx context.Context, labelRulePatch *LabelRulePatch) error {
labelRulePatchJSON, err := json.Marshal(labelRulePatch)
if err != nil {
return errors.Trace(err)
}
return c.requestWithRetry(ctx,
"PatchRegionLabelRules", RegionLabelRules,
http.MethodPatch, bytes.NewBuffer(labelRulePatchJSON), nil)
}

// AccelerateSchedule accelerates the scheduling of the regions within the given key range.
func (c *client) AccelerateSchedule(ctx context.Context, startKey, endKey []byte) error {
input := map[string]string{
"start_key": url.QueryEscape(string(startKey)),
"end_key": url.QueryEscape(string(endKey)),
}
inputJSON, err := json.Marshal(input)
if err != nil {
return errors.Trace(err)
}
return c.requestWithRetry(ctx,
"AccelerateSchedule", AccelerateSchedule,
http.MethodPost, bytes.NewBuffer(inputJSON), nil)
}

// GetMinResolvedTSByStoresIDs get min-resolved-ts by stores IDs.
func (c *client) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uint64) (uint64, map[uint64]uint64, error) {
uri := MinResolvedTSPrefix
Expand Down Expand Up @@ -406,18 +537,3 @@ func (c *client) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uin
}
return resp.MinResolvedTS, resp.StoresMinResolvedTS, nil
}

// AccelerateSchedule accelerates the scheduling of the regions within the given key range.
func (c *client) AccelerateSchedule(ctx context.Context, startKey, endKey []byte) error {
input := map[string]string{
"start_key": url.QueryEscape(string(startKey)),
"end_key": url.QueryEscape(string(endKey)),
}
inputJSON, err := json.Marshal(input)
if err != nil {
return errors.Trace(err)
}
return c.requestWithRetry(ctx,
"AccelerateSchedule", accelerateSchedule,
http.MethodPost, bytes.NewBuffer(inputJSON), nil)
}
31 changes: 31 additions & 0 deletions client/http/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,3 +246,34 @@ type Rule struct {
Version uint64 `json:"version,omitempty"` // only set at runtime, add 1 each time rules updated, begin from 0.
CreateTimestamp uint64 `json:"create_timestamp,omitempty"` // only set at runtime, recorded rule create timestamp
}

// GroupBundle represents a rule group and all rules belong to the group.
type GroupBundle struct {
ID string `json:"group_id"`
Index int `json:"group_index"`
Override bool `json:"group_override"`
Rules []*Rule `json:"rules"`
}

// RegionLabel is the label of a region.
type RegionLabel struct {
Key string `json:"key"`
Value string `json:"value"`
TTL string `json:"ttl,omitempty"`
StartAt string `json:"start_at,omitempty"`
}

// LabelRule is the rule to assign labels to a region.
type LabelRule struct {
ID string `json:"id"`
Index int `json:"index"`
Labels []RegionLabel `json:"labels"`
RuleType string `json:"rule_type"`
Data interface{} `json:"data"`
}

// LabelRulePatch is the patch to update the label rules.
type LabelRulePatch struct {
SetRules []*LabelRule `json:"sets"`
DeleteRules []string `json:"deletes"`
}
Loading
Loading