Skip to content

Commit

Permalink
scale down
Browse files Browse the repository at this point in the history
n

m

patch error field not declared in schema

commented out podSet immutability from workload webhook to be able to update that field

added more comments

clean code

nit

debuggin

n

m

patch error field not declared in schema

clean code

n

m

patch error field not declared in schema

commented out podSet immutability from workload webhook to be able to update that field

added more comments

clean code

nit

a

cluster queue reconciliation fixed, it had to do with the infot totalrequests from admission
inside the worklad go file

working with scheduler

cleaning code

cleaning code

cleaning

cleaning

cleaning

integation test, but it messes up with parallelism test which should be expected

updated parallelism it test

updated wrappers

kep

removed Kep

removed log lines

clean code

added a better conditional for updating the resize if the job is a RayCluster

added Kind condition

updated test and equivalentToWorkload condition

added podset assigments check

updated feature gate

updated feature gate

updating equivalentWorkload

fixed lint

removed changes from scheduler and workload controller

testing

updated workload controller reconciler to update spec and status

nit

update feature gate

update variables

made code more generic

updated workload controller helper method

typo
  • Loading branch information
vicentefb committed Apr 10, 2024
1 parent 9b833df commit 6601a49
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 7 deletions.
37 changes: 37 additions & 0 deletions pkg/controller/core/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/constants"
"sigs.k8s.io/kueue/pkg/controller/core/indexer"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/queue"
"sigs.k8s.io/kueue/pkg/util/slices"
"sigs.k8s.io/kueue/pkg/workload"
Expand Down Expand Up @@ -191,6 +192,13 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
if updated, err := r.reconcileOnClusterQueueActiveState(ctx, &wl, cqName); updated || err != nil {
return ctrl.Result{}, err
}
if features.Enabled(features.DynamicallySizedJobs) && downSizeJobIfNecessary(&wl) {
// Update Status
workload.SyncAdmittedCondition(&wl)
if err := workload.ApplyAdmissionStatus(ctx, r.client, &wl, true); err != nil {
return ctrl.Result{}, err
}
}

return r.reconcileNotReadyTimeout(ctx, req, &wl)
}
Expand Down Expand Up @@ -241,6 +249,35 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, nil
}

func downSizeJobIfNecessary(wl *kueue.Workload) bool {
statusUpdate := false
podSetSize := len(wl.Spec.PodSets)
for i := 1; i < podSetSize; i++ {
if ptr.Deref(wl.Status.Admission.PodSetAssignments[i].Count, 0) > wl.Spec.PodSets[i].Count {
// Get Resource Requests and Usage values
originalResourceRequests := wl.Spec.PodSets[i].Template.Spec.Containers[0].Resources.Requests
currentAssignedResourceUsage := wl.Status.Admission.PodSetAssignments[i].ResourceUsage

diff := ptr.Deref(wl.Status.Admission.PodSetAssignments[i].Count, 0) - wl.Spec.PodSets[i].Count
for k := range currentAssignedResourceUsage {
resourceQuantity := originalResourceRequests[k]
resourceQuantity.Mul(int64(diff))
originalResourceRequests[k] = resourceQuantity

assignedResourceQuantity := currentAssignedResourceUsage[k]
assignedResourceQuantity.Sub(originalResourceRequests[k])
currentAssignedResourceUsage[k] = assignedResourceQuantity
}

wl.Status.Admission.PodSetAssignments[i].Count = ptr.To(wl.Spec.PodSets[i].Count)
wl.Status.Admission.PodSetAssignments[i].ResourceUsage = currentAssignedResourceUsage

statusUpdate = true
}
}
return statusUpdate
}

func (r *WorkloadReconciler) reconcileCheckBasedEviction(ctx context.Context, wl *kueue.Workload) (bool, error) {
if apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadEvicted) || !workload.HasRetryOrRejectedChecks(wl) {
return false, nil
Expand Down
5 changes: 5 additions & 0 deletions pkg/controller/jobframework/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ type JobWithReclaimablePods interface {
ReclaimablePods() ([]kueue.ReclaimablePod, error)
}

type SizableJobs interface {
// IsDownsizable returns the true/false depending if the job is being downsized.
IsDownsizable(wl *kueue.Workload) bool
}

type StopReason int

const (
Expand Down
20 changes: 16 additions & 4 deletions pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,17 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
}
}

