Skip to content

Commit

Permalink
SimpleAsyncTaskScheduler runs fixed-delay tasks on scheduler thread
Browse files Browse the repository at this point in the history
Closes gh-31334
  • Loading branch information
jhoeller committed Sep 28, 2023
1 parent 86b764d commit ef61b4e
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@
* separate thread. This is an attractive choice with virtual threads on JDK 21,
* expecting common usage with {@link #setVirtualThreads setVirtualThreads(true)}.
*
* <p><b>NOTE: Scheduling with a fixed delay enforces execution on the single
* scheduler thread, in order to provide traditional fixed-delay semantics!</b>
* Prefer the use of fixed rates or cron triggers instead which are a better fit
* with this thread-per-task scheduler variant.
*
* <p>Supports a graceful shutdown through {@link #setTaskTerminationTimeout},
* at the expense of task tracking overhead per execution thread at runtime.
* Supports limiting concurrent threads through {@link #setConcurrencyLimit}.
Expand Down Expand Up @@ -234,7 +239,8 @@ public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Duration period) {
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Instant startTime, Duration delay) {
Duration initialDelay = Duration.between(this.clock.instant(), startTime);
try {
return this.scheduledExecutor.scheduleWithFixedDelay(scheduledTask(task),
// Blocking task on scheduler thread for fixed delay semantics
return this.scheduledExecutor.scheduleWithFixedDelay(task,
NANO.convert(initialDelay), NANO.convert(delay), NANO);
}
catch (RejectedExecutionException ex) {
Expand All @@ -245,7 +251,8 @@ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Instant startTim
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Duration delay) {
try {
return this.scheduledExecutor.scheduleWithFixedDelay(scheduledTask(task),
// Blocking task on scheduler thread for fixed delay semantics
return this.scheduledExecutor.scheduleWithFixedDelay(task,
0, NANO.convert(delay), NANO);
}
catch (RejectedExecutionException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,20 @@ public void withQualifiedSchedulerAndPlaceholder() throws InterruptedException {

Thread.sleep(110);
assertThat(ctx.getBean(AtomicInteger.class).get()).isGreaterThanOrEqualTo(10);
assertThat(ctx.getBean(QualifiedExplicitSchedulerConfigWithPlaceholder.class).threadName).startsWith("explicitScheduler1");
assertThat(ctx.getBean(QualifiedExplicitSchedulerConfigWithPlaceholder.class).threadName)
.startsWith("explicitScheduler1").isNotEqualTo("explicitScheduler1-1");
}

@Test
@EnabledForTestGroups(LONG_RUNNING)
public void withQualifiedSchedulerWithFixedDelayTask() throws InterruptedException {
ctx = new AnnotationConfigApplicationContext(QualifiedExplicitSchedulerConfigWithFixedDelayTask.class);
assertThat(ctx.getBean(ScheduledTaskHolder.class).getScheduledTasks()).hasSize(1);

Thread.sleep(110);
assertThat(ctx.getBean(AtomicInteger.class).get()).isBetween(4, 5);
assertThat(ctx.getBean(QualifiedExplicitSchedulerConfigWithFixedDelayTask.class).threadName)
.isEqualTo("explicitScheduler1-1");
}

@Test
Expand Down Expand Up @@ -228,7 +241,20 @@ public void withInitiallyDelayedFixedRateTask() throws InterruptedException {

// The @Scheduled method should have been called several times
// but not more times than the delay allows.
assertThat(counter.get()).isBetween(2, 10);
assertThat(counter.get()).isBetween(6, 10);
}

@Test
@EnabledForTestGroups(LONG_RUNNING)
public void withInitiallyDelayedFixedDelayTask() throws InterruptedException {
ctx = new AnnotationConfigApplicationContext(FixedDelayTaskConfig_withInitialDelay.class);

Thread.sleep(1950);
AtomicInteger counter = ctx.getBean(AtomicInteger.class);

// The @Scheduled method should have been called several times
// but not more times than the delay allows.
assertThat(counter.get()).isBetween(1, 5);
}

@Test
Expand Down Expand Up @@ -333,14 +359,14 @@ static class AmbiguousExplicitSchedulerConfig {
@Bean
public TaskScheduler taskScheduler1() {
SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler();
scheduler.setThreadNamePrefix("explicitScheduler1");
scheduler.setThreadNamePrefix("explicitScheduler1-");
return scheduler;
}

@Bean
public TaskScheduler taskScheduler2() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("explicitScheduler2");
scheduler.setThreadNamePrefix("explicitScheduler2-");
return scheduler;
}

Expand All @@ -359,14 +385,14 @@ static class ExplicitScheduledTaskRegistrarConfig implements SchedulingConfigure
@Bean
public TaskScheduler taskScheduler1() {
SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler();
scheduler.setThreadNamePrefix("explicitScheduler1");
scheduler.setThreadNamePrefix("explicitScheduler1-");
return scheduler;
}

@Bean
public TaskScheduler taskScheduler2() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("explicitScheduler2");
scheduler.setThreadNamePrefix("explicitScheduler2-");
return scheduler;
}

Expand Down Expand Up @@ -397,14 +423,14 @@ static class QualifiedExplicitSchedulerConfig {
@Bean @Qualifier("myScheduler")
public TaskScheduler taskScheduler1() {
SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler();
scheduler.setThreadNamePrefix("explicitScheduler1");
scheduler.setThreadNamePrefix("explicitScheduler1-");
return scheduler;
}

@Bean
public TaskScheduler taskScheduler2() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("explicitScheduler2");
scheduler.setThreadNamePrefix("explicitScheduler2-");
return scheduler;
}

Expand All @@ -414,9 +440,10 @@ public AtomicInteger counter() {
}

@Scheduled(fixedRate = 10, scheduler = "myScheduler")
public void task() {
public void task() throws InterruptedException {
threadName = Thread.currentThread().getName();
counter().incrementAndGet();
Thread.sleep(10);
}
}

Expand All @@ -430,14 +457,14 @@ static class QualifiedExplicitSchedulerConfigWithPlaceholder {
@Bean @Qualifier("myScheduler")
public TaskScheduler taskScheduler1() {
SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler();
scheduler.setThreadNamePrefix("explicitScheduler1");
scheduler.setThreadNamePrefix("explicitScheduler1-");
return scheduler;
}

@Bean
public TaskScheduler taskScheduler2() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("explicitScheduler2");
scheduler.setThreadNamePrefix("explicitScheduler2-");
return scheduler;
}

Expand All @@ -447,9 +474,10 @@ public AtomicInteger counter() {
}

@Scheduled(fixedRate = 10, scheduler = "${scheduler}")
public void task() {
public void task() throws InterruptedException {
threadName = Thread.currentThread().getName();
counter().incrementAndGet();
Thread.sleep(10);
}

@Bean
Expand All @@ -463,21 +491,55 @@ public static PropertySourcesPlaceholderConfigurer placeholderConfigurer() {
}


@Configuration
@EnableScheduling
static class QualifiedExplicitSchedulerConfigWithFixedDelayTask {

String threadName;

@Bean @Qualifier("myScheduler")
public TaskScheduler taskScheduler1() {
SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler();
scheduler.setThreadNamePrefix("explicitScheduler1-");
return scheduler;
}

@Bean
public TaskScheduler taskScheduler2() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("explicitScheduler2-");
return scheduler;
}

@Bean
public AtomicInteger counter() {
return new AtomicInteger();
}

@Scheduled(fixedDelay = 10, scheduler = "myScheduler")
public void task() throws InterruptedException {
threadName = Thread.currentThread().getName();
counter().incrementAndGet();
Thread.sleep(10);
}
}


@Configuration
@EnableScheduling
static class SchedulingEnabled_withAmbiguousTaskSchedulers_butNoActualTasks {

@Bean
public TaskScheduler taskScheduler1() {
SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler();
scheduler.setThreadNamePrefix("explicitScheduler1");
scheduler.setThreadNamePrefix("explicitScheduler1-");
return scheduler;
}

@Bean
public TaskScheduler taskScheduler2() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("explicitScheduler2");
scheduler.setThreadNamePrefix("explicitScheduler2-");
return scheduler;
}
}
Expand All @@ -494,15 +556,15 @@ public void task() {
@Bean
public TaskScheduler taskScheduler1() {
SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler();
scheduler.setThreadNamePrefix("explicitScheduler1");
scheduler.setThreadNamePrefix("explicitScheduler1-");
scheduler.setConcurrencyLimit(1);
return scheduler;
}

@Bean
public TaskScheduler taskScheduler2() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("explicitScheduler2");
scheduler.setThreadNamePrefix("explicitScheduler2-");
return scheduler;
}
}
Expand Down Expand Up @@ -620,8 +682,26 @@ public AtomicInteger counter() {
}

@Scheduled(initialDelay = 1000, fixedRate = 100)
public void task() {
public void task() throws InterruptedException {
counter().incrementAndGet();
Thread.sleep(100);
}
}


@Configuration
@EnableScheduling
static class FixedDelayTaskConfig_withInitialDelay {

@Bean
public AtomicInteger counter() {
return new AtomicInteger();
}

@Scheduled(initialDelay = 1000, fixedDelay = 100)
public void task() throws InterruptedException {
counter().incrementAndGet();
Thread.sleep(100);
}
}

Expand Down

0 comments on commit ef61b4e

Please sign in to comment.