Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "feature(scheduler): won't run Filter if PreFilter returned a Skip status" #113786

Merged
merged 1 commit into from Nov 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 0 additions & 5 deletions pkg/scheduler/framework/cycle_state.go
Expand Up @@ -19,8 +19,6 @@ package framework
import (
"errors"
"sync"

"k8s.io/apimachinery/pkg/util/sets"
)

var (
Expand Down Expand Up @@ -50,8 +48,6 @@ type CycleState struct {
storage sync.Map
// if recordPluginMetrics is true, PluginExecutionDuration will be recorded for this cycle.
recordPluginMetrics bool
// SkipFilterPlugins are plugins that will be skipped in the Filter extension point.
SkipFilterPlugins sets.String
}

// NewCycleState initializes a new CycleState and returns its pointer.
Expand Down Expand Up @@ -87,7 +83,6 @@ func (c *CycleState) Clone() *CycleState {
return true
})
copy.recordPluginMetrics = c.recordPluginMetrics
copy.SkipFilterPlugins = c.SkipFilterPlugins

return copy
}
Expand Down
1 change: 0 additions & 1 deletion pkg/scheduler/framework/interface.go
Expand Up @@ -91,7 +91,6 @@ const (
// Wait is used when a Permit plugin finds a pod scheduling should wait.
Wait
// Skip is used when a Bind plugin chooses to skip binding.
// Also, if a PreFilter plugin returns Skip, coupled Filter plugin will be skipped.
Skip
)

