Skip to content

Commit

Permalink
Store node information in NodeInfo
Browse files Browse the repository at this point in the history
  • Loading branch information
wojtek-t committed Apr 25, 2016
1 parent c5df0bf commit 1835c85
Show file tree
Hide file tree
Showing 11 changed files with 269 additions and 185 deletions.
5 changes: 3 additions & 2 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -2351,8 +2351,9 @@ func (kl *Kubelet) canAdmitPod(pods []*api.Pod, pod *api.Pod) (bool, string, str
otherPods = append(otherPods, p)
}
}
nodeInfo := schedulercache.CreateNodeNameToInfoMap(otherPods)[kl.nodeName]
fit, err := predicates.RunGeneralPredicates(pod, kl.nodeName, nodeInfo, node)
nodeInfo := schedulercache.NewNodeInfo(otherPods...)
nodeInfo.SetNode(node)
fit, err := predicates.GeneralPredicates(pod, kl.nodeName, nodeInfo)
if !fit {
if re, ok := err.(*predicates.PredicateFailureError); ok {
reason := re.PredicateName
Expand Down
133 changes: 35 additions & 98 deletions plugin/pkg/scheduler/algorithm/predicates/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
Expand All @@ -42,27 +41,6 @@ type PersistentVolumeClaimInfo interface {
GetPersistentVolumeClaimInfo(namespace string, pvcID string) (*api.PersistentVolumeClaim, error)
}

type StaticNodeInfo struct {
*api.NodeList
}

func (nodes StaticNodeInfo) GetNodeInfo(nodeID string) (*api.Node, error) {
for ix := range nodes.Items {
if nodes.Items[ix].Name == nodeID {
return &nodes.Items[ix], nil
}
}
return nil, fmt.Errorf("failed to find node: %s, %#v", nodeID, nodes)
}

type ClientNodeInfo struct {
*client.Client
}

func (nodes ClientNodeInfo) GetNodeInfo(nodeID string) (*api.Node, error) {
return nodes.Nodes().Get(nodeID)
}

type CachedNodeInfo struct {
*cache.StoreToNodeLister
}
Expand Down Expand Up @@ -271,9 +249,8 @@ var GCEPDVolumeFilter VolumeFilter = VolumeFilter{
}

type VolumeZoneChecker struct {
nodeInfo NodeInfo
pvInfo PersistentVolumeInfo
pvcInfo PersistentVolumeClaimInfo
pvInfo PersistentVolumeInfo
pvcInfo PersistentVolumeClaimInfo
}

// VolumeZonePredicate evaluates if a pod can fit due to the volumes it requests, given
Expand All @@ -290,20 +267,16 @@ type VolumeZoneChecker struct {
// determining the zone of a volume during scheduling, and that is likely to
// require calling out to the cloud provider. It seems that we are moving away
// from inline volume declarations anyway.
func NewVolumeZonePredicate(nodeInfo NodeInfo, pvInfo PersistentVolumeInfo, pvcInfo PersistentVolumeClaimInfo) algorithm.FitPredicate {
func NewVolumeZonePredicate(pvInfo PersistentVolumeInfo, pvcInfo PersistentVolumeClaimInfo) algorithm.FitPredicate {
c := &VolumeZoneChecker{
nodeInfo: nodeInfo,
pvInfo: pvInfo,
pvcInfo: pvcInfo,
pvInfo: pvInfo,
pvcInfo: pvcInfo,
}
return c.predicate
}

func (c *VolumeZoneChecker) predicate(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
node, err := c.nodeInfo.GetNodeInfo(nodeName)
if err != nil {
return false, err
}
node := nodeInfo.Node()
if node == nil {
return false, fmt.Errorf("node not found: %q", nodeName)
}
Expand Down Expand Up @@ -372,10 +345,6 @@ func (c *VolumeZoneChecker) predicate(pod *api.Pod, nodeName string, nodeInfo *s
return true, nil
}

type ResourceFit struct {
info NodeInfo
}

type resourceRequest struct {
milliCPU int64
memory int64
Expand Down Expand Up @@ -422,8 +391,12 @@ func podName(pod *api.Pod) string {
return pod.Namespace + "/" + pod.Name
}

func podFitsResourcesInternal(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo, info *api.Node) (bool, error) {
allocatable := info.Status.Allocatable
func podFitsResourcesInternal(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
node := nodeInfo.Node()
if node == nil {
return false, fmt.Errorf("node not found: %q", nodeName)
}
allocatable := node.Status.Allocatable
allowedPodNumber := allocatable.Pods().Value()
if int64(len(nodeInfo.Pods()))+1 > allowedPodNumber {
return false,
Expand All @@ -450,26 +423,8 @@ func podFitsResourcesInternal(pod *api.Pod, nodeName string, nodeInfo *scheduler
return true, nil
}

func (r *NodeStatus) PodFitsResources(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
info, err := r.info.GetNodeInfo(nodeName)
if err != nil {
return false, err
}
return podFitsResourcesInternal(pod, nodeName, nodeInfo, info)
}

func NewResourceFitPredicate(info NodeInfo) algorithm.FitPredicate {
fit := &NodeStatus{
info: info,
}
return fit.PodFitsResources
}

func NewSelectorMatchPredicate(info NodeInfo) algorithm.FitPredicate {
selector := &NodeStatus{
info: info,
}
return selector.PodSelectorMatches
func PodFitsResources(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
return podFitsResourcesInternal(pod, nodeName, nodeInfo)
}

// nodeMatchesNodeSelectorTerms checks if a node's labels satisfy a list of node selector terms,
Expand Down Expand Up @@ -541,14 +496,10 @@ func PodMatchesNodeLabels(pod *api.Pod, node *api.Node) bool {
return nodeAffinityMatches
}

type NodeSelector struct {
info NodeInfo
}

func (n *NodeStatus) PodSelectorMatches(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
node, err := n.info.GetNodeInfo(nodeName)
if err != nil {
return false, err
func PodSelectorMatches(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
node := nodeInfo.Node()
if node == nil {
return false, fmt.Errorf("node not found: %q", nodeName)
}
if PodMatchesNodeLabels(pod, node) {
return true, nil
Expand All @@ -567,14 +518,12 @@ func PodFitsHost(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInf
}

type NodeLabelChecker struct {
info NodeInfo
labels []string
presence bool
}

func NewNodeLabelPredicate(info NodeInfo, labels []string, presence bool) algorithm.FitPredicate {
func NewNodeLabelPredicate(labels []string, presence bool) algorithm.FitPredicate {
labelChecker := &NodeLabelChecker{
info: info,
labels: labels,
presence: presence,
}
Expand All @@ -594,11 +543,12 @@ func NewNodeLabelPredicate(info NodeInfo, labels []string, presence bool) algori
// A node may have a label with "retiring" as key and the date as the value
// and it may be desirable to avoid scheduling new pods on this node
func (n *NodeLabelChecker) CheckNodeLabelPresence(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
var exists bool
node, err := n.info.GetNodeInfo(nodeName)
if err != nil {
return false, err
node := nodeInfo.Node()
if node == nil {
return false, fmt.Errorf("node not found: %q", nodeName)
}

var exists bool
nodeLabels := labels.Set(node.Labels)
for _, label := range n.labels {
exists = nodeLabels.Has(label)
Expand Down Expand Up @@ -725,11 +675,16 @@ func PodFitsHostPorts(pod *api.Pod, nodeName string, nodeInfo *schedulercache.No
}

func getUsedPorts(pods ...*api.Pod) map[int]bool {
// TODO: Aggregate it at the NodeInfo level.
ports := make(map[int]bool)
for _, pod := range pods {
for _, container := range pod.Spec.Containers {
for _, podPort := range container.Ports {
ports[podPort.HostPort] = true
// "0" is explicitly ignored in PodFitsHostPorts,
// which is the only function that uses this value.
if podPort.HostPort != 0 {
ports[podPort.HostPort] = true
}
}
}
}
Expand All @@ -748,27 +703,8 @@ func haveSame(a1, a2 []string) bool {
return false
}

type NodeStatus struct {
info NodeInfo
}

func GeneralPredicates(info NodeInfo) algorithm.FitPredicate {
node := &NodeStatus{
info: info,
}
return node.SchedulerGeneralPredicates
}

func (n *NodeStatus) SchedulerGeneralPredicates(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
node, err := n.info.GetNodeInfo(nodeName)
if err != nil {
return false, err
}
return RunGeneralPredicates(pod, nodeName, nodeInfo, node)
}

func RunGeneralPredicates(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo, node *api.Node) (bool, error) {
fit, err := podFitsResourcesInternal(pod, nodeName, nodeInfo, node)
func GeneralPredicates(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
fit, err := podFitsResourcesInternal(pod, nodeName, nodeInfo)
if !fit {
return fit, err
}
Expand All @@ -781,8 +717,9 @@ func RunGeneralPredicates(pod *api.Pod, nodeName string, nodeInfo *schedulercach
if !fit {
return fit, err
}
if !PodMatchesNodeLabels(pod, node) {
return false, ErrNodeSelectorNotMatch
fit, err = PodSelectorMatches(pod, nodeName, nodeInfo)
if !fit {
return fit, err
}
return true, nil
}
23 changes: 14 additions & 9 deletions plugin/pkg/scheduler/algorithm/predicates/predicates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ func TestPodFitsResources(t *testing.T) {

for _, test := range enoughPodsTests {
node := api.Node{Status: api.NodeStatus{Capacity: makeResources(10, 20, 32).Capacity, Allocatable: makeAllocatableResources(10, 20, 32)}}
test.nodeInfo.SetNode(&node)

fit := NodeStatus{FakeNodeInfo(node)}
fits, err := fit.PodFitsResources(test.pod, "machine", test.nodeInfo)
fits, err := PodFitsResources(test.pod, "machine", test.nodeInfo)
if !reflect.DeepEqual(err, test.wErr) {
t.Errorf("%s: unexpected error: %v, want: %v", test.test, err, test.wErr)
}
Expand Down Expand Up @@ -203,9 +203,9 @@ func TestPodFitsResources(t *testing.T) {
}
for _, test := range notEnoughPodsTests {
node := api.Node{Status: api.NodeStatus{Capacity: api.ResourceList{}, Allocatable: makeAllocatableResources(10, 20, 1)}}
test.nodeInfo.SetNode(&node)

fit := NodeStatus{FakeNodeInfo(node)}
fits, err := fit.PodFitsResources(test.pod, "machine", test.nodeInfo)
fits, err := PodFitsResources(test.pod, "machine", test.nodeInfo)
if !reflect.DeepEqual(err, test.wErr) {
t.Errorf("%s: unexpected error: %v, want: %v", test.test, err, test.wErr)
}
Expand Down Expand Up @@ -994,9 +994,10 @@ func TestPodFitsSelector(t *testing.T) {

for _, test := range tests {
node := api.Node{ObjectMeta: api.ObjectMeta{Labels: test.labels}}
nodeInfo := schedulercache.NewNodeInfo()
nodeInfo.SetNode(&node)

fit := NodeStatus{FakeNodeInfo(node)}
fits, err := fit.PodSelectorMatches(test.pod, "machine", schedulercache.NewNodeInfo())
fits, err := PodSelectorMatches(test.pod, "machine", nodeInfo)
if !reflect.DeepEqual(err, ErrNodeSelectorNotMatch) && err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -1057,8 +1058,11 @@ func TestNodeLabelPresence(t *testing.T) {
}
for _, test := range tests {
node := api.Node{ObjectMeta: api.ObjectMeta{Labels: label}}
labelChecker := NodeLabelChecker{FakeNodeInfo(node), test.labels, test.presence}
fits, err := labelChecker.CheckNodeLabelPresence(test.pod, "machine", schedulercache.NewNodeInfo())
nodeInfo := schedulercache.NewNodeInfo()
nodeInfo.SetNode(&node)

labelChecker := NodeLabelChecker{test.labels, test.presence}
fits, err := labelChecker.CheckNodeLabelPresence(test.pod, "machine", nodeInfo)
if !reflect.DeepEqual(err, ErrNodeLabelPresenceViolated) && err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -1550,7 +1554,8 @@ func TestRunGeneralPredicates(t *testing.T) {
},
}
for _, test := range resourceTests {
fits, err := RunGeneralPredicates(test.pod, test.nodeName, test.nodeInfo, test.node)
test.nodeInfo.SetNode(test.node)
fits, err := GeneralPredicates(test.pod, test.nodeName, test.nodeInfo)
if !reflect.DeepEqual(err, test.wErr) {
t.Errorf("%s: unexpected error: %v, want: %v", test.test, err, test.wErr)
}
Expand Down
25 changes: 5 additions & 20 deletions plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,13 @@ func init() {
// Fit is determined by resource availability.
// This predicate is actually a default predicate, because it is invoked from
// predicates.GeneralPredicates()
factory.RegisterFitPredicateFactory(
"PodFitsResources",
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewResourceFitPredicate(args.NodeInfo)
},
)
factory.RegisterFitPredicate("PodFitsResources", predicates.PodFitsResources)
// Fit is determined by the presence of the Host parameter and a string match
// This predicate is actually a default predicate, because it is invoked from
// predicates.GeneralPredicates()
factory.RegisterFitPredicate("HostName", predicates.PodFitsHost)
// Fit is determined by node selector query.
factory.RegisterFitPredicate("MatchNodeSelector", predicates.PodSelectorMatches)
}

func defaultPredicates() sets.String {
Expand All @@ -104,14 +101,7 @@ func defaultPredicates() sets.String {
factory.RegisterFitPredicateFactory(
"NoVolumeZoneConflict",
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewVolumeZonePredicate(args.NodeInfo, args.PVInfo, args.PVCInfo)
},
),
// Fit is determined by node selector query.
factory.RegisterFitPredicateFactory(
"MatchNodeSelector",
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewSelectorMatchPredicate(args.NodeInfo)
return predicates.NewVolumeZonePredicate(args.PVInfo, args.PVCInfo)
},
),
// Fit is determined by whether or not there would be too many AWS EBS volumes attached to the node
Expand All @@ -134,12 +124,7 @@ func defaultPredicates() sets.String {
),
// GeneralPredicates are the predicates that are enforced by all Kubernetes components
// (e.g. kubelet and all schedulers)
factory.RegisterFitPredicateFactory(
"GeneralPredicates",
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.GeneralPredicates(args.NodeInfo)
},
),
factory.RegisterFitPredicate("GeneralPredicates", predicates.GeneralPredicates),
)
}

Expand Down
Loading

0 comments on commit 1835c85

Please sign in to comment.