Skip to content

Commit

Permalink
Fix bugs in pod reconciler, add integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
achernevskii committed Nov 17, 2023
1 parent 5695e32 commit 09c09ce
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 57 deletions.
62 changes: 29 additions & 33 deletions pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"errors"
"fmt"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -133,20 +134,16 @@ func NewReconciler(
}
}

func (r *JobReconciler) ReconcileGenericJobWrapper(ctx context.Context, req ctrl.Request, job GenericJob) (ctrl.Result, error) {
func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Request, job GenericJob) (result ctrl.Result, err error) {
object := job.Object()
log := ctrl.LoggerFrom(ctx).WithValues("job", req.String(), "gvk", job.GVK())
ctx = ctrl.LoggerInto(ctx, log)

result, err := r.ReconcileGenericJob(ctx, req, job)
defer func() {
err = r.ignoreUnretryableError(log, err)
}()

return result, r.ignoreUnretryableError(ctx, err)
}

func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Request, job GenericJob) (ctrl.Result, error) {
object := job.Object()
log := ctrl.LoggerFrom(ctx)

err := r.client.Get(ctx, req.NamespacedName, object)
err = r.client.Get(ctx, req.NamespacedName, object)

if jws, implements := job.(JobWithSkip); implements {
if jws.Skip() {
Expand Down Expand Up @@ -514,7 +511,7 @@ func equivalentToWorkload(job GenericJob, wl *kueue.Workload) bool {
return false
}

jobPodSets := ResetMinCounts(job.PodSets())
jobPodSets := ClearMinCountsIfFeatureDisabled(job.PodSets())

if !workload.CanBePartiallyAdmitted(wl) || !workload.HasQuotaReservation(wl) {
// the two sets should fully match.
Expand Down Expand Up @@ -644,16 +641,7 @@ func (r *JobReconciler) constructWorkload(ctx context.Context, job GenericJob, o
return nil, err
}

wl.Spec.PodSets = ResetMinCounts(wl.Spec.PodSets)

priorityClassName, source, p, err := r.extractPriority(ctx, wl.Spec.PodSets, job)
if err != nil {
return nil, err
}

wl.Spec.PriorityClassName = priorityClassName
wl.Spec.Priority = &p
wl.Spec.PriorityClassSource = source
wl.Spec.PodSets = ClearMinCountsIfFeatureDisabled(wl.Spec.PodSets)

return wl, nil
}
Expand All @@ -668,7 +656,7 @@ func (r *JobReconciler) constructWorkload(ctx context.Context, job GenericJob, o
Finalizers: []string{kueue.ResourceInUseFinalizerName},
},
Spec: kueue.WorkloadSpec{
PodSets: ResetMinCounts(podSets),
PodSets: ClearMinCountsIfFeatureDisabled(podSets),
QueueName: QueueName(job),
},
}
Expand All @@ -684,19 +672,24 @@ func (r *JobReconciler) constructWorkload(ctx context.Context, job GenericJob, o
)
}

priorityClassName, source, p, err := r.extractPriority(ctx, podSets, job)
if err != nil {
if err := ctrl.SetControllerReference(object, wl, r.client.Scheme()); err != nil {
return nil, err
}
return wl, nil
}

// prepareWorkload adds the priority information for the constructed workload
func (r *JobReconciler) prepareWorkload(ctx context.Context, job GenericJob, wl *kueue.Workload) error {
priorityClassName, source, p, err := r.extractPriority(ctx, wl.Spec.PodSets, job)
if err != nil {
return err
}

wl.Spec.PriorityClassName = priorityClassName
wl.Spec.Priority = &p
wl.Spec.PriorityClassSource = source

if err := ctrl.SetControllerReference(object, wl, r.client.Scheme()); err != nil {
return nil, err
}
return wl, nil
return nil
}

func (r *JobReconciler) extractPriority(ctx context.Context, podSets []kueue.PodSet, job GenericJob) (string, string, int32, error) {
Expand Down Expand Up @@ -764,6 +757,10 @@ func (r *JobReconciler) handleJobWithNoWorkload(ctx context.Context, job Generic
if err != nil {
return err
}
err = r.prepareWorkload(ctx, job, wl)
if err != nil {
return err
}
if err = r.client.Create(ctx, wl); err != nil {
return err
}
Expand All @@ -772,9 +769,8 @@ func (r *JobReconciler) handleJobWithNoWorkload(ctx context.Context, job Generic
return nil
}

func (r *JobReconciler) ignoreUnretryableError(ctx context.Context, err error) error {
func (r *JobReconciler) ignoreUnretryableError(log logr.Logger, err error) error {
if IsUnretryableError(err) {
log := ctrl.LoggerFrom(ctx)
log.V(2).Info("Received an unretryable error", "error", err)
return nil
}
Expand Down Expand Up @@ -832,7 +828,7 @@ type genericReconciler struct {
}

func (r *genericReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
return r.jr.ReconcileGenericJobWrapper(ctx, req, r.newJob())
return r.jr.ReconcileGenericJob(ctx, req, r.newJob())
}

func (r *genericReconciler) SetupWithManager(mgr ctrl.Manager) error {
Expand All @@ -845,8 +841,8 @@ func (r *genericReconciler) SetupWithManager(mgr ctrl.Manager) error {
return b.Complete(r)
}

// ResetMinCounts resets the minCount for all podSets if the PartialAdmission feature is not enabled
func ResetMinCounts(in []kueue.PodSet) []kueue.PodSet {
// ClearMinCountsIfFeatureDisabled sets the minCount for all podSets to nil if the PartialAdmission feature is not enabled
func ClearMinCountsIfFeatureDisabled(in []kueue.PodSet) []kueue.PodSet {
if features.Enabled(features.PartialAdmission) || len(in) == 0 {
return in
}
Expand Down
26 changes: 17 additions & 9 deletions pkg/controller/jobs/pod/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,11 @@ func (p *Pod) constructGroupPodSets(podsInGroup corev1.PodList) ([]kueue.PodSet,
var resultPodSets []kueue.PodSet

for _, podInGroup := range podsInGroup.Items {
// Skip failed pods
if podInGroup.Status.Phase == corev1.PodFailed {
break
}

tc, err := strconv.Atoi(podInGroup.GetAnnotations()[GroupTotalCountAnnotation])
if err != nil {
return nil, fmt.Errorf("failed to extract '%s' annotation from the pod '%s': %w",
Expand Down Expand Up @@ -353,7 +358,7 @@ func (p *Pod) ConstructComposableWorkload(ctx context.Context, c client.Client,
} else {
if err := c.List(ctx, &podsInGroup, client.MatchingLabels{
GroupNameLabel: p.groupName(),
}); err != nil {
}, client.InNamespace(p.Namespace)); err != nil {
return nil, err
}

Expand Down Expand Up @@ -439,7 +444,7 @@ func (p *Pod) FindMatchingWorkloads(ctx context.Context, c client.Client) (*kueu
var podsInGroup corev1.PodList
if err := c.List(ctx, &podsInGroup, client.MatchingLabels{
GroupNameLabel: p.groupName(),
}); err != nil {
}, client.InNamespace(p.Namespace)); err != nil {
return nil, nil, err
}

Expand All @@ -456,21 +461,24 @@ func (p *Pod) FindMatchingWorkloads(ctx context.Context, c client.Client) (*kueu
}

func (p *Pod) equivalentToWorkload(wl *kueue.Workload, jobPodSets []kueue.PodSet) bool {
workloadFinished := apimeta.FindStatusCondition(wl.Status.Conditions, kueue.WorkloadFinished) != nil
workloadFinished := apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadFinished)

if wl.GetName() != p.groupName() {
return false
}

if !workloadFinished && len(jobPodSets) != len(wl.Spec.PodSets) {
if !workloadFinished && len(wl.Spec.PodSets) < len(jobPodSets) {
return false
}

for i := range wl.Spec.PodSets {
if !workloadFinished && wl.Spec.PodSets[i].Count != jobPodSets[i].Count {
if i >= len(jobPodSets) {
return true
}
if !workloadFinished && wl.Spec.PodSets[i].Count < jobPodSets[i].Count {
return false
}
if i < len(jobPodSets) && wl.Spec.PodSets[i].Name != jobPodSets[i].Name {
if wl.Spec.PodSets[i].Name != jobPodSets[i].Name {
return false
}
}
Expand Down Expand Up @@ -516,13 +524,13 @@ func (p *Pod) IsComposableJobFinished(ctx context.Context, c client.Client) (met
Status: metav1.ConditionTrue,
Reason: "JobFinished",
Message: fmt.Sprintf(
"Pod group has finished. Pods succeeded: %d/%d. Pods failed: %d/%d",
succeededPodCount, groupTotalCount, groupTotalCount-succeededPodCount, groupTotalCount,
"Pods succeeded: %d/%d.",
succeededPodCount, groupTotalCount,
),
}

if succeededPodCount < groupTotalCount {
condition.Status = metav1.ConditionFalse
return metav1.Condition{}, false
}

return condition, true
Expand Down
32 changes: 19 additions & 13 deletions pkg/controller/jobs/pod/pod_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ func TestReconciler(t *testing.T) {
KueueFinalizer().
Group("test-group").
GroupTotalCount("2").
StatusPhase(corev1.PodFailed).
StatusPhase(corev1.PodSucceeded).
Obj(),
},
wantPods: []corev1.Pod{
Expand All @@ -718,7 +718,7 @@ func TestReconciler(t *testing.T) {
Label("kueue.x-k8s.io/managed", "true").
Group("test-group").
GroupTotalCount("2").
StatusPhase(corev1.PodFailed).
StatusPhase(corev1.PodSucceeded).
Obj(),
},
workloads: []kueue.Workload{
Expand All @@ -734,7 +734,7 @@ func TestReconciler(t *testing.T) {
Obj(),
},
wantWorkloads: []kueue.Workload{
*utiltesting.MakeWorkload("test-group", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
*utiltesting.MakeWorkload("test-group", "ns").
PodSets(
*utiltesting.MakePodSet("b990493b", 2).
Request(corev1.ResourceCPU, "1").
Expand All @@ -745,15 +745,15 @@ func TestReconciler(t *testing.T) {
Admitted(true).
Condition(metav1.Condition{
Type: "Finished",
Status: "False",
Status: "True",
Reason: "JobFinished",
Message: "Pod group has finished. Pods succeeded: 1/2. Pods failed: 1/2",
Message: "Pods succeeded: 2/2.",
}).
Obj(),
},
workloadCmpOpts: defaultWorkloadCmpOpts,
},
"workload is deleted if the pod in group has been deleted after admission": {
"workload is not deleted if the pod in group has been deleted after admission": {
pods: []corev1.Pod{*basePodWrapper.
Clone().
Label("kueue.x-k8s.io/managed", "true").
Expand All @@ -767,12 +767,6 @@ func TestReconciler(t *testing.T) {
KueueFinalizer().
Group("test-group").
GroupTotalCount("2").
StatusConditions(corev1.PodCondition{
Type: "TerminationTarget",
Status: corev1.ConditionTrue,
Reason: "StoppedByKueue",
Message: "No matching Workload",
}).
Obj()},
workloads: []kueue.Workload{
*utiltesting.MakeWorkload("test-group", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
Expand All @@ -786,7 +780,18 @@ func TestReconciler(t *testing.T) {
Admitted(true).
Obj(),
},
wantErr: jobframework.ErrNoMatchingWorkloads,
wantWorkloads: []kueue.Workload{
*utiltesting.MakeWorkload("test-group", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
PodSets(
*utiltesting.MakePodSet("b990493b", 2).
Request(corev1.ResourceCPU, "1").
Obj(),
).
Queue("user-queue").
ReserveQuota(utiltesting.MakeAdmission("cq").AssignmentPodCount(1).Obj()).
Admitted(true).
Obj(),
},
workloadCmpOpts: defaultWorkloadCmpOpts,
},
"pod group is stopped when workload is evicted": {
Expand Down Expand Up @@ -887,6 +892,7 @@ func TestReconciler(t *testing.T) {
*basePodWrapper.
Clone().
Label("kueue.x-k8s.io/managed", "true").
KueueFinalizer().
Group("test-group").
GroupTotalCount("2").
StatusPhase(corev1.PodSucceeded).
Expand Down

0 comments on commit 09c09ce

Please sign in to comment.