Skip to content

Commit

Permalink
cronjob controller: ensure already existing jobs are added to Active …
Browse files Browse the repository at this point in the history
…list of cronjobs

Signed-off-by: Andrew Sy Kim <andrewsy@google.com>
  • Loading branch information
andrewsykim committed Sep 22, 2023
1 parent 7be63ed commit 895a822
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 7 deletions.
36 changes: 30 additions & 6 deletions pkg/controller/cronjob/cronjob_controllerv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,7 @@ func (jm *ControllerV2) syncCronJob(
}
}

jobAlreadyExists := false
jobReq, err := getJobFromTemplate2(cronJob, *scheduledTime)
if err != nil {
klog.ErrorS(err, "Unable to make Job from template", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()))
Expand All @@ -595,18 +596,41 @@ func (jm *ControllerV2) syncCronJob(
// anything because any creation will fail
return nil, updateStatus, err
case errors.IsAlreadyExists(err):
// If the job is created by other actor, assume it has updated the cronjob status accordingly
klog.InfoS("Job already exists", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "job", klog.KRef(jobReq.GetNamespace(), jobReq.GetName()))
return nil, updateStatus, err
// If the job is created by other actor, assume it has updated the cronjob status accordingly.
// However, if the job was created by cronjob controller, this means we've previously created the job
// but failed to update the active list in the status, in which case we should reattempt to add the job
// into the active list and update the status.
jobAlreadyExists = true
job, err := jm.jobControl.GetJob(jobReq.GetNamespace(), jobReq.GetName())
if err != nil {
return nil, updateStatus, err
}
jobResp = job

// check that this job is owned by cronjob controller, otherwise do nothing and assume external controller
// is updating the status.
if !metav1.IsControlledBy(job, cronJob) {
return nil, updateStatus, nil
}

// Recheck if the job is missing from the active list before attempting to update the status again.
found := inActiveList(*cronJob, job.ObjectMeta.UID)
if found {
return nil, updateStatus, nil
}
case err != nil:
// default error handling
jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
return nil, updateStatus, err
}

metrics.CronJobCreationSkew.Observe(jobResp.ObjectMeta.GetCreationTimestamp().Sub(*scheduledTime).Seconds())
klog.V(4).InfoS("Created Job", "job", klog.KRef(jobResp.GetNamespace(), jobResp.GetName()), "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()))
jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name)
if jobAlreadyExists {
klog.InfoS("Job already exists", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "job", klog.KRef(jobReq.GetNamespace(), jobReq.GetName()))
} else {
metrics.CronJobCreationSkew.Observe(jobResp.ObjectMeta.GetCreationTimestamp().Sub(*scheduledTime).Seconds())
klog.V(4).InfoS("Created Job", "job", klog.KRef(jobResp.GetNamespace(), jobResp.GetName()), "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()))
jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name)
}

// ------------------------------------------------------------------ //

Expand Down
115 changes: 114 additions & 1 deletion pkg/controller/cronjob/cronjob_controllerv2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"k8s.io/utils/pointer"

"fmt"

_ "k8s.io/kubernetes/pkg/apis/batch/install"
_ "k8s.io/kubernetes/pkg/apis/core/install"
"k8s.io/kubernetes/pkg/controller"
Expand Down Expand Up @@ -475,10 +476,22 @@ func TestControllerV2SyncCronJob(t *testing.T) {
jobCreationTime: justAfterThePriorHour(),
now: *justAfterTheHour(),
jobCreateError: errors.NewAlreadyExists(schema.GroupResource{Resource: "job", Group: "batch"}, ""),
expectErr: true,
expectErr: false,
expectUpdateStatus: true,
jobPresentInCJActiveStatus: true,
},
"prev ran but done, is time, job not present in CJ active status, create job failed, A": {
concurrencyPolicy: "Allow",
schedule: onTheHour,
deadline: noDead,
ranPreviously: true,
jobCreationTime: justAfterThePriorHour(),
now: *justAfterTheHour(),
jobCreateError: errors.NewAlreadyExists(schema.GroupResource{Resource: "job", Group: "batch"}, ""),
expectErr: false,
expectUpdateStatus: true,
jobPresentInCJActiveStatus: false,
},
"prev ran but done, is time, F": {
concurrencyPolicy: "Forbid",
schedule: onTheHour,
Expand Down Expand Up @@ -1797,3 +1810,103 @@ func TestControllerV2CleanupFinishedJobs(t *testing.T) {
})
}
}

