Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

activate unschedulable pods only if the node became more schedulable #71551

Merged
merged 1 commit into from Dec 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/scheduler/factory/BUILD
Expand Up @@ -74,6 +74,7 @@ go_test(
"//pkg/scheduler/internal/queue:go_default_library",
"//pkg/scheduler/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
Expand Down
56 changes: 55 additions & 1 deletion pkg/scheduler/factory/factory.go
Expand Up @@ -998,7 +998,14 @@ func (c *configFactory) updateNodeInCache(oldObj, newObj interface{}) {
}

c.invalidateCachedPredicatesOnNodeUpdate(newNode, oldNode)
c.podQueue.MoveAllToActiveQueue()
// Only activate unschedulable pods if the node became more schedulable.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An optimization we can perform here is to look at the unschedulableQueue and if there is no pod in it, we can skip the check for changes in the node object and call MoveAllToActiveQueue. This optimization will be useful in large clusters where many nodes send updates and there is no unschedulable pods in the cluster.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I missed this optimization, I will add this later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please expand the comment by pointing out that:

"We skip the node property comparison when there is no unschedulable pods in the queue to save processing cycles. We still trigger a move to active queue to cover the case that a pod being processed by the scheduler is determined unschedulable. We want this pod to be reevaluated when a change in the cluster happens."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

// We skip the node property comparison when there is no unschedulable pods in the queue
// to save processing cycles. We still trigger a move to active queue to cover the case
// that a pod being processed by the scheduler is determined unschedulable. We want this
// pod to be reevaluated when a change in the cluster happens.
if c.podQueue.NumUnschedulablePods() == 0 || nodeSchedulingPropertiesChanged(newNode, oldNode) {
c.podQueue.MoveAllToActiveQueue()
}
}

func (c *configFactory) invalidateCachedPredicatesOnNodeUpdate(newNode *v1.Node, oldNode *v1.Node) {
Expand Down Expand Up @@ -1070,6 +1077,53 @@ func (c *configFactory) invalidateCachedPredicatesOnNodeUpdate(newNode *v1.Node,
}
}

func nodeSchedulingPropertiesChanged(newNode *v1.Node, oldNode *v1.Node) bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this to helper/util and make it as public? So others can reuse it with predicates and priorities :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

xref kubernetes-retired/kube-batch#491

@jiaxuanzhou , something like this PR seems better :)

if nodeSpecUnschedulableChanged(newNode, oldNode) {
return true
}
if nodeAllocatableChanged(newNode, oldNode) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should order these so that the cheaper functions runs before the more expensive ones.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the order to: Spec.Unschedulable -> Status.Allocatable -> Labels -> Taints -> Conditions. In general, Status.Allocatable is changed more frequent than Labels and Taints, so I make it as the second. Conditions is put at the end as it performs more operations to strip each conditions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The order sounds good. Condition changes are somewhat frequent though as kubelet updates last transition time of a condition somewhat frequently. So, it might be better to check that before some other ones, but without actual data it is hard to find the right order. We can go with the current order and study the impact of change of ordering later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use map of func to the total time each func has spent ?
That way, we can dynamically pick the cheap func first at runtime.

return true
}
if nodeLabelsChanged(newNode, oldNode) {
return true
}
if nodeTaintsChanged(newNode, oldNode) {
return true
}
if nodeConditionsChanged(newNode, oldNode) {
return true
}

return false
}

func nodeAllocatableChanged(newNode *v1.Node, oldNode *v1.Node) bool {
return !reflect.DeepEqual(oldNode.Status.Allocatable, newNode.Status.Allocatable)
}

func nodeLabelsChanged(newNode *v1.Node, oldNode *v1.Node) bool {
return !reflect.DeepEqual(oldNode.GetLabels(), newNode.GetLabels())
}

func nodeTaintsChanged(newNode *v1.Node, oldNode *v1.Node) bool {
return !reflect.DeepEqual(newNode.Spec.Taints, oldNode.Spec.Taints)
}

