Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature(scheduler): won't run Filter if PreFilter returned a Skip status #112637

Merged
merged 1 commit into from Nov 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/scheduler/framework/cycle_state.go
Expand Up @@ -19,6 +19,8 @@ package framework
import (
"errors"
"sync"

"k8s.io/apimachinery/pkg/util/sets"
)

var (
Expand Down Expand Up @@ -48,6 +50,8 @@ type CycleState struct {
storage sync.Map
// if recordPluginMetrics is true, PluginExecutionDuration will be recorded for this cycle.
recordPluginMetrics bool
// SkipFilterPlugins are plugins that will be skipped in the Filter extension point.
SkipFilterPlugins sets.String
}

// NewCycleState initializes a new CycleState and returns its pointer.
Expand Down Expand Up @@ -83,6 +87,7 @@ func (c *CycleState) Clone() *CycleState {
return true
})
copy.recordPluginMetrics = c.recordPluginMetrics
copy.SkipFilterPlugins = c.SkipFilterPlugins

return copy
}
Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler/framework/interface.go
Expand Up @@ -91,6 +91,7 @@ const (
// Wait is used when a Permit plugin finds a pod scheduling should wait.
Wait
// Skip is used when a Bind plugin chooses to skip binding.
// Also, if a PreFilter plugin returns Skip, coupled Filter plugin will be skipped.
Skip
)

Expand Down
18 changes: 14 additions & 4 deletions pkg/scheduler/framework/runtime/framework.go
Expand Up @@ -592,24 +592,30 @@ func (f *frameworkImpl) QueueSortFunc() framework.LessFunc {

// RunPreFilterPlugins runs the set of configured PreFilter plugins. It returns
// *Status and its code is set to non-success if any of the plugins returns
// anything but Success. If a non-success status is returned, then the scheduling
// cycle is aborted.
// anything but Success/Skip.
// Plugins that returned Skip status are recorded in the cyclestate,
// and they are skipped in the Filter extension point.
// If a non-success status is returned, then the scheduling cycle is aborted.
func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (_ *framework.PreFilterResult, status *framework.Status) {
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(preFilter, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
}()
var result *framework.PreFilterResult
var pluginsWithNodes []string
skipPlugins := sets.NewString()
for _, pl := range f.preFilterPlugins {
r, s := f.runPreFilterPlugin(ctx, pl, state, pod)
if !s.IsSuccess() {
if !s.IsSuccess() && !s.IsSkip() {
sanposhiho marked this conversation as resolved.
Show resolved Hide resolved
s.SetFailedPlugin(pl.Name())
if s.IsUnschedulable() {
return nil, s
}
return nil, framework.AsStatus(fmt.Errorf("running PreFilter plugin %q: %w", pl.Name(), s.AsError())).WithFailedPlugin(pl.Name())
}
if s.IsSkip() {
skipPlugins.Insert(pl.Name())
}
if !r.AllNodes() {
pluginsWithNodes = append(pluginsWithNodes, pl.Name())
}
Expand All @@ -621,8 +627,8 @@ func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framewor
}
return nil, framework.NewStatus(framework.Unschedulable, msg)
}

}
state.SkipFilterPlugins = skipPlugins
return result, nil
}

