Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement "post-filter" extension point for scheduling framework #78097

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/scheduler/core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ go_test(
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
Expand Down
42 changes: 27 additions & 15 deletions pkg/scheduler/core/generic_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

"k8s.io/klog"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -89,9 +89,10 @@ type FailedPredicateMap map[string][]predicates.PredicateFailureReason

// FitError describes a fit error of a pod.
type FitError struct {
Pod *v1.Pod
NumAllNodes int
FailedPredicates FailedPredicateMap
Pod *v1.Pod
NumAllNodes int
FailedPredicates FailedPredicateMap
FilteredNodesStatuses framework.NodeToStatusMap
}

// ErrNoNodesAvailable is used to describe the error that no nodes available to schedule pods.
Expand All @@ -111,6 +112,10 @@ func (f *FitError) Error() string {
}
}

for _, status := range f.FilteredNodesStatuses {
reasons[status.Message()]++
}

sortReasonsHistogram := func() []string {
reasonStrings := []string{}
for k, v := range reasons {
Expand Down Expand Up @@ -206,16 +211,23 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister

trace.Step("Basic checks done")
startPredicateEvalTime := time.Now()
filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pluginContext, pod, nodeLister)
filteredNodes, failedPredicateMap, filteredNodesStatuses, err := g.findNodesThatFit(pluginContext, pod, nodeLister)
if err != nil {
return result, err
}

// Run "postfilter" plugins.
postfilterStatus := g.framework.RunPostFilterPlugins(pluginContext, pod, filteredNodes, filteredNodesStatuses)
if !postfilterStatus.IsSuccess() {
return result, postfilterStatus.AsError()
}

if len(filteredNodes) == 0 {
return result, &FitError{
Pod: pod,
NumAllNodes: numNodes,
FailedPredicates: failedPredicateMap,
Pod: pod,
NumAllNodes: numNodes,
FailedPredicates: failedPredicateMap,
FilteredNodesStatuses: filteredNodesStatuses,
}
}
trace.Step("Computing predicates done")
Expand Down Expand Up @@ -449,9 +461,10 @@ func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes i

// Filters the nodes to find the ones that fit based on the given predicate functions
// Each node is passed through the predicate functions to determine if it is a fit
func (g *genericScheduler) findNodesThatFit(pluginContext *framework.PluginContext, pod *v1.Pod, nodeLister algorithm.NodeLister) ([]*v1.Node, FailedPredicateMap, error) {
func (g *genericScheduler) findNodesThatFit(pluginContext *framework.PluginContext, pod *v1.Pod, nodeLister algorithm.NodeLister) ([]*v1.Node, FailedPredicateMap, framework.NodeToStatusMap, error) {
var filtered []*v1.Node
failedPredicateMap := FailedPredicateMap{}
filteredNodesStatuses := framework.NodeToStatusMap{}

if len(g.predicates) == 0 {
filtered = nodeLister.ListNodes()
Expand Down Expand Up @@ -495,8 +508,7 @@ func (g *genericScheduler) findNodesThatFit(pluginContext *framework.PluginConte
status := g.framework.RunFilterPlugins(pluginContext, pod, nodeName)
if !status.IsSuccess() {
predicateResultLock.Lock()
failedPredicateMap[nodeName] = append(failedPredicateMap[nodeName],
predicates.NewFailureReason(status.Message()))
filteredNodesStatuses[nodeName] = status
if status.Code() != framework.Unschedulable {
errs[status.Message()]++
}
Expand Down Expand Up @@ -524,7 +536,7 @@ func (g *genericScheduler) findNodesThatFit(pluginContext *framework.PluginConte

filtered = filtered[:filteredLen]
if len(errs) > 0 {
return []*v1.Node{}, FailedPredicateMap{}, errors.CreateAggregateFromMessageCountMap(errs)
return []*v1.Node{}, FailedPredicateMap{}, framework.NodeToStatusMap{}, errors.CreateAggregateFromMessageCountMap(errs)
}
}

Expand All @@ -539,9 +551,9 @@ func (g *genericScheduler) findNodesThatFit(pluginContext *framework.PluginConte
klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
extender, err)
continue
} else {
return []*v1.Node{}, FailedPredicateMap{}, err
}

return []*v1.Node{}, FailedPredicateMap{}, framework.NodeToStatusMap{}, err
}

for failedNodeName, failedMsg := range failedMap {
Expand All @@ -556,7 +568,7 @@ func (g *genericScheduler) findNodesThatFit(pluginContext *framework.PluginConte
}
}
}
return filtered, failedPredicateMap, nil
return filtered, failedPredicateMap, filteredNodesStatuses, nil
}

// addNominatedPods adds pods with equal or greater priority which are nominated
Expand Down
92 changes: 86 additions & 6 deletions pkg/scheduler/core/generic_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ import (
"time"

apps "k8s.io/api/apps/v1"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -139,6 +140,42 @@ func getNodeReducePriority(pod *v1.Pod, meta interface{}, nodeNameToInfo map[str
var EmptyPluginRegistry = framework.Registry{}
var emptyFramework, _ = framework.NewFramework(EmptyPluginRegistry, nil, []schedulerconfig.PluginConfig{})

// FakeFilterPlugin is a test filter plugin used by default scheduler.
type FakeFilterPlugin struct {
numFilterCalled int
failFilter bool
}

var filterPlugin = &FakeFilterPlugin{}

// Name returns name of the plugin.
func (fp *FakeFilterPlugin) Name() string {
return "fake-filter-plugin"
}

// reset is used to reset filter plugin.
func (fp *FakeFilterPlugin) reset() {
fp.numFilterCalled = 0
fp.failFilter = false
}

// Filter is a test function that returns an error or nil, depending on the
// value of "failFilter".
func (fp *FakeFilterPlugin) Filter(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {
fp.numFilterCalled++

if fp.failFilter {
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("injecting failure for pod %v", pod.Name))
}

return nil
}

// NewFilterPlugin is the factory for filtler plugin.
func NewFilterPlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
return filterPlugin, nil
}

func makeNodeList(nodeNames []string) []*v1.Node {
result := make([]*v1.Node, 0, len(nodeNames))
for _, nodeName := range nodeNames {
Expand Down Expand Up @@ -219,6 +256,21 @@ func TestSelectHost(t *testing.T) {

func TestGenericScheduler(t *testing.T) {
defer algorithmpredicates.SetPredicatesOrderingDuringTest(order)()

filterPluginRegistry := framework.Registry{filterPlugin.Name(): NewFilterPlugin}
filterFramework, err := framework.NewFramework(filterPluginRegistry, &schedulerconfig.Plugins{
Filter: &schedulerconfig.PluginSet{
Enabled: []schedulerconfig.Plugin{
{
Name: filterPlugin.Name(),
},
},
},
}, []schedulerconfig.PluginConfig{})
if err != nil {
t.Errorf("Unexpected error when initialize scheduling framework, err :%v", err.Error())
}

tests := []struct {
name string
predicates map[string]algorithmpredicates.FitPredicate
Expand All @@ -229,6 +281,7 @@ func TestGenericScheduler(t *testing.T) {
pod *v1.Pod
pods []*v1.Pod
buildPredMeta bool // build predicates metadata or not
failFilter bool
expectedHosts sets.String
expectsErr bool
wErr error
Expand All @@ -246,7 +299,9 @@ func TestGenericScheduler(t *testing.T) {
FailedPredicates: FailedPredicateMap{
"machine1": []algorithmpredicates.PredicateFailureReason{algorithmpredicates.ErrFakePredicate},
"machine2": []algorithmpredicates.PredicateFailureReason{algorithmpredicates.ErrFakePredicate},
}},
},
FilteredNodesStatuses: framework.NodeToStatusMap{},
},
},
{
predicates: map[string]algorithmpredicates.FitPredicate{"true": truePredicate},
Expand Down Expand Up @@ -309,6 +364,7 @@ func TestGenericScheduler(t *testing.T) {
"2": []algorithmpredicates.PredicateFailureReason{algorithmpredicates.ErrFakePredicate},
"1": []algorithmpredicates.PredicateFailureReason{algorithmpredicates.ErrFakePredicate},
},
FilteredNodesStatuses: framework.NodeToStatusMap{},
},
},
{
Expand Down Expand Up @@ -339,6 +395,7 @@ func TestGenericScheduler(t *testing.T) {
"1": []algorithmpredicates.PredicateFailureReason{algorithmpredicates.ErrFakePredicate},
"2": []algorithmpredicates.PredicateFailureReason{algorithmpredicates.ErrFakePredicate},
},
FilteredNodesStatuses: framework.NodeToStatusMap{},
},
},
{
Expand Down Expand Up @@ -426,6 +483,7 @@ func TestGenericScheduler(t *testing.T) {
FailedPredicates: FailedPredicateMap{
"1": []algorithmpredicates.PredicateFailureReason{algorithmpredicates.ErrFakePredicate, algorithmpredicates.ErrFakePredicate},
},
FilteredNodesStatuses: framework.NodeToStatusMap{},
},
},
{
Expand Down Expand Up @@ -538,9 +596,29 @@ func TestGenericScheduler(t *testing.T) {
expectedHosts: sets.NewString("machine2", "machine3"),
wErr: nil,
},
{
name: "test with failed filter plugin",
predicates: map[string]algorithmpredicates.FitPredicate{"true": truePredicate},
prioritizers: []priorities.PriorityConfig{{Function: numericPriority, Weight: 1}},
nodes: []string{"3"},
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}},
expectedHosts: nil,
failFilter: true,
expectsErr: true,
wErr: &FitError{
Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}},
NumAllNodes: 1,
FailedPredicates: FailedPredicateMap{},
FilteredNodesStatuses: framework.NodeToStatusMap{
"3": framework.NewStatus(framework.Unschedulable, "injecting failure for pod test-filter"),
},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
filterPlugin.failFilter = test.failFilter

cache := internalcache.New(time.Duration(0), wait.NeverStop)
for _, pod := range test.pods {
cache.AddPod(pod)
Expand All @@ -564,7 +642,7 @@ func TestGenericScheduler(t *testing.T) {
predMetaProducer,
test.prioritizers,
priorities.EmptyPriorityMetadataProducer,
emptyFramework,
filterFramework,
[]algorithm.SchedulerExtender{},
nil,
pvcLister,
Expand All @@ -575,11 +653,13 @@ func TestGenericScheduler(t *testing.T) {
false)
result, err := scheduler.Schedule(test.pod, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)), framework.NewPluginContext())
if !reflect.DeepEqual(err, test.wErr) {
t.Errorf("Unexpected error: %v, expected: %v", err, test.wErr)
t.Errorf("Unexpected error: %v, expected: %v", err.Error(), test.wErr)
}
if test.expectedHosts != nil && !test.expectedHosts.Has(result.SuggestedHost) {
t.Errorf("Expected: %s, got: %s", test.expectedHosts, result.SuggestedHost)
}

filterPlugin.reset()
})
}
}
Expand Down Expand Up @@ -613,7 +693,7 @@ func TestFindFitAllError(t *testing.T) {
nodes := makeNodeList([]string{"3", "2", "1"})
scheduler := makeScheduler(predicates, nodes)

_, predicateMap, err := scheduler.findNodesThatFit(nil, &v1.Pod{}, schedulertesting.FakeNodeLister(nodes))
_, predicateMap, _, err := scheduler.findNodesThatFit(nil, &v1.Pod{}, schedulertesting.FakeNodeLister(nodes))

if err != nil {
t.Errorf("unexpected error: %v", err)
Expand Down Expand Up @@ -643,7 +723,7 @@ func TestFindFitSomeError(t *testing.T) {
scheduler := makeScheduler(predicates, nodes)

pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}}
_, predicateMap, err := scheduler.findNodesThatFit(nil, pod, schedulertesting.FakeNodeLister(nodes))
_, predicateMap, _, err := scheduler.findNodesThatFit(nil, pod, schedulertesting.FakeNodeLister(nodes))

if err != nil {
t.Errorf("unexpected error: %v", err)
Expand Down
Loading