// TestControllerV2JobAlreadyExistsButNotInActiveStatus validates that an already created job that was not added to the status
// of a CronJob initially will be added back on the next sync. Previously, if we failed to update the status after creating a job,
// cronjob controller would retry continuously because it would attempt to create a job that already exists.
func TestControllerV2JobAlreadyExistsButNotInActiveStatus(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)

cj := cronJob()
cj.Spec.ConcurrencyPolicy = "Forbid"
cj.Spec.Schedule = everyHour
cj.Status.LastScheduleTime = &metav1.Time{Time: justBeforeThePriorHour()}
cj.Status.Active = []v1.ObjectReference{}
cjCopy := cj.DeepCopy()

job, err := getJobFromTemplate2(&cj, justAfterThePriorHour())
if err != nil {
t.Fatalf("Unexpected error creating a job from template: %v", err)
}
job.UID = "1234"
job.Namespace = cj.Namespace

client := fake.NewSimpleClientset(cjCopy, job)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
_ = informerFactory.Batch().V1().CronJobs().Informer().GetIndexer().Add(cjCopy)

jm, err := NewControllerV2(informerFactory.Batch().V1().Jobs(), informerFactory.Batch().V1().CronJobs(), client)
if err != nil {
t.Fatalf("unexpected error %v", err)
}

jobControl := &fakeJobControl{Job: job, CreateErr: errors.NewAlreadyExists(schema.GroupResource{Resource: "job", Group: "batch"}, "")}
jm.jobControl = jobControl
cronJobControl := &fakeCJControl{}
jm.cronJobControl = cronJobControl
jm.now = justBeforeTheHour

jm.enqueueController(cjCopy)
jm.processNextWorkItem(ctx)

if len(cronJobControl.Updates) != 1 {
t.Fatalf("Unexpected updates to cronjob, got: %d, expected 1", len(cronJobControl.Updates))
}
if len(cronJobControl.Updates[0].Status.Active) != 1 {
t.Errorf("Unexpected active jobs count, got: %d, expected 1", len(cronJobControl.Updates[0].Status.Active))
}

expectedActiveRef, err := getRef(job)
if err != nil {
t.Fatalf("Error getting expected job ref: %v", err)
}
if !reflect.DeepEqual(cronJobControl.Updates[0].Status.Active[0], *expectedActiveRef) {
t.Errorf("Unexpected job reference in cronjob active list, got: %v, expected: %v", cronJobControl.Updates[0].Status.Active[0], expectedActiveRef)
}
}

// TestControllerV2JobAlreadyExistsButDifferentOwnner validates that an already created job
// not owned by the cronjob controller is ignored.
func TestControllerV2JobAlreadyExistsButDifferentOwner(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)

cj := cronJob()
cj.Spec.ConcurrencyPolicy = "Forbid"
cj.Spec.Schedule = everyHour
cj.Status.LastScheduleTime = &metav1.Time{Time: justBeforeThePriorHour()}
cj.Status.Active = []v1.ObjectReference{}
cjCopy := cj.DeepCopy()

job, err := getJobFromTemplate2(&cj, justAfterThePriorHour())
if err != nil {
t.Fatalf("Unexpected error creating a job from template: %v", err)
}
job.UID = "1234"
job.Namespace = cj.Namespace

// remove owners for this test since we are testing that jobs not belonging to cronjob
// controller are safely ignored
job.OwnerReferences = []metav1.OwnerReference{}

client := fake.NewSimpleClientset(cjCopy, job)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
_ = informerFactory.Batch().V1().CronJobs().Informer().GetIndexer().Add(cjCopy)

jm, err := NewControllerV2(informerFactory.Batch().V1().Jobs(), informerFactory.Batch().V1().CronJobs(), client)
if err != nil {
t.Fatalf("unexpected error %v", err)
}

jobControl := &fakeJobControl{Job: job, CreateErr: errors.NewAlreadyExists(schema.GroupResource{Resource: "job", Group: "batch"}, "")}
jm.jobControl = jobControl
cronJobControl := &fakeCJControl{}
jm.cronJobControl = cronJobControl
jm.now = justBeforeTheHour

jm.enqueueController(cjCopy)
jm.processNextWorkItem(ctx)

if len(cronJobControl.Updates) != 0 {
t.Fatalf("Unexpected updates to cronjob, got: %d, expected 0", len(cronJobControl.Updates))
}
}

0 comments on commit 895a822

Please sign in to comment.