Skip to content

Commit

Permalink
koordlet: pod/container PSI/CPI metircs cache to tsdb (#1336)
Browse files Browse the repository at this point in the history
Signed-off-by: zhushaohua <zshmmm@163.com>
  • Loading branch information
zshmmm committed Jun 13, 2023
1 parent 7798f3a commit f833fd9
Show file tree
Hide file tree
Showing 7 changed files with 232 additions and 1,410 deletions.
273 changes: 0 additions & 273 deletions pkg/koordlet/metriccache/metric_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,9 @@ type MetricCache interface {
GetNodeCPUInfo(param *QueryParam) (*NodeCPUInfo, error)
GetNodeLocalStorageInfo(param *QueryParam) (*NodeLocalStorageInfo, error)
GetBECPUResourceMetric(param *QueryParam) BECPUResourceQueryResult
GetContainerInterferenceMetric(metricName InterferenceMetricName, podUID *string, containerID *string, param *QueryParam) ContainerInterferenceQueryResult
GetPodInterferenceMetric(metricName InterferenceMetricName, podUID *string, param *QueryParam) PodInterferenceQueryResult
InsertNodeCPUInfo(info *NodeCPUInfo) error
InsertNodeLocalStorageInfo(info *NodeLocalStorageInfo) error
InsertBECPUResourceMetric(t time.Time, metric *BECPUResourceMetric) error
InsertContainerInterferenceMetrics(t time.Time, metric *ContainerInterferenceMetric) error
InsertPodInterferenceMetrics(t time.Time, metric *PodInterferenceMetric) error
}

type metricCache struct {
Expand Down Expand Up @@ -302,167 +298,6 @@ func (m *metricCache) GetContainerThrottledMetric(containerID *string, param *Qu
return result
}

func (m *metricCache) GetContainerInterferenceMetric(metricName InterferenceMetricName, podUID *string, containerID *string, param *QueryParam) ContainerInterferenceQueryResult {
result := ContainerInterferenceQueryResult{}
if param == nil || param.Start == nil || param.End == nil {
result.Error = fmt.Errorf("GetContainerInterferenceMetric %v query parameters are illegal %v", containerID, param)
return result
}
metrics, err := m.convertAndGetContainerInterferenceMetric(metricName, containerID, param.Start, param.End)
if err != nil {
result.Error = fmt.Errorf("GetContainerInterferenceMetric %v of %v failed, query params %v, error %v", metricName, containerID, param, err)
return result
}

aggregateFunc := getAggregateFunc(param.Aggregate)
metricValue, err := aggregateContainerInterferenceMetricByName(metricName, metrics, aggregateFunc)
if err != nil {
result.Error = fmt.Errorf("GetContainerInterferenceMetric %v aggregate failed, metrics %v, error %v",
containerID, metrics, err)
return result
}

count, err := count(metrics)
if err != nil {
result.Error = fmt.Errorf("GetContainerInterferenceMetric %v aggregate failed, metrics %v, error %v",
containerID, metrics, err)
return result
}

result.AggregateInfo = &AggregateInfo{MetricsCount: int64(count)}
result.Metric = &ContainerInterferenceMetric{
MetricName: metricName,
PodUID: *podUID,
ContainerID: *containerID,
MetricValue: metricValue,
}
return result
}

func (m *metricCache) GetPodInterferenceMetric(metricName InterferenceMetricName, podUID *string, param *QueryParam) PodInterferenceQueryResult {
result := PodInterferenceQueryResult{}
if param == nil || param.Start == nil || param.End == nil {
result.Error = fmt.Errorf("GetPodInterferenceMetric %v query parameters are illegal %v", podUID, param)
return result
}
metrics, err := m.convertAndGetPodInterferenceMetric(metricName, podUID, param.Start, param.End)
if err != nil {
result.Error = fmt.Errorf("GetPodInterferenceMetric %v of %v failed, query params %v, error %v", metricName, podUID, param, err)
return result
}

aggregateFunc := getAggregateFunc(param.Aggregate)
metricValue, err := aggregatePodInterferenceMetricByName(metricName, metrics, aggregateFunc)
if err != nil {
result.Error = fmt.Errorf("GetPodInterferenceMetric %v aggregate failed, metrics %v, error %v",
podUID, metrics, err)
return result
}

count, err := count(metrics)
if err != nil {
result.Error = fmt.Errorf("GetPodInterferenceMetric %v aggregate failed, metrics %v, error %v",
podUID, metrics, err)
return result
}

result.AggregateInfo = &AggregateInfo{MetricsCount: int64(count)}
result.Metric = &PodInterferenceMetric{
MetricName: metricName,
PodUID: *podUID,
MetricValue: metricValue,
}
return result
}

func aggregateContainerInterferenceMetricByName(metricName InterferenceMetricName, metrics interface{}, aggregateFunc AggregationFunc) (interface{}, error) {
switch metricName {
case MetricNameContainerCPI:
return aggregateCPI(metrics, aggregateFunc)
case MetricNameContainerPSI:
return aggregatePSI(metrics, aggregateFunc)
default:
return nil, fmt.Errorf("get unknown metric name")
}
}

func aggregatePodInterferenceMetricByName(metricName InterferenceMetricName, metrics interface{}, aggregateFunc AggregationFunc) (interface{}, error) {
switch metricName {
case MetricNamePodCPI:
return aggregateCPI(metrics, aggregateFunc)
case MetricNamePodPSI:
return aggregatePSI(metrics, aggregateFunc)
default:
return nil, fmt.Errorf("get unknown metric name")
}
}

func aggregateCPI(metrics interface{}, aggregateFunc AggregationFunc) (interface{}, error) {
cycles, err := aggregateFunc(metrics, AggregateParam{
ValueFieldName: "Cycles", TimeFieldName: "Timestamp"})
if err != nil {
return nil, err
}
instructions, err := aggregateFunc(metrics, AggregateParam{
ValueFieldName: "Instructions", TimeFieldName: "Timestamp"})
if err != nil {
return nil, err
}
metricValue := &CPIMetric{
Cycles: uint64(cycles),
Instructions: uint64(instructions),
}
return metricValue, nil
}

func aggregatePSI(metrics interface{}, aggregateFunc AggregationFunc) (interface{}, error) {
someCPUAvg10, err := aggregateFunc(metrics, AggregateParam{
ValueFieldName: "SomeCPUAvg10", TimeFieldName: "Timestamp"})
if err != nil {
return nil, err
}
someMemAvg10, err := aggregateFunc(metrics, AggregateParam{
ValueFieldName: "SomeMemAvg10", TimeFieldName: "Timestamp"})
if err != nil {
return nil, err
}
someIOAvg10, err := aggregateFunc(metrics, AggregateParam{
ValueFieldName: "SomeIOAvg10", TimeFieldName: "Timestamp"})
if err != nil {
return nil, err
}
fullCPUAvg10, err := aggregateFunc(metrics, AggregateParam{
ValueFieldName: "FullCPUAvg10", TimeFieldName: "Timestamp"})
if err != nil {
return nil, err
}
fullMemAvg10, err := aggregateFunc(metrics, AggregateParam{
ValueFieldName: "FullMemAvg10", TimeFieldName: "Timestamp"})
if err != nil {
return nil, err
}
fullIOAvg10, err := aggregateFunc(metrics, AggregateParam{
ValueFieldName: "FullIOAvg10", TimeFieldName: "Timestamp"})
if err != nil {
return nil, err
}
cpuFullSupported, err := fieldLastOfMetricListBool(metrics, AggregateParam{
ValueFieldName: "CPUFullSupported", TimeFieldName: "Timestamp"})
if err != nil {
return nil, err
}
metricValue := &PSIMetric{
SomeCPUAvg10: someCPUAvg10,
SomeMemAvg10: someMemAvg10,
SomeIOAvg10: someIOAvg10,
FullCPUAvg10: fullCPUAvg10,
FullMemAvg10: fullMemAvg10,
FullIOAvg10: fullIOAvg10,
CPUFullSupported: cpuFullSupported,
}
return metricValue, nil
}

func (m *metricCache) InsertBECPUResourceMetric(t time.Time, metric *BECPUResourceMetric) error {
dbItem := &beCPUResourceMetric{
CPUUsedCores: float64(metric.CPUUsed.MilliValue()) / 1000,
Expand Down Expand Up @@ -519,14 +354,6 @@ func (m *metricCache) InsertContainerThrottledMetrics(t time.Time, metric *Conta
return m.db.InsertContainerThrottledMetric(dbItem)
}

func (m *metricCache) InsertContainerInterferenceMetrics(t time.Time, metric *ContainerInterferenceMetric) error {
return m.convertAndInsertContainerInterferenceMetric(t, metric)
}

func (m *metricCache) InsertPodInterferenceMetrics(t time.Time, metric *PodInterferenceMetric) error {
return m.convertAndInsertPodInterferenceMetric(t, metric)
}

func (m *metricCache) aggregateGPUUsages(gpuResourceMetricsByTime [][]gpuResourceMetric, aggregateFunc AggregationFunc) ([]GPUMetric, error) {
if len(gpuResourceMetricsByTime) == 0 {
return nil, nil
Expand Down Expand Up @@ -635,103 +462,3 @@ type PSIMetric struct {

CPUFullSupported bool
}

func (m *metricCache) convertAndInsertContainerInterferenceMetric(t time.Time, metric *ContainerInterferenceMetric) error {
switch metric.MetricName {
case MetricNameContainerCPI:
dbItem := &containerCPIMetric{
PodUID: metric.PodUID,
ContainerID: metric.ContainerID,
Cycles: float64(metric.MetricValue.(*CPIMetric).Cycles),
Instructions: float64(metric.MetricValue.(*CPIMetric).Instructions),
Timestamp: t,
}
return m.db.InsertContainerCPIMetric(dbItem)
case MetricNameContainerPSI:
dbItem := &containerPSIMetric{
PodUID: metric.PodUID,
ContainerID: metric.ContainerID,
SomeCPUAvg10: metric.MetricValue.(*PSIMetric).SomeCPUAvg10,
SomeMemAvg10: metric.MetricValue.(*PSIMetric).SomeMemAvg10,
SomeIOAvg10: metric.MetricValue.(*PSIMetric).SomeIOAvg10,
FullCPUAvg10: metric.MetricValue.(*PSIMetric).FullCPUAvg10,
FullMemAvg10: metric.MetricValue.(*PSIMetric).FullMemAvg10,
FullIOAvg10: metric.MetricValue.(*PSIMetric).FullIOAvg10,
CPUFullSupported: metric.MetricValue.(*PSIMetric).CPUFullSupported,
Timestamp: t,
}
return m.db.InsertContainerPSIMetric(dbItem)
default:
return fmt.Errorf("get unknown metric name")
}
}

func (m *metricCache) convertAndInsertPodInterferenceMetric(t time.Time, metric *PodInterferenceMetric) error {
switch metric.MetricName {
case MetricNamePodPSI:
dbItem := &podPSIMetric{
PodUID: metric.PodUID,
SomeCPUAvg10: metric.MetricValue.(*PSIMetric).SomeCPUAvg10,
SomeMemAvg10: metric.MetricValue.(*PSIMetric).SomeMemAvg10,
SomeIOAvg10: metric.MetricValue.(*PSIMetric).SomeIOAvg10,
FullCPUAvg10: metric.MetricValue.(*PSIMetric).FullCPUAvg10,
FullMemAvg10: metric.MetricValue.(*PSIMetric).FullMemAvg10,
FullIOAvg10: metric.MetricValue.(*PSIMetric).FullIOAvg10,
CPUFullSupported: metric.MetricValue.(*PSIMetric).CPUFullSupported,
Timestamp: t,
}
return m.db.InsertPodPSIMetric(dbItem)
default:
return fmt.Errorf("get unknown metric name")
}
}

func (m *metricCache) convertAndGetContainerInterferenceMetric(metricName InterferenceMetricName, containerID *string, start, end *time.Time) (interface{}, error) {
switch metricName {
case MetricNameContainerCPI:
return m.db.GetContainerCPIMetric(containerID, start, end)
case MetricNameContainerPSI:
return m.db.GetContainerPSIMetric(containerID, start, end)
default:
return nil, fmt.Errorf("get unknown metric name")
}
}

type podCPIMetric struct {
PodUID string
Cycles float64
Instructions float64
Timestamp time.Time
}

func (m *metricCache) convertAndGetPodInterferenceMetric(metricName InterferenceMetricName, podUID *string, start, end *time.Time) (interface{}, error) {
switch metricName {
case MetricNamePodCPI:
// get container CPI and compute pod CPI
containerCPIMetrics, err := m.db.GetContainerCPIMetricByPodUid(podUID, start, end)
if err != nil {
return nil, err
}
if len(containerCPIMetrics) <= 0 {
return []podCPIMetric{}, nil
}
var sumCycles, sumInstructions float64
for _, containerCPI := range containerCPIMetrics {
sumCycles += containerCPI.Cycles
sumInstructions += containerCPI.Instructions
}
podMetric := podCPIMetric{
PodUID: *podUID,
Cycles: sumCycles,
Instructions: sumInstructions,
Timestamp: containerCPIMetrics[len(containerCPIMetrics)-1].Timestamp,
}
return []podCPIMetric{
podMetric,
}, nil
case MetricNamePodPSI:
return m.db.GetPodPSIMetric(podUID, start, end)
default:
return nil, fmt.Errorf("get unknown metric name")
}
}

0 comments on commit f833fd9

Please sign in to comment.