Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler: skip updates of assumed pods #100286

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
62 changes: 7 additions & 55 deletions pkg/scheduler/eventhandlers.go
Expand Up @@ -174,9 +174,15 @@ func (sched *Scheduler) updatePodInSchedulingQueue(oldObj, newObj interface{}) {
if oldPod.ResourceVersion == newPod.ResourceVersion {
return
}
if sched.skipPodUpdate(newPod) {

isAssumed, err := sched.SchedulerCache.IsAssumedPod(newPod)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", newPod.Namespace, newPod.Name, err))
}
if isAssumed {
return
}

if err := sched.SchedulingQueue.Update(oldPod, newPod); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to update %T: %v", newObj, err))
}
Expand Down Expand Up @@ -299,60 +305,6 @@ func responsibleForPod(pod *v1.Pod, profiles profile.Map) bool {
return profiles.HandlesSchedulerName(pod.Spec.SchedulerName)
}

// skipPodUpdate checks whether the specified pod update should be ignored.
// This function will return true if
// - The pod has already been assumed, AND
// - The pod has only its ResourceVersion, Spec.NodeName, Annotations,
// ManagedFields, Finalizers and/or Conditions updated.
func (sched *Scheduler) skipPodUpdate(pod *v1.Pod) bool {
// Non-assumed pods should never be skipped.
isAssumed, err := sched.SchedulerCache.IsAssumedPod(pod)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", pod.Namespace, pod.Name, err))
return false
}
if !isAssumed {
return false
}

// Gets the assumed pod from the cache.
assumedPod, err := sched.SchedulerCache.GetPod(pod)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to get assumed pod %s/%s from cache: %v", pod.Namespace, pod.Name, err))
return false
}

// Compares the assumed pod in the cache with the pod update. If they are
// equal (with certain fields excluded), this pod update will be skipped.
f := func(pod *v1.Pod) *v1.Pod {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I say we should migrate some of these to isPodUpdated in scheduling_queue.go.

The function is used to determine if an unschedulabe pod should be moved out into backoff or active.

@ahg-g? WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we can add ManagedFields and Finalizers. I am not sure about Annotations, and NodeName isn't going to trigger a queue update.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait, this comment hasn't been addressed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also p.Status.Conditions please

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

p.Status = v1.PodStatus{} has been assigned in isPodUpdated

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah missed, that

/lgtm

p := pod.DeepCopy()
// ResourceVersion must be excluded because each object update will
// have a new resource version.
p.ResourceVersion = ""
// Spec.NodeName must be excluded because the pod assumed in the cache
// is expected to have a node assigned while the pod update may nor may
// not have this field set.
p.Spec.NodeName = ""
// Annotations must be excluded for the reasons described in
// https://github.com/kubernetes/kubernetes/issues/52914.
p.Annotations = nil
// Same as above, when annotations are modified with ServerSideApply,
// ManagedFields may also change and must be excluded
p.ManagedFields = nil
// The following might be changed by external controllers, but they don't
// affect scheduling decisions.
p.Finalizers = nil
p.Status.Conditions = nil
return p
}
assumedPodCopy, podCopy := f(assumedPod), f(pod)
if !reflect.DeepEqual(assumedPodCopy, podCopy) {
return false
}
klog.V(3).InfoS("Pod update ignored because changes won't affect scheduling", "pod", klog.KObj(pod))
return true
}

// addAllEventHandlers is a helper function used in tests and in Scheduler
// to add event handlers for various informers.
func addAllEventHandlers(
Expand Down
216 changes: 0 additions & 216 deletions pkg/scheduler/eventhandlers_test.go
Expand Up @@ -26,225 +26,9 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/scheduler/internal/cache"
fakecache "k8s.io/kubernetes/pkg/scheduler/internal/cache/fake"
"k8s.io/kubernetes/pkg/scheduler/internal/queue"
)

