Skip to content

Commit

Permalink
Expose Provisioning Request status as events for Pods/Jobs (#2147)
Browse files Browse the repository at this point in the history
* emit admission check messages as event

* Unit test for pod group

* Change quotes

* Refactor + unit test

* Comment

* New reason for admission check update

* Shorten message

* Fix typo

* Fix typo
  • Loading branch information
pajakd committed May 10, 2024
1 parent 0f0a315 commit b870144
Show file tree
Hide file tree
Showing 6 changed files with 326 additions and 8 deletions.
17 changes: 9 additions & 8 deletions pkg/controller/jobframework/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ package jobframework

// JobReconciler event reason list
const (
ReasonStarted = "Started"
ReasonSuspended = "Suspended"
ReasonStopped = "Stopped"
ReasonCreatedWorkload = "CreatedWorkload"
ReasonDeletedWorkload = "DeletedWorkload"
ReasonUpdatedWorkload = "UpdatedWorkload"
ReasonFinishedWorkload = "FinishedWorkload"
ReasonErrWorkloadCompose = "ErrWorkloadCompose"
ReasonStarted = "Started"
ReasonSuspended = "Suspended"
ReasonStopped = "Stopped"
ReasonCreatedWorkload = "CreatedWorkload"
ReasonDeletedWorkload = "DeletedWorkload"
ReasonUpdatedWorkload = "UpdatedWorkload"
ReasonFinishedWorkload = "FinishedWorkload"
ReasonErrWorkloadCompose = "ErrWorkloadCompose"
ReasonUpdatedAdmissionCheck = "UpdatedAdmissionCheck"
)
3 changes: 3 additions & 0 deletions pkg/controller/jobframework/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package jobframework
import (
"context"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -111,6 +112,8 @@ type ComposableJob interface {
FindMatchingWorkloads(ctx context.Context, c client.Client, r record.EventRecorder) (match *kueue.Workload, toDelete []*kueue.Workload, err error)
// Stop implements the custom stop procedure for ComposableJob
Stop(ctx context.Context, c client.Client, podSetsInfo []podset.PodSetInfo, stopReason StopReason, eventMsg string) ([]client.Object, error)
// Calls f on each member of the ComposableJob
ForEach(f func(obj runtime.Object))
}

func QueueName(job GenericJob) string {
Expand Down
26 changes: 26 additions & 0 deletions pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/validation"
Expand Down Expand Up @@ -453,6 +454,9 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
return ctrl.Result{}, err
}

if workload.HasQuotaReservation(wl) {
r.recordAdmissionCheckUpdate(wl, job)
}
// update queue name if changed.
q := QueueName(job)
if wl.Spec.QueueName != q {
Expand Down Expand Up @@ -484,6 +488,28 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
return ctrl.Result{}, nil
}

func (r *JobReconciler) recordAdmissionCheckUpdate(wl *kueue.Workload, job GenericJob) {
message := ""
object := job.Object()
for _, check := range wl.Status.AdmissionChecks {
if check.State == kueue.CheckStatePending && check.Message != "" {
if message != "" {
message += "; "
}
message += check.Name + ": " + check.Message
}
}
if message != "" {
if cJob, isComposable := job.(ComposableJob); isComposable {
cJob.ForEach(func(obj runtime.Object) {
r.record.Eventf(obj, corev1.EventTypeNormal, ReasonUpdatedAdmissionCheck, message)
})
} else {
r.record.Eventf(object, corev1.EventTypeNormal, ReasonUpdatedAdmissionCheck, message)
}
}
}

// IsParentJobManaged checks whether the parent job is managed by kueue.
func (r *JobReconciler) IsParentJobManaged(ctx context.Context, jobObj client.Object, namespace string) (bool, error) {
owner := metav1.GetControllerOf(jobObj)
Expand Down
110 changes: 110 additions & 0 deletions pkg/controller/jobs/job/job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2498,6 +2498,116 @@ func TestReconciler(t *testing.T) {
},
wantErr: jobframework.ErrNoMatchingWorkloads,
},
"admission check message is emitted as event for job": {
job: *baseJobWrapper.Clone().
Suspend(true).
Obj(),
wantJob: *baseJobWrapper.Clone().
Suspend(true).
Obj(),
workloads: []kueue.Workload{
*baseWorkloadWrapper.Clone().
Admitted(false).
Active(false).
Queue("foo").
Condition(metav1.Condition{
Type: kueue.WorkloadQuotaReserved,
Status: metav1.ConditionTrue,
Reason: "Reason",
}).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "acName",
State: kueue.CheckStatePending,
Message: "Not admitted, ETA: 2024-02-22T10:36:40Z.",
}).
Obj(),
},
wantWorkloads: []kueue.Workload{
*baseWorkloadWrapper.Clone().
Admitted(false).
Active(false).
Queue("foo").
Condition(metav1.Condition{
Type: kueue.WorkloadQuotaReserved,
Status: metav1.ConditionTrue,
Reason: "Reason",
}).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "acName",
State: kueue.CheckStatePending,
Message: "Not admitted, ETA: 2024-02-22T10:36:40Z.",
}).
Obj(),
},
wantEvents: []utiltesting.EventRecord{
{
Key: types.NamespacedName{Name: "job", Namespace: "ns"},
EventType: "Normal",
Reason: jobframework.ReasonUpdatedAdmissionCheck,
Message: "acName: Not admitted, ETA: 2024-02-22T10:36:40Z.",
},
},
},
"multiple admission check messages are emitted as a single event for job": {
job: *baseJobWrapper.Clone().
Suspend(true).
Obj(),
wantJob: *baseJobWrapper.Clone().
Suspend(true).
Obj(),
workloads: []kueue.Workload{
*baseWorkloadWrapper.Clone().
Admitted(false).
Active(false).
Queue("foo").
Condition(metav1.Condition{
Type: kueue.WorkloadQuotaReserved,
Status: metav1.ConditionTrue,
Reason: "Reason",
}).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "acName1",
State: kueue.CheckStatePending,
Message: "Some message.",
}).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "acName2",
State: kueue.CheckStatePending,
Message: "Another message.",
}).
Obj(),
},
wantWorkloads: []kueue.Workload{
*baseWorkloadWrapper.Clone().
Admitted(false).
Active(false).
Queue("foo").
Condition(metav1.Condition{
Type: kueue.WorkloadQuotaReserved,
Status: metav1.ConditionTrue,
Reason: "Reason",
}).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "acName1",
State: kueue.CheckStatePending,
Message: "Some message.",
}).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "acName2",
State: kueue.CheckStatePending,
Message: "Another message.",
}).
Obj(),
},
wantEvents: []utiltesting.EventRecord{
{
Key: types.NamespacedName{Name: "job", Namespace: "ns"},
EventType: "Normal",
Reason: jobframework.ReasonUpdatedAdmissionCheck,
Message: "acName1: Some message.; acName2: Another message.",
},
},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
Expand Down
10 changes: 10 additions & 0 deletions pkg/controller/jobs/pod/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,16 @@ func (p *Pod) Stop(ctx context.Context, c client.Client, _ []podset.PodSetInfo,
return stoppedNow, nil
}