Expand Down Expand Up @@ -716,8 +722,12 @@ func (f *frameworkImpl) RunFilterPlugins(
pod *v1.Pod,
nodeInfo *framework.NodeInfo,
) framework.PluginToStatus {
skippedPlugins := state.SkipFilterPlugins
statuses := make(framework.PluginToStatus)
for _, pl := range f.filterPlugins {
if skippedPlugins.Has(pl.Name()) {
continue
}
pluginStatus := f.runFilterPlugin(ctx, pl, state, pod, nodeInfo)
if !pluginStatus.IsSuccess() {
if !pluginStatus.IsUnschedulable() {
Expand Down
188 changes: 159 additions & 29 deletions pkg/scheduler/framework/runtime/framework_test.go
Expand Up @@ -1325,7 +1325,7 @@ func TestPreFilterPlugins(t *testing.T) {
if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err)
}
f.RunPreFilterPlugins(ctx, nil, nil)
f.RunPreFilterPlugins(ctx, framework.NewCycleState(), nil)
f.RunPreFilterExtensionAddPod(ctx, nil, nil, nil, nil)
f.RunPreFilterExtensionRemovePod(ctx, nil, nil, nil, nil)

Expand All @@ -1344,40 +1344,152 @@ func TestPreFilterPlugins(t *testing.T) {
})
}

func TestRunPreFilterPluginsStatus(t *testing.T) {
preFilter := &TestPlugin{
name: preFilterPluginName,
inj: injectedResult{PreFilterStatus: int(framework.Error)},
func TestRunPreFilterPlugins(t *testing.T) {
tests := []struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So much better, thanks!

name string
plugins []*TestPlugin
wantPreFilterResult *framework.PreFilterResult
wantSkippedPlugins sets.String
wantStatus *framework.Status
}{
sanposhiho marked this conversation as resolved.
Show resolved Hide resolved
{
name: "all PreFilter returned success",
plugins: []*TestPlugin{
{
name: "success1",
},
{
name: "success2",
},
},
wantPreFilterResult: nil,
wantStatus: nil,
},
{
name: "one PreFilter plugin returned success, but another PreFilter plugin returned non-success",
plugins: []*TestPlugin{
{
name: "success",
},
{
name: "error",
inj: injectedResult{PreFilterStatus: int(framework.Error)},
},
},
wantPreFilterResult: nil,
wantStatus: framework.AsStatus(fmt.Errorf("running PreFilter plugin %q: %w", "error", errInjectedStatus)).WithFailedPlugin("error"),
},
{
name: "one PreFilter plugin returned skip, but another PreFilter plugin returned non-success",
plugins: []*TestPlugin{
{
name: "skip",
inj: injectedResult{PreFilterStatus: int(framework.Skip)},
},
{
name: "error",
inj: injectedResult{PreFilterStatus: int(framework.Error)},
},
},
wantPreFilterResult: nil,
wantStatus: framework.AsStatus(fmt.Errorf("running PreFilter plugin %q: %w", "error", errInjectedStatus)).WithFailedPlugin("error"),
},
{
name: "all PreFilter plugins returned skip",
plugins: []*TestPlugin{
{
name: "skip1",
inj: injectedResult{PreFilterStatus: int(framework.Skip)},
},
{
name: "skip2",
inj: injectedResult{PreFilterStatus: int(framework.Skip)},
},
{
name: "skip3",
inj: injectedResult{PreFilterStatus: int(framework.Skip)},
},
},
wantPreFilterResult: nil,
wantSkippedPlugins: sets.NewString("skip1", "skip2", "skip3"),
wantStatus: nil,
},
{
name: "some PreFilter plugins returned skip",
plugins: []*TestPlugin{
{
name: "skip1",
inj: injectedResult{PreFilterStatus: int(framework.Skip)},
},
{
name: "success1",
},
{
name: "skip2",
inj: injectedResult{PreFilterStatus: int(framework.Skip)},
},
{
name: "success2",
},
},
wantPreFilterResult: nil,
wantSkippedPlugins: sets.NewString("skip1", "skip2"),
wantStatus: nil,
},
}
r := make(Registry)
r.Register(preFilterPluginName,
func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) {
return preFilter, nil
})
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := make(Registry)
enabled := make([]config.Plugin, len(tt.plugins))
for i, p := range tt.plugins {
p := p
enabled[i].Name = p.name
r.Register(p.name, func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) {
return p, nil
})
}

plugins := &config.Plugins{PreFilter: config.PluginSet{Enabled: []config.Plugin{{Name: preFilterPluginName}}}}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

profile := config.KubeSchedulerProfile{Plugins: plugins}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
f, err := newFrameworkWithQueueSortAndBind(
r,
config.KubeSchedulerProfile{Plugins: &config.Plugins{PreFilter: config.PluginSet{Enabled: enabled}}},
ctx.Done(),
)
if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err)
}

