Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

run all PreFilter when the preemption will happen later in the same scheduling cycle #119779

Merged
merged 2 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/scheduler/framework/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,9 @@ type PreFilterPlugin interface {
// plugins must return success or the pod will be rejected. PreFilter could optionally
// return a PreFilterResult to influence which nodes to evaluate downstream. This is useful
// for cases where it is possible to determine the subset of nodes to process in O(1) time.
// When PreFilterResult filters out some Nodes, the framework considers Nodes that are filtered out as getting "UnschedulableAndUnresolvable".
// i.e., those Nodes will be out of the candidates of the preemption.
//
// When it returns Skip status, returned PreFilterResult and other fields in status are just ignored,
// and coupled Filter plugin/PreFilterExtensions() will be skipped in this scheduling cycle.
PreFilter(ctx context.Context, state *CycleState, p *v1.Pod) (*PreFilterResult, *Status)
Expand Down
18 changes: 15 additions & 3 deletions pkg/scheduler/framework/runtime/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,7 @@ func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framewor
if verboseLogs {
logger = klog.LoggerWithName(logger, "PreFilter")
}
var returnStatus *framework.Status
for _, pl := range f.preFilterPlugins {
ctx := ctx
if verboseLogs {
Expand All @@ -683,9 +684,18 @@ func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framewor
}
if !s.IsSuccess() {
s.SetPlugin(pl.Name())
if s.IsRejected() {
if s.Code() == framework.UnschedulableAndUnresolvable {
// In this case, the preemption shouldn't happen in this scheduling cycle.
// So, no need to execute all PreFilter.
return nil, s
}
if s.Code() == framework.Unschedulable {
// In this case, the preemption should happen later in this scheduling cycle.
// So we need to execute all PreFilter.
// https://github.com/kubernetes/kubernetes/issues/119770
returnStatus = s
continue
}
return nil, framework.AsStatus(fmt.Errorf("running PreFilter plugin %q: %w", pl.Name(), s.AsError())).WithPlugin(pl.Name())
}
if !r.AllNodes() {
Expand All @@ -697,10 +707,12 @@ func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framewor
if len(pluginsWithNodes) == 1 {
msg = fmt.Sprintf("node(s) didn't satisfy plugin %v", pluginsWithNodes[0])
}
return nil, framework.NewStatus(framework.Unschedulable, msg)

// When PreFilterResult filters out Nodes, the framework considers Nodes that are filtered out as getting "UnschedulableAndUnresolvable".
return result, framework.NewStatus(framework.UnschedulableAndUnresolvable, msg)
}
}
return result, nil
return result, returnStatus
}

func (f *frameworkImpl) runPreFilterPlugin(ctx context.Context, pl framework.PreFilterPlugin, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
Expand Down
81 changes: 66 additions & 15 deletions pkg/scheduler/framework/runtime/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (pl *TestPlugin) ScoreExtensions() framework.ScoreExtensions {
}

func (pl *TestPlugin) PreFilter(ctx context.Context, state *framework.CycleState, p *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
return nil, framework.NewStatus(framework.Code(pl.inj.PreFilterStatus), injectReason)
return pl.inj.PreFilterResult, framework.NewStatus(framework.Code(pl.inj.PreFilterStatus), injectReason)
}

func (pl *TestPlugin) PreFilterExtensions() framework.PreFilterExtensions {
Expand Down Expand Up @@ -1622,6 +1622,56 @@ func TestRunPreFilterPlugins(t *testing.T) {
wantSkippedPlugins: sets.New("skip1", "skip2"),
wantStatusCode: framework.Success,
},
{
name: "one PreFilter plugin returned Unschedulable, but another PreFilter plugin should be executed",
plugins: []*TestPlugin{
{
name: "unschedulable",
inj: injectedResult{PreFilterStatus: int(framework.Unschedulable)},
},
{
// to make sure this plugin is executed, this plugin return Skip and we confirm it via wantSkippedPlugins.
name: "skip",
inj: injectedResult{PreFilterStatus: int(framework.Skip)},
},
},
wantPreFilterResult: nil,
wantSkippedPlugins: sets.New("skip"),
wantStatusCode: framework.Unschedulable,
},
{
name: "one PreFilter plugin returned UnschedulableAndUnresolvable, and all other plugins aren't executed",
plugins: []*TestPlugin{
{
name: "unresolvable",
inj: injectedResult{PreFilterStatus: int(framework.UnschedulableAndUnresolvable)},
},
{
// to make sure this plugin is not executed, this plugin return Skip and we confirm it via wantSkippedPlugins.
name: "skip",
inj: injectedResult{PreFilterStatus: int(framework.Skip)},
},
},
wantPreFilterResult: nil,
wantStatusCode: framework.UnschedulableAndUnresolvable,
},
{
name: "all nodes are filtered out by prefilter result, but other plugins aren't executed because we consider all nodes are filtered out by UnschedulableAndUnresolvable",
plugins: []*TestPlugin{
{
name: "reject-all-nodes",
inj: injectedResult{PreFilterResult: &framework.PreFilterResult{NodeNames: sets.New[string]()}},
},
{
// to make sure this plugin is not executed, this plugin return Skip and we confirm it via wantSkippedPlugins.
name: "skip",
inj: injectedResult{PreFilterStatus: int(framework.Skip)},
},
},
wantPreFilterResult: &framework.PreFilterResult{NodeNames: sets.New[string]()},
wantSkippedPlugins: sets.New[string](), // "skip" plugin isn't executed.
wantStatusCode: framework.UnschedulableAndUnresolvable,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -3224,20 +3274,21 @@ func buildScoreConfigWithWeights(weights map[string]int32, ps ...string) *config
}

type injectedResult struct {
ScoreRes int64 `json:"scoreRes,omitempty"`
NormalizeRes int64 `json:"normalizeRes,omitempty"`
ScoreStatus int `json:"scoreStatus,omitempty"`
NormalizeStatus int `json:"normalizeStatus,omitempty"`
PreFilterStatus int `json:"preFilterStatus,omitempty"`
PreFilterAddPodStatus int `json:"preFilterAddPodStatus,omitempty"`
PreFilterRemovePodStatus int `json:"preFilterRemovePodStatus,omitempty"`
FilterStatus int `json:"filterStatus,omitempty"`
PostFilterStatus int `json:"postFilterStatus,omitempty"`
PreScoreStatus int `json:"preScoreStatus,omitempty"`
ReserveStatus int `json:"reserveStatus,omitempty"`
PreBindStatus int `json:"preBindStatus,omitempty"`
BindStatus int `json:"bindStatus,omitempty"`
PermitStatus int `json:"permitStatus,omitempty"`
ScoreRes int64 `json:"scoreRes,omitempty"`
NormalizeRes int64 `json:"normalizeRes,omitempty"`
ScoreStatus int `json:"scoreStatus,omitempty"`
NormalizeStatus int `json:"normalizeStatus,omitempty"`
PreFilterResult *framework.PreFilterResult `json:"preFilterResult,omitempty"`
PreFilterStatus int `json:"preFilterStatus,omitempty"`
PreFilterAddPodStatus int `json:"preFilterAddPodStatus,omitempty"`
PreFilterRemovePodStatus int `json:"preFilterRemovePodStatus,omitempty"`
FilterStatus int `json:"filterStatus,omitempty"`
PostFilterStatus int `json:"postFilterStatus,omitempty"`
PreScoreStatus int `json:"preScoreStatus,omitempty"`
ReserveStatus int `json:"reserveStatus,omitempty"`
PreBindStatus int `json:"preBindStatus,omitempty"`
BindStatus int `json:"bindStatus,omitempty"`
PermitStatus int `json:"permitStatus,omitempty"`
}

func setScoreRes(inj injectedResult) (int64, *framework.Status) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/framework/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,9 @@ const ExtenderName = "Extender"

// Diagnosis records the details to diagnose a scheduling failure.
type Diagnosis struct {
// NodeToStatusMap records the status of each node
// if they're rejected in PreFilter (via PreFilterResult) or Filter plugins.
// Nodes that pass PreFilter/Filter plugins are not included in this map.
NodeToStatusMap NodeToStatusMap
// UnschedulablePlugins are plugins that returns Unschedulable or UnschedulableAndUnresolvable.
UnschedulablePlugins sets.Set[string]
Expand Down
12 changes: 7 additions & 5 deletions pkg/scheduler/schedule_one.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,12 +485,14 @@ func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.F
nodes := allNodes
if !preRes.AllNodes() {
nodes = make([]*framework.NodeInfo, 0, len(preRes.NodeNames))
for n := range preRes.NodeNames {
nInfo, err := sched.nodeInfoSnapshot.NodeInfos().Get(n)
if err != nil {
return nil, diagnosis, err
for _, n := range allNodes {
if !preRes.NodeNames.Has(n.Node().Name) {
// We consider Nodes that are filtered out by PreFilterResult as rejected via UnschedulableAndUnresolvable.
// We have to record them in NodeToStatusMap so that they won't be considered as candidates in the preemption.
diagnosis.NodeToStatusMap[n.Node().Name] = framework.NewStatus(framework.UnschedulableAndUnresolvable, "node is filtered out by the prefilter result")
continue
}
nodes = append(nodes, nInfo)
nodes = append(nodes, n)
}
}
feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, &diagnosis, nodes)
Expand Down
43 changes: 36 additions & 7 deletions pkg/scheduler/schedule_one_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2227,7 +2227,7 @@ func TestSchedulerSchedulePod(t *testing.T) {
nodes: []string{"node1", "node2", "node3"},
pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
wantNodes: sets.New("node2"),
wantEvaluatedNodes: ptr.To[int32](1),
wantEvaluatedNodes: ptr.To[int32](3),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this number shouldn't have changed?

Although it's only useful for debugging.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be changed.

Please refer to the change in pkg/scheduler/scheduler.go in this PR: After this PR, EvaluatedNodes contains the number of nodes that filtered out by PreFilterResult.
https://github.com/kubernetes/kubernetes/pull/119779/files#r1439161728

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I understand that it's not a bug. However, it was useful to have the value 1 to know that only one node passed PreFilter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Then, we have to change how to calculate EvaluatedNodes.
https://github.com/sanposhiho/kubernetes/blob/master/pkg/scheduler/schedule_one.go#L435


Created #124705

},
{
name: "test prefilter plugin returning non-intersecting nodes",
Expand All @@ -2254,9 +2254,9 @@ func TestSchedulerSchedulePod(t *testing.T) {
NumAllNodes: 3,
Diagnosis: framework.Diagnosis{
NodeToStatusMap: framework.NodeToStatusMap{
"node1": framework.NewStatus(framework.Unschedulable, "node(s) didn't satisfy plugin(s) [FakePreFilter2 FakePreFilter3] simultaneously"),
"node2": framework.NewStatus(framework.Unschedulable, "node(s) didn't satisfy plugin(s) [FakePreFilter2 FakePreFilter3] simultaneously"),
"node3": framework.NewStatus(framework.Unschedulable, "node(s) didn't satisfy plugin(s) [FakePreFilter2 FakePreFilter3] simultaneously"),
"node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, "node(s) didn't satisfy plugin(s) [FakePreFilter2 FakePreFilter3] simultaneously"),
"node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, "node(s) didn't satisfy plugin(s) [FakePreFilter2 FakePreFilter3] simultaneously"),
"node3": framework.NewStatus(framework.UnschedulableAndUnresolvable, "node(s) didn't satisfy plugin(s) [FakePreFilter2 FakePreFilter3] simultaneously"),
},
UnschedulablePlugins: sets.Set[string]{},
PreFilterMsg: "node(s) didn't satisfy plugin(s) [FakePreFilter2 FakePreFilter3] simultaneously",
Expand Down Expand Up @@ -2284,13 +2284,42 @@ func TestSchedulerSchedulePod(t *testing.T) {
NumAllNodes: 1,
Diagnosis: framework.Diagnosis{
NodeToStatusMap: framework.NodeToStatusMap{
"node1": framework.NewStatus(framework.Unschedulable, "node(s) didn't satisfy plugin FakePreFilter2"),
"node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, "node(s) didn't satisfy plugin FakePreFilter2"),
},
UnschedulablePlugins: sets.Set[string]{},
PreFilterMsg: "node(s) didn't satisfy plugin FakePreFilter2",
},
},
},
{
name: "test some nodes are filtered out by prefilter plugin and other are filtered out by filter plugin",
registerPlugins: []tf.RegisterPluginFunc{
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
tf.RegisterPreFilterPlugin(
"FakePreFilter",
tf.NewFakePreFilterPlugin("FakePreFilter", &framework.PreFilterResult{NodeNames: sets.New[string]("node2")}, nil),
),
tf.RegisterFilterPlugin(
"FakeFilter",
tf.NewFakeFilterPlugin(map[string]framework.Code{"node2": framework.Unschedulable}),
),
tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
},
nodes: []string{"node1", "node2"},
pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
wErr: &framework.FitError{
Pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
NumAllNodes: 2,
Diagnosis: framework.Diagnosis{
NodeToStatusMap: framework.NodeToStatusMap{
"node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, "node is filtered out by the prefilter result"),
"node2": framework.NewStatus(framework.Unschedulable, "injecting failure for pod test-prefilter").WithPlugin("FakeFilter"),
},
UnschedulablePlugins: sets.New("FakeFilter"),
PreFilterMsg: "",
},
},
},
{
name: "test prefilter plugin returning skip",
registerPlugins: []tf.RegisterPluginFunc{
Expand Down Expand Up @@ -2356,7 +2385,7 @@ func TestSchedulerSchedulePod(t *testing.T) {
wantEvaluatedNodes: ptr.To[int32](1),
},
{
name: "test no score plugin, prefilter plugin returning 2 nodes, only 1 node is evaluated",
name: "test no score plugin, prefilter plugin returning 2 nodes, only 1 node is evaluated in Filter",
registerPlugins: []tf.RegisterPluginFunc{
tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
tf.RegisterPreFilterPlugin(
Expand All @@ -2368,7 +2397,7 @@ func TestSchedulerSchedulePod(t *testing.T) {
nodes: []string{"node1", "node2", "node3"},
pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
wantNodes: sets.New("node1", "node2"),
wantEvaluatedNodes: ptr.To[int32](1),
wantEvaluatedNodes: ptr.To[int32](2),
},
}
for _, test := range tests {
Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ type ScheduleResult struct {
SuggestedHost string
// The number of nodes the scheduler evaluated the pod against in the filtering
// phase and beyond.
// Note that it contains the number of nodes that filtered out by PreFilterResult.
EvaluatedNodes int
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EvaluatedNodes is only used in logging, not being used at anywhere else like any functionalities or metrics. So, this change doesn't cause anything bad.

logger.V(2).Info("Successfully bound pod to node", "pod", klog.KObj(assumedPod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes)

(It leads us to another question; whether this EvaluatedNodes is worthwhile to keep or not in the first place though)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've find this information useful when debugging in the past.

// The number of nodes out of the evaluated ones that fit the pod.
FeasibleNodes int
Expand Down