Permalink
Browse files

Merge pull request #57168 from yastij/predicates-ordering

Automatic merge from submit-queue (batch tested with PRs 57252, 57168). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Implementing predicates ordering

**What this PR does / why we need it**: implements predicates ordering for the scheduler

**Which issue(s) this PR fixes** : Fixes #53812 

**Special notes for your reviewer**:


@bsalamat @gmarek @resouer as discussed on slack, to implement ordering we have to choices:

- use a layered approach with a list that indexes the order of the predicates map

- change the underlying data structure used to represent a collection of predicates (a map in our case) into a list of predicates objects. 
Going with this solution might be "cleaner" but it will require a lot of changes and will increase the cost for accessing predicates from O(1) to O(n) (n being the number of predicates used by the scheduler).

we might go with this solution for now. If the number of predicates start growing, we might switch to the second option.
 
**Release note**:

```release-note
adding predicates ordering for the kubernetes scheduler.
```
  • Loading branch information...
k8s-merge-robot committed Dec 20, 2017
2 parents dd9bca8 + e62952d commit 51fbd6e63774f494ce0fef8ced58ce41c1af180a
@@ -49,9 +49,25 @@ import (
)
const (
MatchInterPodAffinity = "MatchInterPodAffinity"
CheckVolumeBinding = "CheckVolumeBinding"
MatchInterPodAffinityPred = "MatchInterPodAffinity"
CheckVolumeBindingPred = "CheckVolumeBinding"
CheckNodeConditionPred = "CheckNodeCondition"
GeneralPred = "GeneralPredicates"
HostNamePred = "HostName"
PodFitsHostPortsPred = "PodFitsHostPorts"
MatchNodeSelectorPred = "MatchNodeSelector"
PodFitsResourcesPred = "PodFitsResources"
NoDiskConflictPred = "NoDiskConflict"
PodToleratesNodeTaintsPred = "PodToleratesNodeTaints"
PodToleratesNodeNoExecuteTaintsPred = "PodToleratesNodeNoExecuteTaints"
CheckNodeLabelPresencePred = "CheckNodeLabelPresence"
checkServiceAffinityPred = "checkServiceAffinity"
MaxEBSVolumeCountPred = "MaxEBSVolumeCount"
MaxGCEPDVolumeCountPred = "MaxGCEPDVolumeCount"
MaxAzureDiskVolumeCountPred = "MaxAzureDiskVolumeCount"
NoVolumeZoneConflictPred = "NoVolumeZoneConflict"
CheckNodeMemoryPressurePred = "CheckNodeMemoryPressure"
CheckNodeDiskPressurePred = "CheckNodeDiskPressure"
// DefaultMaxGCEPDVolumes defines the maximum number of PD Volumes for GCE
// GCE instances can have up to 16 PD volumes attached.
DefaultMaxGCEPDVolumes = 16
@@ -79,6 +95,21 @@ const (
// For example:
// https://github.com/kubernetes/kubernetes/blob/36a218e/plugin/pkg/scheduler/factory/factory.go#L422
// IMPORTANT NOTE: this list contains the ordering of the predicates, if you develop a new predicate
// it is mandatory to add its name to this list.
// Otherwise it won't be processed, see generic_scheduler#podFitsOnNode().
// The order is based on the restrictiveness & complexity of predicates.
// Design doc: https://github.com/kubernetes/community/blob/master/contributors/design-proposals/scheduling/predicates-ordering.md
var (
predicatesOrdering = []string{CheckNodeConditionPred,
GeneralPred, HostNamePred, PodFitsHostPortsPred,
MatchNodeSelectorPred, PodFitsResourcesPred, NoDiskConflictPred,
PodToleratesNodeTaintsPred, PodToleratesNodeNoExecuteTaintsPred, CheckNodeLabelPresencePred,
checkServiceAffinityPred, MaxEBSVolumeCountPred, MaxGCEPDVolumeCountPred,
MaxAzureDiskVolumeCountPred, CheckVolumeBindingPred, NoVolumeZoneConflictPred,
CheckNodeMemoryPressurePred, CheckNodeDiskPressurePred, MatchInterPodAffinityPred}
)
// NodeInfo: Other types for predicate functions...
type NodeInfo interface {
GetNodeInfo(nodeID string) (*v1.Node, error)
@@ -93,6 +124,14 @@ type CachedPersistentVolumeInfo struct {
corelisters.PersistentVolumeLister
}
func PredicatesOrdering() []string {
return predicatesOrdering
}
func SetPredicatesOrdering(names []string) {
predicatesOrdering = names
}
func (c *CachedPersistentVolumeInfo) GetPersistentVolumeInfo(pvID string) (*v1.PersistentVolume, error) {
return c.Get(pvID)
}
@@ -65,17 +65,17 @@ func init() {
// Fit is defined based on the absence of port conflicts.
// This predicate is actually a default predicate, because it is invoked from
// predicates.GeneralPredicates()
factory.RegisterFitPredicate("PodFitsHostPorts", predicates.PodFitsHostPorts)
factory.RegisterFitPredicate(predicates.PodFitsHostPortsPred, predicates.PodFitsHostPorts)
// Fit is determined by resource availability.
// This predicate is actually a default predicate, because it is invoked from
// predicates.GeneralPredicates()
factory.RegisterFitPredicate("PodFitsResources", predicates.PodFitsResources)
factory.RegisterFitPredicate(predicates.PodFitsResourcesPred, predicates.PodFitsResources)
// Fit is determined by the presence of the Host parameter and a string match
// This predicate is actually a default predicate, because it is invoked from
// predicates.GeneralPredicates()
factory.RegisterFitPredicate("HostName", predicates.PodFitsHost)
factory.RegisterFitPredicate(predicates.HostNamePred, predicates.PodFitsHost)
// Fit is determined by node selector query.
factory.RegisterFitPredicate("MatchNodeSelector", predicates.PodMatchNodeSelector)
factory.RegisterFitPredicate(predicates.MatchNodeSelectorPred, predicates.PodMatchNodeSelector)
// Use equivalence class to speed up heavy predicates phase.
factory.RegisterGetEquivalencePodFunction(
@@ -117,62 +117,62 @@ func defaultPredicates() sets.String {
return sets.NewString(
// Fit is determined by volume zone requirements.
factory.RegisterFitPredicateFactory(
"NoVolumeZoneConflict",
predicates.NoVolumeZoneConflictPred,
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewVolumeZonePredicate(args.PVInfo, args.PVCInfo, args.StorageClassInfo)
},
),
// Fit is determined by whether or not there would be too many AWS EBS volumes attached to the node
factory.RegisterFitPredicateFactory(
"MaxEBSVolumeCount",
predicates.MaxEBSVolumeCountPred,
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewMaxPDVolumeCountPredicate(predicates.EBSVolumeFilterType, args.PVInfo, args.PVCInfo)
},
),
// Fit is determined by whether or not there would be too many GCE PD volumes attached to the node
factory.RegisterFitPredicateFactory(
"MaxGCEPDVolumeCount",
predicates.MaxGCEPDVolumeCountPred,
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewMaxPDVolumeCountPredicate(predicates.GCEPDVolumeFilterType, args.PVInfo, args.PVCInfo)
},
),
// Fit is determined by whether or not there would be too many Azure Disk volumes attached to the node
factory.RegisterFitPredicateFactory(
"MaxAzureDiskVolumeCount",
predicates.MaxAzureDiskVolumeCountPred,
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewMaxPDVolumeCountPredicate(predicates.AzureDiskVolumeFilterType, args.PVInfo, args.PVCInfo)
},
),
// Fit is determined by inter-pod affinity.
factory.RegisterFitPredicateFactory(
predicates.MatchInterPodAffinity,
predicates.MatchInterPodAffinityPred,
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewPodAffinityPredicate(args.NodeInfo, args.PodLister)
},
),
// Fit is determined by non-conflicting disk volumes.
factory.RegisterFitPredicate("NoDiskConflict", predicates.NoDiskConflict),
factory.RegisterFitPredicate(predicates.NoDiskConflictPred, predicates.NoDiskConflict),
// GeneralPredicates are the predicates that are enforced by all Kubernetes components
// (e.g. kubelet and all schedulers)
factory.RegisterFitPredicate("GeneralPredicates", predicates.GeneralPredicates),
factory.RegisterFitPredicate(predicates.GeneralPred, predicates.GeneralPredicates),
// Fit is determined by node memory pressure condition.
factory.RegisterFitPredicate("CheckNodeMemoryPressure", predicates.CheckNodeMemoryPressurePredicate),
factory.RegisterFitPredicate(predicates.CheckNodeMemoryPressurePred, predicates.CheckNodeMemoryPressurePredicate),
// Fit is determined by node disk pressure condition.
factory.RegisterFitPredicate("CheckNodeDiskPressure", predicates.CheckNodeDiskPressurePredicate),
factory.RegisterFitPredicate(predicates.CheckNodeDiskPressurePred, predicates.CheckNodeDiskPressurePredicate),
// Fit is determined by node conditions: not ready, network unavailable or out of disk.
factory.RegisterMandatoryFitPredicate("CheckNodeCondition", predicates.CheckNodeConditionPredicate),
factory.RegisterMandatoryFitPredicate(predicates.CheckNodeConditionPred, predicates.CheckNodeConditionPredicate),
// Fit is determined based on whether a pod can tolerate all of the node's taints
factory.RegisterFitPredicate("PodToleratesNodeTaints", predicates.PodToleratesNodeTaints),
factory.RegisterFitPredicate(predicates.PodToleratesNodeTaintsPred, predicates.PodToleratesNodeTaints),
// Fit is determined by volume topology requirements.
factory.RegisterFitPredicateFactory(
predicates.CheckVolumeBinding,
predicates.CheckVolumeBindingPred,
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewVolumeBindingPredicate(args.VolumeBinder)
},
@@ -185,18 +185,18 @@ func ApplyFeatureGates() {
if utilfeature.DefaultFeatureGate.Enabled(features.TaintNodesByCondition) {
// Remove "CheckNodeCondition" predicate
factory.RemoveFitPredicate("CheckNodeCondition")
factory.RemoveFitPredicate(predicates.CheckNodeConditionPred)
// Remove Key "CheckNodeCondition" From All Algorithm Provider
// The key will be removed from all providers which in algorithmProviderMap[]
// if you just want remove specific provider, call func RemovePredicateKeyFromAlgoProvider()
factory.RemovePredicateKeyFromAlgorithmProviderMap("CheckNodeCondition")
factory.RemovePredicateKeyFromAlgorithmProviderMap(predicates.CheckNodeConditionPred)
// Fit is determined based on whether a pod can tolerate all of the node's taints
factory.RegisterMandatoryFitPredicate("PodToleratesNodeTaints", predicates.PodToleratesNodeTaints)
factory.RegisterMandatoryFitPredicate(predicates.PodToleratesNodeTaintsPred, predicates.PodToleratesNodeTaints)
// Insert Key "PodToleratesNodeTaints" To All Algorithm Provider
// The key will insert to all providers which in algorithmProviderMap[]
// if you just want insert to specific provider, call func InsertPredicateKeyToAlgoProvider()
factory.InsertPredicateKeyToAlgorithmProviderMap("PodToleratesNodeTaints")
factory.InsertPredicateKeyToAlgorithmProviderMap(predicates.PodToleratesNodeTaintsPred)
glog.Warningf("TaintNodesByCondition is enabled, PodToleratesNodeTaints predicate is mandatory")
}
@@ -78,7 +78,7 @@ func TestDefaultPredicates(t *testing.T) {
"CheckNodeDiskPressure",
"CheckNodeCondition",
"PodToleratesNodeTaints",
predicates.CheckVolumeBinding,
predicates.CheckVolumeBindingPred,
)
if expected := defaultPredicates(); !result.Equal(expected) {
@@ -444,34 +444,37 @@ func podFitsOnNode(
// TODO(bsalamat): consider using eCache and adding proper eCache invalidations
// when pods are nominated or their nominations change.
eCacheAvailable = eCacheAvailable && !podsAdded
for predicateKey, predicate := range predicateFuncs {
if eCacheAvailable {
// PredicateWithECache will return its cached predicate results.
fit, reasons, invalid = ecache.PredicateWithECache(pod.GetName(), info.Node().GetName(), predicateKey, equivalenceHash)
}
// TODO(bsalamat): When one predicate fails and fit is false, why do we continue
// checking other predicates?
if !eCacheAvailable || invalid {
// we need to execute predicate functions since equivalence cache does not work
fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
if err != nil {
return false, []algorithm.PredicateFailureReason{}, err
}
for _, predicateKey := range predicates.PredicatesOrdering() {
//TODO (yastij) : compute average predicate restrictiveness to export it as promethus metric
if predicate, exist := predicateFuncs[predicateKey]; exist {
if eCacheAvailable {
// Store data to update eCache after this loop.
if res, exists := predicateResults[predicateKey]; exists {
res.Fit = res.Fit && fit
res.FailReasons = append(res.FailReasons, reasons...)
predicateResults[predicateKey] = res
} else {
predicateResults[predicateKey] = HostPredicate{Fit: fit, FailReasons: reasons}
// PredicateWithECache will return its cached predicate results.
fit, reasons, invalid = ecache.PredicateWithECache(pod.GetName(), info.Node().GetName(), predicateKey, equivalenceHash)
}
// TODO(bsalamat): When one predicate fails and fit is false, why do we continue
// checking other predicates?
if !eCacheAvailable || invalid {
// we need to execute predicate functions since equivalence cache does not work
fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
if err != nil {
return false, []algorithm.PredicateFailureReason{}, err
}
if eCacheAvailable {
// Store data to update eCache after this loop.
if res, exists := predicateResults[predicateKey]; exists {
res.Fit = res.Fit && fit
res.FailReasons = append(res.FailReasons, reasons...)
predicateResults[predicateKey] = res
} else {
predicateResults[predicateKey] = HostPredicate{Fit: fit, FailReasons: reasons}
}
}
}
}
if !fit {
// eCache is available and valid, and predicates result is unfit, record the fail reasons
failedPredicates = append(failedPredicates, reasons...)
if !fit {
// eCache is available and valid, and predicates result is unfit, record the fail reasons
failedPredicates = append(failedPredicates, reasons...)
}
}
}
}
@@ -42,6 +42,10 @@ import (
schedulertesting "k8s.io/kubernetes/plugin/pkg/scheduler/testing"
)
var (
order = []string{"false", "true", "matches", "nopods", predicates.MatchInterPodAffinityPred}
)
func falsePredicate(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
return false, []algorithm.PredicateFailureReason{algorithmpredicates.ErrFakePredicate}, nil
}
@@ -181,6 +185,7 @@ func TestSelectHost(t *testing.T) {
}
func TestGenericScheduler(t *testing.T) {
predicates.SetPredicatesOrdering(order)
tests := []struct {
name string
predicates map[string]algorithm.FitPredicate
@@ -401,6 +406,7 @@ func TestGenericScheduler(t *testing.T) {
}
func TestFindFitAllError(t *testing.T) {
predicates.SetPredicatesOrdering(order)
nodes := []string{"3", "2", "1"}
predicates := map[string]algorithm.FitPredicate{"true": truePredicate, "false": falsePredicate}
nodeNameToInfo := map[string]*schedulercache.NodeInfo{
@@ -430,8 +436,9 @@ func TestFindFitAllError(t *testing.T) {
}
func TestFindFitSomeError(t *testing.T) {
predicates.SetPredicatesOrdering(order)
nodes := []string{"3", "2", "1"}
predicates := map[string]algorithm.FitPredicate{"true": truePredicate, "match": matchesPredicate}
predicates := map[string]algorithm.FitPredicate{"true": truePredicate, "matches": matchesPredicate}
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1"}}
nodeNameToInfo := map[string]*schedulercache.NodeInfo{
"3": schedulercache.NewNodeInfo(),
@@ -741,6 +748,7 @@ var negPriority, lowPriority, midPriority, highPriority, veryHighPriority = int3
// TestSelectNodesForPreemption tests selectNodesForPreemption. This test assumes
// that podsFitsOnNode works correctly and is tested separately.
func TestSelectNodesForPreemption(t *testing.T) {
predicates.SetPredicatesOrdering(order)
tests := []struct {
name string
predicates map[string]algorithm.FitPredicate
@@ -864,7 +872,7 @@ func TestSelectNodesForPreemption(t *testing.T) {
nodes = append(nodes, node)
}
if test.addAffinityPredicate {
test.predicates[predicates.MatchInterPodAffinity] = algorithmpredicates.NewPodAffinityPredicate(FakeNodeInfo(*nodes[0]), schedulertesting.FakePodLister(test.pods))
test.predicates[predicates.MatchInterPodAffinityPred] = algorithmpredicates.NewPodAffinityPredicate(FakeNodeInfo(*nodes[0]), schedulertesting.FakePodLister(test.pods))
}
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(test.pods, nodes)
nodeToPods, err := selectNodesForPreemption(test.pod, nodeNameToInfo, nodes, test.predicates, PredicateMetadata, nil, nil)
@@ -879,6 +887,7 @@ func TestSelectNodesForPreemption(t *testing.T) {
// TestPickOneNodeForPreemption tests pickOneNodeForPreemption.
func TestPickOneNodeForPreemption(t *testing.T) {
predicates.SetPredicatesOrdering(order)
tests := []struct {
name string
predicates map[string]algorithm.FitPredicate
@@ -409,7 +409,7 @@ func (c *configFactory) invalidatePredicatesForPv(pv *v1.PersistentVolume) {
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
// Add/delete impacts the available PVs to choose from
invalidPredicates.Insert(predicates.CheckVolumeBinding)
invalidPredicates.Insert(predicates.CheckVolumeBindingPred)
}
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates)
@@ -480,7 +480,7 @@ func (c *configFactory) invalidatePredicatesForPvc(pvc *v1.PersistentVolumeClaim
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
// Add/delete impacts the available PVs to choose from
invalidPredicates.Insert(predicates.CheckVolumeBinding)
invalidPredicates.Insert(predicates.CheckVolumeBindingPred)
}
c.equivalencePodCache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates)
}
@@ -491,7 +491,7 @@ func (c *configFactory) invalidatePredicatesForPvcUpdate(old, new *v1.Persistent
if old.Spec.VolumeName != new.Spec.VolumeName {
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
// PVC volume binding has changed
invalidPredicates.Insert(predicates.CheckVolumeBinding)
invalidPredicates.Insert(predicates.CheckVolumeBindingPred)
}
// The bound volume type may change
invalidPredicates.Insert(maxPDVolumeCountPredicateKeys...)
@@ -278,7 +278,7 @@ func (sched *Scheduler) assumeAndBindVolumes(assumed *v1.Pod, host string) error
err = fmt.Errorf("Volume binding started, waiting for completion")
if bindingRequired {
if sched.config.Ecache != nil {
invalidPredicates := sets.NewString(predicates.CheckVolumeBinding)
invalidPredicates := sets.NewString(predicates.CheckVolumeBindingPred)
sched.config.Ecache.InvalidateCachedPredicateItemOfAllNodes(invalidPredicates)
}
@@ -619,7 +619,7 @@ func setupTestSchedulerWithVolumeBinding(fakeVolumeBinder *volumebinder.VolumeBi
scache.AddNode(&testNode)
predicateMap := map[string]algorithm.FitPredicate{
"VolumeBindingChecker": predicates.NewVolumeBindingPredicate(fakeVolumeBinder),
predicates.CheckVolumeBindingPred: predicates.NewVolumeBindingPredicate(fakeVolumeBinder),
}
recorder := broadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "scheduler"})
@@ -637,6 +637,8 @@ func makePredicateError(failReason string) error {
}
func TestSchedulerWithVolumeBinding(t *testing.T) {
order := []string{predicates.CheckVolumeBindingPred, predicates.GeneralPred}
predicates.SetPredicatesOrdering(order)
findErr := fmt.Errorf("find err")
assumeErr := fmt.Errorf("assume err")
bindErr := fmt.Errorf("bind err")

0 comments on commit 51fbd6e

Please sign in to comment.