Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #119769: Fix a bug that PostFilter plugin may not function if previous #119943

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/scheduler/framework/runtime/framework.go
Expand Up @@ -616,12 +616,13 @@ func (f *frameworkImpl) QueueSortFunc() framework.LessFunc {
// If a non-success status is returned, then the scheduling cycle is aborted.
func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (_ *framework.PreFilterResult, status *framework.Status) {
startTime := time.Now()
skipPlugins := sets.New[string]()
defer func() {
state.SkipFilterPlugins = skipPlugins
metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PreFilter, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
}()
var result *framework.PreFilterResult
var pluginsWithNodes []string
skipPlugins := sets.New[string]()
for _, pl := range f.preFilterPlugins {
r, s := f.runPreFilterPlugin(ctx, pl, state, pod)
if s.IsSkip() {
Expand All @@ -647,7 +648,6 @@ func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framewor
return nil, framework.NewStatus(framework.Unschedulable, msg)
}
}
state.SkipFilterPlugins = skipPlugins
return result, nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/framework/runtime/framework_test.go
Expand Up @@ -1662,8 +1662,8 @@ func TestRunPreFilterPlugins(t *testing.T) {
inj: injectedResult{PreFilterStatus: int(framework.Error)},
},
},
wantPreFilterResult: nil,
wantStatusCode: framework.Error,
wantSkippedPlugins: sets.New("skip"),
wantStatusCode: framework.Error,
},
{
name: "all PreFilter plugins returned skip",
Expand Down
48 changes: 45 additions & 3 deletions test/integration/scheduler/preemption/preemption_test.go
Expand Up @@ -99,9 +99,16 @@ func waitForNominatedNodeName(cs clientset.Interface, pod *v1.Pod) error {

const tokenFilterName = "token-filter"

// tokenFilter is a fake plugin that implements PreFilter and Filter.
// `Token` simulates the allowed pods number a cluster can host.
// If `EnablePreFilter` is set to false or `Token` is positive, PreFilter passes; otherwise returns Unschedulable
// For each Filter() call, `Token` is decreased by one. When `Token` is positive, Filter passes; otherwise return
// Unschedulable or UnschedulableAndUnresolvable (when `Unresolvable` is set to true)
// AddPod()/RemovePod() adds/removes one token to the cluster to simulate the dryrun preemption
type tokenFilter struct {
Tokens int
Unresolvable bool
Tokens int
Unresolvable bool
EnablePreFilter bool
}

// Name returns name of the plugin.
Expand All @@ -123,7 +130,10 @@ func (fp *tokenFilter) Filter(ctx context.Context, state *framework.CycleState,
}

func (fp *tokenFilter) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
return nil, nil
if !fp.EnablePreFilter || fp.Tokens > 0 {
return nil, nil
}
return nil, framework.NewStatus(framework.Unschedulable)
}

func (fp *tokenFilter) AddPod(ctx context.Context, state *framework.CycleState, podToSchedule *v1.Pod,
Expand Down Expand Up @@ -195,6 +205,7 @@ func TestPreemption(t *testing.T) {
existingPods []*v1.Pod
pod *v1.Pod
initTokens int
enablePreFilter bool
unresolvable bool
preemptedPodIndexes map[int]struct{}
enablePodDisruptionConditions bool
Expand Down Expand Up @@ -275,6 +286,36 @@ func TestPreemption(t *testing.T) {
}),
preemptedPodIndexes: map[int]struct{}{0: {}},
},
// This is identical with previous subtest except for setting enablePreFilter to true.
// With this fake plugin returning Unschedulable in PreFilter, it's able to exercise the path
// that in-tree plugins return Skip in PreFilter and their AddPod/RemovePod functions are also
// skipped properly upon preemption.
{
name: "basic pod preemption with preFilter",
initTokens: 1,
enablePreFilter: true,
existingPods: []*v1.Pod{
initPausePod(&testutils.PausePodConfig{
Name: "victim-pod",
Namespace: testCtx.NS.Name,
Priority: &lowPriority,
Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)},
},
}),
},
pod: initPausePod(&testutils.PausePodConfig{
Name: "preemptor-pod",
Namespace: testCtx.NS.Name,
Priority: &highPriority,
Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)},
},
}),
preemptedPodIndexes: map[int]struct{}{0: {}},
},
{
// same as the previous test, but the filter is unresolvable.
name: "basic pod preemption with unresolvable filter",
Expand Down Expand Up @@ -446,6 +487,7 @@ func TestPreemption(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodDisruptionConditions, test.enablePodDisruptionConditions)()
filter.Tokens = test.initTokens
filter.EnablePreFilter = test.enablePreFilter
filter.Unresolvable = test.unresolvable
pods := make([]*v1.Pod, len(test.existingPods))
// Create and run existingPods.
Expand Down