Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: Extend ExtenderFilterResult to include UnschedulableAndUnresolvable nodes #92866

Merged
merged 2 commits into from Feb 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 9 additions & 7 deletions pkg/scheduler/core/extender.go
Expand Up @@ -269,11 +269,13 @@ func convertToNodeNameToMetaVictims(

// Filter based on extender implemented predicate functions. The filtered list is
// expected to be a subset of the supplied list; otherwise the function returns an error.
// failedNodesMap optionally contains the list of failed nodes and failure reasons.
// The failedNodes and failedAndUnresolvableNodes optionally contains the list
// of failed nodes and failure reasons, except nodes in the latter are
// unresolvable.
func (h *HTTPExtender) Filter(
pod *v1.Pod,
nodes []*v1.Node,
) ([]*v1.Node, extenderv1.FailedNodesMap, error) {
) (filteredList []*v1.Node, failedNodes, failedAndUnresolvableNodes extenderv1.FailedNodesMap, err error) {
var (
result extenderv1.ExtenderFilterResult
nodeList *v1.NodeList
Expand All @@ -287,7 +289,7 @@ func (h *HTTPExtender) Filter(
}

if h.filterVerb == "" {
return nodes, extenderv1.FailedNodesMap{}, nil
return nodes, extenderv1.FailedNodesMap{}, extenderv1.FailedNodesMap{}, nil
}

if h.nodeCacheCapable {
Expand All @@ -310,10 +312,10 @@ func (h *HTTPExtender) Filter(
}

if err := h.send(h.filterVerb, args, &result); err != nil {
return nil, nil, err
return nil, nil, nil, err
}
if result.Error != "" {
return nil, nil, fmt.Errorf(result.Error)
return nil, nil, nil, fmt.Errorf(result.Error)
}

if h.nodeCacheCapable && result.NodeNames != nil {
Expand All @@ -322,7 +324,7 @@ func (h *HTTPExtender) Filter(
if n, ok := fromNodeName[nodeName]; ok {
nodeResult[i] = n
} else {
return nil, nil, fmt.Errorf(
return nil, nil, nil, fmt.Errorf(
"extender %q claims a filtered node %q which is not found in the input node list",
h.extenderURL, nodeName)
}
Expand All @@ -334,7 +336,7 @@ func (h *HTTPExtender) Filter(
}
}

return nodeResult, result.FailedNodes, nil
return nodeResult, result.FailedNodes, result.FailedAndUnresolvableNodes, nil
}

// Prioritize based on extender implemented priority functions. Weight*priority is added
Expand Down
26 changes: 25 additions & 1 deletion pkg/scheduler/core/generic_scheduler.go
Expand Up @@ -345,14 +345,23 @@ func (g *genericScheduler) findNodesThatPassFilters(
}

func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, feasibleNodes []*v1.Node, statuses framework.NodeToStatusMap) ([]*v1.Node, error) {
// Extenders are called sequentially.
// Nodes in original feasibleNodes can be excluded in one extender, and pass on to the next
// extender in a decreasing manner.
for _, extender := range g.extenders {
if len(feasibleNodes) == 0 {
break
}
if !extender.IsInterested(pod) {
continue
}
feasibleList, failedMap, err := extender.Filter(pod, feasibleNodes)

// Status of failed nodes in failedAndUnresolvableMap will be added or overwritten in <statuses>,
// so that the scheduler framework can respect the UnschedulableAndUnresolvable status for
// particular nodes, and this may eventually improve preemption efficiency.
// Note: users are recommended to configure the extenders that may return UnschedulableAndUnresolvable
// status ahead of others.
feasibleList, failedMap, failedAndUnresolvableMap, err := extender.Filter(pod, feasibleNodes)
if err != nil {
if extender.IsIgnorable() {
klog.InfoS("Skipping extender as it returned error and has ignorable flag set", "extender", extender, "err", err)
Expand All @@ -361,13 +370,28 @@ func (g *genericScheduler) findNodesThatPassExtenders(pod *v1.Pod, feasibleNodes
return nil, err
}

for failedNodeName, failedMsg := range failedAndUnresolvableMap {
var aggregatedReasons []string
if _, found := statuses[failedNodeName]; found {
aggregatedReasons = statuses[failedNodeName].Reasons()
}
aggregatedReasons = append(aggregatedReasons, failedMsg)
statuses[failedNodeName] = framework.NewStatus(framework.UnschedulableAndUnresolvable, aggregatedReasons...)
}

for failedNodeName, failedMsg := range failedMap {
if _, found := failedAndUnresolvableMap[failedNodeName]; found {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would this happen?
I can only think of the case where the extender returns both states for the same Node.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, ignore the nodes in failedMap if they are already in failedAndUnresolvableMap

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a comment that this only happens if the extender returns it in both

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added, PTAL

// failedAndUnresolvableMap takes precedence over failedMap
// note that this only happens if the extender returns the node in both maps
continue
}
if _, found := statuses[failedNodeName]; !found {
statuses[failedNodeName] = framework.NewStatus(framework.Unschedulable, failedMsg)
} else {
statuses[failedNodeName].AppendReason(failedMsg)
}
}

feasibleNodes = feasibleList
}
return feasibleNodes, nil
Expand Down
174 changes: 174 additions & 0 deletions pkg/scheduler/core/generic_scheduler_test.go
Expand Up @@ -273,6 +273,180 @@ func TestSelectHost(t *testing.T) {
}
}

func TestFindNodesThatPassExtenders(t *testing.T) {
tests := []struct {
name string
extenders []st.FakeExtender
nodes []*v1.Node
filteredNodesStatuses framework.NodeToStatusMap
expectsErr bool
expectedNodes []*v1.Node
expectedStatuses framework.NodeToStatusMap
}{
{
name: "error",
extenders: []st.FakeExtender{
{
Predicates: []st.FitPredicate{st.ErrorPredicateExtender},
},
},
nodes: makeNodeList([]string{"a"}),
filteredNodesStatuses: make(framework.NodeToStatusMap),
expectsErr: true,
},
{
name: "success",
extenders: []st.FakeExtender{
{
Predicates: []st.FitPredicate{st.TruePredicateExtender},
},
},
nodes: makeNodeList([]string{"a"}),
filteredNodesStatuses: make(framework.NodeToStatusMap),
expectsErr: false,
expectedNodes: makeNodeList([]string{"a"}),
expectedStatuses: make(framework.NodeToStatusMap),
},
{
name: "unschedulable",
extenders: []st.FakeExtender{
{
Predicates: []st.FitPredicate{func(pod *v1.Pod, node *v1.Node) *framework.Status {
if node.Name == "a" {
return framework.NewStatus(framework.Success)
}
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("node %q is not allowed", node.Name))
}},
},
},
nodes: makeNodeList([]string{"a", "b"}),
filteredNodesStatuses: make(framework.NodeToStatusMap),
expectsErr: false,
expectedNodes: makeNodeList([]string{"a"}),
expectedStatuses: framework.NodeToStatusMap{
"b": framework.NewStatus(framework.Unschedulable, fmt.Sprintf("FakeExtender: node %q failed", "b")),
},
},
{
name: "unschedulable and unresolvable",
extenders: []st.FakeExtender{
{
Predicates: []st.FitPredicate{func(pod *v1.Pod, node *v1.Node) *framework.Status {
if node.Name == "a" {
return framework.NewStatus(framework.Success)
}
if node.Name == "b" {
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("node %q is not allowed", node.Name))
}
return framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("node %q is not allowed", node.Name))
}},
},
},
nodes: makeNodeList([]string{"a", "b", "c"}),
filteredNodesStatuses: make(framework.NodeToStatusMap),
expectsErr: false,
expectedNodes: makeNodeList([]string{"a"}),
expectedStatuses: framework.NodeToStatusMap{
"b": framework.NewStatus(framework.Unschedulable, fmt.Sprintf("FakeExtender: node %q failed", "b")),
"c": framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("FakeExtender: node %q failed and unresolvable", "c")),
},
},
{
name: "extender may overwrite the statuses",
extenders: []st.FakeExtender{
{
Predicates: []st.FitPredicate{func(pod *v1.Pod, node *v1.Node) *framework.Status {
if node.Name == "a" {
return framework.NewStatus(framework.Success)
}
if node.Name == "b" {
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("node %q is not allowed", node.Name))
}
return framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("node %q is not allowed", node.Name))
}},
},
},
nodes: makeNodeList([]string{"a", "b", "c"}),
filteredNodesStatuses: framework.NodeToStatusMap{
"c": framework.NewStatus(framework.Unschedulable, fmt.Sprintf("FakeFilterPlugin: node %q failed", "c")),
},
expectsErr: false,
expectedNodes: makeNodeList([]string{"a"}),
expectedStatuses: framework.NodeToStatusMap{
"b": framework.NewStatus(framework.Unschedulable, fmt.Sprintf("FakeExtender: node %q failed", "b")),
"c": framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("FakeFilterPlugin: node %q failed", "c"), fmt.Sprintf("FakeExtender: node %q failed and unresolvable", "c")),
},
},
{
name: "multiple extenders",
extenders: []st.FakeExtender{
{
Predicates: []st.FitPredicate{func(pod *v1.Pod, node *v1.Node) *framework.Status {
if node.Name == "a" {
return framework.NewStatus(framework.Success)
}
if node.Name == "b" {
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("node %q is not allowed", node.Name))
}
return framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("node %q is not allowed", node.Name))
}},
},
{
Predicates: []st.FitPredicate{func(pod *v1.Pod, node *v1.Node) *framework.Status {
if node.Name == "a" {
return framework.NewStatus(framework.Success)
}
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("node %q is not allowed", node.Name))
}},
},
},
nodes: makeNodeList([]string{"a", "b", "c"}),
filteredNodesStatuses: make(framework.NodeToStatusMap),
expectsErr: false,
expectedNodes: makeNodeList([]string{"a"}),
expectedStatuses: framework.NodeToStatusMap{
"b": framework.NewStatus(framework.Unschedulable, fmt.Sprintf("FakeExtender: node %q failed", "b")),
"c": framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("FakeExtender: node %q failed and unresolvable", "c")),
},
},
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although a little rare, do you think it's necessary to add a case to cover the path that "extender may overwrite the statuses"? i.e., the tuple {feasibleNodes, statuses} passed onto the extender(s) is {[a, b], {c: Unchedulable}}, extenders may return feasibleNodes as [a], while statuses as {c: UnschedualbeAndUnresolvable, b: Unschedulable}.

