Skip to content

Commit

Permalink
[scheduler] Check admission check consistency before admission.
Browse files Browse the repository at this point in the history
  • Loading branch information
trasc committed Sep 19, 2023
1 parent 2426402 commit 8339e25
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 4 deletions.
9 changes: 7 additions & 2 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -228,7 +229,7 @@ func (s *Scheduler) schedule(ctx context.Context) {
log.V(5).Info("Finished waiting for all admitted workloads to be in the PodsReady condition")
}
e.status = nominated
if err := s.admit(ctx, e); err != nil {
if err := s.admit(ctx, e, cq.AdmissionChecks); err != nil {
e.inadmissibleMsg = fmt.Sprintf("Failed to admit workload: %v", err)
}
}
Expand Down Expand Up @@ -419,7 +420,7 @@ func (s *Scheduler) validateLimitRange(ctx context.Context, wi *workload.Info) e
// admit sets the admitting clusterQueue and flavors into the workload of
// the entry, and asynchronously updates the object in the apiserver after
// assuming it in the cache.
func (s *Scheduler) admit(ctx context.Context, e *entry) error {
func (s *Scheduler) admit(ctx context.Context, e *entry, mustHaveChecks sets.Set[string]) error {
log := ctrl.LoggerFrom(ctx)
newWorkload := e.Obj.DeepCopy()
admission := &kueue.Admission{
Expand All @@ -428,6 +429,10 @@ func (s *Scheduler) admit(ctx context.Context, e *entry) error {
}

workload.SetQuotaReservation(newWorkload, admission)
if workload.HasAllChecks(newWorkload, mustHaveChecks) {
// sync Admitted, ignore the result since an API update is always done.
_ = workload.SyncAdmittedCondition(newWorkload)
}
if err := s.cache.AssumeWorkload(newWorkload); err != nil {
return err
}
Expand Down
16 changes: 14 additions & 2 deletions pkg/workload/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -316,8 +317,6 @@ func SetQuotaReservation(w *kueue.Workload, admission *kueue.Admission) {
evictedCond.LastTransitionTime = metav1.Now()
}

// sync Admitted, ignore the result since an API update is always done.
_ = SyncAdmittedCondition(w)
}

func SetEvictedCondition(w *kueue.Workload, reason string, message string) {
Expand Down Expand Up @@ -444,6 +443,19 @@ func HasAllChecksReady(wl *kueue.Workload) bool {
return true
}

// Returns true if all the mustHaveChecks are present in the workload.
func HasAllChecks(wl *kueue.Workload, mustHaveChecks sets.Set[string]) bool {
if mustHaveChecks.Len() == 0 {
return true
}

mustHaveChecks = mustHaveChecks.Clone()
for i := range wl.Status.AdmissionChecks {
mustHaveChecks.Delete(wl.Status.AdmissionChecks[i].Type)
}
return mustHaveChecks.Len() == 0
}

// Returns true if any of the workloads checks are Retry or Rejected
func HasRetryOrRejectedChecks(wl *kueue.Workload) bool {
for i := range wl.Status.AdmissionChecks {
Expand Down

0 comments on commit 8339e25

Please sign in to comment.