func TestSkipPodUpdate(t *testing.T) {
for _, test := range []struct {
pod *v1.Pod
isAssumedPodFunc func(*v1.Pod) bool
getPodFunc func(*v1.Pod) *v1.Pod
expected bool
name string
}{
{
name: "Non-assumed pod",
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
},
},
isAssumedPodFunc: func(*v1.Pod) bool { return false },
getPodFunc: func(*v1.Pod) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
},
}
},
expected: false,
},
{
name: "with changes on ResourceVersion, Spec.NodeName and/or Annotations",
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
Annotations: map[string]string{"a": "b"},
ResourceVersion: "0",
},
Spec: v1.PodSpec{
NodeName: "node-0",
},
},
isAssumedPodFunc: func(*v1.Pod) bool {
return true
},
getPodFunc: func(*v1.Pod) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
Annotations: map[string]string{"c": "d"},
ResourceVersion: "1",
},
Spec: v1.PodSpec{
NodeName: "node-1",
},
}
},
expected: true,
},
{
name: "with ServerSideApply changes on Annotations",
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
Annotations: map[string]string{"a": "b"},
ResourceVersion: "0",
ManagedFields: []metav1.ManagedFieldsEntry{
{
Manager: "some-actor",
Operation: metav1.ManagedFieldsOperationApply,
APIVersion: "v1",
FieldsType: "FieldsV1",
FieldsV1: &metav1.FieldsV1{
Raw: []byte(`
"f:metadata": {
"f:annotations": {
"f:a: {}
}
}
`),
},
},
},
},
Spec: v1.PodSpec{
NodeName: "node-0",
},
},
isAssumedPodFunc: func(*v1.Pod) bool {
return true
},
getPodFunc: func(*v1.Pod) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
Annotations: map[string]string{"a": "c", "d": "e"},
ResourceVersion: "1",
ManagedFields: []metav1.ManagedFieldsEntry{
{
Manager: "some-actor",
Operation: metav1.ManagedFieldsOperationApply,
APIVersion: "v1",
FieldsType: "FieldsV1",
FieldsV1: &metav1.FieldsV1{
Raw: []byte(`
"f:metadata": {
"f:annotations": {
"f:a: {}
"f:d: {}
}
}
`),
},
},
{
Manager: "some-actor",
Operation: metav1.ManagedFieldsOperationApply,
APIVersion: "v1",
FieldsType: "FieldsV1",
FieldsV1: &metav1.FieldsV1{
Raw: []byte(`
"f:metadata": {
"f:annotations": {
"f:a: {}
}
}
`),
},
},
},
},
Spec: v1.PodSpec{
NodeName: "node-1",
},
}
},
expected: true,
},
{
name: "with changes on Labels",
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
Labels: map[string]string{"a": "b"},
},
},
isAssumedPodFunc: func(*v1.Pod) bool {
return true
},
getPodFunc: func(*v1.Pod) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
Labels: map[string]string{"c": "d"},
},
}
},
expected: false,
},
{
name: "with changes on Finalizers",
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
Finalizers: []string{"a", "b"},
},
},
isAssumedPodFunc: func(*v1.Pod) bool {
return true
},
getPodFunc: func(*v1.Pod) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
Finalizers: []string{"c", "d"},
},
}
},
expected: true,
},
{
name: "with changes on Conditions",
pod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
},
Status: v1.PodStatus{
Conditions: []v1.PodCondition{
{Type: "foo"},
},
},
},
isAssumedPodFunc: func(*v1.Pod) bool {
return true
},
getPodFunc: func(*v1.Pod) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-0",
},
}
},
expected: true,
},
} {
t.Run(test.name, func(t *testing.T) {
c := &Scheduler{
SchedulerCache: &fakecache.Cache{
IsAssumedPodFunc: test.isAssumedPodFunc,
GetPodFunc: test.getPodFunc,
},
}
got := c.skipPodUpdate(test.pod)
if got != test.expected {
t.Errorf("skipPodUpdate() = %t, expected = %t", got, test.expected)
}
})
}
}

func TestNodeAllocatableChanged(t *testing.T) {
newQuantity := func(value int64) resource.Quantity {
return *resource.NewQuantity(value, resource.BinarySI)
Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/internal/queue/scheduling_queue.go
Expand Up @@ -426,6 +426,8 @@ func isPodUpdated(oldPod, newPod *v1.Pod) bool {
p.ResourceVersion = ""
p.Generation = 0
p.Status = v1.PodStatus{}
p.ManagedFields = nil
p.Finalizers = nil
return p
}
return !reflect.DeepEqual(strip(oldPod), strip(newPod))
Expand Down
12 changes: 7 additions & 5 deletions pkg/scheduler/scheduler.go
Expand Up @@ -28,6 +28,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
Expand Down Expand Up @@ -639,14 +640,15 @@ func (sched *Scheduler) skipPodSchedule(fwk framework.Framework, pod *v1.Pod) bo
return true
}

// Case 2: pod has been assumed and pod updates could be skipped.
// Case 2: pod has been assumed could be skipped.
// An assumed pod can be added again to the scheduling queue if it got an update event
// during its previous scheduling cycle but before getting assumed.
if sched.skipPodUpdate(pod) {
return true
isAssumed, err := sched.SchedulerCache.IsAssumedPod(pod)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", pod.Namespace, pod.Name, err))
return false
}

return false
return isAssumed
}

func defaultAlgorithmSourceProviderName() *string {
Expand Down