Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store node information in NodeInfo #24598

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -2351,8 +2351,9 @@ func (kl *Kubelet) canAdmitPod(pods []*api.Pod, pod *api.Pod) (bool, string, str
otherPods = append(otherPods, p)
}
}
nodeInfo := schedulercache.CreateNodeNameToInfoMap(otherPods)[kl.nodeName]
fit, err := predicates.RunGeneralPredicates(pod, kl.nodeName, nodeInfo, node)
nodeInfo := schedulercache.NewNodeInfo(otherPods...)
nodeInfo.SetNode(node)
fit, err := predicates.GeneralPredicates(pod, kl.nodeName, nodeInfo)
if !fit {
if re, ok := err.(*predicates.PredicateFailureError); ok {
reason := re.PredicateName
Expand Down
133 changes: 35 additions & 98 deletions plugin/pkg/scheduler/algorithm/predicates/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
Expand All @@ -42,27 +41,6 @@ type PersistentVolumeClaimInfo interface {
GetPersistentVolumeClaimInfo(namespace string, pvcID string) (*api.PersistentVolumeClaim, error)
}

type StaticNodeInfo struct {
*api.NodeList
}

func (nodes StaticNodeInfo) GetNodeInfo(nodeID string) (*api.Node, error) {
for ix := range nodes.Items {
if nodes.Items[ix].Name == nodeID {
return &nodes.Items[ix], nil
}
}
return nil, fmt.Errorf("failed to find node: %s, %#v", nodeID, nodes)
}

type ClientNodeInfo struct {
*client.Client
}

func (nodes ClientNodeInfo) GetNodeInfo(nodeID string) (*api.Node, error) {
return nodes.Nodes().Get(nodeID)
}

type CachedNodeInfo struct {
*cache.StoreToNodeLister
}
Expand Down Expand Up @@ -271,9 +249,8 @@ var GCEPDVolumeFilter VolumeFilter = VolumeFilter{
}

type VolumeZoneChecker struct {
nodeInfo NodeInfo
pvInfo PersistentVolumeInfo
pvcInfo PersistentVolumeClaimInfo
pvInfo PersistentVolumeInfo
pvcInfo PersistentVolumeClaimInfo
}

// VolumeZonePredicate evaluates if a pod can fit due to the volumes it requests, given
Expand All @@ -290,20 +267,16 @@ type VolumeZoneChecker struct {
// determining the zone of a volume during scheduling, and that is likely to
// require calling out to the cloud provider. It seems that we are moving away
// from inline volume declarations anyway.
func NewVolumeZonePredicate(nodeInfo NodeInfo, pvInfo PersistentVolumeInfo, pvcInfo PersistentVolumeClaimInfo) algorithm.FitPredicate {
func NewVolumeZonePredicate(pvInfo PersistentVolumeInfo, pvcInfo PersistentVolumeClaimInfo) algorithm.FitPredicate {
c := &VolumeZoneChecker{
nodeInfo: nodeInfo,
pvInfo: pvInfo,
pvcInfo: pvcInfo,
pvInfo: pvInfo,
pvcInfo: pvcInfo,
}
return c.predicate
}

func (c *VolumeZoneChecker) predicate(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
node, err := c.nodeInfo.GetNodeInfo(nodeName)
if err != nil {
return false, err
}
node := nodeInfo.Node()
if node == nil {
return false, fmt.Errorf("node not found: %q", nodeName)
}
Expand Down Expand Up @@ -372,10 +345,6 @@ func (c *VolumeZoneChecker) predicate(pod *api.Pod, nodeName string, nodeInfo *s
return true, nil
}

type ResourceFit struct {
info NodeInfo
}

type resourceRequest struct {
milliCPU int64
memory int64
Expand Down Expand Up @@ -422,8 +391,12 @@ func podName(pod *api.Pod) string {
return pod.Namespace + "/" + pod.Name
}

func podFitsResourcesInternal(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo, info *api.Node) (bool, error) {
allocatable := info.Status.Allocatable
func podFitsResourcesInternal(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
node := nodeInfo.Node()
if node == nil {
return false, fmt.Errorf("node not found: %q", nodeName)
}
allocatable := node.Status.Allocatable
allowedPodNumber := allocatable.Pods().Value()
if int64(len(nodeInfo.Pods()))+1 > allowedPodNumber {
return false,
Expand All @@ -450,26 +423,8 @@ func podFitsResourcesInternal(pod *api.Pod, nodeName string, nodeInfo *scheduler
return true, nil
}

func (r *NodeStatus) PodFitsResources(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
info, err := r.info.GetNodeInfo(nodeName)
if err != nil {
return false, err
}
return podFitsResourcesInternal(pod, nodeName, nodeInfo, info)
}

func NewResourceFitPredicate(info NodeInfo) algorithm.FitPredicate {
fit := &NodeStatus{
info: info,
}
return fit.PodFitsResources
}

func NewSelectorMatchPredicate(info NodeInfo) algorithm.FitPredicate {
selector := &NodeStatus{
info: info,
}
return selector.PodSelectorMatches
func PodFitsResources(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
return podFitsResourcesInternal(pod, nodeName, nodeInfo)
}

// nodeMatchesNodeSelectorTerms checks if a node's labels satisfy a list of node selector terms,
Expand Down Expand Up @@ -541,14 +496,10 @@ func PodMatchesNodeLabels(pod *api.Pod, node *api.Node) bool {
return nodeAffinityMatches
}

type NodeSelector struct {
info NodeInfo
}

func (n *NodeStatus) PodSelectorMatches(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
node, err := n.info.GetNodeInfo(nodeName)
if err != nil {
return false, err
func PodSelectorMatches(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
node := nodeInfo.Node()
if node == nil {
return false, fmt.Errorf("node not found: %q", nodeName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I am not sure about the k8s policy here. But it is not a good idea to return a very general but private defined error in a public function.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We definitely shouldn't return such errors to the outside world (users). But this is more like internal scheduling function, and there are a bunch of other such examples in the code (even in this file). So I think it's ok.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to see a FitError (the name exists somehow in generic_scheduler) or PredicateError to encapsulate those.

Not arguing here. We can add it later to separate smaller commits.

}
if PodMatchesNodeLabels(pod, node) {
return true, nil
Expand All @@ -567,14 +518,12 @@ func PodFitsHost(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInf
}

type NodeLabelChecker struct {
info NodeInfo
labels []string
presence bool
}

func NewNodeLabelPredicate(info NodeInfo, labels []string, presence bool) algorithm.FitPredicate {
func NewNodeLabelPredicate(labels []string, presence bool) algorithm.FitPredicate {
labelChecker := &NodeLabelChecker{
info: info,
labels: labels,
presence: presence,
}
Expand All @@ -594,11 +543,12 @@ func NewNodeLabelPredicate(info NodeInfo, labels []string, presence bool) algori
// A node may have a label with "retiring" as key and the date as the value
// and it may be desirable to avoid scheduling new pods on this node
func (n *NodeLabelChecker) CheckNodeLabelPresence(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
var exists bool
node, err := n.info.GetNodeInfo(nodeName)
if err != nil {
return false, err
node := nodeInfo.Node()
if node == nil {
return false, fmt.Errorf("node not found: %q", nodeName)
}

var exists bool
nodeLabels := labels.Set(node.Labels)
for _, label := range n.labels {
exists = nodeLabels.Has(label)
Expand Down Expand Up @@ -725,11 +675,16 @@ func PodFitsHostPorts(pod *api.Pod, nodeName string, nodeInfo *schedulercache.No
}

func getUsedPorts(pods ...*api.Pod) map[int]bool {
// TODO: Aggregate it at the NodeInfo level.
ports := make(map[int]bool)
for _, pod := range pods {
for _, container := range pod.Spec.Containers {
for _, podPort := range container.Ports {
ports[podPort.HostPort] = true
// "0" is explicitly ignored in PodFitsHostPorts,
// which is the only function that uses this value.
if podPort.HostPort != 0 {
ports[podPort.HostPort] = true
}
}
}
}
Expand All @@ -748,27 +703,8 @@ func haveSame(a1, a2 []string) bool {
return false
}

type NodeStatus struct {
info NodeInfo
}

func GeneralPredicates(info NodeInfo) algorithm.FitPredicate {
node := &NodeStatus{
info: info,
}
return node.SchedulerGeneralPredicates
}

func (n *NodeStatus) SchedulerGeneralPredicates(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
node, err := n.info.GetNodeInfo(nodeName)
if err != nil {
return false, err
}
return RunGeneralPredicates(pod, nodeName, nodeInfo, node)
}

func RunGeneralPredicates(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo, node *api.Node) (bool, error) {
fit, err := podFitsResourcesInternal(pod, nodeName, nodeInfo, node)
func GeneralPredicates(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
fit, err := podFitsResourcesInternal(pod, nodeName, nodeInfo)
if !fit {
return fit, err
}
Expand All @@ -781,8 +717,9 @@ func RunGeneralPredicates(pod *api.Pod, nodeName string, nodeInfo *schedulercach
if !fit {
return fit, err
}
if !PodMatchesNodeLabels(pod, node) {
return false, ErrNodeSelectorNotMatch
fit, err = PodSelectorMatches(pod, nodeName, nodeInfo)
if !fit {
return fit, err
}
return true, nil
}
23 changes: 14 additions & 9 deletions plugin/pkg/scheduler/algorithm/predicates/predicates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ func TestPodFitsResources(t *testing.T) {

for _, test := range enoughPodsTests {
node := api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 32)}}
test.nodeInfo.SetNode(&node)

fit := NodeStatus{FakeNodeInfo(node)}
fits, err := fit.PodFitsResources(test.pod, "machine", test.nodeInfo)
fits, err := PodFitsResources(test.pod, "machine", test.nodeInfo)
if !reflect.DeepEqual(err, test.wErr) {
t.Errorf("%s: unexpected error: %v, want: %v", test.test, err, test.wErr)
}
Expand Down Expand Up @@ -203,9 +203,9 @@ func TestPodFitsResources(t *testing.T) {
}
for _, test := range notEnoughPodsTests {
node := api.Node{Status: api.NodeStatus{Capacity: api.ResourceList{}, Allocatable: makeAllocatableResources(10, 20, 1)}}
test.nodeInfo.SetNode(&node)

fit := NodeStatus{FakeNodeInfo(node)}
fits, err := fit.PodFitsResources(test.pod, "machine", test.nodeInfo)
fits, err := PodFitsResources(test.pod, "machine", test.nodeInfo)
if !reflect.DeepEqual(err, test.wErr) {
t.Errorf("%s: unexpected error: %v, want: %v", test.test, err, test.wErr)
}
Expand Down Expand Up @@ -994,9 +994,10 @@ func TestPodFitsSelector(t *testing.T) {

for _, test := range tests {
node := api.Node{ObjectMeta: api.ObjectMeta{Labels: test.labels}}
nodeInfo := schedulercache.NewNodeInfo()
nodeInfo.SetNode(&node)

fit := NodeStatus{FakeNodeInfo(node)}
fits, err := fit.PodSelectorMatches(test.pod, "machine", schedulercache.NewNodeInfo())
fits, err := PodSelectorMatches(test.pod, "machine", nodeInfo)
if !reflect.DeepEqual(err, ErrNodeSelectorNotMatch) && err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -1057,8 +1058,11 @@ func TestNodeLabelPresence(t *testing.T) {
}
for _, test := range tests {
node := api.Node{ObjectMeta: api.ObjectMeta{Labels: label}}
labelChecker := NodeLabelChecker{FakeNodeInfo(node), test.labels, test.presence}
fits, err := labelChecker.CheckNodeLabelPresence(test.pod, "machine", schedulercache.NewNodeInfo())
nodeInfo := schedulercache.NewNodeInfo()
nodeInfo.SetNode(&node)

labelChecker := NodeLabelChecker{test.labels, test.presence}
fits, err := labelChecker.CheckNodeLabelPresence(test.pod, "machine", nodeInfo)
if !reflect.DeepEqual(err, ErrNodeLabelPresenceViolated) && err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -1550,7 +1554,8 @@ func TestRunGeneralPredicates(t *testing.T) {
},
}
for _, test := range resourceTests {
fits, err := RunGeneralPredicates(test.pod, test.nodeName, test.nodeInfo, test.node)
test.nodeInfo.SetNode(test.node)
fits, err := GeneralPredicates(test.pod, test.nodeName, test.nodeInfo)
if !reflect.DeepEqual(err, test.wErr) {
t.Errorf("%s: unexpected error: %v, want: %v", test.test, err, test.wErr)
}
Expand Down
25 changes: 5 additions & 20 deletions plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,13 @@ func init() {
// Fit is determined by resource availability.
// This predicate is actually a default predicate, because it is invoked from
// predicates.GeneralPredicates()
factory.RegisterFitPredicateFactory(
"PodFitsResources",
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewResourceFitPredicate(args.NodeInfo)
},
)
factory.RegisterFitPredicate("PodFitsResources", predicates.PodFitsResources)
// Fit is determined by the presence of the Host parameter and a string match
// This predicate is actually a default predicate, because it is invoked from
// predicates.GeneralPredicates()
factory.RegisterFitPredicate("HostName", predicates.PodFitsHost)
// Fit is determined by node selector query.
factory.RegisterFitPredicate("MatchNodeSelector", predicates.PodSelectorMatches)
}

func defaultPredicates() sets.String {
Expand All @@ -104,14 +101,7 @@ func defaultPredicates() sets.String {
factory.RegisterFitPredicateFactory(
"NoVolumeZoneConflict",
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewVolumeZonePredicate(args.NodeInfo, args.PVInfo, args.PVCInfo)
},
),
// Fit is determined by node selector query.
factory.RegisterFitPredicateFactory(
"MatchNodeSelector",
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewSelectorMatchPredicate(args.NodeInfo)
return predicates.NewVolumeZonePredicate(args.PVInfo, args.PVCInfo)
},
),
// Fit is determined by whether or not there would be too many AWS EBS volumes attached to the node
Expand All @@ -134,12 +124,7 @@ func defaultPredicates() sets.String {
),
// GeneralPredicates are the predicates that are enforced by all Kubernetes components
// (e.g. kubelet and all schedulers)
factory.RegisterFitPredicateFactory(
"GeneralPredicates",
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.GeneralPredicates(args.NodeInfo)
},
),
factory.RegisterFitPredicate("GeneralPredicates", predicates.GeneralPredicates),
)
}

Expand Down
Loading