Expand Down
18 changes: 4 additions & 14 deletions pkg/scheduler/framework/runtime/framework.go
Expand Up @@ -599,30 +599,24 @@ func (f *frameworkImpl) QueueSortFunc() framework.LessFunc {

// RunPreFilterPlugins runs the set of configured PreFilter plugins. It returns
// *Status and its code is set to non-success if any of the plugins returns
// anything but Success/Skip.
// Plugins that returned Skip status are recorded in the cyclestate,
// and they are skipped in the Filter extension point.
// If a non-success status is returned, then the scheduling cycle is aborted.
// anything but Success. If a non-success status is returned, then the scheduling
// cycle is aborted.
func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (_ *framework.PreFilterResult, status *framework.Status) {
startTime := time.Now()
defer func() {
metrics.FrameworkExtensionPointDuration.WithLabelValues(preFilter, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
}()
var result *framework.PreFilterResult
var pluginsWithNodes []string
skipPlugins := sets.NewString()
for _, pl := range f.preFilterPlugins {
r, s := f.runPreFilterPlugin(ctx, pl, state, pod)
if !s.IsSuccess() && !s.IsSkip() {
if !s.IsSuccess() {
s.SetFailedPlugin(pl.Name())
if s.IsUnschedulable() {
return nil, s
}
return nil, framework.AsStatus(fmt.Errorf("running PreFilter plugin %q: %w", pl.Name(), s.AsError())).WithFailedPlugin(pl.Name())
}
if s.IsSkip() {
skipPlugins.Insert(pl.Name())
}
if !r.AllNodes() {
pluginsWithNodes = append(pluginsWithNodes, pl.Name())
}
Expand All @@ -634,8 +628,8 @@ func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framewor
}
return nil, framework.NewStatus(framework.Unschedulable, msg)
}

}
state.SkipFilterPlugins = skipPlugins
return result, nil
}

Expand Down Expand Up @@ -729,12 +723,8 @@ func (f *frameworkImpl) RunFilterPlugins(
pod *v1.Pod,
nodeInfo *framework.NodeInfo,
) framework.PluginToStatus {
skippedPlugins := state.SkipFilterPlugins
statuses := make(framework.PluginToStatus)
for _, pl := range f.filterPlugins {
if skippedPlugins.Has(pl.Name()) {
continue
}
pluginStatus := f.runFilterPlugin(ctx, pl, state, pod, nodeInfo)
if !pluginStatus.IsSuccess() {
if !pluginStatus.IsUnschedulable() {
Expand Down
188 changes: 29 additions & 159 deletions pkg/scheduler/framework/runtime/framework_test.go
Expand Up @@ -1393,7 +1393,7 @@ func TestPreFilterPlugins(t *testing.T) {
if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err)
}
f.RunPreFilterPlugins(ctx, framework.NewCycleState(), nil)
f.RunPreFilterPlugins(ctx, nil, nil)
f.RunPreFilterExtensionAddPod(ctx, nil, nil, nil, nil)
f.RunPreFilterExtensionRemovePod(ctx, nil, nil, nil, nil)

Expand All @@ -1412,152 +1412,40 @@ func TestPreFilterPlugins(t *testing.T) {
})
}

func TestRunPreFilterPlugins(t *testing.T) {
tests := []struct {
name string
plugins []*TestPlugin
wantPreFilterResult *framework.PreFilterResult
wantSkippedPlugins sets.String
wantStatus *framework.Status
}{
{
name: "all PreFilter returned success",
plugins: []*TestPlugin{
{
name: "success1",
},
{
name: "success2",
},
},
wantPreFilterResult: nil,
wantStatus: nil,
},
{
name: "one PreFilter plugin returned success, but another PreFilter plugin returned non-success",
plugins: []*TestPlugin{
{
name: "success",
},
{
name: "error",
inj: injectedResult{PreFilterStatus: int(framework.Error)},
},
},
wantPreFilterResult: nil,
wantStatus: framework.AsStatus(fmt.Errorf("running PreFilter plugin %q: %w", "error", errInjectedStatus)).WithFailedPlugin("error"),
},
{
name: "one PreFilter plugin returned skip, but another PreFilter plugin returned non-success",
plugins: []*TestPlugin{
{
name: "skip",
inj: injectedResult{PreFilterStatus: int(framework.Skip)},
},
{
name: "error",
inj: injectedResult{PreFilterStatus: int(framework.Error)},
},
},
wantPreFilterResult: nil,
wantStatus: framework.AsStatus(fmt.Errorf("running PreFilter plugin %q: %w", "error", errInjectedStatus)).WithFailedPlugin("error"),
},
{
name: "all PreFilter plugins returned skip",
plugins: []*TestPlugin{
{
name: "skip1",
inj: injectedResult{PreFilterStatus: int(framework.Skip)},
},
{
name: "skip2",
inj: injectedResult{PreFilterStatus: int(framework.Skip)},
},
{
name: "skip3",
inj: injectedResult{PreFilterStatus: int(framework.Skip)},
},
},
wantPreFilterResult: nil,
wantSkippedPlugins: sets.NewString("skip1", "skip2", "skip3"),
wantStatus: nil,
},
{
name: "some PreFilter plugins returned skip",
plugins: []*TestPlugin{
{
name: "skip1",
inj: injectedResult{PreFilterStatus: int(framework.Skip)},
},
{
name: "success1",
},
{
name: "skip2",
inj: injectedResult{PreFilterStatus: int(framework.Skip)},
},
{
name: "success2",
},
},
wantPreFilterResult: nil,
wantSkippedPlugins: sets.NewString("skip1", "skip2"),
wantStatus: nil,
},
func TestRunPreFilterPluginsStatus(t *testing.T) {
preFilter := &TestPlugin{
name: preFilterPluginName,
inj: injectedResult{PreFilterStatus: int(framework.Error)},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := make(Registry)
enabled := make([]config.Plugin, len(tt.plugins))
for i, p := range tt.plugins {
p := p
enabled[i].Name = p.name
r.Register(p.name, func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) {
return p, nil
})
}
r := make(Registry)
r.Register(preFilterPluginName,
func(_ runtime.Object, fh framework.Handle) (framework.Plugin, error) {
return preFilter, nil
})

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
plugins := &config.Plugins{PreFilter: config.PluginSet{Enabled: []config.Plugin{{Name: preFilterPluginName}}}}

f, err := newFrameworkWithQueueSortAndBind(
r,
config.KubeSchedulerProfile{Plugins: &config.Plugins{PreFilter: config.PluginSet{Enabled: enabled}}},
ctx.Done(),
)
if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err)
}
profile := config.KubeSchedulerProfile{Plugins: plugins}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

state := framework.NewCycleState()
result, status := f.RunPreFilterPlugins(ctx, state, nil)
if d := cmp.Diff(result, tt.wantPreFilterResult); d != "" {
t.Errorf("wrong status. got: %v, want: %v, diff: %s", result, tt.wantPreFilterResult, d)
}
if d := cmp.Diff(status, tt.wantStatus, cmp.Comparer(func(a, b *framework.Status) bool {
if a.Code() == framework.Error && b.Code() == framework.Error {
// we assume two error status is equal to each other if both contain the same reasons.
return cmp.Equal(a.Reasons(), b.Reasons())
}
return a.Equal(b)
})); d != "" {
t.Errorf("wrong status. got: %v, want: %v, diff: %s", status, tt.wantStatus, d)
}
skipped := state.SkipFilterPlugins
if d := cmp.Diff(skipped, tt.wantSkippedPlugins); d != "" {
t.Errorf("wrong skip filter plugins. got: %v, want: %v, diff: %s", skipped, tt.wantSkippedPlugins, d)
}
})
f, err := newFrameworkWithQueueSortAndBind(r, profile, ctx.Done())
if err != nil {
t.Fatalf("Failed to create framework for testing: %v", err)
}
_, status := f.RunPreFilterPlugins(ctx, nil, nil)
wantStatus := framework.AsStatus(fmt.Errorf("running PreFilter plugin %q: %w", preFilter.Name(), errInjectedStatus)).WithFailedPlugin(preFilter.Name())
if !reflect.DeepEqual(status, wantStatus) {
t.Errorf("wrong status. got: %v, want:%v", status, wantStatus)
}
}

