Skip to content


add nominated pods in resoruce compute
Browse files Browse the repository at this point in the history
  • Loading branch information
denkensk committed Oct 5, 2021
1 parent 1a3ad3f commit 213025d
Show file tree
Hide file tree
Showing 6 changed files with 304 additions and 57 deletions.
4 changes: 1 addition & 3 deletions pkg/apis/config/v1beta1/defaults.go
Expand Up @@ -100,9 +100,7 @@ func SetDefaultsNodeResourcesAllocatableArgs(obj *NodeResourcesAllocatableArgs)

// SetDefaultsCapacitySchedulingArgs sets the default parameters for CapacityScheduling plugin.
func SetDefaultsCapacitySchedulingArgs(obj *CapacitySchedulingArgs) {
if obj.KubeConfigPath == nil {
obj.KubeConfigPath = &defaultKubeConfigPath
// TODO(k/k#96427): get KubeConfigPath and KubeMaster from configuration or command args.

// SetDefaultTargetLoadPackingArgs sets the default parameters for TargetLoadPacking plugin
Expand Down
192 changes: 167 additions & 25 deletions pkg/capacityscheduling/capacity_scheduling.go
Expand Up @@ -62,7 +62,17 @@ type CapacityScheduling struct {

// PreFilterState computed at PreFilter and used at PostFilter or Reserve.
type PreFilterState struct {
podReq framework.Resource

// nominatedPodsReqInEQWithPodReq is the sum of podReq and the requested resources of the Nominated Pods
// which subject to the same quota(namespace) and is more important than the preemptor.
nominatedPodsReqInEQWithPodReq framework.Resource

// nominatedPodsReqWithPodReq is the sum of podReq and the requested resources of the Nominated Pods
// which subject to the all quota(namespace). Generated Nominated Pods consist of two kinds of pods:
// 1. the pods subject to the same quota(namespace) and is more important than the preemptor.
// 2. the pods subject to the different quota(namespace) and the usage of quota(namespace) does not exceed min.
nominatedPodsReqWithPodReq framework.Resource

// Clone the preFilter state.
Expand Down Expand Up @@ -195,24 +205,76 @@ func (c *CapacityScheduling) EventsToRegister() []framework.ClusterEvent {
// 1. Check if the (pod.request + eq.allocated) is less than eq.max.
// 2. Check if the sum(eq's usage) > sum(eq's min).
func (c *CapacityScheduling) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) *framework.Status {
// TODO improve the efficiency of taking snapshot
// e.g. use a two-pointer data structure to only copy the updated EQs when necessary.
snapshotElasticQuota := c.snapshotElasticQuota()
preFilterState := computePodResourceRequest(pod)
podReq := computePodResourceRequest(pod)

state.Write(preFilterStateKey, preFilterState)
state.Write(ElasticQuotaSnapshotKey, snapshotElasticQuota)

elasticQuotaInfos := snapshotElasticQuota.elasticQuotaInfos
eq := snapshotElasticQuota.elasticQuotaInfos[pod.Namespace]
if eq == nil {
return framework.NewStatus(framework.Success, "skipCapacityScheduling")
preFilterState := &PreFilterState{
podReq: *podReq,
state.Write(preFilterStateKey, preFilterState)
return framework.NewStatus(framework.Success)

if eq.overUsed(preFilterState.Resource, eq.Max) {
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("Pod %v/%v is rejected in Prefilter because ElasticQuota %v is more than Max", pod.Namespace, pod.Name, eq.Namespace))
// nominatedPodsReqInEQWithPodReq is the sum of podReq and the requested resources of the Nominated Pods
// which subject to the same quota(namespace) and is more important than the preemptor.
nominatedPodsReqInEQWithPodReq := &framework.Resource{}
// nominatedPodsReqWithPodReq is the sum of podReq and the requested resources of the Nominated Pods
// which subject to the all quota(namespace). Generated Nominated Pods consist of two kinds of pods:
// 1. the pods subject to the same quota(namespace) and is more important than the preemptor.
// 2. the pods subject to the different quota(namespace) and the usage of quota(namespace) does not exceed min.
nominatedPodsReqWithPodReq := &framework.Resource{}

nodeList, err := c.fh.SnapshotSharedLister().NodeInfos().List()
if err != nil {
return framework.NewStatus(framework.Error, fmt.Sprintf("Error getting the nodelist: %v", err))

if elasticQuotaInfos.aggregatedMinOverUsedWithPod(preFilterState.Resource) {
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("Pod %v/%v is rejected in Prefilter because total ElasticQuota used is more than min", pod.Namespace, pod.Name))
for _, node := range nodeList {
nominatedPods := c.fh.NominatedPodsForNode(node.Node().Name)
for _, p := range nominatedPods {
if p.Pod.UID == pod.UID {
ns := p.Pod.Namespace
info := c.elasticQuotaInfos[ns]
if info != nil {
pResourceRequest := computePodResourceRequest(p.Pod).ResourceList()
// If they are subject to the same quota(namespace) and p is more important than pod,
// p will be added to the nominatedResource and totalNominatedResource.
// If they aren't subject to the same quota(namespace) and the usage of quota(p's namespace) does not exceed min,
// p will be added to the totalNominatedResource.
if ns == pod.Namespace && corev1helpers.PodPriority(p.Pod) >= corev1helpers.PodPriority(pod) {
} else if ns != pod.Namespace && !info.usedOverMin() {

preFilterState := &PreFilterState{
podReq: *podReq,
nominatedPodsReqInEQWithPodReq: *nominatedPodsReqInEQWithPodReq,
nominatedPodsReqWithPodReq: *nominatedPodsReqWithPodReq,
state.Write(preFilterStateKey, preFilterState)

if eq.usedOverMaxWith(nominatedPodsReqInEQWithPodReq) {
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("Pod %v/%v is rejected in PreFilter because ElasticQuota %v is more than Max", pod.Namespace, pod.Name, eq.Namespace))

if elasticQuotaInfos.aggregatedUsedOverMinWith(*nominatedPodsReqWithPodReq) {
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("Pod %v/%v is rejected in PreFilter because total ElasticQuota used is more than min", pod.Namespace, pod.Name))

return framework.NewStatus(framework.Success, "")
Expand Down Expand Up @@ -316,7 +378,7 @@ func (c *CapacityScheduling) preempt(ctx context.Context, state *framework.Cycle

// 1) Ensure the preemptor is eligible to preempt other pods.
if !defaultpreemption.PodEligibleToPreemptOthers(pod, nodeLister, m[pod.Status.NominatedNodeName]) {
if !c.PodEligibleToPreemptOthers(pod, nodeLister, m[pod.Status.NominatedNodeName], state) {
klog.V(5).InfoS("Pod is not eligible for more preemption.", "pod", klog.KObj(pod))
return "", nil
Expand Down Expand Up @@ -347,6 +409,85 @@ func (c *CapacityScheduling) preempt(ctx context.Context, state *framework.Cycle
return bestCandidate.Name(), nil

// PodEligibleToPreemptOthers determines whether this pod should be considered
// for preempting other pods or not. If this pod has already preempted other
// pods and those are in their graceful termination period, it shouldn't be
// considered for preemption.
// We look at the node that is nominated for this pod and as long as there are
// terminating pods on the node, we don't consider this for preempting more pods.
func (c *CapacityScheduling) PodEligibleToPreemptOthers(pod *v1.Pod, nodeInfos framework.NodeInfoLister, nominatedNodeStatus *framework.Status, state *framework.CycleState) bool {
if pod.Spec.PreemptionPolicy != nil && *pod.Spec.PreemptionPolicy == v1.PreemptNever {
klog.V(5).Infof("Pod %v/%v is not eligible for preemption because it has a preemptionPolicy of %v", pod.Namespace, pod.Name, v1.PreemptNever)
return false

preFilterState, err := getPreFilterState(state)
if err != nil {
klog.Errorf("error reading %q from cycleState: %v", preFilterStateKey, err)
return false

nomNodeName := pod.Status.NominatedNodeName
if len(nomNodeName) > 0 {
// If the pod's nominated node is considered as UnschedulableAndUnresolvable by the filters,
// then the pod should be considered for preempting again.
if nominatedNodeStatus.Code() == framework.UnschedulableAndUnresolvable {
return true

elasticQuotaSnapshotState, err := getElasticQuotaSnapshotState(state)
if err != nil {
klog.Errorf("error reading %q from cycleState: %v", ElasticQuotaSnapshotKey, err)
return true

nodeInfo, _ := nodeInfos.Get(nomNodeName)
if nodeInfo == nil {
return true

podPriority := corev1helpers.PodPriority(pod)
preemptorEQInfo, preemptorWithEQ := elasticQuotaSnapshotState.elasticQuotaInfos[pod.Namespace]
if preemptorWithEQ {
moreThanMinWithPreemptor := preemptorEQInfo.usedOverMinWith(&preFilterState.nominatedPodsReqWithPodReq)
for _, p := range nodeInfo.Pods {
if p.Pod.DeletionTimestamp != nil {
eqInfo, withEQ := elasticQuotaSnapshotState.elasticQuotaInfos[p.Pod.Namespace]
if !withEQ {
if p.Pod.Namespace == pod.Namespace && corev1helpers.PodPriority(p.Pod) < podPriority {
// There is a terminating pod on the nominated node.
// If the terminating pod is in the same namespace with preemptor
// and it is less important than preemptor,
// return false to avoid preempting more pods.
return false
} else if p.Pod.Namespace != pod.Namespace && !moreThanMinWithPreemptor && eqInfo.usedOverMin() {
// There is a terminating pod on the nominated node.
// The terminating pod isn't in the same namespace with preemptor.
// If moreThanMinWithPreemptor is false, it indicates that preemptor can preempt the pods in other EQs whose used is over min.
// And if the used of terminating pod's quota is over min, so the room released by terminating pod on the nominated node can be used by the preemptor.
// return false to avoid preempting more pods.
return false
} else {
for _, p := range nodeInfo.Pods {
_, withEQ := elasticQuotaSnapshotState.elasticQuotaInfos[p.Pod.Namespace]
if withEQ {
if p.Pod.DeletionTimestamp != nil && corev1helpers.PodPriority(p.Pod) < podPriority {
// There is a terminating pod on the nominated node.
return false
return true

// FindCandidates calculates a slice of preemption candidates.
// Each candidate is executable to make the given <pod> schedulable.
func (c *CapacityScheduling) FindCandidates(ctx context.Context, cs kubernetes.Interface, state *framework.CycleState, pod *v1.Pod,
Expand Down Expand Up @@ -452,6 +593,10 @@ func selectVictimsOnNode(
return nil, 0, framework.NewStatus(framework.Unschedulable, msg)

var nominatedPodsReqInEQWithPodReq framework.Resource
var nominatedPodsReqWithPodReq framework.Resource
podReq := preFilterState.podReq

removePod := func(rpi *framework.PodInfo) error {
if err := nodeInfo.RemovePod(rpi.Pod); err != nil {
return err
Expand All @@ -475,20 +620,17 @@ func selectVictimsOnNode(
podPriority := corev1helpers.PodPriority(pod)
preemptorElasticQuotaInfo, preemptorWithElasticQuota := elasticQuotaInfos[pod.Namespace]

var moreThanMinWithPreemptor bool
// Check if there is elastic quota in the preemptor's namespace.
if preemptorWithElasticQuota {
moreThanMinWithPreemptor = preemptorElasticQuotaInfo.overUsed(preFilterState.Resource, preemptorElasticQuotaInfo.Min)

// sort the pods in node by the priority class
sort.Slice(nodeInfo.Pods, func(i, j int) bool { return !util.MoreImportantPod(nodeInfo.Pods[i].Pod, nodeInfo.Pods[j].Pod) })

var potentialVictims []*framework.PodInfo
if preemptorWithElasticQuota {
nominatedPodsReqInEQWithPodReq = preFilterState.nominatedPodsReqInEQWithPodReq
nominatedPodsReqWithPodReq = preFilterState.nominatedPodsReqWithPodReq
moreThanMinWithPreemptor := preemptorElasticQuotaInfo.usedOverMinWith(&nominatedPodsReqInEQWithPodReq)
for _, p := range nodeInfo.Pods {
pElasticQuotaInfo, pWithElasticQuota := elasticQuotaInfos[p.Pod.Namespace]
if !pWithElasticQuota {
eqInfo, withEQ := elasticQuotaInfos[p.Pod.Namespace]
if !withEQ {

Expand All @@ -512,7 +654,7 @@ func selectVictimsOnNode(
// will be chosen from Quotas that allocates more resources
// than its min, i.e., borrowing resources from other
// Quotas.
if p.Pod.Namespace != pod.Namespace && moreThanMin(*pElasticQuotaInfo.Used, *pElasticQuotaInfo.Min) {
if p.Pod.Namespace != pod.Namespace && eqInfo.usedOverMin() {
potentialVictims = append(potentialVictims, p)
if err := removePod(p); err != nil {
return nil, 0, framework.AsStatus(err)
Expand All @@ -522,8 +664,8 @@ func selectVictimsOnNode(
} else {
for _, p := range nodeInfo.Pods {
_, pWithElasticQuota := elasticQuotaInfos[p.Pod.Namespace]
if pWithElasticQuota {
_, withEQ := elasticQuotaInfos[p.Pod.Namespace]
if withEQ {
if corev1helpers.PodPriority(p.Pod) < podPriority {
Expand Down Expand Up @@ -555,8 +697,8 @@ func selectVictimsOnNode(
// after removing all the lower priority pods,
// we are almost done and this node is not suitable for preemption.
if preemptorWithElasticQuota {
if preemptorElasticQuotaInfo.overUsed(preFilterState.Resource, preemptorElasticQuotaInfo.Max) ||
elasticQuotaInfos.aggregatedMinOverUsedWithPod(preFilterState.Resource) {
if preemptorElasticQuotaInfo.usedOverMaxWith(&podReq) ||
elasticQuotaInfos.aggregatedUsedOverMinWith(podReq) {
return nil, 0, framework.NewStatus(framework.Unschedulable, "global quota max exceeded")
Expand All @@ -583,7 +725,7 @@ func selectVictimsOnNode(
klog.V(5).InfoS("Found a potential preemption victim on node", "pod", klog.KObj(p), "node", klog.KObj(nodeInfo.Node()))

if preemptorWithElasticQuota && (preemptorElasticQuotaInfo.overUsed(preFilterState.Resource, preemptorElasticQuotaInfo.Max) || elasticQuotaInfos.aggregatedMinOverUsedWithPod(preFilterState.Resource)) {
if preemptorWithElasticQuota && (preemptorElasticQuotaInfo.usedOverMaxWith(&nominatedPodsReqInEQWithPodReq) || elasticQuotaInfos.aggregatedUsedOverMinWith(nominatedPodsReqWithPodReq)) {
if err := removePod(pi); err != nil {
return false, err
Expand Down Expand Up @@ -798,8 +940,8 @@ func getPodDisruptionBudgets(pdbLister policylisters.PodDisruptionBudgetLister)
// Memory: 1G
// Result: CPU: 3, Memory: 3G
func computePodResourceRequest(pod *v1.Pod) *PreFilterState {
result := &PreFilterState{}
func computePodResourceRequest(pod *v1.Pod) *framework.Resource {
result := &framework.Resource{}
for _, container := range pod.Spec.Containers {
Expand Down
29 changes: 26 additions & 3 deletions pkg/capacityscheduling/capacity_scheduling_test.go
Expand Up @@ -21,7 +21,7 @@ import (

gocmp ""

Expand Down Expand Up @@ -123,8 +123,26 @@ func TestPreFilter(t *testing.T) {
for _, tt := range tests {
t.Run(, func(t *testing.T) {
var registerPlugins []st.RegisterPluginFunc
registeredPlugins := append(
st.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
st.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),

fwk, err := st.NewFramework(
registeredPlugins, "",
frameworkruntime.WithSnapshotSharedLister(testutil.NewFakeSharedLister(make([]*v1.Pod, 0), make([]*v1.Node, 0))),

if err != nil {

cs := &CapacityScheduling{
elasticQuotaInfos: tt.elasticQuotas,
fh: fwk,

pods := make([]*v1.Pod, 0)
Expand Down Expand Up @@ -291,10 +309,15 @@ func TestFindCandidates(t *testing.T) {
t.Errorf("Unexpected preFilterStatus: %v", preFilterStatus)

prefilterStatue := computePodResourceRequest(tt.pod)
podReq := computePodResourceRequest(tt.pod)
elasticQuotaSnapshotState := &ElasticQuotaSnapshotState{
elasticQuotaInfos: tt.elasticQuotas,
prefilterStatue := &PreFilterState{
podReq: *podReq,
nominatedPodsReqWithPodReq: *podReq,
nominatedPodsReqInEQWithPodReq: *podReq,
state.Write(preFilterStateKey, prefilterStatue)
state.Write(ElasticQuotaSnapshotKey, elasticQuotaSnapshotState)

Expand All @@ -319,7 +342,7 @@ func TestFindCandidates(t *testing.T) {
sort.Slice(got, func(i, j int) bool {
return got[i].Name() < got[j].Name()
if diff := cmp.Diff(tt.want, got, cmp.AllowUnexported(candidate{})); diff != "" {
if diff := gocmp.Diff(tt.want, got, gocmp.AllowUnexported(candidate{})); diff != "" {
t.Errorf("Unexpected candidates (-want, +got): %s", diff)
Expand Down

0 comments on commit 213025d

Please sign in to comment.