Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pods pending due to insufficient OIR should get scheduled once sufficient OIR becomes available. #41870

Merged
merged 2 commits into from
Mar 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/kubelet/kubelet_node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,14 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) {
if node.Status.Allocatable == nil {
node.Status.Allocatable = make(v1.ResourceList)
}
// Remove opaque integer resources from allocatable that are no longer
// present in capacity.
for k := range node.Status.Allocatable {
_, found := node.Status.Capacity[k]
if !found && v1.IsOpaqueIntResourceName(k) {
delete(node.Status.Allocatable, k)
}
}
allocatableReservation := kl.containerManager.GetNodeAllocatableReservation()
for k, v := range node.Status.Capacity {
value := *(v.Copy())
Expand Down
28 changes: 25 additions & 3 deletions plugin/pkg/scheduler/algorithm/predicates/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,30 @@ func (c *VolumeZoneChecker) predicate(pod *v1.Pod, meta interface{}, nodeInfo *s
return true, nil, nil
}

// Returns a *schedulercache.Resource that covers the largest width in each
// resource dimension. Because init-containers run sequentially, we collect the
// max in each dimension iteratively. In contrast, we sum the resource vectors
// for regular containers since they run simultaneously.
//
// Example:
//
// Pod:
// InitContainers
// IC1:
// CPU: 2
// Memory: 1G
// IC2:
// CPU: 2
// Memory: 3G
// Containers
// C1:
// CPU: 2
// Memory: 1G
// C2:
// CPU: 1
// Memory: 1G
//
// Result: CPU: 3, Memory: 3G
func GetResourceRequest(pod *v1.Pod) *schedulercache.Resource {
result := schedulercache.Resource{}
for _, container := range pod.Spec.Containers {
Expand Down Expand Up @@ -505,10 +529,8 @@ func GetResourceRequest(pod *v1.Pod) *schedulercache.Resource {
default:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

feels like this method needs a header comment // GetResourceRequest calculates maximal individual init containers (since they're serial) and sum of standard pod containers (since theyre running together), and takes the max of the entire set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, will add this en passent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a banner comment.

if v1.IsOpaqueIntResourceName(rName) {
value := rQuantity.Value()
// Ensure the opaque resource map is initialized in the result.
result.AddOpaque(rName, int64(0))
if value > result.OpaqueIntResources[rName] {
result.OpaqueIntResources[rName] = value
result.SetOpaque(rName, value)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we update unit test for the fix to ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which unit tests do you have in mind? We could add something for SetOpaque.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup;

}
}
}
Expand Down
8 changes: 6 additions & 2 deletions plugin/pkg/scheduler/schedulercache/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,15 @@ func (r *Resource) ResourceList() v1.ResourceList {
}

func (r *Resource) AddOpaque(name v1.ResourceName, quantity int64) {
r.SetOpaque(name, r.OpaqueIntResources[name]+quantity)
}

func (r *Resource) SetOpaque(name v1.ResourceName, quantity int64) {
// Lazily allocate opaque integer resource map.
if r.OpaqueIntResources == nil {
r.OpaqueIntResources = map[v1.ResourceName]int64{}
}
r.OpaqueIntResources[name] += quantity
r.OpaqueIntResources[name] = quantity
}

// NewNodeInfo returns a ready to use empty NodeInfo object.
Expand Down Expand Up @@ -333,7 +337,7 @@ func (n *NodeInfo) SetNode(node *v1.Node) error {
n.allowedPodNumber = int(rQuant.Value())
default:
if v1.IsOpaqueIntResourceName(rName) {
n.allocatableResource.AddOpaque(rName, rQuant.Value())
n.allocatableResource.SetOpaque(rName, rQuant.Value())
}
}
}
Expand Down
125 changes: 93 additions & 32 deletions test/e2e/opaque_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
. "github.com/onsi/gomega"
)

var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", func() {
var _ = framework.KubeDescribe("Opaque resources", func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This likely needs to marked as [Serial] b/c you're mucking the nodes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added [Serial] tag.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...and removed [Serial] tag.

f := framework.NewDefaultFramework("opaque-resource")
opaqueResName := v1.OpaqueIntResourceName("foo")
var node *v1.Node
Expand All @@ -59,11 +59,19 @@ var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", fun
}
}

removeOpaqueResource(f, node.Name, opaqueResName)
addOpaqueResource(f, node.Name, opaqueResName)
})

// TODO: The suite times out if removeOpaqueResource is called as part of
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty disconcerting, do you know why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I am stumped on why this causes a problem. Open to any ideas.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/cc @kubernetes/test-infra-reviewers any insights here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I looked at anti-affinity and it does both, so it looks like they just patched and didn't bother to clearly define a happy path.

// an AfterEach closure. For now, it is the last statement in each
// It block.
// AfterEach(func() {
// removeOpaqueResource(f, node.Name, opaqueResName)
// })