Copy link
Member Author

@cofyc cofyc Jan 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's necessary. The case "extender may overwrite the statuses" is added (https://github.com/kubernetes/kubernetes/pull/92866/files#diff-af29ae211c469a644b28df812d85bf717e76e623126ef3d5a14d07989c883fddR353). PTAL.


cmpOpts := []cmp.Option{
cmp.Comparer(func(s1 framework.Status, s2 framework.Status) bool {
return s1.Code() == s2.Code() && reflect.DeepEqual(s1.Reasons(), s2.Reasons())
}),
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var extenders []framework.Extender
for ii := range tt.extenders {
extenders = append(extenders, &tt.extenders[ii])
}
scheduler := &genericScheduler{
extenders: extenders,
}
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}}
got, err := scheduler.findNodesThatPassExtenders(pod, tt.nodes, tt.filteredNodesStatuses)
if tt.expectsErr {
if err == nil {
t.Error("Unexpected non-error")
}
} else {
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if diff := cmp.Diff(tt.expectedNodes, got); diff != "" {
t.Errorf("filtered nodes (-want,+got):\n%s", diff)
}
if diff := cmp.Diff(tt.expectedStatuses, tt.filteredNodesStatuses, cmpOpts...); diff != "" {
t.Errorf("filtered statuses (-want,+got):\n%s", diff)
}
}
})
}
}