func (p *Pod) ForEach(f func(obj runtime.Object)) {
if p.isGroup {
for _, pod := range p.list.Items {
f(&pod)
}
} else {
f(&p.pod)
}
}

func SetupIndexes(ctx context.Context, indexer client.FieldIndexer) error {
if err := indexer.IndexField(ctx, &corev1.Pod{}, PodGroupNameCacheKey, IndexPodGroupName); err != nil {
return err
Expand Down
168 changes: 168 additions & 0 deletions pkg/controller/jobs/pod/pod_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3515,6 +3515,174 @@ func TestReconciler(t *testing.T) {
wantWorkloads: nil,
wantErr: errPodGroupLabelsMismatch,
},
"admission check message is recorded as event for a single pod": {
pods: []corev1.Pod{*basePodWrapper.
Clone().
Label("kueue.x-k8s.io/managed", "true").
KueueFinalizer().
KueueSchedulingGate().
Obj()},
wantPods: nil,
workloads: []kueue.Workload{
*utiltesting.MakeWorkload(GetWorkloadNameForPod(basePodWrapper.GetName(), basePodWrapper.GetUID()), "ns").Finalizers(kueue.ResourceInUseFinalizerName).
Active(false).
Condition(metav1.Condition{
Type: kueue.WorkloadQuotaReserved,
Status: metav1.ConditionTrue,
}).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "acName1",
State: kueue.CheckStatePending,
Message: "Not admitted.",
}).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "acName2",
State: kueue.CheckStatePending,
Message: "Test message.",
}).
PodSets(
*utiltesting.MakePodSet(kueue.DefaultPodSetName, 1).
Request(corev1.ResourceCPU, "1").
SchedulingGates(corev1.PodSchedulingGate{Name: "kueue.x-k8s.io/admission"}).
Obj(),
).
Queue("user-queue").
Priority(0).
ControllerReference(corev1.SchemeGroupVersion.WithKind("Pod"), "pod", "test-uid").
ReserveQuota(utiltesting.MakeAdmission("cq").AssignmentPodCount(1).Obj()).
Admitted(false).
Obj(),
},
wantWorkloads: []kueue.Workload{
*utiltesting.MakeWorkload(GetWorkloadNameForPod(basePodWrapper.GetName(), basePodWrapper.GetUID()), "ns").Finalizers(kueue.ResourceInUseFinalizerName).
Active(false).
Condition(metav1.Condition{
Type: kueue.WorkloadQuotaReserved,
Status: metav1.ConditionTrue,
}).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "acName1",
State: kueue.CheckStatePending,
Message: "Not admitted.",
}).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "acName2",
State: kueue.CheckStatePending,
Message: "Test message.",
}).
PodSets(
*utiltesting.MakePodSet(kueue.DefaultPodSetName, 1).
Request(corev1.ResourceCPU, "1").
SchedulingGates(corev1.PodSchedulingGate{Name: "kueue.x-k8s.io/admission"}).
Obj(),
).
Queue("user-queue").
Priority(0).
ControllerReference(corev1.SchemeGroupVersion.WithKind("Pod"), "pod", "test-uid").
ReserveQuota(utiltesting.MakeAdmission("cq").AssignmentPodCount(1).Obj()).
Admitted(false).
Obj(),
},
workloadCmpOpts: defaultWorkloadCmpOpts,
wantEvents: []utiltesting.EventRecord{
{
Key: types.NamespacedName{Name: "pod", Namespace: "ns"},
EventType: "Normal",
Reason: jobframework.ReasonUpdatedAdmissionCheck,
Message: "acName1: Not admitted.; acName2: Test message.",
},
},
},
"admission check message is recorded as event for each pod in the group": {
pods: []corev1.Pod{
*basePodWrapper.
Clone().
Name("pod1").
Label("kueue.x-k8s.io/managed", "true").
KueueFinalizer().
KueueSchedulingGate().
Group("test-group").
GroupTotalCount("2").
Obj(),
*basePodWrapper.
Clone().
Name("pod2").
Label("kueue.x-k8s.io/managed", "true").
KueueFinalizer().
KueueSchedulingGate().
Group("test-group").
GroupTotalCount("2").
Obj(),
},
workloads: []kueue.Workload{
*utiltesting.MakeWorkload("test-group", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
Active(false).
Condition(metav1.Condition{
Type: kueue.WorkloadQuotaReserved,
Status: metav1.ConditionTrue,
}).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "acName",
State: kueue.CheckStatePending,
Message: "Not admitted, ETA: 2024-02-22T10:36:40Z.",
}).
PodSets(
*utiltesting.MakePodSet("dc85db45", 2).
Request(corev1.ResourceCPU, "1").
SchedulingGates(corev1.PodSchedulingGate{Name: "kueue.x-k8s.io/admission"}).
Obj(),
).
Queue("user-queue").
Priority(0).
OwnerReference(corev1.SchemeGroupVersion.WithKind("Pod"), "pod1", "test-uid").
OwnerReference(corev1.SchemeGroupVersion.WithKind("Pod"), "pod2", "test-uid").
ReserveQuota(utiltesting.MakeAdmission("cq").AssignmentPodCount(1).Obj()).
Admitted(false).
Obj(),
},
wantPods: nil,
wantWorkloads: []kueue.Workload{
*utiltesting.MakeWorkload("test-group", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
Active(false).
Condition(metav1.Condition{
Type: kueue.WorkloadQuotaReserved,
Status: metav1.ConditionTrue,
}).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "acName",
State: kueue.CheckStatePending,
Message: "Not admitted, ETA: 2024-02-22T10:36:40Z.",
}).
PodSets(
*utiltesting.MakePodSet("dc85db45", 2).
Request(corev1.ResourceCPU, "1").
SchedulingGates(corev1.PodSchedulingGate{Name: "kueue.x-k8s.io/admission"}).
Obj(),
).
Queue("user-queue").
Priority(0).
OwnerReference(corev1.SchemeGroupVersion.WithKind("Pod"), "pod1", "test-uid").
OwnerReference(corev1.SchemeGroupVersion.WithKind("Pod"), "pod2", "test-uid").
ReserveQuota(utiltesting.MakeAdmission("cq").AssignmentPodCount(1).Obj()).
Admitted(false).
Obj(),
},
workloadCmpOpts: defaultWorkloadCmpOpts,
wantEvents: []utiltesting.EventRecord{
{
Key: types.NamespacedName{Name: "pod1", Namespace: "ns"},
EventType: "Normal",
Reason: jobframework.ReasonUpdatedAdmissionCheck,
Message: "acName: Not admitted, ETA: 2024-02-22T10:36:40Z.",
},
{
Key: types.NamespacedName{Name: "pod2", Namespace: "ns"},
EventType: "Normal",
Reason: jobframework.ReasonUpdatedAdmissionCheck,
Message: "acName: Not admitted, ETA: 2024-02-22T10:36:40Z.",
},
},
},
}

for name, tc := range testCases {
Expand Down

0 comments on commit b870144

Please sign in to comment.