Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix predicate return #3553

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions example/extender/extender.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,8 @@ func predicate(w http.ResponseWriter, r *http.Request) {

resp := &extender.PredicateResponse{}
if req.Task.BestEffort && len(req.Node.Tasks) > 10 {
sts := api.Status{}
sts.Code = api.Unschedulable
sts.Reason = "Too many tasks on the node"
resp.Status = append(resp.Status, &sts)
resp.ErrorMessage = "Too many tasks on the node"
lowang-bh marked this conversation as resolved.
Show resolved Hide resolved
resp.Code = api.Unschedulable
}
response, err := json.Marshal(resp)
if err != nil {
Expand Down
15 changes: 3 additions & 12 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,23 +303,14 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a
}
}

func (alloc *Action) predicate(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) {
func (alloc *Action) predicate(task *api.TaskInfo, node *api.NodeInfo) error {
// Check for Resource Predicate
var statusSets api.StatusSets
if ok, resources := task.InitResreq.LessEqualWithResourcesName(node.FutureIdle(), api.Zero); !ok {
statusSets = append(statusSets, &api.Status{Code: api.Unschedulable, Reason: api.WrapInsufficientResourceReason(resources)})
return nil, api.NewFitErrWithStatus(task, node, statusSets...)
return api.NewFitErrWithStatus(task, node, statusSets...)
}
statusSets, err := alloc.session.PredicateFn(task, node)
if err != nil {
return nil, api.NewFitError(task, node, err.Error())
}

if statusSets.ContainsUnschedulable() || statusSets.ContainsUnschedulableAndUnresolvable() ||
statusSets.ContainsErrorSkipOrWait() {
return nil, api.NewFitErrWithStatus(task, node, statusSets...)
}
return nil, nil
return alloc.session.PredicateForAllocateAction(task, node)
}

func (alloc *Action) UnInitialize() {}
16 changes: 1 addition & 15 deletions pkg/scheduler/actions/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package backfill

import (
"fmt"
"time"

"k8s.io/klog/v2"
Expand All @@ -44,20 +43,7 @@ func (backfill *Action) Execute(ssn *framework.Session) {
klog.V(5).Infof("Enter Backfill ...")
defer klog.V(5).Infof("Leaving Backfill ...")

predicateFunc := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) {
var statusSets api.StatusSets
statusSets, err := ssn.PredicateFn(task, node)
if err != nil {
return nil, err
}

// predicateHelper.PredicateNodes will print the log if predicate failed, so don't print log anymore here
if statusSets.ContainsUnschedulable() || statusSets.ContainsUnschedulableAndUnresolvable() || statusSets.ContainsErrorSkipOrWait() {
err := fmt.Errorf(statusSets.Message()) // should not include variables in api node errors
return nil, err
}
return nil, nil
}
predicateFunc := ssn.PredicateForAllocateAction

// TODO (k82cn): When backfill, it's also need to balance between Queues.
pendingTasks := backfill.pickUpPendingTasks(ssn)
Expand Down
12 changes: 1 addition & 11 deletions pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,17 +213,7 @@ func preempt(
return false, fmt.Errorf("PrePredicate for task %s/%s failed for: %v", preemptor.Namespace, preemptor.Name, err)
}

predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) {
var statusSets api.StatusSets
statusSets, _ = ssn.PredicateFn(task, node)

// When filtering candidate nodes, need to consider the node statusSets instead of the err information.
// refer to kube-scheduler preemption code: https://github.com/kubernetes/kubernetes/blob/9d87fa215d9e8020abdc17132d1252536cd752d2/pkg/scheduler/framework/preemption/preemption.go#L422
if statusSets.ContainsUnschedulableAndUnresolvable() || statusSets.ContainsErrorSkipOrWait() {
return nil, api.NewFitErrWithStatus(task, node, statusSets...)
}
return nil, nil
}
predicateFn := ssn.PredicateForPreemptAction
// we should filter out those nodes that are UnschedulableAndUnresolvable status got in allocate action
allNodes := ssn.GetUnschedulableAndUnresolvableNodesForTask(preemptor)
predicateNodes, _ := predicateHelper.PredicateNodes(preemptor, allNodes, predicateFn, true)
Expand Down
9 changes: 3 additions & 6 deletions pkg/scheduler/actions/reclaim/reclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,16 +128,13 @@ func (ra *Action) Execute(ssn *framework.Session) {
// we should filter out those nodes that are UnschedulableAndUnresolvable status got in allocate action
totalNodes := ssn.GetUnschedulableAndUnresolvableNodesForTask(task)
for _, n := range totalNodes {
var statusSets api.StatusSets
statusSets, _ = ssn.PredicateFn(task, n)

// When filtering candidate nodes, need to consider the node statusSets instead of the err information.
// refer to kube-scheduler preemption code: https://github.com/kubernetes/kubernetes/blob/9d87fa215d9e8020abdc17132d1252536cd752d2/pkg/scheduler/framework/preemption/preemption.go#L422
if statusSets.ContainsUnschedulableAndUnresolvable() || statusSets.ContainsErrorSkipOrWait() {
klog.V(5).Infof("predicates failed in reclaim for task <%s/%s> on node <%s>, reason is %s.",
task.Namespace, task.Name, n.Name, statusSets.Message())
if err := ssn.PredicateForPreemptAction(task, n); err != nil {
klog.V(4).Infof("Reclaim predicate for task %s/%s on node %s return error %v ", task.Namespace, task.Name, n.Name, err)
continue
}

klog.V(3).Infof("Considering Task <%s/%s> on Node <%s>.", task.Namespace, task.Name, n.Name)

var reclaimees []*api.TaskInfo
Expand Down
33 changes: 32 additions & 1 deletion pkg/scheduler/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,15 @@ const (
Wait
// Skip is used when a Bind plugin chooses to skip binding.
Skip
// There is a Pending status in k8s.
// Pending means that the scheduling process is finished successfully,
// but the plugin wants to stop the scheduling cycle/binding cycle here.
)

type Status struct {
Code int
Reason string
Plugin string
}

// String represents status string
Expand Down Expand Up @@ -227,6 +231,33 @@ func (s StatusSets) Reasons() []string {
return all
}

// ConvertPredicateStatus return predicate status from k8sframework status
func ConvertPredicateStatus(status *k8sframework.Status) *Status {
internalStatus := &Status{}
if status != nil {
internalStatus.Plugin = status.Plugin() // function didn't check whether Status is nil
}
switch status.Code() {
case k8sframework.Error:
internalStatus.Code = Error
case k8sframework.Unschedulable:
internalStatus.Code = Unschedulable
case k8sframework.UnschedulableAndUnresolvable:
internalStatus.Code = UnschedulableAndUnresolvable
case k8sframework.Wait:
internalStatus.Code = Wait
case k8sframework.Skip:
internalStatus.Code = Skip
default:
internalStatus.Code = Success
}
// in case that pod's scheduling message is not identifiable with message: 'all nodes are unavailable'
if internalStatus.Code != Success {
internalStatus.Reason = status.Message()
}
return internalStatus
}

// ValidateExFn is the func declaration used to validate the result.
type ValidateExFn func(interface{}) *ValidateResult

Expand All @@ -237,7 +268,7 @@ type VoteFn func(interface{}) int
type JobEnqueuedFn func(interface{})

// PredicateFn is the func declaration used to predicate node for task.
type PredicateFn func(*TaskInfo, *NodeInfo) ([]*Status, error)
type PredicateFn func(*TaskInfo, *NodeInfo) error

// PrePredicateFn is the func declaration used to pre-predicate node for task.
type PrePredicateFn func(*TaskInfo) error
Expand Down
46 changes: 46 additions & 0 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,52 @@ func (ssn *Session) GetUnschedulableAndUnresolvableNodesForTask(task *api.TaskIn
return ret
}

// PredicateForAllocateAction checks if the predicate error contains
// - Unschedulable
// - UnschedulableAndUnresolvable
// - ErrorSkipOrWait
func (ssn *Session) PredicateForAllocateAction(task *api.TaskInfo, node *api.NodeInfo) error {
err := ssn.PredicateFn(task, node)
if err == nil {
return nil
}

fitError, ok := err.(*api.FitError)
if !ok {
return api.NewFitError(task, node, err.Error())
}

statusSets := fitError.Status
if statusSets.ContainsUnschedulable() || statusSets.ContainsUnschedulableAndUnresolvable() ||
statusSets.ContainsErrorSkipOrWait() {
return fitError
}
return nil
}

// PredicateForPreemptAction checks if the predicate error contains:
// - UnschedulableAndUnresolvable
// - ErrorSkipOrWait
func (ssn *Session) PredicateForPreemptAction(task *api.TaskInfo, node *api.NodeInfo) error {
err := ssn.PredicateFn(task, node)
if err == nil {
return nil
}

fitError, ok := err.(*api.FitError)
if !ok {
return api.NewFitError(task, node, err.Error())
}

// When filtering candidate nodes, need to consider the node statusSets instead of the err information.
// refer to kube-scheduler preemption code: https://github.com/kubernetes/kubernetes/blob/9d87fa215d9e8020abdc17132d1252536cd752d2/pkg/scheduler/framework/preemption/preemption.go#L422
statusSets := fitError.Status
if statusSets.ContainsUnschedulableAndUnresolvable() || statusSets.ContainsErrorSkipOrWait() {
return fitError
}
return nil
}

// Statement returns new statement object
func (ssn *Session) Statement() *Statement {
return &Statement{
Expand Down
10 changes: 4 additions & 6 deletions pkg/scheduler/framework/session_plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,8 +627,7 @@ func (ssn *Session) TaskOrderFn(l, r interface{}) bool {
}

// PredicateFn invoke predicate function of the plugins
func (ssn *Session) PredicateFn(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) {
predicateStatus := make([]*api.Status, 0)
func (ssn *Session) PredicateFn(task *api.TaskInfo, node *api.NodeInfo) error {
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
if !isEnabled(plugin.EnabledPredicate) {
Expand All @@ -638,14 +637,13 @@ func (ssn *Session) PredicateFn(task *api.TaskInfo, node *api.NodeInfo) ([]*api.
if !found {
continue
}
status, err := pfn(task, node)
predicateStatus = append(predicateStatus, status...)
err := pfn(task, node)
if err != nil {
return predicateStatus, err
return err
}
}
}
return predicateStatus, nil
return nil
}

// PrePredicateFn invoke predicate function of the plugins
Expand Down
21 changes: 0 additions & 21 deletions pkg/scheduler/framework/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,24 +262,3 @@ func (nl *NodeLister) List() ([]*v1.Node, error) {
}
return nodes, nil
}

// ConvertPredicateStatus return predicate status from k8sframework status
func ConvertPredicateStatus(status *k8sframework.Status) *api.Status {
internalStatus := &api.Status{}
if status.Code() == k8sframework.Success {
internalStatus.Code = api.Success
return internalStatus
} else if status.Code() == k8sframework.Unschedulable {
internalStatus.Code = api.Unschedulable
internalStatus.Reason = status.Message()
return internalStatus
} else if status.Code() == k8sframework.UnschedulableAndUnresolvable {
internalStatus.Code = api.UnschedulableAndUnresolvable
internalStatus.Reason = status.Message()
return internalStatus
} else {
internalStatus.Code = api.Error
internalStatus.Reason = status.Message()
return internalStatus
}
}
12 changes: 6 additions & 6 deletions pkg/scheduler/plugins/deviceshare/deviceshare.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package deviceshare

import (
"context"
"fmt"
"math"
"reflect"

Expand Down Expand Up @@ -109,7 +108,7 @@ func getDeviceScore(ctx context.Context, pod *v1.Pod, node *api.NodeInfo, schedu

func (dp *deviceSharePlugin) OnSessionOpen(ssn *framework.Session) {
// Register event handlers to update task info in PodLister & nodeMap
ssn.AddPredicateFn(dp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) {
ssn.AddPredicateFn(dp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) error {
predicateStatus := make([]*api.Status, 0)
// Check PredicateWithCache
for _, val := range api.RegisteredDevices {
Expand All @@ -120,21 +119,22 @@ func (dp *deviceSharePlugin) OnSessionOpen(ssn *framework.Session) {
predicateStatus = append(predicateStatus, &api.Status{
Code: devices.Unschedulable,
Reason: "node not initialized with device" + val,
Plugin: PluginName,
})
return predicateStatus, fmt.Errorf("node not initialized with device %s", val)
return api.NewFitErrWithStatus(task, node, predicateStatus...)
}
klog.V(4).Infof("pod %s/%s did not request device %s on %s, skipping it", task.Pod.Namespace, task.Pod.Name, val, node.Name)
continue
}
code, msg, err := dev.FilterNode(task.Pod, dp.schedulePolicy)
if err != nil {
predicateStatus = append(predicateStatus, createStatus(code, msg))
return predicateStatus, err
return api.NewFitErrWithStatus(task, node, predicateStatus...)
}
filterNodeStatus := createStatus(code, msg)
if filterNodeStatus.Code != api.Success {
predicateStatus = append(predicateStatus, filterNodeStatus)
return predicateStatus, fmt.Errorf("plugin device filternode predicates failed %s", msg)
return api.NewFitErrWithStatus(task, node, predicateStatus...)
}
} else {
klog.Warningf("Devices %s assertion conversion failed, skip", val)
Expand All @@ -144,7 +144,7 @@ func (dp *deviceSharePlugin) OnSessionOpen(ssn *framework.Session) {
klog.V(4).Infof("checkDevices predicates Task <%s/%s> on Node <%s>: fit ",
task.Namespace, task.Name, node.Name)

return predicateStatus, nil
return nil
})

ssn.AddNodeOrderFn(dp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) (float64, error) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/scheduler/plugins/extender/argument.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ type PredicateRequest struct {
}

type PredicateResponse struct {
Status []*api.Status `json:"status"`
ErrorMessage string `json:"status"`
Code int `json:"code"`
}

type PrioritizeRequest struct {
Expand Down
17 changes: 12 additions & 5 deletions pkg/scheduler/plugins/extender/extender.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,20 +161,27 @@ func (ep *extenderPlugin) OnSessionOpen(ssn *framework.Session) {
}

if ep.config.predicateVerb != "" {
ssn.AddPredicateFn(ep.Name(), func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) {
ssn.AddPredicateFn(ep.Name(), func(task *api.TaskInfo, node *api.NodeInfo) error {
resp := &PredicateResponse{}
err := ep.send(ep.config.predicateVerb, &PredicateRequest{Task: task, Node: node}, resp)
if err != nil {
klog.Warningf("Predicate failed with error %v", err)

if ep.config.ignorable {
return nil, nil
return nil
}
return nil, err
return api.NewFitError(task, node, err.Error())
}

predicateStatus := resp.Status
return predicateStatus, nil
if len(resp.ErrorMessage) == 0 {
return nil
}
// keep compatibility with old behavior: error messages length is not zero,
// but didn't return a code, and code will be 0 for default. Change code to Error for corresponding
if resp.Code == api.Success {
resp.Code = api.Error
}
return api.NewFitErrWithStatus(task, node, &api.Status{Code: resp.Code, Reason: resp.ErrorMessage, Plugin: PluginName})
})
}

Expand Down
Loading
Loading