Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

util: add EMA cpu monitor #39526

Merged
merged 10 commits into from
Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 10 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,14 @@ def go_deps():
sum = "h1:ta993UF76GwbvJcIo3Y68y/M3WxlpEHPWIGDkJYwzJI=",
version = "v0.3.4",
)
go_repository(
name = "com_github_cloudfoundry_gosigar",
build_file_proto_mode = "disable",
importpath = "github.com/cloudfoundry/gosigar",
sum = "h1:T3MoGdugg1vdHn8Az7wDn7cZ4+QCjZph+eXf2CjSjo4=",
version = "v1.3.4",
)

go_repository(
name = "com_github_cloudykit_fastprinter",
build_file_proto_mode = "disable_global",
Expand Down Expand Up @@ -4438,8 +4446,8 @@ def go_deps():
name = "org_golang_x_sys",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/sys",
sum = "h1:ljd4t30dBnAvMZaQCevtY0xLLD0A+bRZXbgLMLU1F/A=",
version = "v0.2.0",
sum = "h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ=",
version = "v0.3.0",
)
go_repository(
name = "org_golang_x_term",
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
github.com/charithe/durationcheck v0.0.9
github.com/cheggaaa/pb/v3 v3.0.8
github.com/cheynewallace/tabby v1.1.1
github.com/cloudfoundry/gosigar v1.3.4
github.com/cockroachdb/errors v1.8.1
github.com/cockroachdb/pebble v0.0.0-20210719141320-8c3bd06debb5
github.com/coocood/freecache v1.2.1
Expand Down Expand Up @@ -110,7 +111,7 @@ require (
golang.org/x/net v0.2.0
golang.org/x/oauth2 v0.2.0
golang.org/x/sync v0.1.0
golang.org/x/sys v0.2.0
golang.org/x/sys v0.3.0
golang.org/x/term v0.2.0
golang.org/x/text v0.4.0
golang.org/x/time v0.2.0
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWR
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudfoundry/gosigar v1.3.4 h1:T3MoGdugg1vdHn8Az7wDn7cZ4+QCjZph+eXf2CjSjo4=
github.com/cloudfoundry/gosigar v1.3.4/go.mod h1:g9r7ETZ1tpvJCT9TpqxO53+5BUZiM2FDSFSENzjK5Z8=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
Expand Down Expand Up @@ -1314,8 +1316,8 @@ golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220909162455-aba9fc2a8ff2/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.2.0 h1:ljd4t30dBnAvMZaQCevtY0xLLD0A+bRZXbgLMLU1F/A=
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ=
golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.2.0 h1:z85xZCsEl7bi/KwbNADeBYoOP0++7W1ipu+aGnpwzRM=
Expand Down
5 changes: 4 additions & 1 deletion util/cgroup/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ go_library(

go_test(
name = "cgroup_test",
srcs = ["cgroup_mock_test.go"],
srcs = [
"cgroup_cpu_test.go",
"cgroup_mock_test.go",
],
embed = [":cgroup"],
flaky = True,
deps = ["@com_github_stretchr_testify//require"],
Expand Down
9 changes: 9 additions & 0 deletions util/cgroup/cgroup_cpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,12 @@ func getCgroupCPU(root string) (CPUUsage, error) {

return res, nil
}

// CPUShares returns the number of CPUs this cgroup can be expected to
// max out. If there's no limit, NumCPU is returned.
func (c CPUUsage) CPUShares() float64 {
if c.Period <= 0 || c.Quota <= 0 {
return float64(c.NumCPU)
hawkingrei marked this conversation as resolved.
Show resolved Hide resolved
}
return float64(c.Quota) / float64(c.Period)
}
9 changes: 0 additions & 9 deletions util/cgroup/cgroup_cpu_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,6 @@ import (
"strings"
)

// CPUShares returns the number of CPUs this cgroup can be expected to
// max out. If there's no limit, NumCPU is returned.
func (c CPUUsage) CPUShares() float64 {
if c.Period <= 0 || c.Quota <= 0 {
return float64(c.NumCPU)
}
return float64(c.Quota) / float64(c.Period)
}

// GetCgroupCPU returns the CPU usage and quota for the current cgroup.
func GetCgroupCPU() (CPUUsage, error) {
cpuusage, err := getCgroupCPU("/")
Expand Down
50 changes: 50 additions & 0 deletions util/cgroup/cgroup_cpu_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build linux

package cgroup

import (
"runtime"
"sync"
"testing"

"github.com/stretchr/testify/require"
)

func TestGetCgroupCPU(t *testing.T) {
exit := make(chan struct{})
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-exit:
return
default:
runtime.Gosched()
}
}
}()
}
cpu, err := GetCgroupCPU()
require.NoError(t, err)
require.NotZero(t, cpu.Period)
require.Less(t, int64(1), cpu.Period)
close(exit)
wg.Wait()
}
23 changes: 23 additions & 0 deletions util/cpu/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "cpu",
srcs = ["cpu.go"],
importpath = "github.com/pingcap/tidb/util/cpu",
visibility = ["//visibility:public"],
deps = [
"//util/cgroup",
"//util/mathutil",
"@com_github_cloudfoundry_gosigar//:gosigar",
"@com_github_pingcap_log//:log",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "cpu_test",
srcs = ["cpu_test.go"],
embed = [":cpu"],
deps = ["@com_github_stretchr_testify//require"],
)
109 changes: 109 additions & 0 deletions util/cpu/cpu.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cpu

import (
"os"
"sync"
"time"

"github.com/cloudfoundry/gosigar"
"github.com/pingcap/log"
"github.com/pingcap/tidb/util/cgroup"
"github.com/pingcap/tidb/util/mathutil"
"go.uber.org/atomic"
"go.uber.org/zap"
)

var cpuUsage atomic.Float64

// GetCPUUsage returns the cpu usage of the current process.
func GetCPUUsage() float64 {
return cpuUsage.Load()
}

// Observer is used to observe the cpu usage of the current process.
type Observer struct {
utime int64
stime int64
now int64
exit chan struct{}
Benjamin2037 marked this conversation as resolved.
Show resolved Hide resolved
cpu mathutil.ExponentialMovingAverage
wg sync.WaitGroup
}

// NewCPUObserver returns a cpu observer.
func NewCPUObserver() *Observer {
return &Observer{
exit: make(chan struct{}),
now: time.Now().UnixNano(),
cpu: *mathutil.NewExponentialMovingAverage(0.95, 10),
}
}

// Start starts the cpu observer.
func (c *Observer) Start() {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
c.wg.Add(1)
go func() {
defer c.wg.Done()
for {
select {
case <-ticker.C:
curr := c.observe()
c.cpu.Add(curr)
cpuUsage.Store(c.cpu.Get())
case <-c.exit:
return
}
}
}()
}

// Stop stops the cpu observer.
func (c *Observer) Stop() {
close(c.exit)
c.wg.Wait()
}

func (c *Observer) observe() float64 {
user, sys, err := getCPUTime()
if err != nil {
log.Error("getCPUTime", zap.Error(err))
}
cgroupCPU, _ := cgroup.GetCgroupCPU()
cpuShare := cgroupCPU.CPUShares()
now := time.Now().UnixNano()
dur := float64(now - c.now)
utime := user * 1e6
stime := sys * 1e6
urate := float64(utime-c.utime) / dur
srate := float64(stime-c.stime) / dur
c.now = now
c.utime = utime
c.stime = stime
return (srate + urate) / cpuShare
}

// getCPUTime returns the cumulative user/system time (in ms) since the process start.
func getCPUTime() (userTimeMillis, sysTimeMillis int64, err error) {
pid := os.Getpid()
cpuTime := sigar.ProcTime{}
if err := cpuTime.Get(pid); err != nil {
return 0, 0, err
}
return int64(cpuTime.User), int64(cpuTime.Sys), nil
}
51 changes: 51 additions & 0 deletions util/cpu/cpu_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cpu

import (
"runtime"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestCPUValue(t *testing.T) {
Observer := NewCPUObserver()
Observer.Start()
exit := make(chan struct{})
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-exit:
return
default:
runtime.Gosched()
}
}
}()
}
time.Sleep(30 * time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this too long for test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not the main problem. because this is a test only for this part. so this test cache can be used for a long time.

require.Greater(t, Observer.observe(), 0.0)
require.Less(t, Observer.observe(), 1.0)
Observer.Stop()
close(exit)
wg.Wait()
}