Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a job quota related deadlock #119776

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
68 changes: 30 additions & 38 deletions pkg/controller/cronjob/cronjob_controllerv2.go
Expand Up @@ -196,24 +196,19 @@ func (jm *ControllerV2) sync(ctx context.Context, cronJobKey string) (*time.Dura
return nil, err
}

cronJobCopy, requeueAfter, updateStatus, err := jm.syncCronJob(ctx, cronJob, jobsToBeReconciled)
// cronJobCopy is used to combine all the updates to a
// CronJob object and perform an actual update only once.
cronJobCopy := cronJob.DeepCopy()

updateStatusAfterCleanup := jm.cleanupFinishedJobs(ctx, cronJobCopy, jobsToBeReconciled)

requeueAfter, updateStatusAfterSync, syncErr := jm.syncCronJob(ctx, cronJobCopy, jobsToBeReconciled)
if err != nil {
logger.V(2).Info("Error reconciling cronjob", "cronjob", klog.KObj(cronJob), "err", err)
if updateStatus {
soltysh marked this conversation as resolved.
Show resolved Hide resolved
if _, err := jm.cronJobControl.UpdateStatus(ctx, cronJobCopy); err != nil {
logger.V(2).Info("Unable to update status for cronjob", "cronjob", klog.KObj(cronJob), "resourceVersion", cronJob.ResourceVersion, "err", err)
return nil, err
}
}
return nil, err
}

if jm.cleanupFinishedJobs(ctx, cronJobCopy, jobsToBeReconciled) {
updateStatus = true
}