// 4.1 update podSetCount for RayCluster resize (downsize)
if jobSizeable, implementsSizable := job.(SizableJobs); implementsSizable && jobSizeable.IsDownsizable(wl) && workload.IsAdmitted(wl) {
toUpdate := wl
_, err := r.updateWorkloadToMatchJob(ctx, job, object, toUpdate, "Updated Workload due to resize: %v")
if err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil

}

// 5. handle WaitForPodsReady only for a standalone job.
// handle a job when waitForPodsReady is enabled, and it is the main job
if r.waitForPodsReady {
Expand Down Expand Up @@ -572,7 +583,7 @@ func (r *JobReconciler) ensureOneWorkload(ctx context.Context, job GenericJob, o
}

if toUpdate != nil {
return r.updateWorkloadToMatchJob(ctx, job, object, toUpdate)
return r.updateWorkloadToMatchJob(ctx, job, object, toUpdate, "Updated not matching Workload for suspended job: %v")
}

return match, nil
Expand Down Expand Up @@ -677,7 +688,8 @@ func equivalentToWorkload(ctx context.Context, c client.Client, job GenericJob,
jobPodSets := clearMinCountsIfFeatureDisabled(job.PodSets())

if runningPodSets := expectedRunningPodSets(ctx, c, wl); runningPodSets != nil {
if equality.ComparePodSetSlices(jobPodSets, runningPodSets) {
jobSizeable, implementsSizable := job.(SizableJobs)
if equality.ComparePodSetSlices(jobPodSets, runningPodSets) || (implementsSizable && jobSizeable.IsDownsizable(wl)) {
return true
}
// If the workload is admitted but the job is suspended, do the check
Expand All @@ -690,7 +702,7 @@ func equivalentToWorkload(ctx context.Context, c client.Client, job GenericJob,
return equality.ComparePodSetSlices(jobPodSets, wl.Spec.PodSets)
}

func (r *JobReconciler) updateWorkloadToMatchJob(ctx context.Context, job GenericJob, object client.Object, wl *kueue.Workload) (*kueue.Workload, error) {
func (r *JobReconciler) updateWorkloadToMatchJob(ctx context.Context, job GenericJob, object client.Object, wl *kueue.Workload, message string) (*kueue.Workload, error) {
newWl, err := r.constructWorkload(ctx, job, object)
if err != nil {
return nil, fmt.Errorf("can't construct workload for update: %w", err)
Expand All @@ -705,7 +717,7 @@ func (r *JobReconciler) updateWorkloadToMatchJob(ctx context.Context, job Generi
}

r.record.Eventf(object, corev1.EventTypeNormal, ReasonUpdatedWorkload,
"Updated not matching Workload for suspended job: %v", klog.KObj(wl))
message, klog.KObj(wl))
return newWl, nil
}

Expand Down
17 changes: 17 additions & 0 deletions pkg/controller/jobs/raycluster/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/podset"
)

Expand Down Expand Up @@ -87,6 +88,22 @@ func (j *RayCluster) GVK() schema.GroupVersionKind {
return gvk
}

func (j *RayCluster) IsDownsizable(wl *kueue.Workload) bool {
return j.GVK().Kind == "RayCluster" && features.Enabled(features.DynamicallySizedJobs) && JobHasDownsize(j.PodSets(), wl)
}

// JobHasDownsize checks whether a job has a scale down
func JobHasDownsize(podSets []kueue.PodSet, wl *kueue.Workload) bool {
for i := 1; i < len(podSets); i++ {
jobPodSetCount := podSets[i].Count
workloadPodSetCount := wl.Spec.PodSets[i].Count
if workloadPodSetCount > jobPodSetCount {
return true
}
}
return false
}

func (j *RayCluster) PodSets() []kueue.PodSet {
// len = workerGroups + head
podSets := make([]kueue.PodSet, len(j.Spec.WorkerGroupSpecs)+1)
Expand Down
5 changes: 5 additions & 0 deletions pkg/features/kube_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ const (
//
// Enables lending limit.
LendingLimit featuregate.Feature = "LendingLimit"
// owner: @vicenteferrara
// kep: <TODO>
// alpha: v0.8
DynamicallySizedJobs featuregate.Feature = "DynamicallySizedJobs"
)

func init() {
Expand All @@ -104,6 +108,7 @@ var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
PrioritySortingWithinCohort: {Default: true, PreRelease: featuregate.Beta},
MultiKueue: {Default: false, PreRelease: featuregate.Alpha},
LendingLimit: {Default: false, PreRelease: featuregate.Alpha},
DynamicallySizedJobs: {Default: false, PreRelease: featuregate.Alpha},
}

func SetFeatureGateDuringTest(tb testing.TB, f featuregate.Feature, value bool) func() {
Expand Down
6 changes: 6 additions & 0 deletions pkg/util/testingjobs/raycluster/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ func (j *ClusterWrapper) NodeSelectorHeadGroup(k, v string) *ClusterWrapper {
return j
}

// Set replica count
func (j *ClusterWrapper) SetReplicaCount(c int32) *ClusterWrapper {
j.Spec.WorkerGroupSpecs[0].Replicas = ptr.To(c)
return j
}

// Obj returns the inner Job.
func (j *ClusterWrapper) Obj() *rayv1.RayCluster {
return &j.RayCluster
Expand Down
8 changes: 6 additions & 2 deletions pkg/webhooks/workload_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,15 +347,19 @@ func ValidateWorkloadUpdate(newObj, oldObj *kueue.Workload) field.ErrorList {
allErrs = append(allErrs, ValidateWorkload(newObj)...)

if workload.HasQuotaReservation(oldObj) {
allErrs = append(allErrs, apivalidation.ValidateImmutableField(newObj.Spec.PodSets, oldObj.Spec.PodSets, specPath.Child("podSets"))...)
if !features.Enabled(features.DynamicallySizedJobs) {
allErrs = append(allErrs, apivalidation.ValidateImmutableField(newObj.Spec.PodSets, oldObj.Spec.PodSets, specPath.Child("podSets"))...)
}
allErrs = append(allErrs, apivalidation.ValidateImmutableField(newObj.Spec.PriorityClassSource, oldObj.Spec.PriorityClassSource, specPath.Child("priorityClassSource"))...)
allErrs = append(allErrs, apivalidation.ValidateImmutableField(newObj.Spec.PriorityClassName, oldObj.Spec.PriorityClassName, specPath.Child("priorityClassName"))...)
}
if workload.HasQuotaReservation(newObj) && workload.HasQuotaReservation(oldObj) {
allErrs = append(allErrs, apivalidation.ValidateImmutableField(newObj.Spec.QueueName, oldObj.Spec.QueueName, specPath.Child("queueName"))...)
allErrs = append(allErrs, validateReclaimablePodsUpdate(newObj, oldObj, field.NewPath("status", "reclaimablePods"))...)
}
allErrs = append(allErrs, validateAdmissionUpdate(newObj.Status.Admission, oldObj.Status.Admission, field.NewPath("status", "admission"))...)
if !features.Enabled(features.DynamicallySizedJobs) {
allErrs = append(allErrs, validateAdmissionUpdate(newObj.Status.Admission, oldObj.Status.Admission, field.NewPath("status", "admission"))...)
}
allErrs = append(allErrs, validateImmutablePodSetUpdates(newObj, oldObj, statusPath.Child("admissionChecks"))...)

return allErrs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"sigs.k8s.io/kueue/pkg/controller/jobframework"
workloadraycluster "sigs.k8s.io/kueue/pkg/controller/jobs/raycluster"
_ "sigs.k8s.io/kueue/pkg/controller/jobs/rayjob" // to enable the framework
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/util/testing"
testingraycluster "sigs.k8s.io/kueue/pkg/util/testingjobs/raycluster"
testingrayjob "sigs.k8s.io/kueue/pkg/util/testingjobs/rayjob"
Expand Down Expand Up @@ -205,7 +206,7 @@ var _ = ginkgo.Describe("RayCluster controller", ginkgo.Ordered, ginkgo.Continue
return apimeta.IsStatusConditionTrue(createdWorkload.Status.Conditions, kueue.WorkloadQuotaReserved)
}, util.Timeout, util.Interval).Should(gomega.BeTrue())

ginkgo.By("checking the job gets suspended when parallelism changes and the added node selectors are removed")
ginkgo.By("checking the job is suspended when parallelism increases and the added node selectors are removed")
parallelism := ptr.Deref(job.Spec.WorkerGroupSpecs[0].Replicas, 1)
newParallelism := parallelism + 1
createdJob.Spec.WorkerGroupSpecs[0].Replicas = &newParallelism
Expand Down Expand Up @@ -632,6 +633,69 @@ var _ = ginkgo.Describe("RayCluster Job controller interacting with scheduler",
util.ExpectPendingWorkloadsMetric(clusterQueue, 0, 0)
util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 1)
})

gomega.Eventually(func() bool {
if err := features.SetEnable(features.DynamicallySizedJobs, true); err != nil {
return false
}
return features.Enabled(features.DynamicallySizedJobs)
}, util.Timeout, util.Interval).Should(gomega.BeTrue())

ginkgo.It("Should not suspend job when there's a scale down", func() {
ginkgo.By("creating localQueue")
localQueue = testing.MakeLocalQueue("local-queue", ns.Name).ClusterQueue(clusterQueue.Name).Obj()
gomega.Expect(k8sClient.Create(ctx, localQueue)).Should(gomega.Succeed())

ginkgo.By("checking a dev job starts")
job := testingraycluster.MakeCluster("dev-job", ns.Name).SetReplicaCount(4).Queue(localQueue.Name).
RequestHead(corev1.ResourceCPU, "1").
RequestWorkerGroup(corev1.ResourceCPU, "1").
Obj()
gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed())
createdJob := &rayv1.RayCluster{}
gomega.Eventually(func() bool {
gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: job.Name, Namespace: job.Namespace}, createdJob)).
Should(gomega.Succeed())
return *createdJob.Spec.Suspend
}, util.Timeout, util.Interval).Should(gomega.BeFalse())
gomega.Expect(createdJob.Spec.HeadGroupSpec.Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(spotUntaintedFlavor.Name))
gomega.Expect(createdJob.Spec.WorkerGroupSpecs[0].Template.Spec.NodeSelector[instanceKey]).Should(gomega.Equal(onDemandFlavor.Name))
util.ExpectPendingWorkloadsMetric(clusterQueue, 0, 0)
util.ExpectReservingActiveWorkloadsMetric(clusterQueue, 1)

ginkgo.By("reduce the number of replicas, check the job is not suspended")
replicaCount := ptr.Deref(job.Spec.WorkerGroupSpecs[0].Replicas, 1)
newReplicaCount := replicaCount - 2
createdJob.Spec.WorkerGroupSpecs[0].Replicas = &newReplicaCount
gomega.Expect(k8sClient.Update(ctx, createdJob)).Should(gomega.Succeed())
gomega.Eventually(func() bool {
gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: job.Name, Namespace: job.Namespace}, createdJob)).
Should(gomega.Succeed())
return *createdJob.Spec.Suspend
}, util.Timeout, util.Interval).Should(gomega.BeFalse())

ginkgo.By("checking the workload is updated with new count")
createdWorkload := &kueue.Workload{}
wlLookupKey := types.NamespacedName{Name: workloadraycluster.GetWorkloadNameForRayCluster(job.Name, job.UID), Namespace: ns.Name}

gomega.Eventually(func() error {
return k8sClient.Get(ctx, wlLookupKey, createdWorkload)
}, util.Timeout, util.Interval).Should(gomega.Succeed())
gomega.Eventually(func() bool {
if err := k8sClient.Get(ctx, wlLookupKey, createdWorkload); err != nil {
return false
}
return createdWorkload.Spec.PodSets[1].Count == newReplicaCount
}, util.Timeout, util.Interval).Should(gomega.BeTrue())
gomega.Eventually(func() bool {
if err := k8sClient.Get(ctx, wlLookupKey, createdWorkload); err != nil {
return false
}
return *createdWorkload.Status.Admission.PodSetAssignments[1].Count == newReplicaCount
}, util.Timeout, util.Interval).Should(gomega.BeTrue())

})

})

var _ = ginkgo.Describe("Job controller with preemption enabled", ginkgo.Ordered, ginkgo.ContinueOnFailure, func() {
Expand Down

0 comments on commit 6601a49

Please sign in to comment.