func nodeConditionsChanged(newNode *v1.Node, oldNode *v1.Node) bool {
strip := func(conditions []v1.NodeCondition) map[v1.NodeConditionType]v1.ConditionStatus {
bsalamat marked this conversation as resolved.
Show resolved Hide resolved
conditionStatuses := make(map[v1.NodeConditionType]v1.ConditionStatus, len(conditions))
for i := range conditions {
conditionStatuses[conditions[i].Type] = conditions[i].Status
}
return conditionStatuses
}
return !reflect.DeepEqual(strip(oldNode.Status.Conditions), strip(newNode.Status.Conditions))
}

func nodeSpecUnschedulableChanged(newNode *v1.Node, oldNode *v1.Node) bool {
return newNode.Spec.Unschedulable != oldNode.Spec.Unschedulable && newNode.Spec.Unschedulable == false
}

func (c *configFactory) deleteNodeFromCache(obj interface{}) {
var node *v1.Node
switch t := obj.(type) {
Expand Down
144 changes: 144 additions & 0 deletions pkg/scheduler/factory/factory_test.go
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -653,3 +654,146 @@ func testGetBinderFunc(expectedBinderType, podName string, extenders []algorithm
t.Errorf("Expected binder %q but got %q", expectedBinderType, binderType)
}
}

func TestNodeAllocatableChanged(t *testing.T) {
newQuantity := func(value int64) resource.Quantity {
return *resource.NewQuantity(value, resource.BinarySI)
}
for _, c := range []struct {
Name string
Changed bool
OldAllocatable v1.ResourceList
NewAllocatable v1.ResourceList
}{
{
Name: "no allocatable resources changed",
Changed: false,
OldAllocatable: v1.ResourceList{v1.ResourceMemory: newQuantity(1024)},
NewAllocatable: v1.ResourceList{v1.ResourceMemory: newQuantity(1024)},
},
{
Name: "new node has more allocatable resources",
Changed: true,
OldAllocatable: v1.ResourceList{v1.ResourceMemory: newQuantity(1024)},
NewAllocatable: v1.ResourceList{v1.ResourceMemory: newQuantity(1024), v1.ResourceStorage: newQuantity(1024)},
},
} {
oldNode := &v1.Node{Status: v1.NodeStatus{Allocatable: c.OldAllocatable}}
newNode := &v1.Node{Status: v1.NodeStatus{Allocatable: c.NewAllocatable}}
changed := nodeAllocatableChanged(newNode, oldNode)
if changed != c.Changed {
t.Errorf("nodeAllocatableChanged should be %t, got %t", c.Changed, changed)
}
}
}

func TestNodeLabelsChanged(t *testing.T) {
for _, c := range []struct {
Name string
Changed bool
OldLabels map[string]string
NewLabels map[string]string
}{
{
Name: "no labels changed",
Changed: false,
OldLabels: map[string]string{"foo": "bar"},
NewLabels: map[string]string{"foo": "bar"},
},
// Labels changed.
{
Name: "new node has more labels",
Changed: true,
OldLabels: map[string]string{"foo": "bar"},
NewLabels: map[string]string{"foo": "bar", "test": "value"},
},
} {
oldNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Labels: c.OldLabels}}
newNode := &v1.Node{ObjectMeta: metav1.ObjectMeta{Labels: c.NewLabels}}
changed := nodeLabelsChanged(newNode, oldNode)
if changed != c.Changed {
t.Errorf("Test case %q failed: should be %t, got %t", c.Name, c.Changed, changed)
}
}
}