func TestFilterPlugins(t *testing.T) {
tests := []struct {
name string
plugins []*TestPlugin
skippedPlugins sets.String
wantStatus *framework.Status
wantStatusMap framework.PluginToStatus
name string
plugins []*TestPlugin
wantStatus *framework.Status
wantStatusMap framework.PluginToStatus
}{
{
name: "SuccessFilter",
Expand All @@ -1567,6 +1455,7 @@ func TestFilterPlugins(t *testing.T) {
inj: injectedResult{FilterStatus: int(framework.Success)},
},
},
wantStatus: nil,
wantStatusMap: framework.PluginToStatus{},
},
{
Expand Down Expand Up @@ -1644,23 +1533,6 @@ func TestFilterPlugins(t *testing.T) {
wantStatus: nil,
wantStatusMap: framework.PluginToStatus{},
},
{
name: "SuccessAndSkipFilters",
plugins: []*TestPlugin{
{
name: "TestPlugin1",
inj: injectedResult{FilterStatus: int(framework.Success)},
},

{
name: "TestPlugin2",
inj: injectedResult{FilterStatus: int(framework.Error)}, // To make sure this plugins isn't called, set error as an injected result.
},
},
wantStatus: nil,
skippedPlugins: sets.NewString("TestPlugin2"),
wantStatusMap: framework.PluginToStatus{},
},
{
name: "ErrorAndSuccessFilters",
plugins: []*TestPlugin{
Expand Down Expand Up @@ -1741,9 +1613,7 @@ func TestFilterPlugins(t *testing.T) {
if err != nil {
t.Fatalf("fail to create framework: %s", err)
}
state := framework.NewCycleState()
state.SkipFilterPlugins = tt.skippedPlugins
gotStatusMap := f.RunFilterPlugins(ctx, state, pod, nil)
gotStatusMap := f.RunFilterPlugins(ctx, nil, pod, nil)
gotStatus := gotStatusMap.Merge()
if !reflect.DeepEqual(gotStatus, tt.wantStatus) {
t.Errorf("wrong status code. got: %v, want:%v", gotStatus, tt.wantStatus)
Expand Down Expand Up @@ -1977,7 +1847,7 @@ func TestFilterPluginsWithNominatedPods(t *testing.T) {
t.Fatalf("fail to create framework: %s", err)
}
tt.nodeInfo.SetNode(tt.node)
gotStatus := f.RunFilterPluginsWithNominatedPods(ctx, framework.NewCycleState(), tt.pod, tt.nodeInfo)
gotStatus := f.RunFilterPluginsWithNominatedPods(ctx, nil, tt.pod, tt.nodeInfo)
if !reflect.DeepEqual(gotStatus, tt.wantStatus) {
t.Errorf("Unexpected status. got: %v, want: %v", gotStatus, tt.wantStatus)
}
Expand Down