Skip to content

Commit

Permalink
Re-use common parts between getNextScheduleTime and nextScheduledTime…
Browse files Browse the repository at this point in the history
…Duration

The two methods nextScheduledTimeDuration and getNextScheduleTime have a
lot of similarities, so this commit squashes the common parts together
along with getMostRecentScheduleTime to avoid code duplication.
  • Loading branch information
soltysh committed Jan 18, 2023
1 parent cb491a8 commit be44d67
Show file tree
Hide file tree
Showing 3 changed files with 243 additions and 151 deletions.
38 changes: 7 additions & 31 deletions pkg/controller/cronjob/cronjob_controllerv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ func (jm *ControllerV2) updateCronJob(old interface{}, curr interface{}) {
return
}
now := jm.now()
t := nextScheduledTimeDuration(*newCJ, sched, now)
t := nextScheduleTimeDuration(newCJ, now, sched)

jm.enqueueControllerAfter(curr, *t)
return
Expand Down Expand Up @@ -517,7 +517,7 @@ func (jm *ControllerV2) syncCronJob(
return cronJob, nil, updateStatus, nil
}

scheduledTime, err := getNextScheduleTime(*cronJob, now, sched, jm.recorder)
scheduledTime, err := nextScheduleTime(cronJob, now, sched, jm.recorder)
if err != nil {
// this is likely a user error in defining the spec value
// we should log the error and not reconcile this cronjob until an update to spec
Expand All @@ -531,7 +531,7 @@ func (jm *ControllerV2) syncCronJob(
// Otherwise, the queue is always suppose to trigger sync function at the time of
// the scheduled time, that will give atleast 1 unmet time schedule
klog.V(4).InfoS("No unmet start times", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()))
t := nextScheduledTimeDuration(*cronJob, sched, now)
t := nextScheduleTimeDuration(cronJob, now, sched)
return cronJob, t, updateStatus, nil
}

