Skip to content

Commit

Permalink
Allow CronTrigger to resume from specified timestamp
Browse files Browse the repository at this point in the history
Includes differentiation between lenient and fixed execution.
Includes default time zone resolution from scheduler-wide Clock.

Closes gh-19475
Closes gh-31948
  • Loading branch information
jhoeller committed Jan 5, 2024
1 parent b169dc5 commit fb4fbea
Show file tree
Hide file tree
Showing 4 changed files with 253 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -117,9 +117,9 @@

/**
* A time zone for which the cron expression will be resolved. By default, this
* attribute is the empty String (i.e. the server's local time zone will be used).
* attribute is the empty String (i.e. the scheduler's time zone will be used).
* @return a zone id accepted by {@link java.util.TimeZone#getTimeZone(String)},
* or an empty String to indicate the server's default time zone
* or an empty String to indicate the scheduler's default time zone
* @since 4.0
* @see org.springframework.scheduling.support.CronTrigger#CronTrigger(String, java.util.TimeZone)
* @see java.util.TimeZone
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,7 +26,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -429,14 +428,14 @@ private void processScheduledTask(Scheduled scheduled, Runnable runnable, Method
Assert.isTrue(initialDelay.isNegative(), "'initialDelay' not supported for cron triggers");
processedSchedule = true;
if (!Scheduled.CRON_DISABLED.equals(cron)) {
TimeZone timeZone;
CronTrigger trigger;
if (StringUtils.hasText(zone)) {
timeZone = StringUtils.parseTimeZoneString(zone);
trigger = new CronTrigger(cron, StringUtils.parseTimeZoneString(zone));
}
else {
timeZone = TimeZone.getDefault();
trigger = new CronTrigger(cron);
}
tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, trigger)));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -39,41 +39,54 @@ public class CronTrigger implements Trigger {

private final CronExpression expression;

@Nullable
private final ZoneId zoneId;


/**
* Build a {@code CronTrigger} from the pattern provided in the default time zone.
* <p>This is equivalent to the {@link CronTrigger#forLenientExecution} factory
* method. Original trigger firings may be skipped if the previous task is still
* running; if this is not desirable, consider {@link CronTrigger#forFixedExecution}.
* @param expression a space-separated list of time fields, following cron
* expression conventions
* @see CronTrigger#forLenientExecution
* @see CronTrigger#forFixedExecution
*/
public CronTrigger(String expression) {
this(expression, ZoneId.systemDefault());
this.expression = CronExpression.parse(expression);
this.zoneId = null;
}

/**
* Build a {@code CronTrigger} from the pattern provided in the given time zone.
* Build a {@code CronTrigger} from the pattern provided in the given time zone,
* with the same lenient execution as {@link CronTrigger#CronTrigger(String)}.
* <p>Note that such explicit time zone customization is usually not necessary,
* using {@link org.springframework.scheduling.TaskScheduler#getClock()} instead.
* @param expression a space-separated list of time fields, following cron
* expression conventions
* @param timeZone a time zone in which the trigger times will be generated
*/
public CronTrigger(String expression, TimeZone timeZone) {
this(expression, timeZone.toZoneId());
this.expression = CronExpression.parse(expression);
Assert.notNull(timeZone, "TimeZone must not be null");
this.zoneId = timeZone.toZoneId();
}

/**
* Build a {@code CronTrigger} from the pattern provided in the given time zone.
* Build a {@code CronTrigger} from the pattern provided in the given time zone,
* with the same lenient execution as {@link CronTrigger#CronTrigger(String)}.
* <p>Note that such explicit time zone customization is usually not necessary,
* using {@link org.springframework.scheduling.TaskScheduler#getClock()} instead.
* @param expression a space-separated list of time fields, following cron
* expression conventions
* @param zoneId a time zone in which the trigger times will be generated
* @since 5.3
* @see CronExpression#parse(String)
*/
public CronTrigger(String expression, ZoneId zoneId) {
Assert.hasLength(expression, "Expression must not be empty");
Assert.notNull(zoneId, "ZoneId must not be null");

this.expression = CronExpression.parse(expression);
Assert.notNull(zoneId, "ZoneId must not be null");
this.zoneId = zoneId;
}

