Skip to content

Commit

Permalink
*: support time function for calibrate resource (#44965)
Browse files Browse the repository at this point in the history
close #44908
  • Loading branch information
CabinfeverB committed Jul 11, 2023
1 parent 3aa21d0 commit e048e9e
Show file tree
Hide file tree
Showing 9 changed files with 5,291 additions and 5,019 deletions.
6 changes: 1 addition & 5 deletions executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ go_library(
"bind.go",
"brie.go",
"builder.go",
"calibrate_resource.go",
"change.go",
"checksum.go",
"compact_table.go",
Expand Down Expand Up @@ -121,6 +120,7 @@ go_library(
"//executor/asyncloaddata",
"//executor/importer",
"//executor/internal/builder",
"//executor/internal/calibrateresource",
"//executor/internal/exec",
"//executor/internal/mpp",
"//executor/internal/pdhelper",
Expand All @@ -139,7 +139,6 @@ go_library(
"//parser/ast",
"//parser/auth",
"//parser/charset",
"//parser/duration",
"//parser/format",
"//parser/model",
"//parser/mysql",
Expand Down Expand Up @@ -290,7 +289,6 @@ go_test(
"batch_point_get_test.go",
"benchmark_test.go",
"brie_test.go",
"calibrate_resource_test.go",
"charset_test.go",
"chunk_size_control_test.go",
"cluster_table_test.go",
Expand Down Expand Up @@ -476,8 +474,6 @@ go_test(
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@com_github_tikv_client_go_v2//util",
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//resource_group/controller",
"@org_golang_google_grpc//:grpc",
"@org_golang_x_exp//slices",
"@org_uber_go_atomic//:atomic",
Expand Down
7 changes: 4 additions & 3 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor/aggfuncs"
"github.com/pingcap/tidb/executor/internal/builder"
"github.com/pingcap/tidb/executor/internal/calibrateresource"
"github.com/pingcap/tidb/executor/internal/exec"
"github.com/pingcap/tidb/executor/internal/pdhelper"
executor_metrics "github.com/pingcap/tidb/executor/metrics"
Expand Down Expand Up @@ -907,10 +908,10 @@ func (b *executorBuilder) buildSimple(v *plannercore.Simple) exec.Executor {
}
}
case *ast.CalibrateResourceStmt:
return &calibrateResourceExec{
return &calibrateresource.Executor{
BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), 0),
workloadType: s.Tp,
optionList: s.DynamicCalibrateResourceOptionList,
WorkloadType: s.Tp,
OptionList: s.DynamicCalibrateResourceOptionList,
}
case *ast.LoadDataActionStmt:
return &LoadDataActionExec{
Expand Down
45 changes: 45 additions & 0 deletions executor/internal/calibrateresource/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "calibrateresource",
srcs = ["calibrate_resource.go"],
importpath = "github.com/pingcap/tidb/executor/internal/calibrateresource",
visibility = ["//executor:__subpackages__"],
deps = [
"//domain",
"//executor/internal/exec",
"//infoschema",
"//kv",
"//parser/ast",
"//parser/duration",
"//parser/model",
"//sessionctx",
"//sessionctx/variable",
"//sessiontxn/staleread",
"//util/chunk",
"//util/mathutil",
"//util/sqlexec",
"@com_github_docker_go_units//:go-units",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_tikv_client_go_v2//oracle",
],
)

go_test(
name = "calibrateresource_test",
timeout = "short",
srcs = ["calibrate_resource_test.go"],
flaky = True,
deps = [
"//domain",
"//parser/mysql",
"//testkit",
"//types",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//resource_group/controller",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package executor
package calibrateresource

import (
"context"
Expand All @@ -23,12 +23,14 @@ import (

"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor/internal/exec"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/duration"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn/staleread"
Expand Down Expand Up @@ -113,48 +115,106 @@ const (
minDuration = time.Minute * 10
)

type calibrateResourceExec struct {
// Executor is used as executor of calibrate resource.
type Executor struct {
exec.BaseExecutor
optionList []*ast.DynamicCalibrateResourceOption
workloadType ast.CalibrateResourceType
OptionList []*ast.DynamicCalibrateResourceOption
WorkloadType ast.CalibrateResourceType
done bool
}

func (e *calibrateResourceExec) parseCalibrateDuration(ctx context.Context) (startTime time.Time, endTime time.Time, err error) {
func (e *Executor) parseTsExpr(ctx context.Context, tsExpr ast.ExprNode) (time.Time, error) {
ts, err := staleread.CalculateAsOfTsExpr(ctx, e.Ctx(), tsExpr)
if err != nil {
return time.Time{}, err
}
return oracle.GetTimeFromTS(ts), nil
}

func (e *Executor) parseCalibrateDuration(ctx context.Context) (startTime time.Time, endTime time.Time, err error) {
var dur time.Duration
var ts uint64
for _, op := range e.optionList {
// startTimeExpr and endTimeExpr are used to calc endTime by FuncCallExpr when duration begin with `interval`.
var startTimeExpr ast.ExprNode
var endTimeExpr ast.ExprNode
for _, op := range e.OptionList {
switch op.Tp {
case ast.CalibrateStartTime:
ts, err = staleread.CalculateAsOfTsExpr(ctx, e.Ctx(), op.Ts)
startTimeExpr = op.Ts
startTime, err = e.parseTsExpr(ctx, startTimeExpr)
if err != nil {
return
}
startTime = oracle.GetTimeFromTS(ts)
case ast.CalibrateEndTime:
ts, err = staleread.CalculateAsOfTsExpr(ctx, e.Ctx(), op.Ts)
endTimeExpr = op.Ts
endTime, err = e.parseTsExpr(ctx, op.Ts)
if err != nil {
return
}
endTime = oracle.GetTimeFromTS(ts)
case ast.CalibrateDuration:
}
}
for _, op := range e.OptionList {
if op.Tp != ast.CalibrateDuration {
continue
}
// string duration
if len(op.StrValue) > 0 {
dur, err = duration.ParseDuration(op.StrValue)
if err != nil {
return
}
// If startTime is not set, startTime will be now() - duration.
if startTime.IsZero() {
toTime := endTime
if toTime.IsZero() {
toTime = time.Now()
}
startTime = toTime.Add(-dur)
}
// If endTime is set, duration will be ignored.
if endTime.IsZero() {
endTime = startTime.Add(dur)
}
continue
}
// interval duration
// If startTime is not set, startTime will be now() - duration.
if startTimeExpr == nil {
toTimeExpr := endTimeExpr
if endTime.IsZero() {
toTimeExpr = &ast.FuncCallExpr{FnName: model.NewCIStr("CURRENT_TIMESTAMP")}
}
startTimeExpr = &ast.FuncCallExpr{
FnName: model.NewCIStr("DATE_SUB"),
Args: []ast.ExprNode{
toTimeExpr,
op.Ts,
&ast.TimeUnitExpr{Unit: op.Unit}},
}
startTime, err = e.parseTsExpr(ctx, startTimeExpr)
if err != nil {
return
}
}
// If endTime is set, duration will be ignored.
if endTime.IsZero() {
endTime, err = e.parseTsExpr(ctx, &ast.FuncCallExpr{
FnName: model.NewCIStr("DATE_ADD"),
Args: []ast.ExprNode{startTimeExpr,
op.Ts,
&ast.TimeUnitExpr{Unit: op.Unit}},
})
if err != nil {
return
}
}
}

if startTime.IsZero() {
err = errors.Errorf("start time should not be 0")
return
}
// If endTime is set, duration will be ignored.
if endTime.IsZero() {
if dur != time.Duration(0) {
endTime = startTime.Add(dur)
} else {
endTime = time.Now()
}
endTime = time.Now()
}
// check the duration
dur = endTime.Sub(startTime)
Expand All @@ -165,11 +225,11 @@ func (e *calibrateResourceExec) parseCalibrateDuration(ctx context.Context) (sta
if dur < minDuration {
err = errors.Errorf("the duration of calibration is too short, which could lead to inaccurate output. Please make the duration between %s and %s", minDuration.String(), maxDuration.String())
}

return
}

func (e *calibrateResourceExec) Next(ctx context.Context, req *chunk.Chunk) error {
// Next implements the interface of Executor.
func (e *Executor) Next(ctx context.Context, req *chunk.Chunk) error {
req.Reset()
if e.done {
return nil
Expand All @@ -178,7 +238,7 @@ func (e *calibrateResourceExec) Next(ctx context.Context, req *chunk.Chunk) erro

exec := e.Ctx().(sqlexec.RestrictedSQLExecutor)
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnOthers)
if len(e.optionList) > 0 {
if len(e.OptionList) > 0 {
return e.dynamicCalibrate(ctx, req, exec)
}
return e.staticCalibrate(ctx, req, exec)
Expand All @@ -189,7 +249,7 @@ var (
errNoCPUQuotaMetrics = errors.Normalize("There is no CPU quota metrics, %v")
)

func (e *calibrateResourceExec) dynamicCalibrate(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor) error {
func (e *Executor) dynamicCalibrate(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor) error {
startTs, endTs, err := e.parseCalibrateDuration(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -217,6 +277,32 @@ func (e *calibrateResourceExec) dynamicCalibrate(ctx context.Context, req *chunk
if err != nil {
return err
}
failpoint.Inject("mockMetricsDataFilter", func() {
ret := make([]*timePointValue, 0)
for _, point := range tikvCPUs.vals {
if point.tp.After(endTs) || point.tp.Before(startTs) {
continue
}
ret = append(ret, point)
}
tikvCPUs.vals = ret
ret = make([]*timePointValue, 0)
for _, point := range tidbCPUs.vals {
if point.tp.After(endTs) || point.tp.Before(startTs) {
continue
}
ret = append(ret, point)
}
tidbCPUs.vals = ret
ret = make([]*timePointValue, 0)
for _, point := range rus.vals {
if point.tp.After(endTs) || point.tp.Before(startTs) {
continue
}
ret = append(ret, point)
}
rus.vals = ret
})
quotas := make([]float64, 0)
lowCount := 0
for {
Expand Down Expand Up @@ -268,7 +354,7 @@ func (e *calibrateResourceExec) dynamicCalibrate(ctx context.Context, req *chunk
return nil
}

func (e *calibrateResourceExec) staticCalibrate(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor) error {
func (e *Executor) staticCalibrate(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor) error {
if !variable.EnableResourceControl.Load() {
return infoschema.ErrResourceGroupSupportDisabled
}
Expand All @@ -288,12 +374,12 @@ func (e *calibrateResourceExec) staticCalibrate(ctx context.Context, req *chunk.
}

// The default workload to calculate the RU capacity.
if e.workloadType == ast.WorkloadNone {
e.workloadType = ast.TPCC
if e.WorkloadType == ast.WorkloadNone {
e.WorkloadType = ast.TPCC
}
baseCost, ok := workloadBaseRUCostMap[e.workloadType]
baseCost, ok := workloadBaseRUCostMap[e.WorkloadType]
if !ok {
return errors.Errorf("unknown workload '%T'", e.workloadType)
return errors.Errorf("unknown workload '%T'", e.WorkloadType)
}

if totalTiDBCPU/baseCost.tidbToKVCPURatio < totalKVCPUQuota {
Expand Down Expand Up @@ -359,12 +445,12 @@ func (t *timeSeriesValues) advance(target time.Time) bool {

func getRUPerSec(ctx context.Context, sctx sessionctx.Context, exec sqlexec.RestrictedSQLExecutor, startTime, endTime string) (*timeSeriesValues, error) {
query := fmt.Sprintf("SELECT time, value FROM METRICS_SCHEMA.resource_manager_resource_unit where time >= '%s' and time <= '%s' ORDER BY time asc", startTime, endTime)
return getValuesFromMetrics(ctx, sctx, exec, query, "resource_manager_resource_unit")
return getValuesFromMetrics(ctx, sctx, exec, query)
}

func getComponentCPUUsagePerSec(ctx context.Context, sctx sessionctx.Context, exec sqlexec.RestrictedSQLExecutor, component, startTime, endTime string) (*timeSeriesValues, error) {
query := fmt.Sprintf("SELECT time, sum(value) FROM METRICS_SCHEMA.process_cpu_usage where time >= '%s' and time <= '%s' and job like '%%%s' GROUP BY time ORDER BY time asc", startTime, endTime, component)
return getValuesFromMetrics(ctx, sctx, exec, query, "process_cpu_usage")
return getValuesFromMetrics(ctx, sctx, exec, query)
}

func getNumberFromMetrics(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, query, metrics string) (float64, error) {
Expand All @@ -379,7 +465,7 @@ func getNumberFromMetrics(ctx context.Context, exec sqlexec.RestrictedSQLExecuto
return rows[0].GetFloat64(0), nil
}

func getValuesFromMetrics(ctx context.Context, sctx sessionctx.Context, exec sqlexec.RestrictedSQLExecutor, query, metrics string) (*timeSeriesValues, error) {
func getValuesFromMetrics(ctx context.Context, sctx sessionctx.Context, exec sqlexec.RestrictedSQLExecutor, query string) (*timeSeriesValues, error) {
rows, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, query)
if err != nil {
return nil, errors.Trace(err)
Expand Down

0 comments on commit e048e9e

Please sign in to comment.