Skip to content

Commit

Permalink
[Framework] Add UnschedulableAndUnresolvable status code
Browse files Browse the repository at this point in the history
The status can be used by (Pre)Filter plugins to indicate that
preemption wouldn't change the decision of the filter.

Signed-off-by: Aldo Culquicondor <acondor@google.com>
  • Loading branch information
alculquicondor committed Aug 28, 2019
1 parent a1f1f0b commit 3c1f8a8
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 29 deletions.
11 changes: 7 additions & 4 deletions pkg/scheduler/core/generic_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func (g *genericScheduler) Preempt(pod *v1.Pod, scheduleErr error) (*v1.Node, []
if len(allNodes) == 0 {
return nil, nil, nil, ErrNoNodesAvailable
}
potentialNodes := nodesWherePreemptionMightHelp(allNodes, fitError.FailedPredicates)
potentialNodes := nodesWherePreemptionMightHelp(allNodes, fitError)
if len(potentialNodes) == 0 {
klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name)
// In this case, we should clean-up any existing nominated node name of the pod.
Expand Down Expand Up @@ -509,7 +509,7 @@ func (g *genericScheduler) findNodesThatFit(pluginContext *framework.PluginConte
if !status.IsSuccess() {
predicateResultLock.Lock()
filteredNodesStatuses[nodeName] = status
if status.Code() != framework.Unschedulable {
if !status.IsUnschedulable() {
errs[status.Message()]++
}
predicateResultLock.Unlock()
Expand Down Expand Up @@ -1166,10 +1166,13 @@ func unresolvablePredicateExists(failedPredicates []predicates.PredicateFailureR

// nodesWherePreemptionMightHelp returns a list of nodes with failed predicates
// that may be satisfied by removing pods from the node.
func nodesWherePreemptionMightHelp(nodes []*v1.Node, failedPredicatesMap FailedPredicateMap) []*v1.Node {
func nodesWherePreemptionMightHelp(nodes []*v1.Node, fitErr *FitError) []*v1.Node {
potentialNodes := []*v1.Node{}
for _, node := range nodes {
failedPredicates, _ := failedPredicatesMap[node.Name]
if fitErr.FilteredNodesStatuses[node.Name].Code() == framework.UnschedulableAndUnresolvable {
continue
}
failedPredicates, _ := fitErr.FailedPredicates[node.Name]
// If we assume that scheduler looks at all nodes and populates the failedPredicateMap
// (which is the case today), the !found case should never happen, but we'd prefer
// to rely less on such assumptions in the code when checking does not impose
Expand Down
83 changes: 66 additions & 17 deletions pkg/scheduler/core/generic_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ var emptyFramework, _ = framework.NewFramework(EmptyPluginRegistry, nil, []sched
// FakeFilterPlugin is a test filter plugin used by default scheduler.
type FakeFilterPlugin struct {
numFilterCalled int32
failFilter bool
returnCode framework.Code
}

var filterPlugin = &FakeFilterPlugin{}
Expand All @@ -157,19 +157,19 @@ func (fp *FakeFilterPlugin) Name() string {
// reset is used to reset filter plugin.
func (fp *FakeFilterPlugin) reset() {
fp.numFilterCalled = 0
fp.failFilter = false
fp.returnCode = framework.Success
}

// Filter is a test function that returns an error or nil, depending on the
// value of "failFilter".
func (fp *FakeFilterPlugin) Filter(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {
atomic.AddInt32(&fp.numFilterCalled, 1)

if fp.failFilter {
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("injecting failure for pod %v", pod.Name))
if fp.returnCode == framework.Success {
return nil
}

return nil
return framework.NewStatus(fp.returnCode, fmt.Sprintf("injecting failure for pod %v", pod.Name))
}

// NewFilterPlugin is the factory for filter plugin.
Expand Down Expand Up @@ -282,7 +282,7 @@ func TestGenericScheduler(t *testing.T) {
pod *v1.Pod
pods []*v1.Pod
buildPredMeta bool // build predicates metadata or not
failFilter bool
filterReturnCode framework.Code
expectedHosts sets.String
expectsErr bool
wErr error
Expand Down Expand Up @@ -598,14 +598,14 @@ func TestGenericScheduler(t *testing.T) {
wErr: nil,
},
{
name: "test with failed filter plugin",
predicates: map[string]algorithmpredicates.FitPredicate{"true": truePredicate},
prioritizers: []priorities.PriorityConfig{{Function: numericPriority, Weight: 1}},
nodes: []string{"3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}},
expectedHosts: nil,
failFilter: true,
expectsErr: true,
name: "test with filter plugin returning Unschedulable status",
predicates: map[string]algorithmpredicates.FitPredicate{"true": truePredicate},
prioritizers: []priorities.PriorityConfig{{Function: numericPriority, Weight: 1}},
nodes: []string{"3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}},
expectedHosts: nil,
filterReturnCode: framework.Unschedulable,
expectsErr: true,
wErr: &FitError{
Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}},
NumAllNodes: 1,
Expand All @@ -615,10 +615,28 @@ func TestGenericScheduler(t *testing.T) {
},
},
},
{
name: "test with filter plugin returning UnschedulableAndUnresolvable status",
predicates: map[string]algorithmpredicates.FitPredicate{"true": truePredicate},
prioritizers: []priorities.PriorityConfig{{Function: numericPriority, Weight: 1}},
nodes: []string{"3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}},
expectedHosts: nil,
filterReturnCode: framework.UnschedulableAndUnresolvable,
expectsErr: true,
wErr: &FitError{
Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}},
NumAllNodes: 1,
FailedPredicates: FailedPredicateMap{},
FilteredNodesStatuses: framework.NodeToStatusMap{
"3": framework.NewStatus(framework.UnschedulableAndUnresolvable, "injecting failure for pod test-filter"),
},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
filterPlugin.failFilter = test.failFilter
filterPlugin.returnCode = test.filterReturnCode

cache := internalcache.New(time.Duration(0), wait.NeverStop)
for _, pod := range test.pods {
Expand Down Expand Up @@ -1495,14 +1513,15 @@ func TestPickOneNodeForPreemption(t *testing.T) {

func TestNodesWherePreemptionMightHelp(t *testing.T) {
// Prepare 4 node names.
nodeNames := []string{}
nodeNames := make([]string, 0, 4)
for i := 1; i < 5; i++ {
nodeNames = append(nodeNames, fmt.Sprintf("machine%d", i))
}

tests := []struct {
name string
failedPredMap FailedPredicateMap
nodesStatuses framework.NodeToStatusMap
expected map[string]bool // set of expected node names. Value is ignored.
}{
{
Expand Down Expand Up @@ -1586,11 +1605,41 @@ func TestNodesWherePreemptionMightHelp(t *testing.T) {
},
expected: map[string]bool{"machine1": true, "machine3": true, "machine4": true},
},
{
name: "UnschedulableAndUnresolvable status should be skipped but Unschedulable should be tried",
failedPredMap: FailedPredicateMap{},
nodesStatuses: framework.NodeToStatusMap{
"machine2": framework.NewStatus(framework.UnschedulableAndUnresolvable, ""),
"machine3": framework.NewStatus(framework.Unschedulable, ""),
"machine4": framework.NewStatus(framework.UnschedulableAndUnresolvable, ""),
},
expected: map[string]bool{"machine1": true, "machine3": true},
},
{
name: "Failed predicates and statuses should be evaluated",
failedPredMap: FailedPredicateMap{
"machine1": []algorithmpredicates.PredicateFailureReason{algorithmpredicates.ErrPodAffinityNotMatch},
"machine2": []algorithmpredicates.PredicateFailureReason{algorithmpredicates.ErrPodAffinityNotMatch},
"machine3": []algorithmpredicates.PredicateFailureReason{algorithmpredicates.ErrPodNotMatchHostName},
"machine4": []algorithmpredicates.PredicateFailureReason{algorithmpredicates.ErrPodNotMatchHostName},
},
nodesStatuses: framework.NodeToStatusMap{
"machine1": framework.NewStatus(framework.Unschedulable, ""),
"machine2": framework.NewStatus(framework.UnschedulableAndUnresolvable, ""),
"machine3": framework.NewStatus(framework.Unschedulable, ""),
"machine4": framework.NewStatus(framework.UnschedulableAndUnresolvable, ""),
},
expected: map[string]bool{"machine1": true},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
nodes := nodesWherePreemptionMightHelp(makeNodeList(nodeNames), test.failedPredMap)
fitErr := FitError{
FailedPredicates: test.failedPredMap,
FilteredNodesStatuses: test.nodesStatuses,
}
nodes := nodesWherePreemptionMightHelp(makeNodeList(nodeNames), &fitErr)
if len(test.expected) != len(nodes) {
t.Errorf("number of nodes is not the same as expected. exptectd: %d, got: %d. Nodes: %v", len(test.expected), len(nodes), nodes)
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/scheduler/framework/v1alpha1/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (f *framework) RunPreFilterPlugins(
for _, pl := range f.preFilterPlugins {
status := pl.PreFilter(pc, pod)
if !status.IsSuccess() {
if status.Code() == Unschedulable {
if status.IsUnschedulable() {
msg := fmt.Sprintf("rejected by %q at prefilter: %v", pl.Name(), status.Message())
klog.V(4).Infof(msg)
return NewStatus(status.Code(), msg)
Expand All @@ -315,7 +315,7 @@ func (f *framework) RunFilterPlugins(pc *PluginContext,
for _, pl := range f.filterPlugins {
status := pl.Filter(pc, pod, nodeName)
if !status.IsSuccess() {
if status.Code() != Unschedulable {
if !status.IsUnschedulable() {
errMsg := fmt.Sprintf("error while running %q filter plugin for pod %q: %v",
pl.Name(), pod.Name, status.Message())
klog.Error(errMsg)
Expand Down Expand Up @@ -433,7 +433,7 @@ func (f *framework) RunPreBindPlugins(
for _, pl := range f.preBindPlugins {
status := pl.PreBind(pc, pod, nodeName)
if !status.IsSuccess() {
if status.Code() == Unschedulable {
if status.IsUnschedulable() {
msg := fmt.Sprintf("rejected by %q at prebind: %v", pl.Name(), status.Message())
klog.V(4).Infof(msg)
return NewStatus(status.Code(), msg)
Expand Down Expand Up @@ -513,7 +513,7 @@ func (f *framework) RunPermitPlugins(
for _, pl := range f.permitPlugins {
status, d := pl.Permit(pc, pod, nodeName)
if !status.IsSuccess() {
if status.Code() == Unschedulable {
if status.IsUnschedulable() {
msg := fmt.Sprintf("rejected by %q at permit: %v", pl.Name(), status.Message())
klog.V(4).Infof(msg)
return NewStatus(status.Code(), msg)
Expand Down Expand Up @@ -547,7 +547,7 @@ func (f *framework) RunPermitPlugins(
return NewStatus(Unschedulable, msg)
case s := <-w.s:
if !s.IsSuccess() {
if s.Code() == Unschedulable {
if s.IsUnschedulable() {
msg := fmt.Sprintf("rejected while waiting at permit: %v", s.Message())
klog.V(4).Infof(msg)
return NewStatus(s.Code(), msg)
Expand Down
15 changes: 14 additions & 1 deletion pkg/scheduler/framework/v1alpha1/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,16 @@ const (
Success Code = iota
// Error is used for internal plugin errors, unexpected input, etc.
Error
// Unschedulable is used when a plugin finds a pod unschedulable.
// Unschedulable is used when a plugin finds a pod unschedulable. The scheduler might attempt to
// preempt other pods to get this pod scheduled. Use UnschedulableAndUnresolvable to make the
// scheduler skip preemption.
// The accompanying status message should explain why the pod is unschedulable.
Unschedulable
// UnschedulableAndUnresolvable is used when a (pre-)filter plugin finds a pod unschedulable and
// preemption would not change anything. Plugins should return Unschedulable if it is possible
// that the pod can get scheduled with preemption.
// The accompanying status message should explain why the pod is unschedulable.
UnschedulableAndUnresolvable
// Wait is used when a permit plugin finds a pod scheduling should wait.
Wait
// Skip is used when a bind plugin chooses to skip binding.
Expand Down Expand Up @@ -100,6 +107,12 @@ func (s *Status) IsSuccess() bool {
return s.Code() == Success
}

// IsUnschedulable returns true if "Status" is Unschedulable (Unschedulable or UnschedulableAndUnresolvable).
func (s *Status) IsUnschedulable() bool {
code := s.Code()
return code == Unschedulable || code == UnschedulableAndUnresolvable
}

// AsError returns an "error" object with the same message as that of the Status.
func (s *Status) AsError() error {
if s.IsSuccess() {
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ func (sched *Scheduler) scheduleOne() {
permitStatus := fwk.RunPermitPlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
if !permitStatus.IsSuccess() {
var reason string
if permitStatus.Code() == framework.Unschedulable {
if permitStatus.IsUnschedulable() {
metrics.PodScheduleFailures.Inc()
reason = v1.PodReasonUnschedulable
} else {
Expand All @@ -635,7 +635,7 @@ func (sched *Scheduler) scheduleOne() {
preBindStatus := fwk.RunPreBindPlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
if !preBindStatus.IsSuccess() {
var reason string
if preBindStatus.Code() == framework.Unschedulable {
if preBindStatus.IsUnschedulable() {
metrics.PodScheduleFailures.Inc()
reason = v1.PodReasonUnschedulable
} else {
Expand Down

0 comments on commit 3c1f8a8

Please sign in to comment.