Skip to content

Commit

Permalink
feat(qrm): native policy support get reserved cpus from kubelet
Browse files Browse the repository at this point in the history
Signed-off-by: caohe <caohe9603@gmail.com>
  • Loading branch information
caohe committed Jul 17, 2023
1 parent 7c5e6b0 commit c30503c
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 15 deletions.
73 changes: 58 additions & 15 deletions pkg/agent/qrm-plugins/cpu/nativepolicy/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package nativepolicy
import (
"context"
"fmt"
"math"
"sync"
"time"

Expand All @@ -30,7 +31,6 @@ import (

"github.com/kubewharf/katalyst-api/pkg/plugins/skeleton"
"github.com/kubewharf/katalyst-core/cmd/katalyst-agent/app/agent"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/calculator"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state"
"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/util"
"github.com/kubewharf/katalyst-core/pkg/config"
Expand Down Expand Up @@ -89,7 +89,6 @@ type NativePolicy struct {

// those are parsed from configurations
// todo if we want to use dynamic configuration, we'd better not use self-defined conf
// todo: not support reservedCPUs?
reservedCPUs machine.CPUSet
cpuPluginSocketAbsPath string
extraStateFileAbsPath string
Expand All @@ -106,16 +105,7 @@ type NativePolicy struct {

func NewNativePolicy(agentCtx *agent.GenericContext, conf *config.Configuration,
_ interface{}, agentName string) (bool, agent.Component, error) {
// TODO: support reserved cpus
allCPUs := agentCtx.CPUDetails.CPUs().Clone()
reservedCPUsNum := conf.ReservedCPUCores

reservedCPUs, _, reserveErr := calculator.TakeHTByNUMABalance(agentCtx.KatalystMachineInfo, allCPUs, reservedCPUsNum)
if reserveErr != nil {
return false, agent.ComponentStub{}, fmt.Errorf("takeByNUMABalance for reservedCPUsNum: %d failed with error: %v",
conf.ReservedCPUCores, reserveErr)
}
general.Infof("take reservedCPUs: %s by reservedCPUsNum: %d", reservedCPUs.String(), reservedCPUsNum)
general.Infof("new native policy")

stateImpl, stateErr := state.NewCheckpointState(conf.GenericQRMPluginConfiguration.StateFileDirectory, cpuPluginStateFileName,
coreconsts.CPUResourcePluginPolicyNameNative, agentCtx.CPUTopology, conf.SkipCPUStateCorruption)
Expand All @@ -141,7 +131,6 @@ func NewNativePolicy(agentCtx *agent.GenericContext, conf *config.Configuration,
residualHitMap: make(map[string]int64),
cpusToReuse: make(map[string]machine.CPUSet),
state: stateImpl,
reservedCPUs: reservedCPUs,
dynamicConfig: conf.DynamicAgentConfiguration,
cpuPluginSocketAbsPath: conf.CPUPluginSocketAbsPath,
extraStateFileAbsPath: conf.ExtraStateFileAbsPath,
Expand All @@ -150,7 +139,11 @@ func NewNativePolicy(agentCtx *agent.GenericContext, conf *config.Configuration,
cpuAllocationOption: conf.CPUAllocationOption,
}

state.GetContainerRequestedCores = policyImplement.getContainerRequestedCores
if err := policyImplement.setReservedCPUs(agentCtx.CPUDetails.CPUs().Clone()); err != nil {
return false, agent.ComponentStub{}, fmt.Errorf("native policy set reserved CPUs failed with error: %v", err)
}

state.SetContainerRequestedCores(policyImplement.getContainerRequestedCores)

err := agentCtx.MetaServer.ConfigurationManager.AddConfigWatcher(crd.AdminQoSConfigurationGVR)
if err != nil {
Expand All @@ -159,7 +152,7 @@ func NewNativePolicy(agentCtx *agent.GenericContext, conf *config.Configuration,

pluginWrapper, err := skeleton.NewRegistrationPluginWrapper(policyImplement, conf.QRMPluginSocketDirs, nil)
if err != nil {
return false, agent.ComponentStub{}, fmt.Errorf("dynamic policy new plugin wrapper failed with error: %v", err)
return false, agent.ComponentStub{}, fmt.Errorf("native policy new plugin wrapper failed with error: %v", err)
}

return true, &agent.PluginWrapper{GenericPlugin: pluginWrapper}, nil
Expand Down Expand Up @@ -677,3 +670,53 @@ func (p *NativePolicy) getContainerRequestedCores(allocationInfo *state.Allocati
}
return allocationInfo.RequestQuantity
}

// setReservedCPUs calculates and sets the reservedCPUs field
func (p *NativePolicy) setReservedCPUs(allCPUs machine.CPUSet) error {
klConfig, err := p.metaServer.GetKubeletConfig(context.TODO())

Check failure on line 676 in pkg/agent/qrm-plugins/cpu/nativepolicy/policy.go

View workflow job for this annotation

GitHub Actions / Lint

p.metaServer.GetKubeletConfig undefined (type *metaserver.MetaServer has no field or method GetKubeletConfig) (typecheck)

Check failure on line 676 in pkg/agent/qrm-plugins/cpu/nativepolicy/policy.go

View workflow job for this annotation

GitHub Actions / Unit Test

p.metaServer.GetKubeletConfig undefined (type *metaserver.MetaServer has no field or method GetKubeletConfig)
if err != nil {
return fmt.Errorf("NewNativePolicy failed because get kubelet config failed with error: %v", err)
}

reservedQuantity, err := getKubeletReservedQuantity(klConfig)
if err != nil {
return fmt.Errorf("getKubeletReservedQuantity failed because get kubelet reserved quantity failed with error: %v", err)
}

if reservedQuantity.IsZero() {
// The native policy requires this to be nonzero. Zero CPU reservation
// would allow the shared pool to be completely exhausted. At that point
// either we would violate our guarantee of exclusivity or need to evict
// any pod that has at least one container that requires zero CPUs.
// See the comments in policy_static.go for more details.
return fmt.Errorf("the native policy requires systemreserved.cpu + kubereserved.cpu to be greater than zero")
}

// Take the ceiling of the reservation, since fractional CPUs cannot be
// exclusively allocated.
reservedCPUsFloat := float64(reservedQuantity.MilliValue()) / 1000
numReservedCPUs := int(math.Ceil(reservedCPUsFloat))

var reserved machine.CPUSet
reservedCPUs, err := machine.Parse(klConfig.ReservedSystemCPUs)
if err != nil {
return fmt.Errorf("NewNativePolicy parse cpuset for reserved-cpus failed with error: %v", err)
}
if reservedCPUs.Size() > 0 {
reserved = reservedCPUs
} else {
// takeByTopology allocates CPUs associated with low-numbered cores from
// allCPUs.
reserved, _ = p.takeByTopology(allCPUs, numReservedCPUs)
}

if reserved.Size() != numReservedCPUs {
return fmt.Errorf("unable to reserve the required amount of CPUs (size of %s did not equal %d)", reserved, numReservedCPUs)
}

general.Infof("take reservedCPUs: %s by reservedCPUsNum: %d", reservedCPUs.String(), numReservedCPUs)

p.reservedCPUs = reserved

return nil
}
26 changes: 26 additions & 0 deletions pkg/agent/qrm-plugins/cpu/nativepolicy/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ limitations under the License.
package nativepolicy

import (
"fmt"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"

"github.com/kubewharf/katalyst-core/pkg/agent/qrm-plugins/cpu/dynamicpolicy/state"
coreconsts "github.com/kubewharf/katalyst-core/pkg/consts"
"github.com/kubewharf/katalyst-core/pkg/util/machine"
Expand All @@ -25,3 +31,23 @@ import (
func generateMachineStateFromPodEntries(topology *machine.CPUTopology, podEntries state.PodEntries) (state.NUMANodeMap, error) {
return state.GenerateMachineStateFromPodEntries(topology, podEntries, coreconsts.CPUResourcePluginPolicyNameNative)
}

func getKubeletReservedQuantity(klConfig *kubeletconfigv1beta1.KubeletConfiguration) (resource.Quantity, error) {
reservedQuantity := resource.NewQuantity(0, resource.DecimalSI)
if klConfig.KubeReserved != nil {
kubeReserved, err := resource.ParseQuantity(klConfig.KubeReserved[string(v1.ResourceCPU)])
if err != nil {
return resource.MustParse("0"), fmt.Errorf("getKubeletReservedQuantity failed because parse cpu quantity for kube-reserved failed with error: %v", err)
}
reservedQuantity.Add(kubeReserved)
}
if klConfig.SystemReserved != nil {
systemReserved, err := resource.ParseQuantity(klConfig.SystemReserved[string(v1.ResourceCPU)])
if err != nil {
return resource.MustParse("0"), fmt.Errorf("getKubeletReservedQuantity parse cpu quantity for system-reserved failed with error: %v", err)
}
reservedQuantity.Add(systemReserved)
}

return *reservedQuantity, nil
}

0 comments on commit c30503c

Please sign in to comment.