Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Predicate cacheing and cleanup #33763

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 1 addition & 5 deletions plugin/pkg/scheduler/algorithm/listers.go
Expand Up @@ -76,7 +76,7 @@ func (f FakeServiceLister) List(labels.Selector) ([]*api.Service, error) {
return f, nil
}

// GetPodServices gets the services that have the selector that match the labels on the given pod
// GetPodServices gets the services that have the selector that match the labels on the given pod.
func (f FakeServiceLister) GetPodServices(pod *api.Pod) (services []*api.Service, err error) {
var selector labels.Selector

Expand All @@ -91,10 +91,6 @@ func (f FakeServiceLister) GetPodServices(pod *api.Pod) (services []*api.Service
services = append(services, service)
}
}
if len(services) == 0 {
err = fmt.Errorf("Could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
}

return
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes the if err == nil check which woj suggested less noisy,and more importantly makes it so semantics conform to the other GetPodsServices tests.

}

Expand Down
59 changes: 59 additions & 0 deletions plugin/pkg/scheduler/algorithm/predicates/metadata.go
@@ -0,0 +1,59 @@
/*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jayunit100 - thanks this is exactly what I was asking for.

As you wrote somewhere, this PR became pretty big. Would you be able to separate this change (I mean this metadata file together with all the changes to integrate it in factory etc.) to a separate PR? And then in this PR you would leave the changes related to service-affinity?
It would be easier to detect the problem if it appear (and easier for me to review it :))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wojtek-t i could break it up, but my first question would be - how should we separate the changes?

  • the metadata.go changes effect predicates.go, because now predicates.go doesnt implement the CheckService algorithm
  • the changes to types.go are only meaningfull in the context of enabling matching pod metadata / node information when calcualting predicates via a factory
  • the changes to plugins.go are simply machinery to enable the predicates.go metadata to be plugged in via a factory
  • also the changes to generic_scheduler are really only there for enabling factoy-ization

so its a pretty cohesive change set.

Either way is ok, just let me know... but figured i would double-check that you think its necessary...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that what you can separate is all the changes to ServiceAffinity (which means all the changes in "checkServiceAffinity" and the utils and utils_test.go files). This would significantly reduce the size of this PR.
Do you agree?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm...

  • checkServiceAffinity: functionally all of the changes to checkServiceAffinity are dependent on precomputation working properly though :) . without it, checkServiceAffinity will still compile - but it wont ever have a complete pod affinity cache, so the match will return true for all nodes, because it wont see any affinity constraints, and then the unit tests would fail..
  • unit tests: indeed you are right, they are an atomic functional patch, but i doubt that would really make your life easier - its just a trivial PR in an isolated fileset .

however, if you want i can think more deeply about separating it.

ill let you make the final call either way is ok with me, i know youre very busy, so anything i can do to help ill do :)

Copyright 2016 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package predicates

import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)

type PredicateMetadataFactory struct {
podLister algorithm.PodLister
}

func NewPredicateMetadataFactory(podLister algorithm.PodLister) algorithm.MetadataProducer {
factory := &PredicateMetadataFactory{
podLister,
}
return factory.GetMetadata
}

// GetMetadata returns the predicateMetadata used which will be used by various predicates.
func (pfactory *PredicateMetadataFactory) GetMetadata(pod *api.Pod, nodeNameToInfoMap map[string]*schedulercache.NodeInfo) interface{} {
// If we cannot compute metadata, just return nil
if pod == nil {
return nil
}
matchingTerms, err := getMatchingAntiAffinityTerms(pod, nodeNameToInfoMap)
if err != nil {
return nil
}
predicateMetadata := &predicateMetadata{
pod: pod,
podBestEffort: isPodBestEffort(pod),
podRequest: GetResourceRequest(pod),
podPorts: GetUsedPorts(pod),
matchingAntiAffinityTerms: matchingTerms,
}
for predicateName, precomputeFunc := range predicatePrecomputations {
glog.V(4).Info("Precompute: %v", predicateName)
precomputeFunc(predicateMetadata)
}
return predicateMetadata
}
125 changes: 74 additions & 51 deletions plugin/pkg/scheduler/algorithm/predicates/predicates.go
Expand Up @@ -36,6 +36,19 @@ import (
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)

// predicatePrecomputations: Helper types/variables...
type PredicateMetadataModifier func(pm *predicateMetadata)

var predicatePrecomputeRegisterLock sync.Mutex
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you please move it above predicatePrecomputations? For me it's always easier to see the lock before the variable it's locking.

var predicatePrecomputations map[string]PredicateMetadataModifier = make(map[string]PredicateMetadataModifier)

func RegisterPredicatePrecomputation(predicateName string, precomp PredicateMetadataModifier) {
predicatePrecomputeRegisterLock.Lock()
defer predicatePrecomputeRegisterLock.Unlock()
predicatePrecomputations[predicateName] = precomp
}

// Other types for predicate functions...
type NodeInfo interface {
GetNodeInfo(nodeID string) (*api.Node, error)
}
Expand Down Expand Up @@ -67,34 +80,21 @@ func (c *CachedNodeInfo) GetNodeInfo(id string) (*api.Node, error) {
return node.(*api.Node), nil
}

// predicateMetadata is a type that is passed as metadata for predicate functions
type predicateMetadata struct {
podBestEffort bool
podRequest *schedulercache.Resource
podPorts map[int]bool
matchingAntiAffinityTerms []matchingPodAntiAffinityTerm
}

// Note that predicateMetdata and matchingPodAntiAffinityTerm need to be declared in the same file
// due to the way declarations are processed in predicate declaration unit tests.
type matchingPodAntiAffinityTerm struct {
term *api.PodAffinityTerm
node *api.Node
}

func PredicateMetadata(pod *api.Pod, nodeInfoMap map[string]*schedulercache.NodeInfo) interface{} {
// If we cannot compute metadata, just return nil
if pod == nil {
return nil
}
matchingTerms, err := getMatchingAntiAffinityTerms(pod, nodeInfoMap)
if err != nil {
return nil
}
return &predicateMetadata{
podBestEffort: isPodBestEffort(pod),
podRequest: GetResourceRequest(pod),
podPorts: GetUsedPorts(pod),
matchingAntiAffinityTerms: matchingTerms,
}
type predicateMetadata struct {
pod *api.Pod
podBestEffort bool
podRequest *schedulercache.Resource
podPorts map[int]bool
matchingAntiAffinityTerms []matchingPodAntiAffinityTerm
serviceAffinityMatchingPodList []*api.Pod
serviceAffinityMatchingPodServices []*api.Service
}

func isVolumeConflict(volume api.Volume, pod *api.Pod) bool {
Expand Down Expand Up @@ -627,20 +627,42 @@ type ServiceAffinity struct {
labels []string
}

func NewServiceAffinityPredicate(podLister algorithm.PodLister, serviceLister algorithm.ServiceLister, nodeInfo NodeInfo, labels []string) algorithm.FitPredicate {
// serviceAffinityPrecomputation should be run once by the scheduler before looping through the Predicate. It is a helper function that
// only should be referenced by NewServiceAffinityPredicate.
func (s *ServiceAffinity) serviceAffinityPrecomputation(pm *predicateMetadata) {
if pm.pod == nil {
glog.Errorf("Cannot precompute service affinity, a pod is required to caluculate service affinity.")
return
}

var errSvc, errList error
// Store services which match the pod.
pm.serviceAffinityMatchingPodServices, errSvc = s.serviceLister.GetPodServices(pm.pod)
selector := CreateSelectorFromLabels(pm.pod.Labels)
// consider only the pods that belong to the same namespace
allMatches, errList := s.podLister.List(selector)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove empty line

// In the future maybe we will return them as part of the function.
if errSvc != nil || errList != nil {
glog.Errorf("Some Error were found while precomputing svc affinity: \nservices:%v , \npods:%v", errSvc, errList)
}
pm.serviceAffinityMatchingPodList = FilterPodsByNamespace(allMatches, pm.pod.Namespace)
}

func NewServiceAffinityPredicate(podLister algorithm.PodLister, serviceLister algorithm.ServiceLister, nodeInfo NodeInfo, labels []string) (algorithm.FitPredicate, PredicateMetadataModifier) {
affinity := &ServiceAffinity{
podLister: podLister,
serviceLister: serviceLister,
nodeInfo: nodeInfo,
labels: labels,
}
return affinity.CheckServiceAffinity
return affinity.checkServiceAffinity, affinity.serviceAffinityPrecomputation
}

// The checkServiceAffinity predicate matches nodes in such a way to force that
// ServiceAffinity.labels are homogenous for pods added to a node.
// (i.e. it returns true IFF this pod can be added to this node, such
// that all other pods in the same service are running on nodes w/
// checkServiceAffinity is a predicate which matches nodes in such a way to force that
// ServiceAffinity.labels are homogenous for pods that are scheduled to a node.
// (i.e. it returns true IFF this pod can be added to this node such that all other pods in
// the same service are running on nodes with
// the exact same ServiceAffinity.label values).
//
// Details:
Expand All @@ -650,46 +672,47 @@ func NewServiceAffinityPredicate(podLister algorithm.PodLister, serviceLister al
// the match.
// Otherwise:
// Create an "implicit selector" which gaurantees pods will land on nodes with similar values
// for the affinity labels.
// for the affinity labels.
//
// To do this, we "reverse engineer" a selector by introspecting existing pods running under the same service+namespace.
// These backfilled labels in the selector "L" are defined like so:
// - L is a label that the ServiceAffinity object needs as a matching constraints.
// - L is not defined in the pod itself already.
// - and SOME pod, from a service, in the same namespace, ALREADY scheduled onto a node, has a matching value.
func (s *ServiceAffinity) CheckServiceAffinity(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
//
// WARNING: This Predicate is NOT gauranteed to work if some of the predicateMetadata data isn't precomputed...
// For that reason it is not exported, i.e. it is highlhy coupled to the implementation of the FitPredicate construction.
func (s *ServiceAffinity) checkServiceAffinity(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
var services []*api.Service
var pods []*api.Pod
if pm, ok := meta.(*predicateMetadata); ok && (pm.serviceAffinityMatchingPodList != nil || pm.serviceAffinityMatchingPodServices != nil) {
services = pm.serviceAffinityMatchingPodServices
pods = pm.serviceAffinityMatchingPodList
} else {
// Make the predicate resilient in case metadata is missing.
pm = &predicateMetadata{pod: pod}
s.serviceAffinityPrecomputation(pm)
pods, services = pm.serviceAffinityMatchingPodList, pm.serviceAffinityMatchingPodServices
}
node := nodeInfo.Node()
if node == nil {
return false, nil, fmt.Errorf("node not found")
}

// check if the pod being scheduled has the affinity labels specified in its NodeSelector
affinityLabels := FindLabelsInSet(s.labels, labels.Set(pod.Spec.NodeSelector))

// Introspect services IFF we didn't predefine all the affinity labels in the pod itself.
// Step 1: If we don't have all constraints, introspect nodes to find the missing constraints.
if len(s.labels) > len(affinityLabels) {
services, err := s.serviceLister.GetPodServices(pod)
if err == nil && len(services) > 0 {
// just use the first service and get the other pods within the service
// TODO: a separate predicate can be created that tries to handle all services for the pod
selector := labels.SelectorFromSet(services[0].Spec.Selector)
servicePods, err := s.podLister.List(selector)
if err != nil {
return false, nil, err
}
// consider only the pods that belong to the same namespace
nsServicePods := FilterPodsByNamespace(servicePods, pod.Namespace)
if len(nsServicePods) > 0 {
// consider any service pod and fetch the node its hosted on
otherNode, err := s.nodeInfo.GetNodeInfo(nsServicePods[0].Spec.NodeName)
if len(services) > 0 {
if len(pods) > 0 {
nodeWithAffinityLabels, err := s.nodeInfo.GetNodeInfo(pods[0].Spec.NodeName)
if err != nil {
return false, nil, err
}
AddUnsetLabelsToMap(affinityLabels, s.labels, labels.Set(otherNode.Labels))
AddUnsetLabelsToMap(affinityLabels, s.labels, labels.Set(nodeWithAffinityLabels.Labels))
}
}
}

// check if the node matches the selector
// Step 2: Finally complete the affinity predicate based on whatever set of predicates we were able to find.
if CreateSelectorFromLabels(affinityLabels).Matches(labels.Set(node.Labels)) {
return true, nil, nil
}
Expand Down
54 changes: 36 additions & 18 deletions plugin/pkg/scheduler/algorithm/predicates/predicates_test.go
Expand Up @@ -119,6 +119,11 @@ func newResourceInitPod(pod *api.Pod, usage ...schedulercache.Resource) *api.Pod
return pod
}

func PredicateMetadata(p *api.Pod, nodeInfo map[string]*schedulercache.NodeInfo) interface{} {
pm := PredicateMetadataFactory{algorithm.FakePodLister{p}}
return pm.GetMetadata(p, nodeInfo)
}

func TestPodFitsResources(t *testing.T) {
enoughPodsTests := []struct {
pod *api.Pod
Expand Down Expand Up @@ -233,7 +238,6 @@ func TestPodFitsResources(t *testing.T) {
for _, test := range enoughPodsTests {
node := api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 0, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32)}}
test.nodeInfo.SetNode(&node)

fits, reasons, err := PodFitsResources(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo)
if err != nil {
t.Errorf("%s: unexpected error: %v", test.test, err)
Expand Down Expand Up @@ -289,7 +293,6 @@ func TestPodFitsResources(t *testing.T) {
for _, test := range notEnoughPodsTests {
node := api.Node{Status: api.NodeStatus{Capacity: api.ResourceList{}, Allocatable: makeAllocatableResources(10, 20, 0, 1)}}
test.nodeInfo.SetNode(&node)

fits, reasons, err := PodFitsResources(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo)
if err != nil {
t.Errorf("%s: unexpected error: %v", test.test, err)
Expand Down Expand Up @@ -1310,22 +1313,38 @@ func TestServiceAffinity(t *testing.T) {
},
}
expectedFailureReasons := []algorithm.PredicateFailureReason{ErrServiceAffinityViolated}

for _, test := range tests {
nodes := []api.Node{node1, node2, node3, node4, node5}
serviceAffinity := ServiceAffinity{algorithm.FakePodLister(test.pods), algorithm.FakeServiceLister(test.services), FakeNodeListInfo(nodes), test.labels}
nodeInfo := schedulercache.NewNodeInfo()
nodeInfo.SetNode(test.node)
fits, reasons, err := serviceAffinity.CheckServiceAffinity(test.pod, PredicateMetadata(test.pod, nil), nodeInfo)
if err != nil {
t.Errorf("%s: unexpected error: %v", test.test, err)
}
if !fits && !reflect.DeepEqual(reasons, expectedFailureReasons) {
t.Errorf("%s: unexpected failure reasons: %v, want: %v", test.test, reasons, expectedFailureReasons)
}
if fits != test.fits {
t.Errorf("%s: expected: %v got %v", test.test, test.fits, fits)
testIt := func(skipPrecompute bool) {
nodes := []api.Node{node1, node2, node3, node4, node5}
nodeInfo := schedulercache.NewNodeInfo()
nodeInfo.SetNode(test.node)
nodeInfoMap := map[string]*schedulercache.NodeInfo{test.node.Name: nodeInfo}
// Reimplementing the logic that the scheduler implements: Any time it makes a predicate, it registers any precomputations.
predicate, precompute := NewServiceAffinityPredicate(algorithm.FakePodLister(test.pods), algorithm.FakeServiceLister(test.services), FakeNodeListInfo(nodes), test.labels)
// Register a precomputation or Rewrite the precomputation to a no-op, depending on the state we want to test.
RegisterPredicatePrecomputation("checkServiceAffinity-unitTestPredicate", func(pm *predicateMetadata) {
if !skipPrecompute {
precompute(pm)
}
})
if pmeta, ok := (PredicateMetadata(test.pod, nodeInfoMap)).(*predicateMetadata); ok {
fits, reasons, err := predicate(test.pod, pmeta, nodeInfo)
if err != nil {
t.Errorf("%s: unexpected error: %v", test.test, err)
}
if !fits && !reflect.DeepEqual(reasons, expectedFailureReasons) {
t.Errorf("%s: unexpected failure reasons: %v, want: %v", test.test, reasons, expectedFailureReasons)
}
if fits != test.fits {
t.Errorf("%s: expected: %v got %v", test.test, test.fits, fits)
}
} else {
t.Errorf("Error casting.")
}
}

testIt(false) // Confirm that the predicate works without precomputed data (resilience)
testIt(true) // Confirm that the predicate works with the precomputed data (better performance)
}
}

Expand Down Expand Up @@ -1586,7 +1605,6 @@ func TestEBSVolumeCountConflicts(t *testing.T) {
}
return "", false
},

FilterPersistentVolume: func(pv *api.PersistentVolume) (string, bool) {
if pv.Spec.AWSElasticBlockStore != nil {
return pv.Spec.AWSElasticBlockStore.VolumeID, true
Expand Down Expand Up @@ -1652,7 +1670,7 @@ func TestPredicatesRegistered(t *testing.T) {
if err == nil {
functions = append(functions, fileFunctions...)
} else {
t.Errorf("unexpected error when parsing %s", filePath)
t.Errorf("unexpected error %s when parsing %s", err, filePath)
}
}

Expand Down
6 changes: 5 additions & 1 deletion plugin/pkg/scheduler/algorithm/predicates/utils_test.go
Expand Up @@ -49,7 +49,11 @@ func ExampleFindLabelsInSet() {
},
},

{}, // a third pod which will have no effect on anything.
{
ObjectMeta: api.ObjectMeta{
Name: "pod3ThatWeWontSee",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This modification makes the unit test a little more robust for debugging, in terms of the expected output.

},
},
}
fmt.Println(FindLabelsInSet([]string{"label1", "label2", "label3"}, nsPods[0].ObjectMeta.Labels)["label3"])
AddUnsetLabelsToMap(labelSubset, []string{"label1", "label2", "label3"}, nsPods[0].ObjectMeta.Labels)
Expand Down
1 change: 1 addition & 0 deletions plugin/pkg/scheduler/algorithm/types.go
Expand Up @@ -54,6 +54,7 @@ type PriorityConfig struct {
Weight int
}

// EmptyMetadataProducer returns a no-op MetadataProducer type.
func EmptyMetadataProducer(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo) interface{} {
return nil
}
Expand Down
5 changes: 5 additions & 0 deletions plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go
Expand Up @@ -66,6 +66,11 @@ func init() {
return priorities.PriorityMetadata
})

factory.RegisterPredicateMetadataProducerFactory(
func(args factory.PluginFactoryArgs) algorithm.MetadataProducer {
return predicates.NewPredicateMetadataFactory(args.PodLister)
})

// EqualPriority is a prioritizer function that gives an equal weight of one to all nodes
// Register the priority function so that its available
// but do not include it as part of the default priorities
Expand Down