It("should not break pods that do not consume opaque integer resources.", func() {
defer removeOpaqueResource(f, node.Name, opaqueResName)

By("Creating a vanilla pod")
requests := v1.ResourceList{v1.ResourceCPU: resource.MustParse("0.1")}
limits := v1.ResourceList{v1.ResourceCPU: resource.MustParse("0.2")}
Expand All @@ -74,19 +82,17 @@ var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", fun
_, err := f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod)
return err
}
predicate := func(e *v1.Event) bool {
return e.Type == v1.EventTypeNormal &&
e.Reason == "Scheduled" &&
// Here we don't check for the bound node name since it can land on
// any one (this pod doesn't require any of the opaque resource.)
strings.Contains(e.Message, fmt.Sprintf("Successfully assigned %v", pod.Name))
}
// Here we don't check for the bound node name since it can land on
// any one (this pod doesn't require any of the opaque resource.)
predicate := scheduleSuccess(pod.Name, "")
success, err := observeEventAfterAction(f, predicate, action)
Expect(err).NotTo(HaveOccurred())
Expect(success).To(Equal(true))
})

It("should schedule pods that do consume opaque integer resources.", func() {
defer removeOpaqueResource(f, node.Name, opaqueResName)

By("Creating a pod that requires less of the opaque resource than is allocatable on a node.")
requests := v1.ResourceList{
v1.ResourceCPU: resource.MustParse("0.1"),
Expand All @@ -103,17 +109,15 @@ var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", fun
_, err := f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod)
return err
}
predicate := func(e *v1.Event) bool {
return e.Type == v1.EventTypeNormal &&
e.Reason == "Scheduled" &&
strings.Contains(e.Message, fmt.Sprintf("Successfully assigned %v to %v", pod.Name, node.Name))
}
predicate := scheduleSuccess(pod.Name, node.Name)
success, err := observeEventAfterAction(f, predicate, action)
Expect(err).NotTo(HaveOccurred())
Expect(success).To(Equal(true))
})

It("should not schedule pods that exceed the available amount of opaque integer resource.", func() {
defer removeOpaqueResource(f, node.Name, opaqueResName)

By("Creating a pod that requires more of the opaque resource than is allocatable on any node")
requests := v1.ResourceList{opaqueResName: resource.MustParse("6")}
limits := v1.ResourceList{}
Expand All @@ -123,17 +127,15 @@ var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", fun
_, err := f.ClientSet.Core().Pods(f.Namespace.Name).Create(newTestPod(f, "over-max-oir", requests, limits))
return err
}
predicate := func(e *v1.Event) bool {
return e.Type == "Warning" &&
e.Reason == "FailedScheduling" &&
strings.Contains(e.Message, "failed to fit in any node")
}
predicate := scheduleFailure("over-max-oir")
success, err := observeEventAfterAction(f, predicate, action)
Expect(err).NotTo(HaveOccurred())
Expect(success).To(Equal(true))
})

It("should account opaque integer resources in pods with multiple containers.", func() {
defer removeOpaqueResource(f, node.Name, opaqueResName)

By("Creating a pod with two containers that together require less of the opaque resource than is allocatable on a node")
requests := v1.ResourceList{opaqueResName: resource.MustParse("1")}
limits := v1.ResourceList{}
Expand Down Expand Up @@ -170,11 +172,7 @@ var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", fun
_, err := f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod)
return err
}
predicate := func(e *v1.Event) bool {
return e.Type == v1.EventTypeNormal &&
e.Reason == "Scheduled" &&
strings.Contains(e.Message, fmt.Sprintf("Successfully assigned %v to %v", pod.Name, node.Name))
}
predicate := scheduleSuccess(pod.Name, node.Name)
success, err := observeEventAfterAction(f, predicate, action)
Expect(err).NotTo(HaveOccurred())
Expect(success).To(Equal(true))
Expand Down Expand Up @@ -214,11 +212,53 @@ var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", fun
_, err = f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod)
return err
}
predicate = func(e *v1.Event) bool {
return e.Type == "Warning" &&
e.Reason == "FailedScheduling" &&
strings.Contains(e.Message, "failed to fit in any node")
predicate = scheduleFailure(pod.Name)
success, err = observeEventAfterAction(f, predicate, action)
Expect(err).NotTo(HaveOccurred())
Expect(success).To(Equal(true))
})

It("should schedule pods that initially do not fit after enough opaque integer resources are freed.", func() {
defer removeOpaqueResource(f, node.Name, opaqueResName)

By("Creating a pod that requires less of the opaque resource than is allocatable on a node.")
requests := v1.ResourceList{
v1.ResourceCPU: resource.MustParse("0.1"),
opaqueResName: resource.MustParse("3"),
}
limits := v1.ResourceList{
v1.ResourceCPU: resource.MustParse("0.2"),
opaqueResName: resource.MustParse("3"),
}
pod1 := newTestPod(f, "oir-1", requests, limits)
pod2 := newTestPod(f, "oir-2", requests, limits)

By("Observing an event that indicates one pod was scheduled")
action := func() error {
_, err := f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod1)
return err
}
predicate := scheduleSuccess(pod1.Name, node.Name)
success, err := observeEventAfterAction(f, predicate, action)
Expect(err).NotTo(HaveOccurred())
Expect(success).To(Equal(true))

By("Observing an event that indicates a subsequent pod was not scheduled")
action = func() error {
_, err := f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod2)
return err
}
predicate = scheduleFailure(pod2.Name)
success, err = observeEventAfterAction(f, predicate, action)
Expect(err).NotTo(HaveOccurred())
Expect(success).To(Equal(true))