func TestGenericScheduler(t *testing.T) {
tests := []struct {
name string
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/factory_test.go
Expand Up @@ -674,8 +674,8 @@ func (f *fakeExtender) SupportsPreemption() bool {
return false
}

func (f *fakeExtender) Filter(pod *v1.Pod, nodes []*v1.Node) (filteredNodes []*v1.Node, failedNodesMap extenderv1.FailedNodesMap, err error) {
return nil, nil, nil
func (f *fakeExtender) Filter(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, extenderv1.FailedNodesMap, extenderv1.FailedNodesMap, error) {
return nil, nil, nil, nil
}

func (f *fakeExtender) Prioritize(
Expand Down
8 changes: 5 additions & 3 deletions pkg/scheduler/framework/extender.go
Expand Up @@ -29,9 +29,11 @@ type Extender interface {
Name() string

// Filter based on extender-implemented predicate functions. The filtered list is
// expected to be a subset of the supplied list. failedNodesMap optionally contains
// the list of failed nodes and failure reasons.
Filter(pod *v1.Pod, nodes []*v1.Node) (filteredNodes []*v1.Node, failedNodesMap extenderv1.FailedNodesMap, err error)
// expected to be a subset of the supplied list.
// The failedNodes and failedAndUnresolvableNodes optionally contains the list
// of failed nodes and failure reasons, except nodes in the latter are
// unresolvable.
Filter(pod *v1.Pod, nodes []*v1.Node) (filteredNodes []*v1.Node, failedNodesMap extenderv1.FailedNodesMap, failedAndUnresolvable extenderv1.FailedNodesMap, err error)

// Prioritize based on extender-implemented priority functions. The returned scores & weight
// are used to compute the weighted score for an extender. The weighted scores are added to
Expand Down