Expand All @@ -550,7 +550,7 @@ func (jm *ControllerV2) syncCronJob(
// Status.LastScheduleTime, Status.LastMissedTime), and then so we won't generate
// and event the next time we process it, and also so the user looking at the status
// can see easily that there was a missed execution.
t := nextScheduledTimeDuration(*cronJob, sched, now)
t := nextScheduleTimeDuration(cronJob, now, sched)
return cronJob, t, updateStatus, nil
}
if inActiveListByName(cronJob, &batchv1.Job{
Expand All @@ -559,7 +559,7 @@ func (jm *ControllerV2) syncCronJob(
Namespace: cronJob.Namespace,
}}) || cronJob.Status.LastScheduleTime.Equal(&metav1.Time{Time: *scheduledTime}) {
klog.V(4).InfoS("Not starting job because the scheduled time is already processed", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "schedule", scheduledTime)
t := nextScheduledTimeDuration(*cronJob, sched, now)
t := nextScheduleTimeDuration(cronJob, now, sched)
return cronJob, t, updateStatus, nil
}
if cronJob.Spec.ConcurrencyPolicy == batchv1.ForbidConcurrent && len(cronJob.Status.Active) > 0 {
Expand All @@ -574,7 +574,7 @@ func (jm *ControllerV2) syncCronJob(
// But that would mean that you could not inspect prior successes or failures of Forbid jobs.
klog.V(4).InfoS("Not starting job because prior execution is still running and concurrency policy is Forbid", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()))
jm.recorder.Eventf(cronJob, corev1.EventTypeNormal, "JobAlreadyActive", "Not starting job because prior execution is running and concurrency policy is Forbid")
t := nextScheduledTimeDuration(*cronJob, sched, now)
t := nextScheduleTimeDuration(cronJob, now, sched)
return cronJob, t, updateStatus, nil
}
if cronJob.Spec.ConcurrencyPolicy == batchv1.ReplaceConcurrent {
Expand Down Expand Up @@ -635,38 +635,14 @@ func (jm *ControllerV2) syncCronJob(
cronJob.Status.LastScheduleTime = &metav1.Time{Time: *scheduledTime}
updateStatus = true

t := nextScheduledTimeDuration(*cronJob, sched, now)
t := nextScheduleTimeDuration(cronJob, now, sched)
return cronJob, t, updateStatus, nil
}

func getJobName(cj *batchv1.CronJob, scheduledTime time.Time) string {
return fmt.Sprintf("%s-%d", cj.Name, getTimeHashInMinutes(scheduledTime))
}

// nextScheduledTimeDuration returns the time duration to requeue based on
// the schedule and last schedule time. It adds a 100ms padding to the next requeue to account
// for Network Time Protocol(NTP) time skews. If the time drifts are adjusted which in most
// realistic cases would be around 100s, scheduled cron will still be executed without missing
// the schedule.
func nextScheduledTimeDuration(cj batchv1.CronJob, sched cron.Schedule, now time.Time) *time.Duration {
earliestTime := cj.ObjectMeta.CreationTimestamp.Time
if cj.Status.LastScheduleTime != nil {
earliestTime = cj.Status.LastScheduleTime.Time
}
mostRecentTime, _, err := getMostRecentScheduleTime(earliestTime, now, sched)
if err != nil {
// we still have to requeue at some point, so aim for the next scheduling slot from now
mostRecentTime = &now
} else if mostRecentTime == nil {
// no missed schedules since earliestTime
mostRecentTime = &earliestTime
}

t := sched.Next(*mostRecentTime).Add(nextScheduleDelta).Sub(now)

return &t
}

// cleanupFinishedJobs cleanups finished jobs created by a CronJob
// It returns a bool to indicate an update to api-server is needed
func (jm *ControllerV2) cleanupFinishedJobs(ctx context.Context, cj *batchv1.CronJob, js []*batchv1.Job) bool {
Expand Down
115 changes: 64 additions & 51 deletions pkg/controller/cronjob/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,39 +69,78 @@ func deleteFromActiveList(cj *batchv1.CronJob, uid types.UID) {
cj.Status.Active = newActive
}

// getNextScheduleTime gets the time of next schedule after last scheduled and before now
// it returns nil if no unmet schedule times.
//
// If there are too many (>100) unstarted times, it will raise a warning and but still return
// the list of missed times.
func getNextScheduleTime(cj batchv1.CronJob, now time.Time, schedule cron.Schedule, recorder record.EventRecorder) (*time.Time, error) {
var (
earliestTime time.Time
)
// mostRecentScheduleTime returns:
// - the last schedule time or CronJob's creation time,
// - the most recent time a Job should be created or nil, if that's after now,
// - number of missed schedules
// - error in an edge case where the schedule specification is grammatically correct,
// but logically doesn't make sense (31st day for months with only 30 days, for example).
func mostRecentScheduleTime(cj *batchv1.CronJob, now time.Time, schedule cron.Schedule, includeStartingDeadlineSeconds bool) (time.Time, *time.Time, int64, error) {
earliestTime := cj.ObjectMeta.CreationTimestamp.Time
if cj.Status.LastScheduleTime != nil {
earliestTime = cj.Status.LastScheduleTime.Time
} else {
// If none found, then this is either a recently created cronJob,
// or the active/completed info was somehow lost (contract for status
// in kubernetes says it may need to be recreated), or that we have
// started a job, but have not noticed it yet (distributed systems can
// have arbitrary delays). In any case, use the creation time of the
// CronJob as last known start time.
earliestTime = cj.ObjectMeta.CreationTimestamp.Time
}
if cj.Spec.StartingDeadlineSeconds != nil {
// Controller is not going to schedule anything below this point
}
if includeStartingDeadlineSeconds && cj.Spec.StartingDeadlineSeconds != nil {
// controller is not going to schedule anything below this point
schedulingDeadline := now.Add(-time.Second * time.Duration(*cj.Spec.StartingDeadlineSeconds))

if schedulingDeadline.After(earliestTime) {
earliestTime = schedulingDeadline
}
}
if earliestTime.After(now) {
return nil, nil

t1 := schedule.Next(earliestTime)
t2 := schedule.Next(t1)

if now.Before(t1) {
return earliestTime, nil, 0, nil
}
if now.Before(t2) {
return earliestTime, &t1, 1, nil
}

// It is possible for cron.ParseStandard("59 23 31 2 *") to return an invalid schedule
// minute - 59, hour - 23, dom - 31, month - 2, and dow is optional, clearly 31 is invalid
// In this case the timeBetweenTwoSchedules will be 0, and we error out the invalid schedule
timeBetweenTwoSchedules := int64(t2.Sub(t1).Round(time.Second).Seconds())
if timeBetweenTwoSchedules < 1 {
return earliestTime, nil, 0, fmt.Errorf("time difference between two schedules is less than 1 second")
}
timeElapsed := int64(now.Sub(t1).Seconds())
numberOfMissedSchedules := (timeElapsed / timeBetweenTwoSchedules) + 1
mostRecentTime := time.Unix(t1.Unix()+((numberOfMissedSchedules-1)*timeBetweenTwoSchedules), 0).UTC()

return earliestTime, &mostRecentTime, numberOfMissedSchedules, nil
}

// nextScheduleTimeDuration returns the time duration to requeue based on
// the schedule and last schedule time. It adds a 100ms padding to the next requeue to account
// for Network Time Protocol(NTP) time skews. If the time drifts the adjustment, which in most
// realistic cases should be around 100s, the job will still be executed without missing
// the schedule.
func nextScheduleTimeDuration(cj *batchv1.CronJob, now time.Time, schedule cron.Schedule) *time.Duration {
earliestTime, mostRecentTime, _, err := mostRecentScheduleTime(cj, now, schedule, false)
if err != nil {
// we still have to requeue at some point, so aim for the next scheduling slot from now
mostRecentTime = &now
} else if mostRecentTime == nil {
// no missed schedules since earliestTime
mostRecentTime = &earliestTime
}

t := schedule.Next(*mostRecentTime).Add(nextScheduleDelta).Sub(now)
return &t
}

t, numberOfMissedSchedules, err := getMostRecentScheduleTime(earliestTime, now, schedule)
// nextScheduleTime returns the time.Time of the next schedule after the last scheduled
// and before now, or nil if no unmet schedule times, and an error.
// If there are too many (>100) unstarted times, it will also record a warning.
func nextScheduleTime(cj *batchv1.CronJob, now time.Time, schedule cron.Schedule, recorder record.EventRecorder) (*time.Time, error) {
_, mostRecentTime, numberOfMissedSchedules, err := mostRecentScheduleTime(cj, now, schedule, true)

if mostRecentTime == nil || mostRecentTime.After(now) {
return nil, err
}

if numberOfMissedSchedules > 100 {
// An object might miss several starts. For example, if
Expand All @@ -121,36 +160,10 @@ func getNextScheduleTime(cj batchv1.CronJob, now time.Time, schedule cron.Schedu
//
// I've somewhat arbitrarily picked 100, as more than 80,
// but less than "lots".
recorder.Eventf(&cj, corev1.EventTypeWarning, "TooManyMissedTimes", "too many missed start times: %d. Set or decrease .spec.startingDeadlineSeconds or check clock skew", numberOfMissedSchedules)
recorder.Eventf(cj, corev1.EventTypeWarning, "TooManyMissedTimes", "too many missed start times: %d. Set or decrease .spec.startingDeadlineSeconds or check clock skew", numberOfMissedSchedules)
klog.InfoS("too many missed times", "cronjob", klog.KRef(cj.GetNamespace(), cj.GetName()), "missed times", numberOfMissedSchedules)
}
return t, err
}

// getMostRecentScheduleTime returns the latest schedule time between earliestTime and the count of number of
// schedules in between them
func getMostRecentScheduleTime(earliestTime time.Time, now time.Time, schedule cron.Schedule) (*time.Time, int64, error) {
t1 := schedule.Next(earliestTime)
t2 := schedule.Next(t1)

if now.Before(t1) {
return nil, 0, nil
}
if now.Before(t2) {
return &t1, 1, nil
}

// It is possible for cron.ParseStandard("59 23 31 2 *") to return an invalid schedule
// seconds - 59, minute - 23, hour - 31 (?!) dom - 2, and dow is optional, clearly 31 is invalid
// In this case the timeBetweenTwoSchedules will be 0, and we error out the invalid schedule
timeBetweenTwoSchedules := int64(t2.Sub(t1).Round(time.Second).Seconds())
if timeBetweenTwoSchedules < 1 {
return nil, 0, fmt.Errorf("time difference between two schedules less than 1 second")
}
timeElapsed := int64(now.Sub(t1).Seconds())
numberOfMissedSchedules := (timeElapsed / timeBetweenTwoSchedules) + 1
t := time.Unix(t1.Unix()+((numberOfMissedSchedules-1)*timeBetweenTwoSchedules), 0).UTC()
return &t, numberOfMissedSchedules, nil
return mostRecentTime, err
}

func copyLabels(template *batchv1.JobTemplateSpec) labels.Set {
Expand Down

0 comments on commit be44d67

Please sign in to comment.