Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #120649: cronjob controller: ensure already existing jobs are added to #120812

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
36 changes: 30 additions & 6 deletions pkg/controller/cronjob/cronjob_controllerv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,7 @@ func (jm *ControllerV2) syncCronJob(
}
}

jobAlreadyExists := false
jobReq, err := getJobFromTemplate2(cronJob, *scheduledTime)
if err != nil {
klog.ErrorS(err, "Unable to make Job from template", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()))
Expand All @@ -595,18 +596,41 @@ func (jm *ControllerV2) syncCronJob(
// anything because any creation will fail
return nil, updateStatus, err
case errors.IsAlreadyExists(err):
// If the job is created by other actor, assume it has updated the cronjob status accordingly
klog.InfoS("Job already exists", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "job", klog.KRef(jobReq.GetNamespace(), jobReq.GetName()))
return nil, updateStatus, err
// If the job is created by other actor, assume it has updated the cronjob status accordingly.
// However, if the job was created by cronjob controller, this means we've previously created the job
// but failed to update the active list in the status, in which case we should reattempt to add the job
// into the active list and update the status.
jobAlreadyExists = true
job, err := jm.jobControl.GetJob(jobReq.GetNamespace(), jobReq.GetName())
if err != nil {
return nil, updateStatus, err
}
jobResp = job

// check that this job is owned by cronjob controller, otherwise do nothing and assume external controller
// is updating the status.
if !metav1.IsControlledBy(job, cronJob) {
return nil, updateStatus, nil
}

// Recheck if the job is missing from the active list before attempting to update the status again.
found := inActiveList(*cronJob, job.ObjectMeta.UID)
if found {
return nil, updateStatus, nil
}
case err != nil:
// default error handling
jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
return nil, updateStatus, err
}

metrics.CronJobCreationSkew.Observe(jobResp.ObjectMeta.GetCreationTimestamp().Sub(*scheduledTime).Seconds())
klog.V(4).InfoS("Created Job", "job", klog.KRef(jobResp.GetNamespace(), jobResp.GetName()), "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()))
jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name)
if jobAlreadyExists {
klog.InfoS("Job already exists", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "job", klog.KRef(jobReq.GetNamespace(), jobReq.GetName()))
} else {
metrics.CronJobCreationSkew.Observe(jobResp.ObjectMeta.GetCreationTimestamp().Sub(*scheduledTime).Seconds())
klog.V(4).InfoS("Created Job", "job", klog.KRef(jobResp.GetNamespace(), jobResp.GetName()), "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()))
jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name)
}

// ------------------------------------------------------------------ //

Expand Down
115 changes: 114 additions & 1 deletion pkg/controller/cronjob/cronjob_controllerv2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"k8s.io/utils/pointer"

"fmt"

_ "k8s.io/kubernetes/pkg/apis/batch/install"
_ "k8s.io/kubernetes/pkg/apis/core/install"
"k8s.io/kubernetes/pkg/controller"
Expand Down Expand Up @@ -475,10 +476,22 @@ func TestControllerV2SyncCronJob(t *testing.T) {
jobCreationTime: justAfterThePriorHour(),
now: *justAfterTheHour(),
jobCreateError: errors.NewAlreadyExists(schema.GroupResource{Resource: "job", Group: "batch"}, ""),
expectErr: true,
expectErr: false,
expectUpdateStatus: true,
jobPresentInCJActiveStatus: true,
},
"prev ran but done, is time, job not present in CJ active status, create job failed, A": {
concurrencyPolicy: "Allow",
schedule: onTheHour,
deadline: noDead,
ranPreviously: true,
jobCreationTime: justAfterThePriorHour(),
now: *justAfterTheHour(),
jobCreateError: errors.NewAlreadyExists(schema.GroupResource{Resource: "job", Group: "batch"}, ""),
expectErr: false,
expectUpdateStatus: true,
jobPresentInCJActiveStatus: false,
},
"prev ran but done, is time, F": {
concurrencyPolicy: "Forbid",
schedule: onTheHour,
Expand Down Expand Up @@ -1797,3 +1810,103 @@ func TestControllerV2CleanupFinishedJobs(t *testing.T) {
})
}
}

