Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add stability status of region #180

Merged
merged 1 commit into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 13 additions & 4 deletions pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/advisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ import (
// todo:
// 1. Support dedicated without and with numa binding but non numa exclusive containers

// metric names for resource advisor
const (
metricRegionStatus = "region_status"
metricRegionOvershoot = "region_overshoot"
)

func init() {
provisionpolicy.RegisterInitializer(types.CPUProvisionPolicyCanonical, provisionpolicy.NewPolicyCanonical)
provisionpolicy.RegisterInitializer(types.CPUProvisionPolicyRama, provisionpolicy.NewPolicyRama)
Expand Down Expand Up @@ -219,12 +225,15 @@ func (cra *cpuResourceAdvisor) update() {

klog.Infof("[qosaware-cpu] region map: %v", general.ToString(cra.regionMap))

// assemble provision result from each region and notify cpu server
calculationResult, err := cra.assembleProvision()
// assemble provision result from each region
calculationResult, boundUpper, err := cra.assembleProvision()
if err != nil {
klog.Errorf("[qosaware-cpu] assemble provision failed: %v", err)
return
}
cra.updateRegionStatus(boundUpper)

// notify cpu server
select {
case cra.sendCh <- calculationResult:
general.Infof("notify cpu server: %+v", calculationResult)
Expand Down Expand Up @@ -405,9 +414,9 @@ func (cra *cpuResourceAdvisor) updateAdvisorEssentials() {
// assembleProvision generates internal calculation result.
// must make sure pool names from cpu provision following qrm definition;
// numa ID set as -1 means no numa-preference is needed.
func (cra *cpuResourceAdvisor) assembleProvision() (types.InternalCPUCalculationResult, error) {
func (cra *cpuResourceAdvisor) assembleProvision() (types.InternalCPUCalculationResult, bool, error) {
if cra.provisionAssembler == nil {
return types.InternalCPUCalculationResult{}, fmt.Errorf("no legal provision assembler")
return types.InternalCPUCalculationResult{}, false, fmt.Errorf("no legal provision assembler")
}

return cra.provisionAssembler.AssembleProvision()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/assembler/provisionassembler"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/region"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/machine"
)

Expand Down Expand Up @@ -269,3 +270,34 @@ func (cra *cpuResourceAdvisor) updateRegionHeadroom() {
_ = cra.metaCache.SetRegionInfo(regionName, regionInfo)
}
}

func (cra *cpuResourceAdvisor) updateRegionStatus(boundUpper bool) {
for regionName, r := range cra.regionMap {
regionInfo, ok := cra.metaCache.GetRegionInfo(regionName)
if !ok {
continue
}

status := r.GetStatus()
regionInfo.RegionStatus = status

// set bound upper
if boundUpper {
regionInfo.RegionStatus.BoundType = types.BoundUpper
}

_ = cra.metaCache.SetRegionInfo(regionName, regionInfo)

// emit metrics
period := cra.conf.SysAdvisorPluginsConfiguration.QoSAwarePluginConfiguration.SyncPeriod
basicTags := region.GetRegionBasicMetricTags(r)

_ = cra.emitter.StoreInt64(metricRegionStatus, int64(period.Seconds()), metrics.MetricTypeNameCount, basicTags...)

tags := basicTags
for k, v := range r.GetStatus().OvershootStatus {
tags = append(tags, metrics.MetricTag{Key: k, Val: string(v)})
}
_ = cra.emitter.StoreInt64(metricRegionOvershoot, int64(period.Seconds()), metrics.MetricTypeNameCount, tags...)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func newTestCPUResourceAdvisor(t *testing.T, pods []*v1.Pod, conf *config.Config
err = metaServer.SetServiceProfilingManager(spd.NewDummyServiceProfilingManager(profiles))
require.NoError(t, err)

cra := NewCPUResourceAdvisor(conf, struct{}{}, metaCache, metaServer, nil)
cra := NewCPUResourceAdvisor(conf, struct{}{}, metaCache, metaServer, metrics.DummyMetrics{})
require.NotNil(t, cra)

return cra, metaCache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
// Advisor data elements are shared ONLY by assemblers as pointer to avoid rebuild in advisor,
// and NOT supposed to be used by other components.
type ProvisionAssembler interface {
AssembleProvision() (types.InternalCPUCalculationResult, error)
AssembleProvision() (types.InternalCPUCalculationResult, bool, error)
}

type InitFunc func(conf *config.Configuration, extraConf interface{}, regionMap *map[string]region.QoSRegion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewProvisionAssemblerCommon(conf *config.Configuration, _ interface{}, regi
}
}

func (pa *ProvisionAssemblerCommon) AssembleProvision() (types.InternalCPUCalculationResult, error) {
func (pa *ProvisionAssemblerCommon) AssembleProvision() (types.InternalCPUCalculationResult, bool, error) {
enableReclaim := pa.conf.GetDynamicConfiguration().EnableReclaim

calculationResult := types.InternalCPUCalculationResult{
Expand All @@ -81,7 +81,7 @@ func (pa *ProvisionAssemblerCommon) AssembleProvision() (types.InternalCPUCalcul
for _, r := range *pa.regionMap {
controlKnob, err := r.GetProvision()
if err != nil {
return types.InternalCPUCalculationResult{}, err
return types.InternalCPUCalculationResult{}, false, err
}

switch r.Type() {
Expand Down Expand Up @@ -126,7 +126,7 @@ func (pa *ProvisionAssemblerCommon) AssembleProvision() (types.InternalCPUCalcul
if shares+isolationUppers > shareAndIsolatedPoolAvailable {
shareAndIsolatePoolSizes = general.MergeMapInt(sharePoolSizes, isolationLowerSizes)
}
regulatePoolSizes(shareAndIsolatePoolSizes, shareAndIsolatedPoolAvailable, enableReclaim)
boundUpper := regulatePoolSizes(shareAndIsolatePoolSizes, shareAndIsolatedPoolAvailable, enableReclaim)

// fill in regulated share-and-isolated pool entries
for poolName, poolSize := range shareAndIsolatePoolSizes {
Expand All @@ -136,7 +136,7 @@ func (pa *ProvisionAssemblerCommon) AssembleProvision() (types.InternalCPUCalcul
reclaimPoolSizeOfNonBindingNumas := shareAndIsolatedPoolAvailable - general.SumUpMapValues(shareAndIsolatePoolSizes) + pa.getNumasReservedForReclaim(*pa.nonBindingNumas)
calculationResult.SetPoolEntry(state.PoolNameReclaim, cpuadvisor.FakedNUMAID, reclaimPoolSizeOfNonBindingNumas)

return calculationResult, nil
return calculationResult, boundUpper, nil
}

func (pa *ProvisionAssemblerCommon) getNumasReservedForReclaim(numas machine.CPUSet) int {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,30 @@ func getNumasAvailableResource(numaAvailable map[int]int, numas machine.CPUSet)

// regulatePoolSizes modifies pool size map to legal values, taking total available
// resource and config such as enable reclaim into account. should be compatible with
// any case and not return error.
func regulatePoolSizes(poolSizes map[string]int, available int, enableReclaim bool) {
// any case and not return error. return true if reach resource upper bound.
func regulatePoolSizes(poolSizes map[string]int, available int, enableReclaim bool) bool {
targetSum := general.SumUpMapValues(poolSizes)
boundUpper := false

// use all available resource for pools when reclaim is disabled
if !enableReclaim || targetSum > available {
// use up all available resource for pools in this case
targetSum = available
}

// use all available resource when reaching resource upper bound
if targetSum > available {
targetSum = available
boundUpper = true
}

if err := normalizePoolSizes(poolSizes, targetSum); err != nil {
// all pools share available resource as fallback if normalization failed
for k := range poolSizes {
poolSizes[k] = available
}
}

return boundUpper
}

func normalizePoolSizes(poolSizes map[string]int, targetSum int) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package region

import (
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/machine"
)

Expand Down Expand Up @@ -64,4 +65,19 @@ type QoSRegion interface {
// GetHeadRoomPolicy returns headroom policy for this region,
// the first is policy with top priority, while the second is the policy that is in-use currently
GetHeadRoomPolicy() (types.CPUHeadroomPolicyName, types.CPUHeadroomPolicyName)

// GetStatus returns region status
GetStatus() types.RegionStatus
}

// GetRegionBasicMetricTags returns metric tag slice of basic region info
func GetRegionBasicMetricTags(r QoSRegion) []metrics.MetricTag {
ret := []metrics.MetricTag{
{Key: "region_name", Val: r.Name()},
{Key: "region_type", Val: string(r.Type())},
{Key: "owner_pool_name", Val: r.OwnerPoolName()},
{Key: "binding_numas", Val: r.GetBindingNumas().String()},
{Key: "bound_type", Val: string(r.GetStatus().BoundType)},
}
return ret
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,19 @@ type QoSRegionBase struct {
name string
ownerPoolName string
regionType types.QoSRegionType
regionStatus types.RegionStatus

types.ResourceEssentials
types.ControlEssentials

// bindingNumas records numas assigned to this region
bindingNumas machine.CPUSet
// podSet records current pod and containers in region keyed by pod uid and container name
podSet types.PodSet
// containerTopologyAwareAssignment changes dynamically by adding container
containerTopologyAwareAssignment types.TopologyAwareAssignment
// indicatorCurrentGetters stores metrics getters for indicators interested in
indicatorCurrentGetters map[string]types.IndicatorCurrentGetter

// provisionPolicies for comparing and merging different provision policy results,
// the former has higher priority; provisionPolicyNameInUse indicates the provision
Expand Down Expand Up @@ -284,6 +289,10 @@ func (r *QoSRegionBase) GetHeadRoomPolicy() (policyTopPriority types.CPUHeadroom
return
}

func (r *QoSRegionBase) GetStatus() types.RegionStatus {
return r.regionStatus
}

// getRegionNameFromMetaCache returns region name owned by container from metacache,
// to restore region info after restart. If numaID is specified, binding numas of the
// region will be checked, otherwise only one region should be owned by container.
Expand Down Expand Up @@ -360,7 +369,7 @@ func (r *QoSRegionBase) initHeadroomPolicy(conf *config.Configuration, extraConf
}

// getIndicators gets indicators by given map of indicator name to the getter of current value
func (r *QoSRegionBase) getIndicators(indicatorCurrentGetterMap map[string]types.IndicatorCurrentGetter) (types.Indicator, error) {
func (r *QoSRegionBase) getIndicators(indicatorCurrentGetters map[string]types.IndicatorCurrentGetter) (types.Indicator, error) {
ctx := context.Background()
indicatorTargetConfig, ok := r.conf.RegionIndicatorTargetConfiguration[r.regionType]
if !ok {
Expand All @@ -371,7 +380,7 @@ func (r *QoSRegionBase) getIndicators(indicatorCurrentGetterMap map[string]types
for _, indicator := range indicatorTargetConfig {
indicatorName := indicator.Name
defaultTarget := indicator.Target
indicatorCurrentGetter, ok := indicatorCurrentGetterMap[indicatorName]
indicatorCurrentGetter, ok := indicatorCurrentGetters[indicatorName]
if !ok {
continue
}
Expand Down Expand Up @@ -446,3 +455,32 @@ func (r *QoSRegionBase) getPodIndicatorTarget(ctx context.Context, podUID string

return &indicatorTarget, nil
}

// updateStatus updates region status based on resource and control essentials
func (r *QoSRegionBase) updateStatus() {
// reset entries
r.regionStatus.OvershootStatus = make(map[string]types.OvershootType)
r.regionStatus.BoundType = types.BoundUnknown

for indicatorName := range r.indicatorCurrentGetters {
r.regionStatus.OvershootStatus[indicatorName] = types.OvershootUnknown
}

// fill in overshoot entry
for indicatorName, indicator := range r.ControlEssentials.Indicators {
if indicator.Current > indicator.Target {
r.regionStatus.OvershootStatus[indicatorName] = types.OvershootTrue
} else {
r.regionStatus.OvershootStatus[indicatorName] = types.OvershootFalse
}
}

// fill in bound entry
if v, ok := r.ControlEssentials.ControlKnobs[types.ControlKnobNonReclaimedCPUSize]; ok {
if v.Value <= r.ResourceEssentials.ResourceLowerBound {
r.regionStatus.BoundType = types.BoundLower
} else {
r.regionStatus.BoundType = types.BoundNone
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (

type QoSRegionDedicatedNumaExclusive struct {
*QoSRegionBase
indicatorCurrentGetterMap map[string]types.IndicatorCurrentGetter
}

// NewQoSRegionDedicatedNumaExclusive returns a region instance for dedicated cores
Expand All @@ -55,7 +54,7 @@ func NewQoSRegionDedicatedNumaExclusive(ci *types.ContainerInfo, conf *config.Co
QoSRegionBase: NewQoSRegionBase(regionName, ci.OwnerPoolName, types.QoSRegionTypeDedicatedNumaExclusive, conf, extraConf, metaReader, metaServer, emitter),
}
r.bindingNumas = machine.NewCPUSet(numaID)
r.indicatorCurrentGetterMap = map[string]types.IndicatorCurrentGetter{
r.indicatorCurrentGetters = map[string]types.IndicatorCurrentGetter{
string(workloadapis.TargetIndicatorNameCPI): r.getPodCPICurrent,
}

Expand All @@ -66,25 +65,27 @@ func (r *QoSRegionDedicatedNumaExclusive) TryUpdateProvision() {
r.Lock()
defer r.Unlock()

controlEssentials := types.ControlEssentials{
r.ControlEssentials = types.ControlEssentials{
ControlKnobs: r.getControlKnobs(),
ReclaimOverlap: true,
}

indicators, err := r.getIndicators(r.indicatorCurrentGetterMap)
indicators, err := r.getIndicators(r.indicatorCurrentGetters)
if err != nil {
general.Errorf("get indicators failed: %v", err)
} else {
controlEssentials.Indicators = indicators
r.ControlEssentials.Indicators = indicators
}

r.updateStatus()

for _, internal := range r.provisionPolicies {
internal.updateStatus = types.PolicyUpdateFailed

// set essentials for policy and regulator
internal.policy.SetPodSet(r.podSet)
internal.policy.SetBindingNumas(r.bindingNumas)
internal.policy.SetEssentials(r.ResourceEssentials, controlEssentials)
internal.policy.SetEssentials(r.ResourceEssentials, r.ControlEssentials)

// run an episode of policy update
if err := internal.policy.Update(); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,8 @@ import (
"github.com/kubewharf/katalyst-core/pkg/util/metric"
)

// todo: support rama policy, with cpu schedwait avg as indicator

type QoSRegionShare struct {
*QoSRegionBase
indicatorCurrentGetterMap map[string]types.IndicatorCurrentGetter
}

// NewQoSRegionShare returns a region instance for shared pool
Expand All @@ -51,7 +48,7 @@ func NewQoSRegionShare(ci *types.ContainerInfo, conf *config.Configuration, extr
QoSRegionBase: NewQoSRegionBase(regionName, ci.OwnerPoolName, types.QoSRegionTypeShare, conf, extraConf, metaReader, metaServer, emitter),
}

r.indicatorCurrentGetterMap = map[string]types.IndicatorCurrentGetter{
r.indicatorCurrentGetters = map[string]types.IndicatorCurrentGetter{
string(v1alpha1.TargetIndicatorNameCPUSchedWait): r.getPodSchedWaitCurrent,
}
return r
Expand All @@ -61,24 +58,26 @@ func (r *QoSRegionShare) TryUpdateProvision() {
r.Lock()
defer r.Unlock()

controlEssentials := types.ControlEssentials{
r.ControlEssentials = types.ControlEssentials{
ControlKnobs: r.getControlKnobs(),
ReclaimOverlap: false,
}

indicators, err := r.getIndicators(r.indicatorCurrentGetterMap)
indicators, err := r.getIndicators(r.indicatorCurrentGetters)
if err != nil {
general.Errorf("get indicators failed: %v", err)
} else {
controlEssentials.Indicators = indicators
r.ControlEssentials.Indicators = indicators
}

r.updateStatus()

for _, internal := range r.provisionPolicies {
internal.updateStatus = types.PolicyUpdateFailed

// set essentials for policy and regulator
internal.policy.SetPodSet(r.podSet)
internal.policy.SetEssentials(r.ResourceEssentials, controlEssentials)
internal.policy.SetEssentials(r.ResourceEssentials, r.ControlEssentials)

// run an episode of policy update
if err := internal.policy.Update(); err != nil {
Expand Down