From 7442bc57840d9f068e7ac83870f1e2e187b23a82 Mon Sep 17 00:00:00 2001 From: Joseph Date: Wed, 27 Jul 2022 10:35:58 +0800 Subject: [PATCH] koordlet: fix build error on macOS caused by GPU (#413) Signed-off-by: Joseph --- pkg/koordlet/metricsadvisor/collector_gpu.go | 242 ---------------- .../metricsadvisor/collector_gpu_linux.go | 265 ++++++++++++++++++ ...pu_test.go => collector_gpu_linux_test.go} | 5 +- .../collector_gpu_unsupported.go | 25 ++ 4 files changed, 293 insertions(+), 244 deletions(-) create mode 100644 pkg/koordlet/metricsadvisor/collector_gpu_linux.go rename pkg/koordlet/metricsadvisor/{collector_gpu_test.go => collector_gpu_linux_test.go} (99%) create mode 100644 pkg/koordlet/metricsadvisor/collector_gpu_unsupported.go diff --git a/pkg/koordlet/metricsadvisor/collector_gpu.go b/pkg/koordlet/metricsadvisor/collector_gpu.go index bf5bfc15f..fdb720aba 100644 --- a/pkg/koordlet/metricsadvisor/collector_gpu.go +++ b/pkg/koordlet/metricsadvisor/collector_gpu.go @@ -17,20 +17,9 @@ limitations under the License. package metricsadvisor import ( - "errors" - "fmt" - "sort" - "sync" - "time" - - "github.com/NVIDIA/go-nvml/pkg/nvml" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" - "k8s.io/klog/v2" - "github.com/koordinator-sh/koordinator/pkg/features" "github.com/koordinator-sh/koordinator/pkg/koordlet/metriccache" - "github.com/koordinator-sh/koordinator/pkg/util" ) type GPUDeviceManager interface { @@ -61,237 +50,6 @@ func (d *dummyDeviceManager) shutdown() error { return nil } -type gpuDeviceManager struct { - sync.RWMutex - deviceCount int - devices []*device - collectTime time.Time - processesMetrics map[uint32][]*rawGPUMetric -} - -type rawGPUMetric struct { - SMUtil uint32 // current utilization rate for the device - MemoryUsed uint64 -} - -type device struct { - Minor int32 // index starting from 0 - DeviceUUID string - MemoryTotal uint64 - Device nvml.Device -} - -// initGPUDeviceManager will not retry if init fails, -func initGPUDeviceManager() GPUDeviceManager { - if !features.DefaultKoordletFeatureGate.Enabled(features.Accelerators) { - return &dummyDeviceManager{} - } - if ret := nvml.Init(); ret != nvml.SUCCESS { - if ret == nvml.ERROR_LIBRARY_NOT_FOUND { - klog.Warning("nvml init failed, library not found") - return &dummyDeviceManager{} - } - klog.Warningf("nvml init failed, return %s", nvml.ErrorString(ret)) - return &dummyDeviceManager{} - } - manager := &gpuDeviceManager{} - if err := manager.initGPUData(); err != nil { - klog.Warningf("nvml init gpu data, error %s", err) - manager.shutdown() - return &dummyDeviceManager{} - } - - return manager -} - -func (g *gpuDeviceManager) shutdown() error { - rt := nvml.Shutdown() - if rt != nvml.SUCCESS { - return fmt.Errorf("nvml shutdown error, code: %s", nvml.ErrorString(rt)) - } - return nil -} - -func (g *gpuDeviceManager) initGPUData() error { - count, ret := nvml.DeviceGetCount() - if ret != nvml.SUCCESS { - return fmt.Errorf("unable to get device count: %v", nvml.ErrorString(ret)) - } - if count == 0 { - return errors.New("no gpu device found") - } - devices := make([]*device, count) - for deviceIndex := 0; deviceIndex < count; deviceIndex++ { - gpudevice, ret := nvml.DeviceGetHandleByIndex(deviceIndex) - if ret != nvml.SUCCESS { - return fmt.Errorf("unable to get device at index %d: %v", deviceIndex, nvml.ErrorString(ret)) - } - - uuid, ret := gpudevice.GetUUID() - if ret != nvml.SUCCESS { - return fmt.Errorf("unable to get device uuid: %v", nvml.ErrorString(ret)) - } - - minor, ret := gpudevice.GetMinorNumber() - if ret != nvml.SUCCESS { - return fmt.Errorf("unable to get device minor number: %v", nvml.ErrorString(ret)) - } - - memory, ret := gpudevice.GetMemoryInfo() - if ret != nvml.SUCCESS { - return fmt.Errorf("unable to get device memory info: %v", nvml.ErrorString(ret)) - } - devices[deviceIndex] = &device{ - DeviceUUID: uuid, - Minor: int32(minor), - MemoryTotal: memory.Total, - Device: gpudevice, - } - } - - g.Lock() - defer g.Unlock() - g.deviceCount = count - g.devices = devices - return nil -} - -func (g *gpuDeviceManager) getNodeGPUUsage() []metriccache.GPUMetric { - g.RLock() - defer g.RUnlock() - tmp := make([]rawGPUMetric, g.deviceCount) - for i := 0; i < g.deviceCount; i++ { - tmp[i] = rawGPUMetric{} - } - for _, p := range g.processesMetrics { - for idx := 0; idx < g.deviceCount; idx++ { - if m := p[uint32(idx)]; m != nil { - tmp[idx].SMUtil += p[uint32(idx)].SMUtil - tmp[idx].MemoryUsed += p[uint32(idx)].MemoryUsed - } - } - } - rtn := make([]metriccache.GPUMetric, g.deviceCount) - for i := 0; i < g.deviceCount; i++ { - rtn[i] = metriccache.GPUMetric{ - DeviceUUID: g.devices[i].DeviceUUID, - Minor: g.devices[i].Minor, - SMUtil: tmp[i].SMUtil, - MemoryUsed: *resource.NewQuantity(int64(tmp[i].MemoryUsed), resource.BinarySI), - MemoryTotal: *resource.NewQuantity(int64(g.devices[i].MemoryTotal), resource.BinarySI), - } - } - return rtn -} - -func (g *gpuDeviceManager) getTotalGPUUsageOfPIDs(pids []uint64) []metriccache.GPUMetric { - g.RLock() - defer g.RUnlock() - tmp := make(map[int]*rawGPUMetric) - for _, pid := range pids { - if metrics, exist := g.processesMetrics[uint32(pid)]; exist { - for idx, metric := range metrics { - if metric == nil { - continue - } - if _, found := tmp[idx]; !found { - tmp[idx] = &rawGPUMetric{} - } - tmp[idx].MemoryUsed += metric.MemoryUsed - tmp[idx].SMUtil += metric.SMUtil - } - } - } - if len(tmp) == 0 { - return nil - } - rtn := make([]metriccache.GPUMetric, 0) - for i := 0; i < g.deviceCount; i++ { - if value, ok := tmp[i]; ok { - rtn = append(rtn, metriccache.GPUMetric{ - DeviceUUID: g.devices[i].DeviceUUID, - Minor: g.devices[i].Minor, - SMUtil: value.SMUtil, - MemoryUsed: *resource.NewQuantity(int64(value.MemoryUsed), resource.BinarySI), - MemoryTotal: *resource.NewQuantity(int64(g.devices[i].MemoryTotal), resource.BinarySI), - }) - } - } - return rtn -} - -func (g *gpuDeviceManager) getPodGPUUsage(podParentDir string, cs []corev1.ContainerStatus) ([]metriccache.GPUMetric, error) { - runningContainer := make([]corev1.ContainerStatus, 0) - for _, c := range cs { - if c.State.Running == nil { - klog.V(5).Infof("non-running container %s", c.ContainerID) - continue - } - runningContainer = append(runningContainer, c) - } - if len(runningContainer) == 0 { - return nil, nil - } - pids, err := util.GetPIDsInPod(podParentDir, cs) - if err != nil { - return nil, fmt.Errorf("failed to get pid, error: %v", err) - } - return g.getTotalGPUUsageOfPIDs(pids), nil -} - -func (g *gpuDeviceManager) getContainerGPUUsage(podParentDir string, c *corev1.ContainerStatus) ([]metriccache.GPUMetric, error) { - if c.State.Running == nil { - klog.V(5).Infof("non-running container %s", c.ContainerID) - return nil, nil - } - currentPIDs, err := util.GetPIDsInContainer(podParentDir, c) - if err != nil { - return nil, fmt.Errorf("failed to get pid, error: %v", err) - } - return g.getTotalGPUUsageOfPIDs(currentPIDs), nil -} - -func (g *gpuDeviceManager) collectGPUUsage() { - processesGPUUsages := make(map[uint32][]*rawGPUMetric) - for deviceIndex, gpuDevice := range g.devices { - processesInfos, ret := gpuDevice.Device.GetComputeRunningProcesses() - if ret != nvml.SUCCESS { - klog.Warningf("Unable to get process info for device at index %d: %v", deviceIndex, nvml.ErrorString(ret)) - continue - } - processUtilizations, ret := gpuDevice.Device.GetProcessUtilization(1024) - if ret != nvml.SUCCESS { - klog.Warningf("Unable to get process utilization for device at index %d: %v", deviceIndex, nvml.ErrorString(ret)) - continue - } - - // Sort by pid. - sort.Slice(processesInfos, func(i, j int) bool { - return processesInfos[i].Pid < processesInfos[j].Pid - }) - sort.Slice(processUtilizations, func(i, j int) bool { - return processUtilizations[i].Pid < processUtilizations[j].Pid - }) - - klog.V(3).Infof("Found %d processes on device %d\n", len(processesInfos), deviceIndex) - for idx, info := range processesInfos { - if _, ok := processesGPUUsages[info.Pid]; !ok { - // pid not exist. - // init processes gpu metric array. - processesGPUUsages[info.Pid] = make([]*rawGPUMetric, g.deviceCount) - } - processesGPUUsages[info.Pid][deviceIndex] = &rawGPUMetric{ - SMUtil: processUtilizations[idx].SmUtil, - MemoryUsed: info.UsedGpuMemory, - } - } - } - g.Lock() - g.processesMetrics = processesGPUUsages - g.collectTime = time.Now() - g.Unlock() -} - func (c *collector) collectGPUUsage() { c.context.gpuDeviceManager.collectGPUUsage() } diff --git a/pkg/koordlet/metricsadvisor/collector_gpu_linux.go b/pkg/koordlet/metricsadvisor/collector_gpu_linux.go new file mode 100644 index 000000000..45fa78cc5 --- /dev/null +++ b/pkg/koordlet/metricsadvisor/collector_gpu_linux.go @@ -0,0 +1,265 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metricsadvisor + +import ( + "errors" + "fmt" + "sort" + "sync" + "time" + + "github.com/NVIDIA/go-nvml/pkg/nvml" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/klog/v2" + + "github.com/koordinator-sh/koordinator/pkg/features" + "github.com/koordinator-sh/koordinator/pkg/koordlet/metriccache" + "github.com/koordinator-sh/koordinator/pkg/util" +) + +type gpuDeviceManager struct { + sync.RWMutex + deviceCount int + devices []*device + collectTime time.Time + processesMetrics map[uint32][]*rawGPUMetric +} + +type rawGPUMetric struct { + SMUtil uint32 // current utilization rate for the device + MemoryUsed uint64 +} + +type device struct { + Minor int32 // index starting from 0 + DeviceUUID string + MemoryTotal uint64 + Device nvml.Device +} + +// initGPUDeviceManager will not retry if init fails, +func initGPUDeviceManager() GPUDeviceManager { + if !features.DefaultKoordletFeatureGate.Enabled(features.Accelerators) { + return &dummyDeviceManager{} + } + if ret := nvml.Init(); ret != nvml.SUCCESS { + if ret == nvml.ERROR_LIBRARY_NOT_FOUND { + klog.Warning("nvml init failed, library not found") + return &dummyDeviceManager{} + } + klog.Warningf("nvml init failed, return %s", nvml.ErrorString(ret)) + return &dummyDeviceManager{} + } + manager := &gpuDeviceManager{} + if err := manager.initGPUData(); err != nil { + klog.Warningf("nvml init gpu data, error %s", err) + manager.shutdown() + return &dummyDeviceManager{} + } + + return manager +} + +func (g *gpuDeviceManager) shutdown() error { + rt := nvml.Shutdown() + if rt != nvml.SUCCESS { + return fmt.Errorf("nvml shutdown error, code: %s", nvml.ErrorString(rt)) + } + return nil +} + +func (g *gpuDeviceManager) initGPUData() error { + count, ret := nvml.DeviceGetCount() + if ret != nvml.SUCCESS { + return fmt.Errorf("unable to get device count: %v", nvml.ErrorString(ret)) + } + if count == 0 { + return errors.New("no gpu device found") + } + devices := make([]*device, count) + for deviceIndex := 0; deviceIndex < count; deviceIndex++ { + gpudevice, ret := nvml.DeviceGetHandleByIndex(deviceIndex) + if ret != nvml.SUCCESS { + return fmt.Errorf("unable to get device at index %d: %v", deviceIndex, nvml.ErrorString(ret)) + } + + uuid, ret := gpudevice.GetUUID() + if ret != nvml.SUCCESS { + return fmt.Errorf("unable to get device uuid: %v", nvml.ErrorString(ret)) + } + + minor, ret := gpudevice.GetMinorNumber() + if ret != nvml.SUCCESS { + return fmt.Errorf("unable to get device minor number: %v", nvml.ErrorString(ret)) + } + + memory, ret := gpudevice.GetMemoryInfo() + if ret != nvml.SUCCESS { + return fmt.Errorf("unable to get device memory info: %v", nvml.ErrorString(ret)) + } + devices[deviceIndex] = &device{ + DeviceUUID: uuid, + Minor: int32(minor), + MemoryTotal: memory.Total, + Device: gpudevice, + } + } + + g.Lock() + defer g.Unlock() + g.deviceCount = count + g.devices = devices + return nil +} + +func (g *gpuDeviceManager) getNodeGPUUsage() []metriccache.GPUMetric { + g.RLock() + defer g.RUnlock() + tmp := make([]rawGPUMetric, g.deviceCount) + for i := 0; i < g.deviceCount; i++ { + tmp[i] = rawGPUMetric{} + } + for _, p := range g.processesMetrics { + for idx := 0; idx < g.deviceCount; idx++ { + if m := p[uint32(idx)]; m != nil { + tmp[idx].SMUtil += p[uint32(idx)].SMUtil + tmp[idx].MemoryUsed += p[uint32(idx)].MemoryUsed + } + } + } + rtn := make([]metriccache.GPUMetric, g.deviceCount) + for i := 0; i < g.deviceCount; i++ { + rtn[i] = metriccache.GPUMetric{ + DeviceUUID: g.devices[i].DeviceUUID, + Minor: g.devices[i].Minor, + SMUtil: tmp[i].SMUtil, + MemoryUsed: *resource.NewQuantity(int64(tmp[i].MemoryUsed), resource.BinarySI), + MemoryTotal: *resource.NewQuantity(int64(g.devices[i].MemoryTotal), resource.BinarySI), + } + } + return rtn +} + +func (g *gpuDeviceManager) getTotalGPUUsageOfPIDs(pids []uint64) []metriccache.GPUMetric { + g.RLock() + defer g.RUnlock() + tmp := make(map[int]*rawGPUMetric) + for _, pid := range pids { + if metrics, exist := g.processesMetrics[uint32(pid)]; exist { + for idx, metric := range metrics { + if metric == nil { + continue + } + if _, found := tmp[idx]; !found { + tmp[idx] = &rawGPUMetric{} + } + tmp[idx].MemoryUsed += metric.MemoryUsed + tmp[idx].SMUtil += metric.SMUtil + } + } + } + if len(tmp) == 0 { + return nil + } + rtn := make([]metriccache.GPUMetric, 0) + for i := 0; i < g.deviceCount; i++ { + if value, ok := tmp[i]; ok { + rtn = append(rtn, metriccache.GPUMetric{ + DeviceUUID: g.devices[i].DeviceUUID, + Minor: g.devices[i].Minor, + SMUtil: value.SMUtil, + MemoryUsed: *resource.NewQuantity(int64(value.MemoryUsed), resource.BinarySI), + MemoryTotal: *resource.NewQuantity(int64(g.devices[i].MemoryTotal), resource.BinarySI), + }) + } + } + return rtn +} + +func (g *gpuDeviceManager) getPodGPUUsage(podParentDir string, cs []corev1.ContainerStatus) ([]metriccache.GPUMetric, error) { + runningContainer := make([]corev1.ContainerStatus, 0) + for _, c := range cs { + if c.State.Running == nil { + klog.V(5).Infof("non-running container %s", c.ContainerID) + continue + } + runningContainer = append(runningContainer, c) + } + if len(runningContainer) == 0 { + return nil, nil + } + pids, err := util.GetPIDsInPod(podParentDir, cs) + if err != nil { + return nil, fmt.Errorf("failed to get pid, error: %v", err) + } + return g.getTotalGPUUsageOfPIDs(pids), nil +} + +func (g *gpuDeviceManager) getContainerGPUUsage(podParentDir string, c *corev1.ContainerStatus) ([]metriccache.GPUMetric, error) { + if c.State.Running == nil { + klog.V(5).Infof("non-running container %s", c.ContainerID) + return nil, nil + } + currentPIDs, err := util.GetPIDsInContainer(podParentDir, c) + if err != nil { + return nil, fmt.Errorf("failed to get pid, error: %v", err) + } + return g.getTotalGPUUsageOfPIDs(currentPIDs), nil +} + +func (g *gpuDeviceManager) collectGPUUsage() { + processesGPUUsages := make(map[uint32][]*rawGPUMetric) + for deviceIndex, gpuDevice := range g.devices { + processesInfos, ret := gpuDevice.Device.GetComputeRunningProcesses() + if ret != nvml.SUCCESS { + klog.Warningf("Unable to get process info for device at index %d: %v", deviceIndex, nvml.ErrorString(ret)) + continue + } + processUtilizations, ret := gpuDevice.Device.GetProcessUtilization(1024) + if ret != nvml.SUCCESS { + klog.Warningf("Unable to get process utilization for device at index %d: %v", deviceIndex, nvml.ErrorString(ret)) + continue + } + + // Sort by pid. + sort.Slice(processesInfos, func(i, j int) bool { + return processesInfos[i].Pid < processesInfos[j].Pid + }) + sort.Slice(processUtilizations, func(i, j int) bool { + return processUtilizations[i].Pid < processUtilizations[j].Pid + }) + + klog.V(3).Infof("Found %d processes on device %d\n", len(processesInfos), deviceIndex) + for idx, info := range processesInfos { + if _, ok := processesGPUUsages[info.Pid]; !ok { + // pid not exist. + // init processes gpu metric array. + processesGPUUsages[info.Pid] = make([]*rawGPUMetric, g.deviceCount) + } + processesGPUUsages[info.Pid][deviceIndex] = &rawGPUMetric{ + SMUtil: processUtilizations[idx].SmUtil, + MemoryUsed: info.UsedGpuMemory, + } + } + } + g.Lock() + g.processesMetrics = processesGPUUsages + g.collectTime = time.Now() + g.Unlock() +} diff --git a/pkg/koordlet/metricsadvisor/collector_gpu_test.go b/pkg/koordlet/metricsadvisor/collector_gpu_linux_test.go similarity index 99% rename from pkg/koordlet/metricsadvisor/collector_gpu_test.go rename to pkg/koordlet/metricsadvisor/collector_gpu_linux_test.go index 5fb0437f7..cfee07301 100644 --- a/pkg/koordlet/metricsadvisor/collector_gpu_test.go +++ b/pkg/koordlet/metricsadvisor/collector_gpu_linux_test.go @@ -24,11 +24,12 @@ import ( "testing" "time" - "github.com/koordinator-sh/koordinator/pkg/koordlet/metriccache" - "github.com/koordinator-sh/koordinator/pkg/util/system" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/koordinator-sh/koordinator/pkg/koordlet/metriccache" + "github.com/koordinator-sh/koordinator/pkg/util/system" ) func Test_gpuUsageDetailRecord_GetNodeGPUUsage(t *testing.T) { diff --git a/pkg/koordlet/metricsadvisor/collector_gpu_unsupported.go b/pkg/koordlet/metricsadvisor/collector_gpu_unsupported.go new file mode 100644 index 000000000..17312ca65 --- /dev/null +++ b/pkg/koordlet/metricsadvisor/collector_gpu_unsupported.go @@ -0,0 +1,25 @@ +//go:build !linux +// +build !linux + +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metricsadvisor + +// initGPUDeviceManager will not retry if init fails, +func initGPUDeviceManager() GPUDeviceManager { + return &dummyDeviceManager{} +}