Skip to content

Commit

Permalink
Passing parameters to ProvisioningRequest from annotations
Browse files Browse the repository at this point in the history
  • Loading branch information
PBundyra committed Mar 26, 2024
1 parent 9dd55ae commit 2b8b01c
Show file tree
Hide file tree
Showing 16 changed files with 277 additions and 36 deletions.
27 changes: 20 additions & 7 deletions pkg/controller/admissionchecks/provisioning/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"maps"
"regexp"
"strconv"
"strings"
"time"

corev1 "k8s.io/api/core/v1"
Expand All @@ -45,6 +46,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/controller/constants"
"sigs.k8s.io/kueue/pkg/podset"
"sigs.k8s.io/kueue/pkg/util/admissioncheck"
"sigs.k8s.io/kueue/pkg/util/api"
Expand Down Expand Up @@ -173,7 +175,7 @@ func (c *Controller) activeOrLastPRForChecks(ctx context.Context, wl *kueue.Work
// PRs relevant for the admission check
if matches(req, wl.Name, checkName) {
prc, err := c.helper.ConfigForAdmissionCheck(ctx, checkName)
if err == nil && c.reqIsNeeded(ctx, wl, prc) && requestHasParameters(req, prc) {
if err == nil && c.reqIsNeeded(ctx, wl, prc) && provReqSyncedWithConfig(req, prc) {
if currPr, exists := activeOrLastPRForChecks[checkName]; !exists || getAttempt(ctx, currPr, wl.Name, checkName) < getAttempt(ctx, req, wl.Name, checkName) {
activeOrLastPRForChecks[checkName] = req
}
Expand Down Expand Up @@ -267,6 +269,7 @@ func (c *Controller) syncOwnedProvisionRequest(ctx context.Context, wl *kueue.Wo
Parameters: parametersKueueToProvisioning(prc.Spec.Parameters),
},
}
passProvReqParams(wl, req)

expectedPodSets := requiredPodSets(wl.Spec.PodSets, prc.Spec.ManagedResources)
psaMap := slices.ToRefMap(wl.Status.Admission.PodSetAssignments, func(p *kueue.PodSetAssignment) string { return p.Name })
Expand Down Expand Up @@ -434,21 +437,31 @@ func parametersKueueToProvisioning(in map[string]kueue.Parameter) map[string]aut
return out
}

func requestHasParameters(req *autoscaling.ProvisioningRequest, prc *kueue.ProvisioningRequestConfig) bool {
// provReqSyncedWithConfig checks if the provisioning request has the same provisioningClassName as the provisioning request config
// and contains all the parameters from the config
func provReqSyncedWithConfig(req *autoscaling.ProvisioningRequest, prc *kueue.ProvisioningRequestConfig) bool {
if req.Spec.ProvisioningClassName != prc.Spec.ProvisioningClassName {
return false
}
if len(req.Spec.Parameters) != len(prc.Spec.Parameters) {
return false
}
for k, vReq := range req.Spec.Parameters {
if vCfg, found := prc.Spec.Parameters[k]; !found || vReq != autoscaling.Parameter(vCfg) {
for k, vCfg := range prc.Spec.Parameters {
if vReq, found := req.Spec.Parameters[k]; !found || string(vReq) != string(vCfg) {
return false
}
}
return true
}

// passProvReqParams extracts from Workload's annotations ones that should be passed to ProvisioningRequest
func passProvReqParams(wl *kueue.Workload, req *autoscaling.ProvisioningRequest) {
if req.Spec.Parameters == nil {
req.Spec.Parameters = make(map[string]autoscaling.Parameter, 0)
}
for annotation, val := range admissioncheck.FilterProvReqAnnotations(wl.Annotations) {
paramName := strings.TrimPrefix(annotation, constants.ProvReqAnnotationPrefix)
req.Spec.Parameters[paramName] = autoscaling.Parameter(val)
}
}

func (c *Controller) syncCheckStates(ctx context.Context, wl *kueue.Workload, checks []string, activeOrLastPRForChecks map[string]*autoscaling.ProvisioningRequest) error {
log := ctrl.LoggerFrom(ctx)
checksMap := slices.ToRefMap(wl.Status.AdmissionChecks, func(c *kueue.AdmissionCheckState) string { return c.Name })
Expand Down
43 changes: 43 additions & 0 deletions pkg/controller/admissionchecks/provisioning/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ func TestReconcile(t *testing.T) {
}).
Obj()

basePodSet := []autoscaling.PodSet{{PodTemplateRef: autoscaling.Reference{Name: "ppt-wl-check1-1-main"}, Count: 1}}

baseWorkloadWithCheck1Ready := baseWorkload.DeepCopy()
workload.SetAdmissionCheckState(&baseWorkloadWithCheck1Ready.Status.AdmissionChecks, kueue.AdmissionCheckState{
Name: "check1",
Expand Down Expand Up @@ -311,6 +313,47 @@ func TestReconcile(t *testing.T) {
},
},
},
"workload with provreq annotation": {
workload: utiltesting.MakeWorkload("wl", TestNamespace).
Annotations(map[string]string{
"provreq.kueue.x-k8s.io/ValidUntilSeconds": "0",
"invalid-provreq-prefix/Foo1": "Bar1",
"another-invalid-provreq-prefix/Foo2": "Bar2"}).
AdmissionChecks(kueue.AdmissionCheckState{
Name: "check1",
State: kueue.CheckStatePending}).
ReserveQuota(utiltesting.MakeAdmission("q1").Obj()).
Obj(),
checks: []kueue.AdmissionCheck{*baseCheck.DeepCopy()},
configs: []kueue.ProvisioningRequestConfig{{ObjectMeta: metav1.ObjectMeta{Name: "config1"}}},
wantRequests: map[string]*autoscaling.ProvisioningRequest{
GetProvisioningRequestName("wl", baseCheck.Name, 1): {
ObjectMeta: metav1.ObjectMeta{
Namespace: TestNamespace,
Name: GetProvisioningRequestName("wl", baseCheck.Name, 1),
OwnerReferences: []metav1.OwnerReference{
{
Name: "wl",
},
},
},
Spec: autoscaling.ProvisioningRequestSpec{
Parameters: map[string]autoscaling.Parameter{
"ValidUntilSeconds": "0",
},
PodSets: basePodSet,
},
},
},
wantEvents: []utiltesting.EventRecord{
{
Key: client.ObjectKeyFromObject(baseWorkload),
EventType: corev1.EventTypeNormal,
Reason: "ProvisioningRequestCreated",
Message: `Created ProvisioningRequest: "wl-check1-1"`,
},
},
},
"remove unnecessary requests": {
workload: baseWorkload.DeepCopy(),
checks: []kueue.AdmissionCheck{*baseCheck.DeepCopy()},
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,7 @@ const (
// workloadPriorityClass name.
// This label is always mutable because it might be useful for the preemption.
WorkloadPriorityClassLabel = "kueue.x-k8s.io/priority-class"

// ProvReqAnnotationPrefix is the prefix for annotations that should be pass to ProvisioningRequest as Parameters.
ProvReqAnnotationPrefix = "provreq.kueue.x-k8s.io/"
)
10 changes: 6 additions & 4 deletions pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"sigs.k8s.io/kueue/pkg/controller/core/indexer"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/podset"
"sigs.k8s.io/kueue/pkg/util/admissioncheck"
"sigs.k8s.io/kueue/pkg/util/equality"
"sigs.k8s.io/kueue/pkg/util/kubeversion"
"sigs.k8s.io/kueue/pkg/util/maps"
Expand Down Expand Up @@ -803,10 +804,11 @@ func (r *JobReconciler) constructWorkload(ctx context.Context, job GenericJob, o

wl := &kueue.Workload{
ObjectMeta: metav1.ObjectMeta{
Name: GetWorkloadNameForOwnerWithGVK(object.GetName(), object.GetUID(), job.GVK()),
Namespace: object.GetNamespace(),
Labels: map[string]string{},
Finalizers: []string{kueue.ResourceInUseFinalizerName},
Name: GetWorkloadNameForOwnerWithGVK(object.GetName(), object.GetUID(), job.GVK()),
Namespace: object.GetNamespace(),
Labels: map[string]string{},
Finalizers: []string{kueue.ResourceInUseFinalizerName},
Annotations: admissioncheck.FilterProvReqAnnotations(job.Object().GetAnnotations()),
},
Spec: kueue.WorkloadSpec{
PodSets: podSets,
Expand Down
32 changes: 32 additions & 0 deletions pkg/controller/jobs/job/job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,38 @@ func TestReconciler(t *testing.T) {
wantEvents []utiltesting.EventRecord
wantErr error
}{
"when workload is created, it has its owner ProvReq annotations": {
job: *baseJobWrapper.Clone().
SetAnnotation(controllerconsts.ProvReqAnnotationPrefix+"test-annotation", "test-val").
SetAnnotation("invalid-provreq-prefix/test-annotation-2", "test-val-2").
UID("test-uid").
Obj(),
wantJob: *baseJobWrapper.Clone().
SetAnnotation(controllerconsts.ProvReqAnnotationPrefix+"test-annotation", "test-val").
SetAnnotation("invalid-provreq-prefix/test-annotation-2", "test-val-2").
UID("test-uid").
Suspend(true).
Obj(),
wantWorkloads: []kueue.Workload{
*utiltesting.MakeWorkload("job", "ns").
Annotations(map[string]string{controllerconsts.ProvReqAnnotationPrefix + "test-annotation": "test-val"}).
Finalizers(kueue.ResourceInUseFinalizerName).
PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 10).Request(corev1.ResourceCPU, "1").Obj()).
Queue("foo").
Priority(0).
Labels(map[string]string{controllerconsts.JobUIDLabel: "test-uid"}).
Obj(),
},

wantEvents: []utiltesting.EventRecord{
{
Key: types.NamespacedName{Name: "job", Namespace: "ns"},
EventType: "Normal",
Reason: "CreatedWorkload",
Message: "Created Workload: ns/" + GetWorkloadNameForJob(baseJobWrapper.Name, types.UID("test-uid")),
},
},
},
"when workload is admitted the PodSetUpdates are propagated to job": {
job: *baseJobWrapper.Clone().
Obj(),
Expand Down
23 changes: 17 additions & 6 deletions pkg/controller/jobs/jobset/jobset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/constants"
controllerconsts "sigs.k8s.io/kueue/pkg/controller/constants"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
testingjobset "sigs.k8s.io/kueue/pkg/util/testingjobs/jobset"
Expand Down Expand Up @@ -202,7 +203,8 @@ var (
}
workloadCmpOpts = []cmp.Option{
cmpopts.EquateEmpty(),
cmpopts.IgnoreFields(kueue.Workload{}, "TypeMeta", "ObjectMeta"),
cmpopts.IgnoreFields(kueue.Workload{}, "TypeMeta"),
cmpopts.IgnoreFields(metav1.ObjectMeta{}, "Name", "Labels", "ResourceVersion", "OwnerReferences", "Finalizers"),
cmpopts.IgnoreFields(kueue.WorkloadSpec{}, "Priority"),
cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime"),
cmpopts.IgnoreFields(kueue.PodSet{}, "Template"),
Expand All @@ -223,7 +225,7 @@ func TestReconciler(t *testing.T) {
wantWorkloads []kueue.Workload
wantErr error
}{
"workload is created with podsets": {
"workload is created with podsets and a ProvReq annotation": {
reconcilerOptions: []jobframework.Option{
jobframework.WithManageJobsWithoutQueueName(true),
},
Expand All @@ -239,8 +241,12 @@ func TestReconciler(t *testing.T) {
Replicas: 2,
Completions: 2,
Parallelism: 2,
},
).Obj(),
}).
Annotations(map[string]string{
controllerconsts.ProvReqAnnotationPrefix + "test-annotation": "test-val",
"invalid-provreq-prefix/test-annotation-2": "test-val-2",
}).
Obj(),
wantJob: testingjobset.MakeJobSet("jobset", "ns").ReplicatedJobs(
testingjobset.ReplicatedJobRequirements{
Name: "replicated-job-1",
Expand All @@ -253,10 +259,15 @@ func TestReconciler(t *testing.T) {
Replicas: 2,
Completions: 2,
Parallelism: 2,
},
).Obj(),
}).
Annotations(map[string]string{
controllerconsts.ProvReqAnnotationPrefix + "test-annotation": "test-val",
"invalid-provreq-prefix/test-annotation-2": "test-val-2",
}).
Obj(),
wantWorkloads: []kueue.Workload{
*utiltesting.MakeWorkload("jobset", "ns").
Annotations(map[string]string{controllerconsts.ProvReqAnnotationPrefix + "test-annotation": "test-val"}).
PodSets(
*utiltesting.MakePodSet("replicated-job-1", 1).Obj(),
*utiltesting.MakePodSet("replicated-job-2", 4).Obj(),
Expand Down
31 changes: 29 additions & 2 deletions pkg/controller/jobs/kubeflow/jobs/mxjob/mxjob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
controllerconsts "sigs.k8s.io/kueue/pkg/controller/constants"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
testingmxjob "sigs.k8s.io/kueue/pkg/util/testingjobs/mxjob"
Expand Down Expand Up @@ -301,8 +302,8 @@ var (
}
workloadCmpOpts = cmp.Options{
cmpopts.EquateEmpty(),
cmpopts.IgnoreFields(kueue.Workload{}, "TypeMeta", "ObjectMeta"),
cmpopts.IgnoreFields(kueue.WorkloadSpec{}, "Priority"),
cmpopts.IgnoreFields(kueue.Workload{}, "TypeMeta"),
cmpopts.IgnoreFields(metav1.ObjectMeta{}, "Name", "Labels", "ResourceVersion", "OwnerReferences", "Finalizers"), cmpopts.IgnoreFields(kueue.WorkloadSpec{}, "Priority"),
cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime"),
cmpopts.IgnoreFields(kueue.PodSet{}, "Template"),
}
Expand Down Expand Up @@ -334,6 +335,32 @@ func TestReconciler(t *testing.T) {
Obj(),
},
},
"workload is created with a ProvReq annotation": {
reconcilerOptions: []jobframework.Option{
jobframework.WithManageJobsWithoutQueueName(true),
},
job: testingmxjob.MakeMXJob("mxjob", "ns").
Annotations(map[string]string{
controllerconsts.ProvReqAnnotationPrefix + "test-annotation": "test-val",
"invalid-provreq-prefix/test-annotation-2": "test-val-2",
}).
Obj(),
wantJob: testingmxjob.MakeMXJob("mxjob", "ns").
Annotations(map[string]string{
controllerconsts.ProvReqAnnotationPrefix + "test-annotation": "test-val",
"invalid-provreq-prefix/test-annotation-2": "test-val-2",
}).Obj(),
wantWorkloads: []kueue.Workload{
*utiltesting.MakeWorkload("mxjob", "ns").
Annotations(map[string]string{controllerconsts.ProvReqAnnotationPrefix + "test-annotation": "test-val"}).
PodSets(
*utiltesting.MakePodSet("scheduler", 1).Obj(),
*utiltesting.MakePodSet("server", 1).Obj(),
*utiltesting.MakePodSet("worker", 1).Obj(),
).
Obj(),
},
},
"workload isn't created due to manageJobsWithoutQueueName=false": {
reconcilerOptions: []jobframework.Option{
jobframework.WithManageJobsWithoutQueueName(false),
Expand Down
8 changes: 5 additions & 3 deletions pkg/controller/jobs/pod/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
controllerconsts "sigs.k8s.io/kueue/pkg/controller/constants"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/podset"
"sigs.k8s.io/kueue/pkg/util/admissioncheck"
"sigs.k8s.io/kueue/pkg/util/kubeversion"
"sigs.k8s.io/kueue/pkg/util/parallelize"
utilslices "sigs.k8s.io/kueue/pkg/util/slices"
Expand Down Expand Up @@ -897,9 +898,10 @@ func (p *Pod) ConstructComposableWorkload(ctx context.Context, c client.Client,

wl := &kueue.Workload{
ObjectMeta: metav1.ObjectMeta{
Namespace: p.pod.GetNamespace(),
Labels: map[string]string{},
Finalizers: []string{kueue.ResourceInUseFinalizerName},
Namespace: p.pod.GetNamespace(),
Labels: map[string]string{},
Finalizers: []string{kueue.ResourceInUseFinalizerName},
Annotations: admissioncheck.FilterProvReqAnnotations(p.pod.GetAnnotations()),
},
Spec: kueue.WorkloadSpec{
QueueName: jobframework.QueueName(p),
Expand Down

0 comments on commit 2b8b01c

Please sign in to comment.