func TestNodeTaintsChanged(t *testing.T) {
for _, c := range []struct {
Name string
Changed bool
OldTaints []v1.Taint
NewTaints []v1.Taint
}{
{
Name: "no taint changed",
Changed: false,
OldTaints: []v1.Taint{{Key: "key", Value: "value"}},
NewTaints: []v1.Taint{{Key: "key", Value: "value"}},
},
{
Name: "taint value changed",
Changed: true,
OldTaints: []v1.Taint{{Key: "key", Value: "value1"}},
NewTaints: []v1.Taint{{Key: "key", Value: "value2"}},
},
} {
oldNode := &v1.Node{Spec: v1.NodeSpec{Taints: c.OldTaints}}
newNode := &v1.Node{Spec: v1.NodeSpec{Taints: c.NewTaints}}
changed := nodeTaintsChanged(newNode, oldNode)
if changed != c.Changed {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A nit: changed -> got, c.Changed -> want

t.Errorf("Test case %q failed: should be %t, not %t", c.Name, c.Changed, changed)
}
}
}

func TestNodeConditionsChanged(t *testing.T) {
nodeConditionType := reflect.TypeOf(v1.NodeCondition{})
if nodeConditionType.NumField() != 6 {
t.Errorf("NodeCondition type has changed. The nodeConditionsChanged() function must be reevaluated.")
}

for _, c := range []struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to prevent changes to the NodeCondition that may be missed here, please add the following code:

    n := v1.NodeCondition{}
    t := reflect.TypeOf(n)
    if t.NumField() != 6 {
    	t.Errorf("NodeCondition type has changed. The nodeConditionsChanged() function must be reevaluated.")
    }

Name string
Changed bool
OldConditions []v1.NodeCondition
NewConditions []v1.NodeCondition
}{
{
Name: "no condition changed",
Changed: false,
OldConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue}},
NewConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue}},
},
{
Name: "only LastHeartbeatTime changed",
Changed: false,
OldConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue, LastHeartbeatTime: metav1.Unix(1, 0)}},
NewConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue, LastHeartbeatTime: metav1.Unix(2, 0)}},
},
{
Name: "new node has more healthy conditions",
Changed: true,
OldConditions: []v1.NodeCondition{},
NewConditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}},
},
{
Name: "new node has less unhealthy conditions",
Changed: true,
OldConditions: []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue}},
NewConditions: []v1.NodeCondition{},
},
{
Name: "condition status changed",
Changed: true,
OldConditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionFalse}},
NewConditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}},
},
} {
oldNode := &v1.Node{Status: v1.NodeStatus{Conditions: c.OldConditions}}
newNode := &v1.Node{Status: v1.NodeStatus{Conditions: c.NewConditions}}
changed := nodeConditionsChanged(newNode, oldNode)
if changed != c.Changed {
t.Errorf("Test case %q failed: should be %t, got %t", c.Name, c.Changed, changed)
}
}
}
14 changes: 14 additions & 0 deletions pkg/scheduler/internal/queue/scheduling_queue.go
Expand Up @@ -71,6 +71,8 @@ type SchedulingQueue interface {
Close()
// DeleteNominatedPodIfExists deletes nominatedPod from internal cache
DeleteNominatedPodIfExists(pod *v1.Pod)
// NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue.
NumUnschedulablePods() int
}

// NewSchedulingQueue initializes a new scheduling queue. If pod priority is
Expand Down Expand Up @@ -164,6 +166,11 @@ func (f *FIFO) Close() {
// DeleteNominatedPodIfExists does nothing in FIFO.
func (f *FIFO) DeleteNominatedPodIfExists(pod *v1.Pod) {}

// NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue.
func (f *FIFO) NumUnschedulablePods() int {
return 0
}

// NewFIFO creates a FIFO object.
func NewFIFO() *FIFO {
return &FIFO{FIFO: cache.NewFIFO(cache.MetaNamespaceKeyFunc)}
Expand Down Expand Up @@ -701,6 +708,13 @@ func (p *PriorityQueue) podsCompareBackoffCompleted(p1, p2 interface{}) bool {
return bo1.Before(bo2)
}

// NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue.
func (p *PriorityQueue) NumUnschedulablePods() int {
p.lock.RLock()
defer p.lock.RUnlock()
return len(p.unschedulableQ.pods)
}

// UnschedulablePodsMap holds pods that cannot be scheduled. This data structure
// is used to implement unschedulableQ.
type UnschedulablePodsMap struct {
Expand Down