Skip to content

Commit

Permalink
feat(sysadvisor): add stability status of region
Browse files Browse the repository at this point in the history
  • Loading branch information
sun-yuliang committed Aug 3, 2023
1 parent 883ecbc commit 04ee57f
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 28 deletions.
17 changes: 13 additions & 4 deletions pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/advisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ import (
// todo:
// 1. Support dedicated without and with numa binding but non numa exclusive containers

// metric names for resource advisor
const (
metricRegionStatus = "region_status"
metricRegionOvershoot = "region_overshoot"
)

func init() {
provisionpolicy.RegisterInitializer(types.CPUProvisionPolicyCanonical, provisionpolicy.NewPolicyCanonical)
provisionpolicy.RegisterInitializer(types.CPUProvisionPolicyRama, provisionpolicy.NewPolicyRama)
Expand Down Expand Up @@ -219,12 +225,15 @@ func (cra *cpuResourceAdvisor) update() {

klog.Infof("[qosaware-cpu] region map: %v", general.ToString(cra.regionMap))

// assemble provision result from each region and notify cpu server
calculationResult, err := cra.assembleProvision()
// assemble provision result from each region
calculationResult, boundUpper, err := cra.assembleProvision()
if err != nil {
klog.Errorf("[qosaware-cpu] assemble provision failed: %v", err)
return
}
cra.updateRegionStatus(boundUpper)

// notify cpu server
select {
case cra.sendCh <- calculationResult:
general.Infof("notify cpu server: %+v", calculationResult)
Expand Down Expand Up @@ -405,9 +414,9 @@ func (cra *cpuResourceAdvisor) updateAdvisorEssentials() {
// assembleProvision generates internal calculation result.
// must make sure pool names from cpu provision following qrm definition;
// numa ID set as -1 means no numa-preference is needed.
func (cra *cpuResourceAdvisor) assembleProvision() (types.InternalCPUCalculationResult, error) {
func (cra *cpuResourceAdvisor) assembleProvision() (types.InternalCPUCalculationResult, bool, error) {
if cra.provisionAssembler == nil {
return types.InternalCPUCalculationResult{}, fmt.Errorf("no legal provision assembler")
return types.InternalCPUCalculationResult{}, false, fmt.Errorf("no legal provision assembler")
}

return cra.provisionAssembler.AssembleProvision()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/assembler/provisionassembler"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/plugin/qosaware/resource/cpu/region"
"github.com/kubewharf/katalyst-core/pkg/agent/sysadvisor/types"
"github.com/kubewharf/katalyst-core/pkg/metrics"
"github.com/kubewharf/katalyst-core/pkg/util/machine"
)

Expand Down Expand Up @@ -269,3 +270,34 @@ func (cra *cpuResourceAdvisor) updateRegionHeadroom() {
_ = cra.metaCache.SetRegionInfo(regionName, regionInfo)
}
}

func (cra *cpuResourceAdvisor) updateRegionStatus(boundUpper bool) {
for regionName, r := range cra.regionMap {
regionInfo, ok := cra.metaCache.GetRegionInfo(regionName)
if !ok {
continue
}

status := r.GetStatus()
regionInfo.RegionStatus = status

// set bound upper
if boundUpper {
regionInfo.RegionStatus.BoundType = types.BoundUpper
}

_ = cra.metaCache.SetRegionInfo(regionName, regionInfo)

// emit metrics
period := cra.conf.SysAdvisorPluginsConfiguration.QoSAwarePluginConfiguration.SyncPeriod
basicTags := region.GetRegionBasicMetricTags(r)

_ = cra.emitter.StoreInt64(metricRegionStatus, int64(period.Seconds()), metrics.MetricTypeNameCount, basicTags...)

tags := basicTags
for k, v := range r.GetStatus().OvershootStatus {
tags = append(tags, metrics.MetricTag{Key: k, Val: string(v)})
}
_ = cra.emitter.StoreInt64(metricRegionOvershoot, int64(period.Seconds()), metrics.MetricTypeNameCount, tags...)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
// Advisor data elements are shared ONLY by assemblers as pointer to avoid rebuild in advisor,
// and NOT supposed to be used by other components.
type ProvisionAssembler interface {
AssembleProvision() (types.InternalCPUCalculationResult, error)
AssembleProvision() (types.InternalCPUCalculationResult, bool, error)
}

type InitFunc func(conf *config.Configuration, extraConf interface{}, regionMap *map[string]region.QoSRegion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewProvisionAssemblerCommon(conf *config.Configuration, _ interface{}, regi
}
}

func (pa *ProvisionAssemblerCommon) AssembleProvision() (types.InternalCPUCalculationResult, error) {
func (pa *ProvisionAssemblerCommon) AssembleProvision() (types.InternalCPUCalculationResult, bool, error) {
enableReclaim := pa.conf.GetDynamicConfiguration().EnableReclaim

calculationResult := types.InternalCPUCalculationResult{
Expand All @@ -81,7 +81,7 @@ func (pa *ProvisionAssemblerCommon) AssembleProvision() (types.InternalCPUCalcul
for _, r := range *pa.regionMap {
controlKnob, err := r.GetProvision()
if err != nil {
return types.InternalCPUCalculationResult{}, err
return types.InternalCPUCalculationResult{}, false, err
}

switch r.Type() {
Expand Down Expand Up @@ -126,7 +126,7 @@ func (pa *ProvisionAssemblerCommon) AssembleProvision() (types.InternalCPUCalcul
if shares+isolationUppers > shareAndIsolatedPoolAvailable {
shareAndIsolatePoolSizes = general.MergeMapInt(sharePoolSizes, isolationLowerSizes)
}
regulatePoolSizes(shareAndIsolatePoolSizes, shareAndIsolatedPoolAvailable, enableReclaim)
boundUpper := regulatePoolSizes(shareAndIsolatePoolSizes, shareAndIsolatedPoolAvailable, enableReclaim)

// fill in regulated share-and-isolated pool entries
for poolName, poolSize := range shareAndIsolatePoolSizes {
Expand All @@ -136,7 +136,7 @@ func (pa *ProvisionAssemblerCommon) AssembleProvision() (types.InternalCPUCalcul
reclaimPoolSizeOfNonBindingNumas := shareAndIsolatedPoolAvailable - general.SumUpMapValues(shareAndIsolatePoolSizes) + pa.getNumasReservedForReclaim(*pa.nonBindingNumas)
calculationResult.SetPoolEntry(state.PoolNameReclaim, cpuadvisor.FakedNUMAID, reclaimPoolSizeOfNonBindingNumas)

return calculationResult, nil
return calculationResult, boundUpper, nil
}

func (pa *ProvisionAssemblerCommon) getNumasReservedForReclaim(numas machine.CPUSet) int {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,30 @@ func getNumasAvailableResource(numaAvailable map[int]int, numas machine.CPUSet)

// regulatePoolSizes modifies pool size map to legal values, taking total available
// resource and config such as enable reclaim into account. should be compatible with
// any case and not return error.
func regulatePoolSizes(poolSizes map[string]int, available int, enableReclaim bool) {
// any case and not return error. return true if reach resource upper bound.
func regulatePoolSizes(poolSizes map[string]int, available int, enableReclaim bool) bool {
targetSum := general.SumUpMapValues(poolSizes)
boundUpper := false

// use all available resource for pools when reclaim is disabled
if !enableReclaim || targetSum > available {
// use up all available resource for pools in this case
targetSum = available
}

// use all available resource when reaching resource upper bound
if targetSum > available {
targetSum = available
boundUpper = true
}

if err := normalizePoolSizes(poolSizes, targetSum); err != nil {
// all pools share available resource as fallback if normalization failed
for k := range poolSizes {
poolSizes[k] = available
}
}

return boundUpper
}

func normalizePoolSizes(poolSizes map[string]int, targetSum int) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,7 @@ type QoSRegion interface {
// GetHeadRoomPolicy returns headroom policy for this region,
// the first is policy with top priority, while the second is the policy that is in-use currently
GetHeadRoomPolicy() (types.CPUHeadroomPolicyName, types.CPUHeadroomPolicyName)

// GetStatus returns region status
GetStatus() types.RegionStatus
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,19 @@ type QoSRegionBase struct {
name string
ownerPoolName string
regionType types.QoSRegionType
regionStatus types.RegionStatus

types.ResourceEssentials
types.ControlEssentials

// bindingNumas records numas assigned to this region
bindingNumas machine.CPUSet
// podSet records current pod and containers in region keyed by pod uid and container name
podSet types.PodSet
// containerTopologyAwareAssignment changes dynamically by adding container
containerTopologyAwareAssignment types.TopologyAwareAssignment
// indicatorCurrentGetters stores metrics getters for indicators interested in
indicatorCurrentGetters map[string]types.IndicatorCurrentGetter

// provisionPolicies for comparing and merging different provision policy results,
// the former has higher priority; provisionPolicyNameInUse indicates the provision
Expand Down Expand Up @@ -284,6 +289,10 @@ func (r *QoSRegionBase) GetHeadRoomPolicy() (policyTopPriority types.CPUHeadroom
return
}

func (r *QoSRegionBase) GetStatus() types.RegionStatus {
return r.regionStatus
}

// getRegionNameFromMetaCache returns region name owned by container from metacache,
// to restore region info after restart. If numaID is specified, binding numas of the
// region will be checked, otherwise only one region should be owned by container.
Expand Down Expand Up @@ -360,7 +369,7 @@ func (r *QoSRegionBase) initHeadroomPolicy(conf *config.Configuration, extraConf
}

// getIndicators gets indicators by given map of indicator name to the getter of current value
func (r *QoSRegionBase) getIndicators(indicatorCurrentGetterMap map[string]types.IndicatorCurrentGetter) (types.Indicator, error) {
func (r *QoSRegionBase) getIndicators(indicatorCurrentGetters map[string]types.IndicatorCurrentGetter) (types.Indicator, error) {
ctx := context.Background()
indicatorTargetConfig, ok := r.conf.RegionIndicatorTargetConfiguration[r.regionType]
if !ok {
Expand All @@ -371,7 +380,7 @@ func (r *QoSRegionBase) getIndicators(indicatorCurrentGetterMap map[string]types
for _, indicator := range indicatorTargetConfig {
indicatorName := indicator.Name
defaultTarget := indicator.Target
indicatorCurrentGetter, ok := indicatorCurrentGetterMap[indicatorName]
indicatorCurrentGetter, ok := indicatorCurrentGetters[indicatorName]
if !ok {
continue
}
Expand Down Expand Up @@ -446,3 +455,44 @@ func (r *QoSRegionBase) getPodIndicatorTarget(ctx context.Context, podUID string

return &indicatorTarget, nil
}

// updateStatus updates region status based on resource and control essentials
func (r *QoSRegionBase) updateStatus() {
// reset entries
r.regionStatus.OvershootStatus = make(map[string]types.OvershootType)
r.regionStatus.BoundType = types.BoundUnknown

for indicatorName := range r.indicatorCurrentGetters {
r.regionStatus.OvershootStatus[indicatorName] = types.OvershootUnknown
}

// fill in overshoot entry
for indicatorName, indicator := range r.ControlEssentials.Indicators {
if indicator.Current > indicator.Target {
r.regionStatus.OvershootStatus[indicatorName] = types.OvershootTrue
} else {
r.regionStatus.OvershootStatus[indicatorName] = types.OvershootFalse
}
}

// fill in bound entry
if v, ok := r.ControlEssentials.ControlKnobs[types.ControlKnobNonReclaimedCPUSize]; ok {
if v.Value <= r.ResourceEssentials.ResourceLowerBound {
r.regionStatus.BoundType = types.BoundLower
} else {
r.regionStatus.BoundType = types.BoundNone
}
}
}

// GetRegionBasicMetricTags returns metric tag slice of basic region info
func GetRegionBasicMetricTags(r QoSRegion) []metrics.MetricTag {
ret := []metrics.MetricTag{
{Key: "region_name", Val: r.Name()},
{Key: "region_type", Val: string(r.Type())},
{Key: "owner_pool_name", Val: r.OwnerPoolName()},
{Key: "binding_numas", Val: r.GetBindingNumas().String()},
{Key: "bound_type", Val: string(r.GetStatus().BoundType)},
}
return ret
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (

type QoSRegionDedicatedNumaExclusive struct {
*QoSRegionBase
indicatorCurrentGetterMap map[string]types.IndicatorCurrentGetter
}

// NewQoSRegionDedicatedNumaExclusive returns a region instance for dedicated cores
Expand All @@ -55,7 +54,7 @@ func NewQoSRegionDedicatedNumaExclusive(ci *types.ContainerInfo, conf *config.Co
QoSRegionBase: NewQoSRegionBase(regionName, ci.OwnerPoolName, types.QoSRegionTypeDedicatedNumaExclusive, conf, extraConf, metaReader, metaServer, emitter),
}
r.bindingNumas = machine.NewCPUSet(numaID)
r.indicatorCurrentGetterMap = map[string]types.IndicatorCurrentGetter{
r.indicatorCurrentGetters = map[string]types.IndicatorCurrentGetter{
string(workloadapis.TargetIndicatorNameCPI): r.getPodCPICurrent,
}

Expand All @@ -66,25 +65,27 @@ func (r *QoSRegionDedicatedNumaExclusive) TryUpdateProvision() {
r.Lock()
defer r.Unlock()

controlEssentials := types.ControlEssentials{
r.ControlEssentials = types.ControlEssentials{
ControlKnobs: r.getControlKnobs(),
ReclaimOverlap: true,
}

indicators, err := r.getIndicators(r.indicatorCurrentGetterMap)
indicators, err := r.getIndicators(r.indicatorCurrentGetters)
if err != nil {
general.Errorf("get indicators failed: %v", err)
} else {
controlEssentials.Indicators = indicators
r.ControlEssentials.Indicators = indicators
}

r.updateStatus()

for _, internal := range r.provisionPolicies {
internal.updateStatus = types.PolicyUpdateFailed

// set essentials for policy and regulator
internal.policy.SetPodSet(r.podSet)
internal.policy.SetBindingNumas(r.bindingNumas)
internal.policy.SetEssentials(r.ResourceEssentials, controlEssentials)
internal.policy.SetEssentials(r.ResourceEssentials, r.ControlEssentials)

// run an episode of policy update
if err := internal.policy.Update(); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,8 @@ import (
"github.com/kubewharf/katalyst-core/pkg/util/metric"
)

// todo: support rama policy, with cpu schedwait avg as indicator

type QoSRegionShare struct {
*QoSRegionBase
indicatorCurrentGetterMap map[string]types.IndicatorCurrentGetter
}

// NewQoSRegionShare returns a region instance for shared pool
Expand All @@ -51,7 +48,7 @@ func NewQoSRegionShare(ci *types.ContainerInfo, conf *config.Configuration, extr
QoSRegionBase: NewQoSRegionBase(regionName, ci.OwnerPoolName, types.QoSRegionTypeShare, conf, extraConf, metaReader, metaServer, emitter),
}

r.indicatorCurrentGetterMap = map[string]types.IndicatorCurrentGetter{
r.indicatorCurrentGetters = map[string]types.IndicatorCurrentGetter{
string(v1alpha1.TargetIndicatorNameCPUSchedWait): r.getPodSchedWaitCurrent,
}
return r
Expand All @@ -61,24 +58,26 @@ func (r *QoSRegionShare) TryUpdateProvision() {
r.Lock()
defer r.Unlock()

controlEssentials := types.ControlEssentials{
r.ControlEssentials = types.ControlEssentials{
ControlKnobs: r.getControlKnobs(),
ReclaimOverlap: false,
}

indicators, err := r.getIndicators(r.indicatorCurrentGetterMap)
indicators, err := r.getIndicators(r.indicatorCurrentGetters)
if err != nil {
general.Errorf("get indicators failed: %v", err)
} else {
controlEssentials.Indicators = indicators
r.ControlEssentials.Indicators = indicators
}

r.updateStatus()

for _, internal := range r.provisionPolicies {
internal.updateStatus = types.PolicyUpdateFailed

// set essentials for policy and regulator
internal.policy.SetPodSet(r.podSet)
internal.policy.SetEssentials(r.ResourceEssentials, controlEssentials)
internal.policy.SetEssentials(r.ResourceEssentials, r.ControlEssentials)

// run an episode of policy update
if err := internal.policy.Update(); err != nil {
Expand Down

0 comments on commit 04ee57f

Please sign in to comment.