diff --git a/pkg/controller/core/workload_controller.go b/pkg/controller/core/workload_controller.go index f60985900d..0fce70d193 100644 --- a/pkg/controller/core/workload_controller.go +++ b/pkg/controller/core/workload_controller.go @@ -153,20 +153,25 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c // If a deactivated workload is re-activated, we need to reset the RequeueState. if wl.Status.RequeueState != nil && ptr.Deref(wl.Spec.Active, true) && workload.IsEvictedByDeactivation(&wl) { wl.Status.RequeueState = nil + log.Info("[VICENTE] FIRST IF") return ctrl.Result{}, workload.ApplyAdmissionStatus(ctx, r.client, &wl, true) } if len(wl.ObjectMeta.OwnerReferences) == 0 && !wl.DeletionTimestamp.IsZero() { + log.Info("[VICENTE] SECOND IF") return ctrl.Result{}, workload.RemoveFinalizer(ctx, r.client, &wl) } if apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadFinished) { + log.Info("[VICENTE] THIRD IF") return ctrl.Result{}, nil } cqName, cqOk := r.queues.ClusterQueueForWorkload(&wl) if cqOk { + log.Info("[VICENTE] CQOK IF") if updated, err := r.reconcileSyncAdmissionChecks(ctx, &wl, cqName); updated || err != nil { + log.Info("[VICENTE] FOURTH IF") return ctrl.Result{}, err } } @@ -177,7 +182,9 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c if err := workload.ApplyAdmissionStatus(ctx, r.client, &wl, true); err != nil { return ctrl.Result{}, err } + log.Info("[VICENTE] ADMISSION STATUS WAS APPLIED") if workload.IsAdmitted(&wl) { + log.Info("[VICENTE] FIFTH IF") c := apimeta.FindStatusCondition(wl.Status.Conditions, kueue.WorkloadQuotaReserved) r.recorder.Eventf(&wl, corev1.EventTypeNormal, "Admitted", "Admitted by ClusterQueue %v, wait time since reservation was %.0fs", wl.Status.Admission.ClusterQueue, time.Since(c.LastTransitionTime.Time).Seconds()) } @@ -185,14 +192,18 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c } if workload.HasQuotaReservation(&wl) { + log.Info("[VICENTE] SIXTH IF WORKLAOD HAS QUOTA ALREADY") if evictionTriggered, err := r.reconcileCheckBasedEviction(ctx, &wl); evictionTriggered || err != nil { + log.Info("[VICENTE] SIXTH IF WORKLAOD HAS QUOTA reconcileCheckBasedEviction", "err", err) return ctrl.Result{}, err } if updated, err := r.reconcileOnClusterQueueActiveState(ctx, &wl, cqName); updated || err != nil { + log.Info("[VICENTE] SIXTH IF WORKLAOD HAS QUOTA reconcileOnClusterQueueActiveState", "err", err) return ctrl.Result{}, err } - + log.Info("[VICENTE] SIXTH IF WORKLAOD HAS QUOTA reconcileNotReadyTimeout", "wl", wl.Status.Admission) + workload.SyncAdmittedCondition(&wl) return r.reconcileNotReadyTimeout(ctx, req, &wl) } @@ -238,7 +249,7 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c return ctrl.Result{}, client.IgnoreNotFound(err) } } - + log.Info("[VICENTE] NO ERRORS RETURNING") return ctrl.Result{}, nil } @@ -585,37 +596,15 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool { default: // Workload update in the cache is handled here; however, some fields are immutable // and are not supposed to actually change anything. + log.Info("[VICENTE INSIDE UPDATE METHOD UPDATING WORKLOAD IN CACHE") if err := r.cache.UpdateWorkload(oldWl, wlCopy); err != nil { log.Error(err, "Updating workload in cache") } - // This forces you to go through the scheduler to update PodSetAssignments if there's a difference between the - // worker group podSetAssignments.Count of the old and new workload - if features.Enabled(features.DynamicallySizedJobs) && compareAdmissionPodSetAssignmentCount(oldWl.Status.Admission, wlCopy.Status.Admission) { - if !r.queues.UpdateWorkload(oldWl, wlCopy) { - log.V(2).Info("Updated workload due to resize.") - } - } } return true } -func compareAdmissionPodSetAssignmentCount(oldWlAdmission *kueue.Admission, newWlAdmisson *kueue.Admission) bool { - // this is specific to RayClusters, it contains a PodSet of length 2, containing head and workers - oldWlPsa := len(oldWlAdmission.PodSetAssignments) - newWlPsa := len(newWlAdmisson.PodSetAssignments) - if oldWlPsa == newWlPsa { - for i := 0; i < oldWlPsa; i++ { - if oldWlAdmission.PodSetAssignments[i].Name == newWlAdmisson.PodSetAssignments[i].Name { - if oldWlAdmission.PodSetAssignments[i].Count != newWlAdmisson.PodSetAssignments[i].Count { - return true - } - } - } - } - return false -} - func (r *WorkloadReconciler) Generic(e event.GenericEvent) bool { r.log.V(3).Info("Ignore generic event", "obj", klog.KObj(e.Object), "kind", e.Object.GetObjectKind().GroupVersionKind()) return false diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index b6b722dc40..eeedb52875 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -265,9 +265,6 @@ func (s *Scheduler) schedule(ctx context.Context) { s.cache.WaitForPodsReady(ctx) log.V(5).Info("Finished waiting for all admitted workloads to be in the PodsReady condition") } - if features.Enabled(features.DynamicallySizedJobs) && e.status == assumed { - continue - } e.status = nominated if err := s.admit(ctx, e, cq.AdmissionChecks); err != nil { e.inadmissibleMsg = fmt.Sprintf("Failed to admit workload: %v", err) @@ -326,21 +323,8 @@ func (s *Scheduler) nominate(ctx context.Context, workloads []workload.Info, sna ns := corev1.Namespace{} e := entry{Info: w} if s.cache.IsAssumedOrAdmittedWorkload(w) { - if features.Enabled(features.DynamicallySizedJobs) { - // here we want to get the flavors and resources assigned again so that we can update PodSetAssignments - e.assignment, e.preemptionTargets = s.getResizeAssignment(log, &e.Info, &snap) - e.inadmissibleMsg = e.assignment.Message() - e.Info.LastAssignment = &e.assignment.LastState - e.status = assumed - if err := s.updateResizePodSetAssignments(ctx, e); err != nil { - log.Error(err, "Could not apploy admission to assumed workload") - continue - } - - } else { - log.Info("Workload skipped from admission because it's already assumed or admitted", "workload", klog.KObj(w.Obj)) - continue - } + log.Info("Workload skipped from admission because it's already assumed or admitted", "workload", klog.KObj(w.Obj)) + continue } else if workload.HasRetryOrRejectedChecks(w.Obj) { e.inadmissibleMsg = "The workload has failed admission checks" } else if snap.InactiveClusterQueueSets.Has(w.ClusterQueue) { @@ -403,29 +387,6 @@ type partialAssignment struct { preemptionTargets []*workload.Info } -func (s *Scheduler) getResizeAssignment(log logr.Logger, wl *workload.Info, snap *cache.Snapshot) (flavorassigner.Assignment, []*workload.Info) { - cq := snap.ClusterQueues[wl.ClusterQueue] - flvAssigner := flavorassigner.New(wl, cq, snap.ResourceFlavors) - resizeAssignment := flvAssigner.Assign(log, nil) - var faPreemtionTargets []*workload.Info - - arm := resizeAssignment.RepresentativeMode() - if arm == flavorassigner.Fit { - return resizeAssignment, nil - } - - if arm == flavorassigner.Preempt { - faPreemtionTargets = s.preemptor.GetTargets(*wl, resizeAssignment, snap) - } - - // if the feature gate is not enabled or we can preempt - if !features.Enabled(features.PartialAdmission) || len(faPreemtionTargets) > 0 { - return resizeAssignment, faPreemtionTargets - } - - return resizeAssignment, nil -} - func (s *Scheduler) getAssignments(log logr.Logger, wl *workload.Info, snap *cache.Snapshot) (flavorassigner.Assignment, []*workload.Info) { cq := snap.ClusterQueues[wl.ClusterQueue] flvAssigner := flavorassigner.New(wl, cq, snap.ResourceFlavors) @@ -526,28 +487,6 @@ func (s *Scheduler) validateLimitRange(ctx context.Context, wi *workload.Info) e return nil } -// admit sets the admitting clusterQueue and flavors into the workload of -// the entry, and asynchronously updates the object in the apiserver after -// assuming it in the cache. -func (s *Scheduler) updateResizePodSetAssignments(ctx context.Context, e entry) error { - newWorkload := e.Obj.DeepCopy() - admission := &kueue.Admission{ - ClusterQueue: kueue.ClusterQueueReference(e.ClusterQueue), - PodSetAssignments: e.assignment.ToAPI(), - } - - workload.SetQuotaReservation(newWorkload, admission) - _ = workload.SyncAdmittedCondition(newWorkload) - - if e.status == assumed { - // Apply admission means to update the workload with the new admission status, this is for the case of a scale down - // we shouldn't requeue a scale down we should only update the workload - return s.applyAdmission(ctx, newWorkload) - } - - return nil -} - // admit sets the admitting clusterQueue and flavors into the workload of // the entry, and asynchronously updates the object in the apiserver after // assuming it in the cache.