Skip to content

Commit

Permalink
koordlet: fix can not collect psi metrics
Browse files Browse the repository at this point in the history
Signed-off-by: lucming <2876757716@qq.com>
  • Loading branch information
lucming committed Jun 21, 2023
1 parent 382f7a0 commit 6c5ebad
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 36 deletions.
34 changes: 0 additions & 34 deletions pkg/koordlet/koordlet.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,13 @@ limitations under the License.
package agent

import (
"context"
"fmt"
"os"
"time"

topologyclientset "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/generated/clientset/versioned"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apiruntime "k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -91,37 +88,6 @@ func NewDaemon(config *config.Configuration) (Daemon, error) {
}

statesInformer := statesinformerimpl.NewStatesInformer(config.StatesInformerConf, kubeClient, crdClient, topologyClient, metricCache, nodeName, schedulingClient)

// setup cgroup path formatter from cgroup driver type
var detectCgroupDriver system.CgroupDriverType
if pollErr := wait.PollImmediate(time.Second*10, time.Minute, func() (bool, error) {
driver := system.GuessCgroupDriverFromCgroupName()
if driver.Validate() {
detectCgroupDriver = driver
return true, nil
}
klog.Infof("can not detect cgroup driver from 'kubepods' cgroup name")

node, err := kubeClient.CoreV1().Nodes().Get(context.TODO(), nodeName, v1.GetOptions{})
if err != nil || node == nil {
klog.Errorf("Can't get node, err: %v", err)
return false, nil
}

port := int(node.Status.DaemonEndpoints.KubeletEndpoint.Port)
if driver, err := system.GuessCgroupDriverFromKubeletPort(port); err == nil && driver.Validate() {
detectCgroupDriver = driver
return true, nil
} else {
klog.Errorf("guess kubelet cgroup driver failed, retry...: %v", err)
return false, nil
}
}); pollErr != nil {
return nil, fmt.Errorf("can not detect kubelet cgroup driver: %v", pollErr)
}
system.SetupCgroupPathFormatter(detectCgroupDriver)
klog.Infof("Node %s use '%s' as cgroup driver", nodeName, string(detectCgroupDriver))

collectorService := metricsadvisor.NewMetricAdvisor(config.CollectorConf, statesInformer, metricCache)

evictVersion, err := util.FindSupportedEvictVersion(kubeClient)
Expand Down
14 changes: 13 additions & 1 deletion pkg/koordlet/util/system/cgroup_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,19 @@ var cgroupPathFormatterInCgroupfs = formatter{
}

// CgroupPathFormatter uses the Systemd cgroup driver by default.
var CgroupPathFormatter = cgroupPathFormatterInSystemd
var CgroupPathFormatter = GetCgroupFormatter()

func GetCgroupPathFormatter(driver CgroupDriverType) formatter {
switch driver {
case Systemd:
return cgroupPathFormatterInSystemd
case Cgroupfs:
return cgroupPathFormatterInCgroupfs
default:
klog.Warningf("cgroup driver formatter not supported: '%s'", string(driver))
return cgroupPathFormatterInSystemd
}
}