By("Observing an event that indicates the second pod was scheduled after deleting the first pod")
action = func() error {
err := f.ClientSet.Core().Pods(f.Namespace.Name).Delete(pod1.Name, nil)
return err
}
predicate = scheduleSuccess(pod2.Name, node.Name)
success, err = observeEventAfterAction(f, predicate, action)
Expect(err).NotTo(HaveOccurred())
Expect(success).To(Equal(true))
Expand All @@ -228,12 +268,14 @@ var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", fun
// Adds the opaque resource to a node.
func addOpaqueResource(f *framework.Framework, nodeName string, opaqueResName v1.ResourceName) {
action := func() error {
By(fmt.Sprintf("Adding OIR to node [%s]", nodeName))
patch := []byte(fmt.Sprintf(`[{"op": "add", "path": "/status/capacity/%s", "value": "5"}]`, escapeForJSONPatch(opaqueResName)))
return f.ClientSet.Core().RESTClient().Patch(types.JSONPatchType).Resource("nodes").Name(nodeName).SubResource("status").Body(patch).Do().Error()
}
predicate := func(n *v1.Node) bool {
capacity, foundCap := n.Status.Capacity[opaqueResName]
allocatable, foundAlloc := n.Status.Allocatable[opaqueResName]
By(fmt.Sprintf("Node [%s] has OIR capacity: [%t] (%s), has OIR allocatable: [%t] (%s)", n.Name, foundCap, capacity.String(), foundAlloc, allocatable.String()))
return foundCap && capacity.MilliValue() == int64(5000) &&
foundAlloc && allocatable.MilliValue() == int64(5000)
}
Expand All @@ -245,14 +287,16 @@ func addOpaqueResource(f *framework.Framework, nodeName string, opaqueResName v1
// Removes the opaque resource from a node.
func removeOpaqueResource(f *framework.Framework, nodeName string, opaqueResName v1.ResourceName) {
action := func() error {
By(fmt.Sprintf("Removing OIR from node [%s]", nodeName))
patch := []byte(fmt.Sprintf(`[{"op": "remove", "path": "/status/capacity/%s"}]`, escapeForJSONPatch(opaqueResName)))
f.ClientSet.Core().RESTClient().Patch(types.JSONPatchType).Resource("nodes").Name(nodeName).SubResource("status").Body(patch).Do()
return nil // Ignore error -- the opaque resource may not exist.
}
predicate := func(n *v1.Node) bool {
_, foundCap := n.Status.Capacity[opaqueResName]
_, foundAlloc := n.Status.Allocatable[opaqueResName]
return !foundCap && !foundAlloc
capacity, foundCap := n.Status.Capacity[opaqueResName]
allocatable, foundAlloc := n.Status.Allocatable[opaqueResName]
By(fmt.Sprintf("Node [%s] has OIR capacity: [%t] (%s), has OIR allocatable: [%t] (%s)", n.Name, foundCap, capacity.String(), foundAlloc, allocatable.String()))
return (!foundCap || capacity.IsZero()) && (!foundAlloc || allocatable.IsZero())
}
success, err := observeNodeUpdateAfterAction(f, nodeName, predicate, action)
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -345,7 +389,7 @@ func observeEventAfterAction(f *framework.Framework, eventPredicate func(*v1.Eve
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
e, ok := obj.(*v1.Event)
By(fmt.Sprintf("Considering event: \nType = [%s], Reason = [%s], Message = [%s]", e.Type, e.Reason, e.Message))
By(fmt.Sprintf("Considering event: \nType = [%s], Name = [%s], Reason = [%s], Message = [%s]", e.Type, e.Name, e.Reason, e.Message))
Expect(ok).To(Equal(true))
if ok && eventPredicate(e) {
observedMatchingEvent = true
Expand Down Expand Up @@ -373,3 +417,20 @@ func observeEventAfterAction(f *framework.Framework, eventPredicate func(*v1.Eve
})
return err == nil, err
}

func scheduleSuccess(podName, nodeName string) func(*v1.Event) bool {
return func(e *v1.Event) bool {
return e.Type == v1.EventTypeNormal &&
e.Reason == "Scheduled" &&
strings.HasPrefix(e.Name, podName) &&
strings.Contains(e.Message, fmt.Sprintf("Successfully assigned %v to %v", podName, nodeName))
}
}

func scheduleFailure(podName string) func(*v1.Event) bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why !scheduleSuccess is not enough to check for failure?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because then we would have to wait for an event not to appear which is much slower.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

b/c it's an event checker, it would wait for the the timeouts on failure.

return func(e *v1.Event) bool {
return strings.HasPrefix(e.Name, podName) &&
e.Type == "Warning" &&
e.Reason == "FailedScheduling"
}
}