Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs: support region http interface in scheduling server #7297

Merged
merged 18 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
359 changes: 359 additions & 0 deletions pkg/mcs/scheduling/server/apis/v1/api.go

Large diffs are not rendered by default.

226 changes: 226 additions & 0 deletions pkg/schedule/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ package handler

import (
"bytes"
"context"
"encoding/hex"
"net/http"
"strconv"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
Expand All @@ -30,6 +33,7 @@ import (
"github.com/tikv/pd/pkg/schedule"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/filter"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/schedule/scatter"
Expand All @@ -41,6 +45,11 @@ import (
"go.uber.org/zap"
)

const (
defaultRegionLimit = 16
maxRegionLimit = 10240
)

// Server is the interface for handler about schedule.
// TODO: remove it after GetCluster is unified between PD server and Scheduling server.
type Server interface {
Expand Down Expand Up @@ -1040,3 +1049,220 @@ func (h *Handler) GetHotBuckets(regionIDs ...uint64) (HotBucketsResponse, error)
}
return ret, nil
}

// GetRegion returns the region labeler.
func (h *Handler) GetRegion(id uint64) *core.RegionInfo {
c := h.GetCluster()
if c == nil {
return nil
}
return c.GetRegion(id)
}

// GetRegionLabeler returns the region labeler.
func (h *Handler) GetRegionLabeler() (*labeler.RegionLabeler, error) {
c := h.GetCluster()
if c == nil || c.GetRegionLabeler() == nil {
return nil, errs.ErrNotBootstrapped
}
return c.GetRegionLabeler(), nil
}

// AccelerateRegionsScheduleInRange accelerates regions scheduling in a given range.
func (h *Handler) AccelerateRegionsScheduleInRange(rawStartKey, rawEndKey string, limit int) error {
startKey, err := hex.DecodeString(rawStartKey)
if err != nil {
return err
}
endKey, err := hex.DecodeString(rawEndKey)
if err != nil {
return err
}
c := h.GetCluster()
if c == nil {
return errs.ErrNotBootstrapped.GenWithStackByArgs()
}
co := h.GetCoordinator()
if co == nil {
return errs.ErrNotBootstrapped.GenWithStackByArgs()
}
regions := c.ScanRegions(startKey, endKey, limit)
if len(regions) > 0 {
regionsIDList := make([]uint64, 0, len(regions))
for _, region := range regions {
regionsIDList = append(regionsIDList, region.GetID())
}
co.GetCheckerController().AddSuspectRegions(regionsIDList...)
}
return nil
}

// AccelerateRegionsScheduleInRanges accelerates regions scheduling in given ranges.
func (h *Handler) AccelerateRegionsScheduleInRanges(startKeys [][]byte, endKeys [][]byte, limit int) error {
c := h.GetCluster()
if c == nil {
return errs.ErrNotBootstrapped.GenWithStackByArgs()
}
co := h.GetCoordinator()
if co == nil {
return errs.ErrNotBootstrapped.GenWithStackByArgs()
}
if len(startKeys) != len(endKeys) {
return errors.New("startKeys and endKeys should have the same length")
}
var regions []*core.RegionInfo
for i := range startKeys {
regions = append(regions, c.ScanRegions(startKeys[i], endKeys[i], limit)...)
}
if len(regions) > 0 {
regionsIDList := make([]uint64, 0, len(regions))
for _, region := range regions {
regionsIDList = append(regionsIDList, region.GetID())
}
co.GetCheckerController().AddSuspectRegions(regionsIDList...)
}
return nil
}

// AdjustLimit adjusts the limit of regions to schedule.
func (h *Handler) AdjustLimit(limitStr string, defaultLimits ...int) (int, error) {
limit := defaultRegionLimit
if len(defaultLimits) > 0 {
limit = defaultLimits[0]
}
if limitStr != "" {
var err error
limit, err = strconv.Atoi(limitStr)
if err != nil {
return 0, err
}
}
if limit > maxRegionLimit {
limit = maxRegionLimit
}
return limit, nil
}

// ScatterRegionsResponse is the response for scatter regions.
type ScatterRegionsResponse struct {
ProcessedPercentage int `json:"processed-percentage"`
}

// BuildScatterRegionsResp builds ScatterRegionsResponse.
func (h *Handler) BuildScatterRegionsResp(opsCount int, failures map[uint64]error) *ScatterRegionsResponse {
// If there existed any operator failed to be added into Operator Controller, add its regions into unProcessedRegions
percentage := 100
if len(failures) > 0 {
percentage = 100 - 100*len(failures)/(opsCount+len(failures))
log.Debug("scatter regions", zap.Errors("failures", func() []error {
r := make([]error, 0, len(failures))
for _, err := range failures {
r = append(r, err)
}
return r
}()))
}
return &ScatterRegionsResponse{
ProcessedPercentage: percentage,
}
}

// ScatterRegionsByRange scatters regions by range.
func (h *Handler) ScatterRegionsByRange(rawStartKey, rawEndKey string, group string, retryLimit int) (int, map[uint64]error, error) {
startKey, err := hex.DecodeString(rawStartKey)
if err != nil {
return 0, nil, err
}
endKey, err := hex.DecodeString(rawEndKey)
if err != nil {
return 0, nil, err
}
co := h.GetCoordinator()
if co == nil {
return 0, nil, errs.ErrNotBootstrapped.GenWithStackByArgs()
}
return co.GetRegionScatterer().ScatterRegionsByRange(startKey, endKey, group, retryLimit)
}

// ScatterRegionsByID scatters regions by id.
func (h *Handler) ScatterRegionsByID(ids []uint64, group string, retryLimit int, skipStoreLimit bool) (int, map[uint64]error, error) {
co := h.GetCoordinator()
if co == nil {
return 0, nil, errs.ErrNotBootstrapped.GenWithStackByArgs()
}
return co.GetRegionScatterer().ScatterRegionsByID(ids, group, retryLimit, false)
}

// SplitRegionsResponse is the response for split regions.
type SplitRegionsResponse struct {
ProcessedPercentage int `json:"processed-percentage"`
NewRegionsID []uint64 `json:"regions-id"`
}

// SplitRegions splits regions by split keys.
func (h *Handler) SplitRegions(ctx context.Context, rawSplitKeys []string, retryLimit int) (*SplitRegionsResponse, error) {
co := h.GetCoordinator()
if co == nil {
return nil, errs.ErrNotBootstrapped.GenWithStackByArgs()
}
splitKeys := make([][]byte, 0, len(rawSplitKeys))
for _, rawKey := range rawSplitKeys {
key, err := hex.DecodeString(rawKey)
if err != nil {
return nil, err
}
splitKeys = append(splitKeys, key)
}

percentage, newRegionsID := co.GetRegionSplitter().SplitRegions(ctx, splitKeys, retryLimit)
s := &SplitRegionsResponse{
ProcessedPercentage: percentage,
NewRegionsID: newRegionsID,
}
failpoint.Inject("splitResponses", func(val failpoint.Value) {
rawID, ok := val.(int)
if ok {
s.ProcessedPercentage = 100
s.NewRegionsID = []uint64{uint64(rawID)}
}
})
return s, nil
}

// CheckRegionsReplicated checks if regions are replicated.
func (h *Handler) CheckRegionsReplicated(rawStartKey, rawEndKey string) (string, error) {
startKey, err := hex.DecodeString(rawStartKey)
if err != nil {
return "", err
}
endKey, err := hex.DecodeString(rawEndKey)
if err != nil {
return "", err
}
c := h.GetCluster()
if c == nil {
return "", errs.ErrNotBootstrapped.GenWithStackByArgs()
}
co := h.GetCoordinator()
if co == nil {
return "", errs.ErrNotBootstrapped.GenWithStackByArgs()
}
regions := c.ScanRegions(startKey, endKey, -1)
state := "REPLICATED"
for _, region := range regions {
if !filter.IsRegionReplicated(c, region) {
state = "INPROGRESS"
if co.IsPendingRegion(region.GetID()) {
state = "PENDING"
break
}
}
}
failpoint.Inject("mockPending", func(val failpoint.Value) {
aok, ok := val.(bool)
if ok && aok {
state = "PENDING"
}
})
return state, nil
}
23 changes: 16 additions & 7 deletions pkg/utils/apiutil/serverapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type microserviceRedirectRule struct {
targetPath string
targetServiceName string
matchMethods []string
filter func(*http.Request) bool
}

// NewRedirector redirects request to the leader if needs to be handled in the leader.
Expand All @@ -94,14 +95,19 @@ func NewRedirector(s *server.Server, opts ...RedirectorOption) negroni.Handler {
type RedirectorOption func(*redirector)

// MicroserviceRedirectRule new a microservice redirect rule option
func MicroserviceRedirectRule(matchPath, targetPath, targetServiceName string, methods []string) RedirectorOption {
func MicroserviceRedirectRule(matchPath, targetPath, targetServiceName string,
methods []string, filters ...func(*http.Request) bool) RedirectorOption {
return func(s *redirector) {
s.microserviceRedirectRules = append(s.microserviceRedirectRules, &microserviceRedirectRule{
matchPath,
targetPath,
targetServiceName,
methods,
})
rule := &microserviceRedirectRule{
matchPath: matchPath,
targetPath: targetPath,
targetServiceName: targetServiceName,
matchMethods: methods,
}
if len(filters) > 0 {
rule.filter = filters[0]
}
s.microserviceRedirectRules = append(s.microserviceRedirectRules, rule)
}
}

Expand All @@ -117,6 +123,9 @@ func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, stri
r.URL.Path = strings.TrimRight(r.URL.Path, "/")
for _, rule := range h.microserviceRedirectRules {
if strings.HasPrefix(r.URL.Path, rule.matchPath) && slice.Contains(rule.matchMethods, r.Method) {
if rule.filter != nil && !rule.filter(r) {
continue
}
addr, ok := h.s.GetServicePrimaryAddr(r.Context(), rule.targetServiceName)
if !ok || addr == "" {
log.Warn("failed to get the service primary addr when trying to match redirect rules",
Expand Down
2 changes: 1 addition & 1 deletion pkg/utils/testutil/api_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func ReadGetJSON(re *require.Assertions, client *http.Client, url string, data i
}

// ReadGetJSONWithBody is used to do get request with input and check whether given data can be extracted successfully.
func ReadGetJSONWithBody(re *require.Assertions, client *http.Client, url string, input []byte, data interface{}) error {
func ReadGetJSONWithBody(re *require.Assertions, client *http.Client, url string, input []byte, data interface{}, checkOpts ...func([]byte, int, http.Header)) error {
resp, err := apiutil.GetJSON(client, url, input)
if err != nil {
return err
Expand Down