// TestControllerV2JobAlreadyExistsButNotInActiveStatus validates that an already created job that was not added to the status
// of a CronJob initially will be added back on the next sync. Previously, if we failed to update the status after creating a job,
// cronjob controller would retry continuously because it would attempt to create a job that already exists.
func TestControllerV2JobAlreadyExistsButNotInActiveStatus(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)

cj := cronJob()
cj.Spec.ConcurrencyPolicy = "Forbid"
cj.Spec.Schedule = everyHour
cj.Status.LastScheduleTime = &metav1.Time{Time: justBeforeThePriorHour()}
cj.Status.Active = []v1.ObjectReference{}
cjCopy := cj.DeepCopy()

job, err := getJobFromTemplate2(&cj, justAfterThePriorHour())
if err != nil {
t.Fatalf("Unexpected error creating a job from template: %v", err)
}
job.UID = "1234"
job.Namespace = cj.Namespace

client := fake.NewSimpleClientset(cjCopy, job)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
_ = informerFactory.Batch().V1().CronJobs().Informer().GetIndexer().Add(cjCopy)

jm, err := NewControllerV2(informerFactory.Batch().V1().Jobs(), informerFactory.Batch().V1().CronJobs(), client)
if err != nil {
t.Fatalf("unexpected error %v", err)
}

jobControl := &fakeJobControl{Job: job, CreateErr: errors.NewAlreadyExists(schema.GroupResource{Resource: "job", Group: "batch"}, "")}
jm.jobControl = jobControl
cronJobControl := &fakeCJControl{}
jm.cronJobControl = cronJobControl
jm.now = justBeforeTheHour

jm.enqueueController(cjCopy)
jm.processNextWorkItem(ctx)

if len(cronJobControl.Updates) != 1 {
t.Fatalf("Unexpected updates to cronjob, got: %d, expected 1", len(cronJobControl.Updates))
}
if len(cronJobControl.Updates[0].Status.Active) != 1 {
t.Errorf("Unexpected active jobs count, got: %d, expected 1", len(cronJobControl.Updates[0].Status.Active))
}

expectedActiveRef, err := getRef(job)
if err != nil {
t.Fatalf("Error getting expected job ref: %v", err)
}
if !reflect.DeepEqual(cronJobControl.Updates[0].Status.Active[0], *expectedActiveRef) {
t.Errorf("Unexpected job reference in cronjob active list, got: %v, expected: %v", cronJobControl.Updates[0].Status.Active[0], expectedActiveRef)
}
}

// TestControllerV2JobAlreadyExistsButDifferentOwnner validates that an already created job
// not owned by the cronjob controller is ignored.
func TestControllerV2JobAlreadyExistsButDifferentOwner(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)

cj := cronJob()
cj.Spec.ConcurrencyPolicy = "Forbid"
cj.Spec.Schedule = everyHour
cj.Status.LastScheduleTime = &metav1.Time{Time: justBeforeThePriorHour()}
cj.Status.Active = []v1.ObjectReference{}
cjCopy := cj.DeepCopy()

job, err := getJobFromTemplate2(&cj, justAfterThePriorHour())
if err != nil {
t.Fatalf("Unexpected error creating a job from template: %v", err)
}
job.UID = "1234"
job.Namespace = cj.Namespace

// remove owners for this test since we are testing that jobs not belonging to cronjob
// controller are safely ignored
job.OwnerReferences = []metav1.OwnerReference{}

client := fake.NewSimpleClientset(cjCopy, job)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
_ = informerFactory.Batch().V1().CronJobs().Informer().GetIndexer().Add(cjCopy)

jm, err := NewControllerV2(informerFactory.Batch().V1().Jobs(), informerFactory.Batch().V1().CronJobs(), client)
if err != nil {
t.Fatalf("unexpected error %v", err)
}

jobControl := &fakeJobControl{Job: job, CreateErr: errors.NewAlreadyExists(schema.GroupResource{Resource: "job", Group: "batch"}, "")}
jm.jobControl = jobControl
cronJobControl := &fakeCJControl{}
jm.cronJobControl = cronJobControl
jm.now = justBeforeTheHour

jm.enqueueController(cjCopy)
jm.processNextWorkItem(ctx)

if len(cronJobControl.Updates) != 0 {
t.Fatalf("Unexpected updates to cronjob, got: %d, expected 0", len(cronJobControl.Updates))
}
}