Skip to content

Commit

Permalink
Merge branch 'master' into sim-config
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] committed May 30, 2024
2 parents 9623d63 + c498063 commit 32e7f8a
Show file tree
Hide file tree
Showing 52 changed files with 922 additions and 1,151 deletions.
26 changes: 26 additions & 0 deletions OWNERS
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# See the OWNERS docs at https://go.k8s.io/owners
approvers:
- AndreMouche
- binshi-bing
- bufferflies
- CabinfeverB
- Connor1996
- disksing
- huachaohuang
- HunDunDM
- HuSharp
- JmPotato
- lhy1024
- nolouch
- overvenus
- qiuyesuifeng
- rleungx
- siddontang
- Yisaer
- zhouqiang-cl
reviewers:
- BusyJay
- howardlau1999
- Luffbee
- shafreeck
- xhebox
1 change: 1 addition & 0 deletions client/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (
membersPrefix = "/pd/api/v1/members"
leaderPrefix = "/pd/api/v1/leader"
transferLeader = "/pd/api/v1/leader/transfer"
health = "/pd/api/v1/health"
// Config
Config = "/pd/api/v1/config"
ClusterVersion = "/pd/api/v1/config/cluster-version"
Expand Down
15 changes: 15 additions & 0 deletions client/http/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type Client interface {
GetStores(context.Context) (*StoresInfo, error)
GetStore(context.Context, uint64) (*StoreInfo, error)
SetStoreLabels(context.Context, int64, map[string]string) error
GetHealthStatus(context.Context) ([]Health, error)
/* Config-related interfaces */
GetConfig(context.Context) (map[string]any, error)
SetConfig(context.Context, map[string]any, ...float64) error
Expand Down Expand Up @@ -337,6 +338,20 @@ func (c *client) SetStoreLabels(ctx context.Context, storeID int64, storeLabels
WithBody(jsonInput))
}

// GetHealthStatus gets the health status of the cluster.
func (c *client) GetHealthStatus(ctx context.Context) ([]Health, error) {
var healths []Health
err := c.request(ctx, newRequestInfo().
WithName(getHealthStatusName).
WithURI(health).
WithMethod(http.MethodGet).
WithResp(&healths))
if err != nil {
return nil, err
}
return healths, nil
}

// GetConfig gets the configurations.
func (c *client) GetConfig(ctx context.Context) (map[string]any, error) {
var config map[string]any
Expand Down
1 change: 1 addition & 0 deletions client/http/request_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
getStoresName = "GetStores"
getStoreName = "GetStore"
setStoreLabelsName = "SetStoreLabels"
getHealthStatusName = "GetHealthStatus"
getConfigName = "GetConfig"
setConfigName = "SetConfig"
getScheduleConfigName = "GetScheduleConfig"
Expand Down
9 changes: 9 additions & 0 deletions client/http/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,3 +661,12 @@ func stringToKeyspaceState(str string) (keyspacepb.KeyspaceState, error) {
return keyspacepb.KeyspaceState(0), fmt.Errorf("invalid KeyspaceState string: %s", str)
}
}

// Health reflects the cluster's health.
// NOTE: This type is moved from `server/api/health.go`, maybe move them to the same place later.
type Health struct {
Name string `json:"name"`
MemberID uint64 `json:"member_id"`
ClientUrls []string `json:"client_urls"`
Health bool `json:"health"`
}
71 changes: 45 additions & 26 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Contex
request := gc.collectRequestAndConsumption(typ)
if request != nil {
c.run.currentRequests = append(c.run.currentRequests, request)
gc.tokenRequestCounter.Inc()
gc.metrics.tokenRequestCounter.Inc()
}
return true
})
Expand Down Expand Up @@ -632,13 +632,9 @@ type groupCostController struct {
calculators []ResourceCalculator
handleRespFunc func(*rmpb.TokenBucketResponse)

successfulRequestDuration prometheus.Observer
failedLimitReserveDuration prometheus.Observer
requestRetryCounter prometheus.Counter
failedRequestCounter prometheus.Counter
tokenRequestCounter prometheus.Counter

mu struct {
// metrics
metrics *groupMetricsCollection
mu struct {
sync.Mutex
consumption *rmpb.Consumption
storeCounter map[uint64]*rmpb.Consumption
Expand Down Expand Up @@ -685,6 +681,30 @@ type groupCostController struct {
tombstone bool
}

type groupMetricsCollection struct {
successfulRequestDuration prometheus.Observer
failedLimitReserveDuration prometheus.Observer
requestRetryCounter prometheus.Counter
failedRequestCounterWithOthers prometheus.Counter
failedRequestCounterWithThrottled prometheus.Counter
tokenRequestCounter prometheus.Counter
}

func initMetrics(oldName, name string) *groupMetricsCollection {
const (
otherType = "others"
throttledType = "throttled"
)
return &groupMetricsCollection{
successfulRequestDuration: successfulRequestDuration.WithLabelValues(oldName, name),
failedLimitReserveDuration: failedLimitReserveDuration.WithLabelValues(oldName, name),
failedRequestCounterWithOthers: failedRequestCounter.WithLabelValues(oldName, name, otherType),
failedRequestCounterWithThrottled: failedRequestCounter.WithLabelValues(oldName, name, throttledType),
requestRetryCounter: requestRetryCounter.WithLabelValues(oldName, name),
tokenRequestCounter: resourceGroupTokenRequestCounter.WithLabelValues(oldName, name),
}
}

type tokenCounter struct {
getTokenBucketFunc func() *rmpb.TokenBucket

Expand Down Expand Up @@ -725,16 +745,13 @@ func newGroupCostController(
default:
return nil, errs.ErrClientResourceGroupConfigUnavailable.FastGenByArgs("not supports the resource type")
}
ms := initMetrics(group.Name, group.Name)
gc := &groupCostController{
meta: group,
name: group.Name,
mainCfg: mainCfg,
mode: group.GetMode(),
successfulRequestDuration: successfulRequestDuration.WithLabelValues(group.Name, group.Name),
failedLimitReserveDuration: failedLimitReserveDuration.WithLabelValues(group.Name, group.Name),
failedRequestCounter: failedRequestCounter.WithLabelValues(group.Name, group.Name),
requestRetryCounter: requestRetryCounter.WithLabelValues(group.Name, group.Name),
tokenRequestCounter: resourceGroupTokenRequestCounter.WithLabelValues(group.Name, group.Name),
meta: group,
name: group.Name,
mainCfg: mainCfg,
mode: group.GetMode(),
metrics: ms,
calculators: []ResourceCalculator{
newKVCalculator(mainCfg),
newSQLCalculator(mainCfg),
Expand Down Expand Up @@ -789,7 +806,7 @@ func (gc *groupCostController) initRunState() {
case rmpb.GroupMode_RUMode:
gc.run.requestUnitTokens = make(map[rmpb.RequestUnitType]*tokenCounter)
for typ := range requestUnitLimitTypeList {
limiter := NewLimiterWithCfg(now, cfgFunc(getRUTokenBucketSetting(gc.meta, typ)), gc.lowRUNotifyChan)
limiter := NewLimiterWithCfg(gc.name, now, cfgFunc(getRUTokenBucketSetting(gc.meta, typ)), gc.lowRUNotifyChan)
counter := &tokenCounter{
limiter: limiter,
avgRUPerSec: 0,
Expand All @@ -803,7 +820,7 @@ func (gc *groupCostController) initRunState() {
case rmpb.GroupMode_RawMode:
gc.run.resourceTokens = make(map[rmpb.RawResourceType]*tokenCounter)
for typ := range requestResourceLimitTypeList {
limiter := NewLimiterWithCfg(now, cfgFunc(getRawResourceTokenBucketSetting(gc.meta, typ)), gc.lowRUNotifyChan)
limiter := NewLimiterWithCfg(gc.name, now, cfgFunc(getRawResourceTokenBucketSetting(gc.meta, typ)), gc.lowRUNotifyChan)
counter := &tokenCounter{
limiter: limiter,
avgRUPerSec: 0,
Expand Down Expand Up @@ -1233,7 +1250,7 @@ func (gc *groupCostController) onRequestWait(
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v))
}
}
if d, err = WaitReservations(ctx, now, res); err == nil {
if d, err = WaitReservations(ctx, now, res); err == nil || errs.ErrClientResourceGroupThrottled.NotEqual(err) {
break retryLoop
}
case rmpb.GroupMode_RUMode:
Expand All @@ -1243,18 +1260,20 @@ func (gc *groupCostController) onRequestWait(
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v))
}
}
if d, err = WaitReservations(ctx, now, res); err == nil {
if d, err = WaitReservations(ctx, now, res); err == nil || errs.ErrClientResourceGroupThrottled.NotEqual(err) {
break retryLoop
}
}
gc.requestRetryCounter.Inc()
gc.metrics.requestRetryCounter.Inc()
time.Sleep(gc.mainCfg.WaitRetryInterval)
waitDuration += gc.mainCfg.WaitRetryInterval
}
if err != nil {
gc.failedRequestCounter.Inc()
if d.Seconds() > 0 {
gc.failedLimitReserveDuration.Observe(d.Seconds())
if errs.ErrClientResourceGroupThrottled.Equal(err) {
gc.metrics.failedRequestCounterWithThrottled.Inc()
gc.metrics.failedLimitReserveDuration.Observe(d.Seconds())
} else {
gc.metrics.failedRequestCounterWithOthers.Inc()
}
gc.mu.Lock()
sub(gc.mu.consumption, delta)
Expand All @@ -1264,7 +1283,7 @@ func (gc *groupCostController) onRequestWait(
})
return nil, nil, waitDuration, 0, err
}
gc.successfulRequestDuration.Observe(d.Seconds())
gc.metrics.successfulRequestDuration.Observe(d.Seconds())
waitDuration += d
}

Expand Down
15 changes: 15 additions & 0 deletions client/resource_group/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/client/errs"
)

func createTestGroupCostController(re *require.Assertions) *groupCostController {
Expand Down Expand Up @@ -117,3 +118,17 @@ func TestRequestAndResponseConsumption(t *testing.T) {
re.Equal(expectedConsumption.TotalCpuTimeMs, consumption.TotalCpuTimeMs, caseNum)
}
}

func TestResourceGroupThrottledError(t *testing.T) {
re := require.New(t)
gc := createTestGroupCostController(re)
gc.initRunState()
req := &TestRequestInfo{
isWrite: true,
writeBytes: 10000000,
}
// The group is throttled
_, _, _, _, err := gc.onRequestWait(context.TODO(), req)
re.Error(err)
re.True(errs.ErrClientResourceGroupThrottled.Equal(err))
}
27 changes: 26 additions & 1 deletion client/resource_group/controller/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/client/errs"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -81,6 +82,15 @@ type Limiter struct {
isLowProcess bool
// remainingNotifyTimes is used to limit notify when the speed limit is already set.
remainingNotifyTimes int
name string

// metrics
metrics *limiterMetricsCollection
}

// limiterMetricsCollection is a collection of metrics for a limiter.
type limiterMetricsCollection struct {
lowTokenNotifyCounter prometheus.Counter
}

// Limit returns the maximum overall event rate.
Expand All @@ -106,15 +116,19 @@ func NewLimiter(now time.Time, r Limit, b int64, tokens float64, lowTokensNotify

// NewLimiterWithCfg returns a new Limiter that allows events up to rate r and permits
// bursts of at most b tokens.
func NewLimiterWithCfg(now time.Time, cfg tokenBucketReconfigureArgs, lowTokensNotifyChan chan<- struct{}) *Limiter {
func NewLimiterWithCfg(name string, now time.Time, cfg tokenBucketReconfigureArgs, lowTokensNotifyChan chan<- struct{}) *Limiter {
lim := &Limiter{
name: name,
limit: Limit(cfg.NewRate),
last: now,
tokens: cfg.NewTokens,
burst: cfg.NewBurst,
notifyThreshold: cfg.NotifyThreshold,
lowTokensNotifyChan: lowTokensNotifyChan,
}
lim.metrics = &limiterMetricsCollection{
lowTokenNotifyCounter: lowTokenRequestNotifyCounter.WithLabelValues(lim.name),
}
log.Debug("new limiter", zap.String("limiter", fmt.Sprintf("%+v", lim)))
return lim
}
Expand Down Expand Up @@ -224,6 +238,14 @@ func (lim *Limiter) SetupNotificationThreshold(threshold float64) {
lim.notifyThreshold = threshold
}

// SetName sets the name of the limiter.
func (lim *Limiter) SetName(name string) *Limiter {
lim.mu.Lock()
defer lim.mu.Unlock()
lim.name = name
return lim
}

// notify tries to send a non-blocking notification on notifyCh and disables
// further notifications (until the next Reconfigure or StartNotification).
func (lim *Limiter) notify() {
Expand All @@ -234,6 +256,9 @@ func (lim *Limiter) notify() {
lim.isLowProcess = true
select {
case lim.lowTokensNotifyChan <- struct{}{}:
if lim.metrics != nil {
lim.metrics.lowTokenNotifyCounter.Inc()
}
default:
}
}
Expand Down
18 changes: 15 additions & 3 deletions client/resource_group/controller/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ const (
// TODO: remove old label in 8.x
resourceGroupNameLabel = "name"
newResourceGroupNameLabel = "resource_group"

errType = "type"
)

var (
Expand All @@ -40,7 +42,7 @@ var (
Namespace: namespace,
Subsystem: requestSubsystem,
Name: "success",
Buckets: []float64{.005, .01, .05, .1, .5, 1, 5, 10, 20, 25, 30}, // 0.005 ~ 30
Buckets: []float64{0.0005, .005, .01, .05, .1, .5, 1, 5, 10, 20, 25, 30, 60, 600, 1800, 3600}, // 0.0005 ~ 1h
Help: "Bucketed histogram of wait duration of successful request.",
}, []string{resourceGroupNameLabel, newResourceGroupNameLabel})

Expand All @@ -49,7 +51,7 @@ var (
Namespace: namespace,
Subsystem: requestSubsystem,
Name: "limit_reserve_time_failed",
Buckets: []float64{.005, .01, .05, .1, .5, 1, 5, 10, 20, 25, 30}, // 0.005 ~ 30
Buckets: []float64{0.0005, .01, .05, .1, .5, 1, 5, 10, 20, 25, 30, 60, 600, 1800, 3600, 86400}, // 0.0005 ~ 24h
Help: "Bucketed histogram of wait duration of failed request.",
}, []string{resourceGroupNameLabel, newResourceGroupNameLabel})

Expand All @@ -59,7 +61,7 @@ var (
Subsystem: requestSubsystem,
Name: "fail",
Help: "Counter of failed request.",
}, []string{resourceGroupNameLabel, newResourceGroupNameLabel})
}, []string{resourceGroupNameLabel, newResourceGroupNameLabel, errType})

requestRetryCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Expand All @@ -73,6 +75,7 @@ var (
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: tokenRequestSubsystem,
Buckets: prometheus.ExponentialBuckets(0.001, 2, 13), // 1ms ~ 8s
Name: "duration",
Help: "Bucketed histogram of latency(s) of token request.",
}, []string{"type"})
Expand All @@ -84,6 +87,14 @@ var (
Name: "resource_group",
Help: "Counter of token request by every resource group.",
}, []string{resourceGroupNameLabel, newResourceGroupNameLabel})

lowTokenRequestNotifyCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: tokenRequestSubsystem,
Name: "low_token_notified",
Help: "Counter of low token request.",
}, []string{newResourceGroupNameLabel})
)

var (
Expand All @@ -100,4 +111,5 @@ func init() {
prometheus.MustRegister(requestRetryCounter)
prometheus.MustRegister(tokenRequestDuration)
prometheus.MustRegister(resourceGroupTokenRequestCounter)
prometheus.MustRegister(lowTokenRequestNotifyCounter)
}
Loading

0 comments on commit 32e7f8a

Please sign in to comment.