// Update the CronJob if needed
if updateStatus {
if updateStatusAfterCleanup || updateStatusAfterSync {
if _, err := jm.cronJobControl.UpdateStatus(ctx, cronJobCopy); err != nil {
logger.V(2).Info("Unable to update status for cronjob", "cronjob", klog.KObj(cronJob), "resourceVersion", cronJob.ResourceVersion, "err", err)
return nil, err
Expand All @@ -225,7 +220,7 @@ func (jm *ControllerV2) sync(ctx context.Context, cronJobKey string) (*time.Dura
return requeueAfter, nil
}
// this marks the key done, currently only happens when the cronjob is suspended or spec has invalid schedule format
return nil, nil
return nil, syncErr
}

// resolveControllerRef returns the controller referenced by a ControllerRef,
Expand Down Expand Up @@ -416,15 +411,12 @@ func (jm *ControllerV2) updateCronJob(logger klog.Logger, old interface{}, curr
// syncCronJob reconciles a CronJob with a list of any Jobs that it created.
// All known jobs created by "cronJob" should be included in "jobs".
// The current time is passed in to facilitate testing.
// It returns a copy of the CronJob that is to be used by other functions
// that mutates the object
// It also returns a bool to indicate an update to api-server is needed
// It returns a bool to indicate an update to api-server is needed
func (jm *ControllerV2) syncCronJob(
ctx context.Context,
cronJob *batchv1.CronJob,
jobs []*batchv1.Job) (*batchv1.CronJob, *time.Duration, bool, error) {
jobs []*batchv1.Job) (*time.Duration, bool, error) {

cronJob = cronJob.DeepCopy()
now := jm.now()
updateStatus := false

Expand All @@ -435,7 +427,7 @@ func (jm *ControllerV2) syncCronJob(
if !found && !IsJobFinished(j) {
cjCopy, err := jm.cronJobControl.GetCronJob(ctx, cronJob.Namespace, cronJob.Name)
if err != nil {
return nil, nil, updateStatus, err
return nil, updateStatus, err
}
if inActiveList(cjCopy, j.ObjectMeta.UID) {
cronJob = cjCopy
Expand Down Expand Up @@ -483,15 +475,15 @@ func (jm *ControllerV2) syncCronJob(
deleteFromActiveList(cronJob, j.UID)
updateStatus = true
case err != nil:
return cronJob, nil, updateStatus, err
return nil, updateStatus, err
}
// the job is missing in the lister but found in api-server
}

if cronJob.DeletionTimestamp != nil {
// The CronJob is being deleted.
// Don't do anything other than updating status.
return cronJob, nil, updateStatus, nil
return nil, updateStatus, nil
}

logger := klog.FromContext(ctx)
Expand All @@ -500,13 +492,13 @@ func (jm *ControllerV2) syncCronJob(
if _, err := time.LoadLocation(timeZone); err != nil {
logger.V(4).Info("Not starting job because timeZone is invalid", "cronjob", klog.KObj(cronJob), "timeZone", timeZone, "err", err)
jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "UnknownTimeZone", "invalid timeZone: %q: %s", timeZone, err)
return cronJob, nil, updateStatus, nil
return nil, updateStatus, nil
}
}

if cronJob.Spec.Suspend != nil && *cronJob.Spec.Suspend {
logger.V(4).Info("Not starting job because the cron is suspended", "cronjob", klog.KObj(cronJob))
return cronJob, nil, updateStatus, nil
return nil, updateStatus, nil
}

sched, err := cron.ParseStandard(formatSchedule(cronJob, jm.recorder))
Expand All @@ -515,7 +507,7 @@ func (jm *ControllerV2) syncCronJob(
// we should log the error and not reconcile this cronjob until an update to spec
logger.V(2).Info("Unparseable schedule", "cronjob", klog.KObj(cronJob), "schedule", cronJob.Spec.Schedule, "err", err)
jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "UnparseableSchedule", "unparseable schedule: %q : %s", cronJob.Spec.Schedule, err)
return cronJob, nil, updateStatus, nil
return nil, updateStatus, nil
}

scheduledTime, err := nextScheduleTime(logger, cronJob, now, sched, jm.recorder)
Expand All @@ -524,7 +516,7 @@ func (jm *ControllerV2) syncCronJob(
// we should log the error and not reconcile this cronjob until an update to spec
logger.V(2).Info("Invalid schedule", "cronjob", klog.KObj(cronJob), "schedule", cronJob.Spec.Schedule, "err", err)
jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "InvalidSchedule", "invalid schedule: %s : %s", cronJob.Spec.Schedule, err)
return cronJob, nil, updateStatus, nil
return nil, updateStatus, nil
}
if scheduledTime == nil {
// no unmet start time, return cj,.
Expand All @@ -533,7 +525,7 @@ func (jm *ControllerV2) syncCronJob(
// the scheduled time, that will give atleast 1 unmet time schedule
logger.V(4).Info("No unmet start times", "cronjob", klog.KObj(cronJob))
t := nextScheduleTimeDuration(cronJob, now, sched)
return cronJob, t, updateStatus, nil
return t, updateStatus, nil
}

tooLate := false
Expand All @@ -552,7 +544,7 @@ func (jm *ControllerV2) syncCronJob(
// and event the next time we process it, and also so the user looking at the status
// can see easily that there was a missed execution.
t := nextScheduleTimeDuration(cronJob, now, sched)
return cronJob, t, updateStatus, nil
return t, updateStatus, nil
}
if inActiveListByName(cronJob, &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -561,7 +553,7 @@ func (jm *ControllerV2) syncCronJob(
}}) || cronJob.Status.LastScheduleTime.Equal(&metav1.Time{Time: *scheduledTime}) {
logger.V(4).Info("Not starting job because the scheduled time is already processed", "cronjob", klog.KObj(cronJob), "schedule", scheduledTime)
t := nextScheduleTimeDuration(cronJob, now, sched)
return cronJob, t, updateStatus, nil
return t, updateStatus, nil
}
if cronJob.Spec.ConcurrencyPolicy == batchv1.ForbidConcurrent && len(cronJob.Status.Active) > 0 {
// Regardless which source of information we use for the set of active jobs,
Expand All @@ -576,18 +568,18 @@ func (jm *ControllerV2) syncCronJob(
logger.V(4).Info("Not starting job because prior execution is still running and concurrency policy is Forbid", "cronjob", klog.KObj(cronJob))
jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "JobAlreadyActive", "Not starting job because prior execution is running and concurrency policy is Forbid")
t := nextScheduleTimeDuration(cronJob, now, sched)
return cronJob, t, updateStatus, nil
return t, updateStatus, nil
}
if cronJob.Spec.ConcurrencyPolicy == batchv1.ReplaceConcurrent {
for _, j := range cronJob.Status.Active {
logger.V(4).Info("Deleting job that was still running at next scheduled start time", "job", klog.KRef(j.Namespace, j.Name))
job, err := jm.jobControl.GetJob(j.Namespace, j.Name)
if err != nil {
jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "FailedGet", "Get job: %v", err)
return cronJob, nil, updateStatus, err
return nil, updateStatus, err
}
if !deleteJob(logger, cronJob, job, jm.jobControl, jm.recorder) {
return cronJob, nil, updateStatus, fmt.Errorf("could not replace job %s/%s", job.Namespace, job.Name)
return nil, updateStatus, fmt.Errorf("could not replace job %s/%s", job.Namespace, job.Name)
}
updateStatus = true
}
Expand All @@ -596,22 +588,22 @@ func (jm *ControllerV2) syncCronJob(
jobReq, err := getJobFromTemplate2(cronJob, *scheduledTime)
if err != nil {
logger.Error(err, "Unable to make Job from template", "cronjob", klog.KObj(cronJob))
return cronJob, nil, updateStatus, err
return nil, updateStatus, err
}
jobResp, err := jm.jobControl.CreateJob(cronJob.Namespace, jobReq)
switch {
case errors.HasStatusCause(err, corev1.NamespaceTerminatingCause):
// if the namespace is being terminated, we don't have to do
// anything because any creation will fail
return cronJob, nil, updateStatus, err
return nil, updateStatus, err
case errors.IsAlreadyExists(err):
// If the job is created by other actor, assume it has updated the cronjob status accordingly
logger.Info("Job already exists", "cronjob", klog.KObj(cronJob), "job", klog.KObj(jobReq))
return cronJob, nil, updateStatus, err
return nil, updateStatus, err
case err != nil:
// default error handling
jm.recorder.Eventf(cronJob, corev1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
return cronJob, nil, updateStatus, err
return nil, updateStatus, err
}

metrics.CronJobCreationSkew.Observe(jobResp.ObjectMeta.GetCreationTimestamp().Sub(*scheduledTime).Seconds())
Expand All @@ -632,14 +624,14 @@ func (jm *ControllerV2) syncCronJob(
jobRef, err := getRef(jobResp)
if err != nil {
logger.V(2).Info("Unable to make object reference", "cronjob", klog.KObj(cronJob), "err", err)
return cronJob, nil, updateStatus, fmt.Errorf("unable to make object reference for job for %s", klog.KObj(cronJob))
return nil, updateStatus, fmt.Errorf("unable to make object reference for job for %s", klog.KObj(cronJob))
}
cronJob.Status.Active = append(cronJob.Status.Active, *jobRef)
cronJob.Status.LastScheduleTime = &metav1.Time{Time: *scheduledTime}
updateStatus = true

t := nextScheduleTimeDuration(cronJob, now, sched)
return cronJob, t, updateStatus, nil
return t, updateStatus, nil
}

func getJobName(cj *batchv1.CronJob, scheduledTime time.Time) string {
Expand Down
123 changes: 122 additions & 1 deletion pkg/controller/cronjob/cronjob_controllerv2_test.go
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"reflect"
"sort"
"strings"
"testing"
"time"
Expand All @@ -36,6 +37,8 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2/ktesting"
"k8s.io/utils/pointer"

_ "k8s.io/kubernetes/pkg/apis/batch/install"
_ "k8s.io/kubernetes/pkg/apis/core/install"
"k8s.io/kubernetes/pkg/controller"
Expand Down Expand Up @@ -1258,7 +1261,8 @@ func TestControllerV2SyncCronJob(t *testing.T) {
return tc.now
},
}
cjCopy, requeueAfter, updateStatus, err := jm.syncCronJob(context.TODO(), &cj, js)
cjCopy := cj.DeepCopy()
requeueAfter, updateStatus, err := jm.syncCronJob(context.TODO(), cjCopy, js)
if tc.expectErr && err == nil {
t.Errorf("%s: expected error got none with requeueAfter time: %#v", name, requeueAfter)
}
Expand Down Expand Up @@ -1691,3 +1695,120 @@ func TestControllerV2GetJobsToBeReconciled(t *testing.T) {
})
}
}

func TestControllerV2CleanupFinishedJobs(t *testing.T) {
tests := []struct {
name string
now time.Time
cronJob *batchv1.CronJob
finishedJobs []*batchv1.Job
jobCreateError error
expectedDeletedJobs []string
}{
{
name: "jobs are still deleted when a cronjob can't create jobs due to jobs quota being reached (avoiding a deadlock)",
now: *justAfterTheHour(),
cronJob: &batchv1.CronJob{
ObjectMeta: metav1.ObjectMeta{Namespace: "foo-ns", Name: "fooer"},
Spec: batchv1.CronJobSpec{
Schedule: onTheHour,
SuccessfulJobsHistoryLimit: pointer.Int32(1),
JobTemplate: batchv1.JobTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"key": "value"}},
},
},
Status: batchv1.CronJobStatus{LastScheduleTime: &metav1.Time{Time: justAfterThePriorHour()}},
},
finishedJobs: []*batchv1.Job{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "foo-ns",
Name: "finished-job-started-hour-ago",
OwnerReferences: []metav1.OwnerReference{{Name: "fooer", Controller: pointer.Bool(true)}},
},
Status: batchv1.JobStatus{StartTime: &metav1.Time{Time: justBeforeThePriorHour()}},
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "foo-ns",
Name: "finished-job-started-minute-ago",
OwnerReferences: []metav1.OwnerReference{{Name: "fooer", Controller: pointer.Bool(true)}},
},
Status: batchv1.JobStatus{StartTime: &metav1.Time{Time: justBeforeTheHour()}},
},
},
jobCreateError: errors.NewInternalError(fmt.Errorf("quota for # of jobs reached")),
expectedDeletedJobs: []string{"finished-job-started-hour-ago"},
},
{
name: "jobs are not deleted if history limit not reached",
now: justBeforeTheHour(),
cronJob: &batchv1.CronJob{
ObjectMeta: metav1.ObjectMeta{Namespace: "foo-ns", Name: "fooer"},
Spec: batchv1.CronJobSpec{
Schedule: onTheHour,
SuccessfulJobsHistoryLimit: pointer.Int32(2),
JobTemplate: batchv1.JobTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"key": "value"}},
},
},
Status: batchv1.CronJobStatus{LastScheduleTime: &metav1.Time{Time: justAfterThePriorHour()}},
},
finishedJobs: []*batchv1.Job{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "foo-ns",
Name: "finished-job-started-hour-ago",
OwnerReferences: []metav1.OwnerReference{{Name: "fooer", Controller: pointer.Bool(true)}},
},
Status: batchv1.JobStatus{StartTime: &metav1.Time{Time: justBeforeThePriorHour()}},
},
},
jobCreateError: nil,
expectedDeletedJobs: []string{},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)

