Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

koordlet: fix build error caused by GPU #413

Merged
merged 1 commit into from
Jul 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 0 additions & 242 deletions pkg/koordlet/metricsadvisor/collector_gpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,9 @@ limitations under the License.
package metricsadvisor

import (
"errors"
"fmt"
"sort"
"sync"
"time"

"github.com/NVIDIA/go-nvml/pkg/nvml"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/klog/v2"

"github.com/koordinator-sh/koordinator/pkg/features"
"github.com/koordinator-sh/koordinator/pkg/koordlet/metriccache"
"github.com/koordinator-sh/koordinator/pkg/util"
)

type GPUDeviceManager interface {
Expand Down Expand Up @@ -61,237 +50,6 @@ func (d *dummyDeviceManager) shutdown() error {
return nil
}

type gpuDeviceManager struct {
sync.RWMutex
deviceCount int
devices []*device
collectTime time.Time
processesMetrics map[uint32][]*rawGPUMetric
}

type rawGPUMetric struct {
SMUtil uint32 // current utilization rate for the device
MemoryUsed uint64
}

type device struct {
Minor int32 // index starting from 0
DeviceUUID string
MemoryTotal uint64
Device nvml.Device
}

// initGPUDeviceManager will not retry if init fails,
func initGPUDeviceManager() GPUDeviceManager {
if !features.DefaultKoordletFeatureGate.Enabled(features.Accelerators) {
return &dummyDeviceManager{}
}
if ret := nvml.Init(); ret != nvml.SUCCESS {
if ret == nvml.ERROR_LIBRARY_NOT_FOUND {
klog.Warning("nvml init failed, library not found")
return &dummyDeviceManager{}
}
klog.Warningf("nvml init failed, return %s", nvml.ErrorString(ret))
return &dummyDeviceManager{}
}
manager := &gpuDeviceManager{}
if err := manager.initGPUData(); err != nil {
klog.Warningf("nvml init gpu data, error %s", err)
manager.shutdown()
return &dummyDeviceManager{}
}

return manager
}

func (g *gpuDeviceManager) shutdown() error {
rt := nvml.Shutdown()
if rt != nvml.SUCCESS {
return fmt.Errorf("nvml shutdown error, code: %s", nvml.ErrorString(rt))
}
return nil
}

func (g *gpuDeviceManager) initGPUData() error {
count, ret := nvml.DeviceGetCount()
if ret != nvml.SUCCESS {
return fmt.Errorf("unable to get device count: %v", nvml.ErrorString(ret))
}
if count == 0 {
return errors.New("no gpu device found")
}
devices := make([]*device, count)
for deviceIndex := 0; deviceIndex < count; deviceIndex++ {
gpudevice, ret := nvml.DeviceGetHandleByIndex(deviceIndex)
if ret != nvml.SUCCESS {
return fmt.Errorf("unable to get device at index %d: %v", deviceIndex, nvml.ErrorString(ret))
}

uuid, ret := gpudevice.GetUUID()
if ret != nvml.SUCCESS {
return fmt.Errorf("unable to get device uuid: %v", nvml.ErrorString(ret))
}

minor, ret := gpudevice.GetMinorNumber()
if ret != nvml.SUCCESS {
return fmt.Errorf("unable to get device minor number: %v", nvml.ErrorString(ret))
}

memory, ret := gpudevice.GetMemoryInfo()
if ret != nvml.SUCCESS {
return fmt.Errorf("unable to get device memory info: %v", nvml.ErrorString(ret))
}
devices[deviceIndex] = &device{
DeviceUUID: uuid,
Minor: int32(minor),
MemoryTotal: memory.Total,
Device: gpudevice,
}
}

g.Lock()
defer g.Unlock()
g.deviceCount = count
g.devices = devices
return nil
}

func (g *gpuDeviceManager) getNodeGPUUsage() []metriccache.GPUMetric {
g.RLock()
defer g.RUnlock()
tmp := make([]rawGPUMetric, g.deviceCount)
for i := 0; i < g.deviceCount; i++ {
tmp[i] = rawGPUMetric{}
}
for _, p := range g.processesMetrics {
for idx := 0; idx < g.deviceCount; idx++ {
if m := p[uint32(idx)]; m != nil {
tmp[idx].SMUtil += p[uint32(idx)].SMUtil
tmp[idx].MemoryUsed += p[uint32(idx)].MemoryUsed
}
}
}
rtn := make([]metriccache.GPUMetric, g.deviceCount)
for i := 0; i < g.deviceCount; i++ {
rtn[i] = metriccache.GPUMetric{
DeviceUUID: g.devices[i].DeviceUUID,
Minor: g.devices[i].Minor,
SMUtil: tmp[i].SMUtil,
MemoryUsed: *resource.NewQuantity(int64(tmp[i].MemoryUsed), resource.BinarySI),
MemoryTotal: *resource.NewQuantity(int64(g.devices[i].MemoryTotal), resource.BinarySI),
}
}
return rtn
}

func (g *gpuDeviceManager) getTotalGPUUsageOfPIDs(pids []uint64) []metriccache.GPUMetric {
g.RLock()
defer g.RUnlock()
tmp := make(map[int]*rawGPUMetric)
for _, pid := range pids {
if metrics, exist := g.processesMetrics[uint32(pid)]; exist {
for idx, metric := range metrics {
if metric == nil {
continue
}
if _, found := tmp[idx]; !found {
tmp[idx] = &rawGPUMetric{}
}
tmp[idx].MemoryUsed += metric.MemoryUsed
tmp[idx].SMUtil += metric.SMUtil
}
}
}
if len(tmp) == 0 {
return nil
}
rtn := make([]metriccache.GPUMetric, 0)
for i := 0; i < g.deviceCount; i++ {
if value, ok := tmp[i]; ok {
rtn = append(rtn, metriccache.GPUMetric{
DeviceUUID: g.devices[i].DeviceUUID,
Minor: g.devices[i].Minor,
SMUtil: value.SMUtil,
MemoryUsed: *resource.NewQuantity(int64(value.MemoryUsed), resource.BinarySI),
MemoryTotal: *resource.NewQuantity(int64(g.devices[i].MemoryTotal), resource.BinarySI),
})
}
}
return rtn
}

func (g *gpuDeviceManager) getPodGPUUsage(podParentDir string, cs []corev1.ContainerStatus) ([]metriccache.GPUMetric, error) {
runningContainer := make([]corev1.ContainerStatus, 0)
for _, c := range cs {
if c.State.Running == nil {
klog.V(5).Infof("non-running container %s", c.ContainerID)
continue
}
runningContainer = append(runningContainer, c)
}
if len(runningContainer) == 0 {
return nil, nil
}
pids, err := util.GetPIDsInPod(podParentDir, cs)
if err != nil {
return nil, fmt.Errorf("failed to get pid, error: %v", err)
}
return g.getTotalGPUUsageOfPIDs(pids), nil
}

func (g *gpuDeviceManager) getContainerGPUUsage(podParentDir string, c *corev1.ContainerStatus) ([]metriccache.GPUMetric, error) {
if c.State.Running == nil {
klog.V(5).Infof("non-running container %s", c.ContainerID)
return nil, nil
}
currentPIDs, err := util.GetPIDsInContainer(podParentDir, c)
if err != nil {
return nil, fmt.Errorf("failed to get pid, error: %v", err)
}
return g.getTotalGPUUsageOfPIDs(currentPIDs), nil
}

func (g *gpuDeviceManager) collectGPUUsage() {
processesGPUUsages := make(map[uint32][]*rawGPUMetric)
for deviceIndex, gpuDevice := range g.devices {
processesInfos, ret := gpuDevice.Device.GetComputeRunningProcesses()
if ret != nvml.SUCCESS {
klog.Warningf("Unable to get process info for device at index %d: %v", deviceIndex, nvml.ErrorString(ret))
continue
}
processUtilizations, ret := gpuDevice.Device.GetProcessUtilization(1024)
if ret != nvml.SUCCESS {
klog.Warningf("Unable to get process utilization for device at index %d: %v", deviceIndex, nvml.ErrorString(ret))
continue
}

// Sort by pid.
sort.Slice(processesInfos, func(i, j int) bool {
return processesInfos[i].Pid < processesInfos[j].Pid
})
sort.Slice(processUtilizations, func(i, j int) bool {
return processUtilizations[i].Pid < processUtilizations[j].Pid
})

klog.V(3).Infof("Found %d processes on device %d\n", len(processesInfos), deviceIndex)
for idx, info := range processesInfos {
if _, ok := processesGPUUsages[info.Pid]; !ok {
// pid not exist.
// init processes gpu metric array.
processesGPUUsages[info.Pid] = make([]*rawGPUMetric, g.deviceCount)
}
processesGPUUsages[info.Pid][deviceIndex] = &rawGPUMetric{
SMUtil: processUtilizations[idx].SmUtil,
MemoryUsed: info.UsedGpuMemory,
}
}
}
g.Lock()
g.processesMetrics = processesGPUUsages
g.collectTime = time.Now()
g.Unlock()
}

func (c *collector) collectGPUUsage() {
c.context.gpuDeviceManager.collectGPUUsage()
}
Loading