f, err := newFrameworkWithQueueSortAndBind(r, profile, ctx.Done())
if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err)
}
_, status := f.RunPreFilterPlugins(ctx, nil, nil)
wantStatus := framework.AsStatus(fmt.Errorf("running PreFilter plugin %q: %w", preFilter.Name(), errInjectedStatus)).WithFailedPlugin(preFilter.Name())
if !reflect.DeepEqual(status, wantStatus) {
t.Errorf("wrong status. got: %v, want:%v", status, wantStatus)
state := framework.NewCycleState()
result, status := f.RunPreFilterPlugins(ctx, state, nil)
if d := cmp.Diff(result, tt.wantPreFilterResult); d != "" {
t.Errorf("wrong status. got: %v, want: %v, diff: %s", result, tt.wantPreFilterResult, d)
}
if d := cmp.Diff(status, tt.wantStatus, cmp.Comparer(func(a, b *framework.Status) bool {
if a.Code() == framework.Error && b.Code() == framework.Error {
// we assume two error status is equal to each other if both contain the same reasons.
return cmp.Equal(a.Reasons(), b.Reasons())
}
return a.Equal(b)
})); d != "" {
t.Errorf("wrong status. got: %v, want: %v, diff: %s", status, tt.wantStatus, d)
}
skipped := state.SkipFilterPlugins
if d := cmp.Diff(skipped, tt.wantSkippedPlugins); d != "" {
t.Errorf("wrong skip filter plugins. got: %v, want: %v, diff: %s", skipped, tt.wantSkippedPlugins, d)
}
})
}
}

func TestFilterPlugins(t *testing.T) {
tests := []struct {
name string
plugins []*TestPlugin
wantStatus *framework.Status
wantStatusMap framework.PluginToStatus
name string
plugins []*TestPlugin
skippedPlugins sets.String
wantStatus *framework.Status
wantStatusMap framework.PluginToStatus
}{
{
name: "SuccessFilter",
Expand All @@ -1387,7 +1499,6 @@ func TestFilterPlugins(t *testing.T) {
inj: injectedResult{FilterStatus: int(framework.Success)},
},
},
wantStatus: nil,
wantStatusMap: framework.PluginToStatus{},
},
{
Expand Down Expand Up @@ -1465,6 +1576,23 @@ func TestFilterPlugins(t *testing.T) {
wantStatus: nil,
wantStatusMap: framework.PluginToStatus{},
},
{
name: "SuccessAndSkipFilters",
plugins: []*TestPlugin{
{
name: "TestPlugin1",
inj: injectedResult{FilterStatus: int(framework.Success)},
},

{
name: "TestPlugin2",
inj: injectedResult{FilterStatus: int(framework.Error)}, // To make sure this plugins isn't called, set error as an injected result.
},
},
wantStatus: nil,
skippedPlugins: sets.NewString("TestPlugin2"),
wantStatusMap: framework.PluginToStatus{},
},
{
name: "ErrorAndSuccessFilters",
plugins: []*TestPlugin{
Expand Down Expand Up @@ -1545,7 +1673,9 @@ func TestFilterPlugins(t *testing.T) {
if err != nil {
t.Fatalf("fail to create framework: %s", err)
}
gotStatusMap := f.RunFilterPlugins(ctx, nil, pod, nil)
state := framework.NewCycleState()
state.SkipFilterPlugins = tt.skippedPlugins
gotStatusMap := f.RunFilterPlugins(ctx, state, pod, nil)
gotStatus := gotStatusMap.Merge()
if !reflect.DeepEqual(gotStatus, tt.wantStatus) {
t.Errorf("wrong status code. got: %v, want:%v", gotStatus, tt.wantStatus)
Expand Down Expand Up @@ -1779,7 +1909,7 @@ func TestFilterPluginsWithNominatedPods(t *testing.T) {
t.Fatalf("fail to create framework: %s", err)
}
tt.nodeInfo.SetNode(tt.node)
gotStatus := f.RunFilterPluginsWithNominatedPods(ctx, nil, tt.pod, tt.nodeInfo)
gotStatus := f.RunFilterPluginsWithNominatedPods(ctx, framework.NewCycleState(), tt.pod, tt.nodeInfo)
if !reflect.DeepEqual(gotStatus, tt.wantStatus) {
t.Errorf("Unexpected status. got: %v, want: %v", gotStatus, tt.wantStatus)
}
Expand Down