func SetupCgroupPathFormatter(driver CgroupDriverType) {
switch driver {
Expand Down
50 changes: 49 additions & 1 deletion pkg/koordlet/util/system/cgroup_driver_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,24 @@ package system
import (
"bufio"
"bytes"
"context"
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"

"golang.org/x/sys/unix"

"github.com/opencontainers/runc/libcontainer/userns"

"github.com/spf13/pflag"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client/config"
)

const (
Expand All @@ -46,6 +51,49 @@ var (
isUnified bool
)

func GetCgroupFormatter() formatter {
klog.Infoln("start to get cgroup driver formatter...")
// setup cgroup path formatter from cgroup driver type
var detectCgroupDriver CgroupDriverType
nodeName := os.Getenv("NODE_NAME")
if pollErr := wait.PollImmediate(time.Second*10, time.Minute, func() (bool, error) {
driver := GuessCgroupDriverFromCgroupName()
if driver.Validate() {
detectCgroupDriver = driver
return true, nil
}
klog.Infof("can not detect cgroup driver from 'kubepods' cgroup name")

cfg, err := config.GetConfig()
if err != nil {
klog.Errorf("failed to get rest config.err=%v", err)
return false, nil
}
kubeClient := clientset.NewForConfigOrDie(cfg)
node, err := kubeClient.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
if err != nil || node == nil {
klog.Errorf("Can't get node, err: %v", err)
return false, nil
}

port := int(node.Status.DaemonEndpoints.KubeletEndpoint.Port)
if driver, err := GuessCgroupDriverFromKubeletPort(port); err == nil && driver.Validate() {
detectCgroupDriver = driver
return true, nil
} else {
klog.Errorf("guess kubelet cgroup driver failed, retry...: %v", err)
return false, nil
}
}); pollErr != nil {
klog.Errorf("can not detect kubelet cgroup driver: %v", pollErr)
return cgroupPathFormatterInSystemd
}

klog.Infof("Node %s use '%s' as cgroup driver", nodeName, string(detectCgroupDriver))

return GetCgroupPathFormatter(detectCgroupDriver)
}

func GuessCgroupDriverFromCgroupName() CgroupDriverType {
systemdKubepodDirExists := FileExists(filepath.Join(GetRootCgroupSubfsDir(CgroupCPUDir), KubeRootNameSystemd))
cgroupfsKubepodDirExists := FileExists(filepath.Join(GetRootCgroupSubfsDir(CgroupCPUDir), KubeRootNameCgroupfs))
Expand Down
65 changes: 65 additions & 0 deletions pkg/koordlet/util/system/cgroup_driver_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,68 @@ func Test_GuessCgroupDriverFromCgroupName(t *testing.T) {
})
}
}

func TestGetCgroupFormatter(t *testing.T) {
tests := []struct {
name string
want formatter
preHandle func(cgroupRootDir string)
isCgroupV2 bool
}{
{
name: "neither kubepods nor kubepods.slice exists",
want: cgroupPathFormatterInSystemd,
preHandle: func(cgroupRootDir string) {},
},
{
name: "only have kubepods dir",
want: cgroupPathFormatterInCgroupfs,
preHandle: func(cgroupRootDir string) {
os.MkdirAll(filepath.Join(cgroupRootDir, "cpu", "kubepods"), 0755)
},
},
{
name: "only have kubepods.slice dir",
want: cgroupPathFormatterInSystemd,
preHandle: func(cgroupRootDir string) {
os.MkdirAll(filepath.Join(cgroupRootDir, "cpu", "kubepods.slice"), 0755)
},
},
{
name: "both kubepods and kubepods.slice exists",
want: cgroupPathFormatterInSystemd,
preHandle: func(cgroupRootDir string) {
os.MkdirAll(filepath.Join(cgroupRootDir, "cpu", "kubepods"), 0755)
os.MkdirAll(filepath.Join(cgroupRootDir, "cpu", "kubepods.slice"), 0755)
},
},
{
name: "only have kubepods dir in cgroupv2",
want: cgroupPathFormatterInCgroupfs,
preHandle: func(cgroupRootDir string) {
os.MkdirAll(filepath.Join(cgroupRootDir, "kubepods"), 0755)
},
isCgroupV2: true,
},
{
name: "only have kubepods.slice dir in cgroupv2",
want: cgroupPathFormatterInSystemd,
preHandle: func(cgroupRootDir string) {
os.MkdirAll(filepath.Join(cgroupRootDir, "kubepods.slice"), 0755)
},
isCgroupV2: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
helper := NewFileTestUtil(t)
defer helper.Cleanup()
helper.SetCgroupsV2(tt.isCgroupV2)
tmpCgroupRoot := helper.TempDir
tt.preHandle(tmpCgroupRoot)

got := GetCgroupFormatter()
assert.Equal(t, tt.want.ParentDir, got.ParentDir)
})
}
}
4 changes: 4 additions & 0 deletions pkg/koordlet/util/system/cgroup_driver_unsupported.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,7 @@ func GuessCgroupDriverFromKubeletPort(int) (CgroupDriverType, error) {
func IsUsingCgroupsV2() bool {
return false
}

func GetCgroupFormatter() formatter {
return cgroupPathFormatterInSystemd
}

0 comments on commit 6c5ebad

Please sign in to comment.