Expand All @@ -94,22 +107,32 @@ public String getExpression() {
*/
@Override
public Instant nextExecution(TriggerContext triggerContext) {
Instant instant = triggerContext.lastCompletion();
if (instant != null) {
Instant timestamp = determineLatestTimestamp(triggerContext);
ZoneId zone = (this.zoneId != null ? this.zoneId : triggerContext.getClock().getZone());
ZonedDateTime zonedTimestamp = ZonedDateTime.ofInstant(timestamp, zone);
ZonedDateTime nextTimestamp = this.expression.next(zonedTimestamp);
return (nextTimestamp != null ? nextTimestamp.toInstant() : null);
}

Instant determineLatestTimestamp(TriggerContext triggerContext) {
Instant timestamp = triggerContext.lastCompletion();
if (timestamp != null) {
Instant scheduled = triggerContext.lastScheduledExecution();
if (scheduled != null && instant.isBefore(scheduled)) {
if (scheduled != null && timestamp.isBefore(scheduled)) {
// Previous task apparently executed too early...
// Let's simply use the last calculated execution time then,
// in order to prevent accidental re-fires in the same second.
instant = scheduled;
timestamp = scheduled;
}
}
else {
instant = triggerContext.getClock().instant();
timestamp = determineInitialTimestamp(triggerContext);
}
ZonedDateTime dateTime = ZonedDateTime.ofInstant(instant, this.zoneId);
ZonedDateTime next = this.expression.next(dateTime);
return (next != null ? next.toInstant() : null);
return timestamp;
}

Instant determineInitialTimestamp(TriggerContext triggerContext) {
return triggerContext.getClock().instant();
}


Expand All @@ -129,4 +152,99 @@ public String toString() {
return this.expression.toString();
}


/**
* Create a {@link CronTrigger} for lenient execution, to be rescheduled
* after every task based on the completion time.
* <p>This variant does not make up for missed trigger firings if the
* associated task has taken too long. As a consequence, original trigger
* firings may be skipped if the previous task is still running.
* <p>This is equivalent to the regular {@link CronTrigger} constructor.
* Note that lenient execution is scheduler-dependent: it may skip trigger
* firings with long-running tasks on a thread pool while executing at
* {@link #forFixedExecution}-like precision with new threads per task.
* @param expression a space-separated list of time fields, following cron
* expression conventions
* @since 6.1.3
* @see #resumeLenientExecution
*/
public static CronTrigger forLenientExecution(String expression) {
return new CronTrigger(expression);
}

/**
* Create a {@link CronTrigger} for lenient execution, to be rescheduled
* after every task based on the completion time.
* <p>This variant does not make up for missed trigger firings if the
* associated task has taken too long. As a consequence, original trigger
* firings may be skipped if the previous task is still running.
* @param expression a space-separated list of time fields, following cron
* expression conventions
* @param resumptionTimestamp the timestamp to resume from (the last-known
* completion timestamp), with the new trigger calculated from there and
* possibly immediately firing (but only once, every subsequent calculation
* will start from the completion time of that first resumed trigger)
* @since 6.1.3
* @see #forLenientExecution
*/
public static CronTrigger resumeLenientExecution(String expression, Instant resumptionTimestamp) {
return new CronTrigger(expression) {
@Override
Instant determineInitialTimestamp(TriggerContext triggerContext) {
return resumptionTimestamp;
}
};
}

/**
* Create a {@link CronTrigger} for fixed execution, to be rescheduled
* after every task based on the last scheduled time.
* <p>This variant makes up for missed trigger firings if the associated task
* has taken too long, scheduling a task for every original trigger firing.
* Such follow-up tasks may execute late but will never be skipped.
* <p>Immediate versus late execution in case of long-running tasks may
* be scheduler-dependent but the guarantee to never skip a task is portable.
* @param expression a space-separated list of time fields, following cron
* expression conventions
* @since 6.1.3
* @see #resumeFixedExecution
*/
public static CronTrigger forFixedExecution(String expression) {
return new CronTrigger(expression) {
@Override
protected Instant determineLatestTimestamp(TriggerContext triggerContext) {
Instant scheduled = triggerContext.lastScheduledExecution();
return (scheduled != null ? scheduled : super.determineInitialTimestamp(triggerContext));
}
};
}

/**
* Create a {@link CronTrigger} for fixed execution, to be rescheduled
* after every task based on the last scheduled time.
* <p>This variant makes up for missed trigger firings if the associated task
* has taken too long, scheduling a task for every original trigger firing.
* Such follow-up tasks may execute late but will never be skipped.
* @param expression a space-separated list of time fields, following cron
* expression conventions
* @param resumptionTimestamp the timestamp to resume from (the last-known
* scheduled timestamp), with every trigger in-between immediately firing
* to make up for every execution that would have happened in the meantime
* @since 6.1.3
* @see #forFixedExecution
*/
public static CronTrigger resumeFixedExecution(String expression, Instant resumptionTimestamp) {
return new CronTrigger(expression) {
@Override
protected Instant determineLatestTimestamp(TriggerContext triggerContext) {
Instant scheduled = triggerContext.lastScheduledExecution();
return (scheduled != null ? scheduled : super.determineLatestTimestamp(triggerContext));
}
@Override
Instant determineInitialTimestamp(TriggerContext triggerContext) {
return resumptionTimestamp;
}
};
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.scheduling.concurrent;

import java.time.Clock;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import org.springframework.core.testfixture.EnabledForTestGroups;
import org.springframework.scheduling.support.CronTrigger;

import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.core.testfixture.TestGroup.LONG_RUNNING;

/**
* @author Juergen Hoeller
* @since 6.1.3
*/
@EnabledForTestGroups(LONG_RUNNING)
class CronTriggerExecutionTests {

ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();

AtomicInteger count = new AtomicInteger();

Runnable quick = count::incrementAndGet;

Runnable slow = () -> {
count.incrementAndGet();
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
};


@BeforeEach
void initialize() {
scheduler.initialize();
}

@AfterEach
void shutdown() {
scheduler.shutdown();
}


@Test
void forLenientExecutionQuick() throws Exception {
scheduler.schedule(quick, CronTrigger.forLenientExecution("*/1 * * * * *"));
Thread.sleep(2000);
assertThat(count.get()).isEqualTo(2);
}

@Test
void forLenientExecutionSlow() throws Exception {
scheduler.schedule(slow, CronTrigger.forLenientExecution("*/1 * * * * *"));
Thread.sleep(2000);
assertThat(count.get()).isEqualTo(1);
}

@Test
void forFixedExecutionQuick() throws Exception {
scheduler.schedule(quick, CronTrigger.forFixedExecution("*/1 * * * * *"));
Thread.sleep(2000);
assertThat(count.get()).isEqualTo(2);
}

@Test
void forFixedExecutionSlow() throws Exception {
scheduler.schedule(slow, CronTrigger.forFixedExecution("*/1 * * * * *"));
Thread.sleep(2000);
assertThat(count.get()).isEqualTo(2);
}

@Test
void resumeLenientExecution() throws Exception {
scheduler.schedule(quick, CronTrigger.resumeLenientExecution("*/1 * * * * *",
Clock.systemDefaultZone().instant().minusSeconds(2)));
Thread.sleep(1000);
assertThat(count.get()).isEqualTo(2);
}

@Test
void resumeFixedExecution() throws Exception {
scheduler.schedule(quick, CronTrigger.resumeFixedExecution("*/1 * * * * *",
Clock.systemDefaultZone().instant().minusSeconds(2)));
Thread.sleep(1000);
assertThat(count.get()).isEqualTo(3);
}

}

0 comments on commit fb4fbea

Please sign in to comment.