for _, job := range tt.finishedJobs {
job.Status.Conditions = []batchv1.JobCondition{{Type: batchv1.JobComplete, Status: v1.ConditionTrue}}
}

client := fake.NewSimpleClientset()

informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
_ = informerFactory.Batch().V1().CronJobs().Informer().GetIndexer().Add(tt.cronJob)
for _, job := range tt.finishedJobs {
_ = informerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
}

jm, err := NewControllerV2(ctx, informerFactory.Batch().V1().Jobs(), informerFactory.Batch().V1().CronJobs(), client)
if err != nil {
t.Errorf("unexpected error %v", err)
return
}
jobControl := &fakeJobControl{CreateErr: tt.jobCreateError}
jm.jobControl = jobControl
jm.now = func() time.Time {
return tt.now
}

jm.enqueueController(tt.cronJob)
jm.processNextWorkItem(ctx)

if len(tt.expectedDeletedJobs) != len(jobControl.DeleteJobName) {
t.Fatalf("expected '%v' jobs to be deleted, instead deleted '%s'", tt.expectedDeletedJobs, jobControl.DeleteJobName)
}
sort.Strings(jobControl.DeleteJobName)
sort.Strings(tt.expectedDeletedJobs)
for i, deletedJob := range jobControl.DeleteJobName {
if deletedJob != tt.expectedDeletedJobs[i] {
t.Fatalf("expected '%v' jobs to be deleted, instead deleted '%s'", tt.expectedDeletedJobs, jobControl.DeleteJobName)
}
}
})
}
}