Skip to content

Commit

Permalink
Add option for graceful shutdown (setTaskTerminationTimeout)
Browse files Browse the repository at this point in the history
  • Loading branch information
jhoeller committed Jul 27, 2023
1 parent 78d0dbb commit ce80637
Show file tree
Hide file tree
Showing 4 changed files with 205 additions and 41 deletions.
34 changes: 21 additions & 13 deletions framework-docs/modules/ROOT/pages/integration/scheduling.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -62,22 +62,27 @@ The variants that Spring provides are as follows:
`ConcurrentTaskExecutor` directly. However, if the `ThreadPoolTaskExecutor` is not
flexible enough for your needs, `ConcurrentTaskExecutor` is an alternative.
* `ThreadPoolTaskExecutor`:
This implementation is most commonly used. It exposes bean properties for
configuring a `java.util.concurrent.ThreadPoolExecutor` and wraps it in a `TaskExecutor`.
If you need to adapt to a different kind of `java.util.concurrent.Executor`, we
recommend that you use a `ConcurrentTaskExecutor` instead.
This implementation is most commonly used. It exposes bean properties for configuring
a `java.util.concurrent.ThreadPoolExecutor` and wraps it in a `TaskExecutor`.
If you need to adapt to a different kind of `java.util.concurrent.Executor`,
we recommend that you use a `ConcurrentTaskExecutor` instead.
* `DefaultManagedTaskExecutor`:
This implementation uses a JNDI-obtained `ManagedExecutorService` in a JSR-236
compatible runtime environment (such as a Jakarta EE application server),
replacing a CommonJ WorkManager for that purpose.

As of 6.1, `ThreadPoolTaskExecutor` provides a pause/resume capability and graceful
shutdown through Spring's lifecycle management. There is also a new "virtualThreads"
option on `SimpleAsyncTaskExecutor` which is aligned with JDK 21's Virtual Threads,
as well as a graceful shutdown capability for `SimpleAsyncTaskExecutor` as well.


[[scheduling-task-executor-usage]]
=== Using a `TaskExecutor`

Spring's `TaskExecutor` implementations are used as simple JavaBeans. In the following example,
we define a bean that uses the `ThreadPoolTaskExecutor` to asynchronously print
out a set of messages:
Spring's `TaskExecutor` implementations are commonly used with dependency injection.
In the following example, we define a bean that uses the `ThreadPoolTaskExecutor`
to asynchronously print out a set of messages:

[source,java,indent=0,subs="verbatim,quotes"]
----
Expand Down Expand Up @@ -227,8 +232,8 @@ fixed delay, those methods should be used directly whenever possible. The value
`PeriodicTrigger` implementation is that you can use it within components that rely on
the `Trigger` abstraction. For example, it may be convenient to allow periodic triggers,
cron-based triggers, and even custom trigger implementations to be used interchangeably.
Such a component could take advantage of dependency injection so that you can configure such `Triggers`
externally and, therefore, easily modify or extend them.
Such a component could take advantage of dependency injection so that you can configure
such `Triggers` externally and, therefore, easily modify or extend them.


[[scheduling-task-scheduler-implementations]]
Expand All @@ -238,10 +243,8 @@ As with Spring's `TaskExecutor` abstraction, the primary benefit of the `TaskSch
arrangement is that an application's scheduling needs are decoupled from the deployment
environment. This abstraction level is particularly relevant when deploying to an
application server environment where threads should not be created directly by the
application itself. For such scenarios, Spring provides a `TimerManagerTaskScheduler`
that delegates to a CommonJ `TimerManager` on WebLogic or WebSphere as well as a more recent
`DefaultManagedTaskScheduler` that delegates to a JSR-236 `ManagedScheduledExecutorService`
in a Jakarta EE environment. Both are typically configured with a JNDI lookup.
application itself. For such scenarios, Spring provides a `DefaultManagedTaskScheduler`
that delegates to a JSR-236 `ManagedScheduledExecutorService` in a Jakarta EE environment.

Whenever external thread management is not a requirement, a simpler alternative is
a local `ScheduledExecutorService` setup within the application, which can be adapted
Expand All @@ -251,6 +254,11 @@ to provide common bean-style configuration along the lines of `ThreadPoolTaskExe
These variants work perfectly fine for locally embedded thread pool setups in lenient
application server environments, as well -- in particular on Tomcat and Jetty.

As of 6.1, `ThreadPoolTaskScheduler` provides a pause/resume capability and graceful
shutdown through Spring's lifecycle management. There is also a new option called
`SimpleAsyncTaskScheduler` which is aligned with JDK 21's Virtual Threads, using a
single scheduler thread but firing up a new thread for every scheduled task execution.



[[scheduling-annotation-support]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,19 @@
* A simple implementation of Spring's {@link TaskScheduler} interface, using
* a single scheduler thread and executing every scheduled task in an individual
* separate thread. This is an attractive choice with virtual threads on JDK 21,
* so it is commonly used with {@link #setVirtualThreads setVirtualThreads(true)}.
* expecting common usage with {@link #setVirtualThreads setVirtualThreads(true)}.
*
* <p>Supports a graceful shutdown through {@link #setTaskTerminationTimeout},
* at the expense of task tracking overhead per execution thread at runtime.
* Supports limiting concurrent threads through {@link #setConcurrencyLimit}.
* By default, the number of concurrent task executions is unlimited.
* This allows for dynamic concurrency of scheduled task executions, in contrast
* to {@link ThreadPoolTaskScheduler} which requires a fixed pool size.
*
* <p><b>NOTE: This implementation does not reuse threads!</b> Consider a
* thread-pooling TaskScheduler implementation instead, in particular for
* scheduling a large number of short-lived tasks. Alternatively, on JDK 21,
* consider setting {@link #setVirtualThreads} to {@code true}.
*
* <p>Extends {@link SimpleAsyncTaskExecutor} and can serve as a fully capable
* replacement for it, e.g. as a single shared instance serving as a
Expand All @@ -64,13 +76,14 @@
* @author Juergen Hoeller
* @since 6.1
* @see #setVirtualThreads
* @see #setTargetTaskExecutor
* @see #setTaskTerminationTimeout
* @see #setConcurrencyLimit
* @see SimpleAsyncTaskExecutor
* @see ThreadPoolTaskScheduler
*/
@SuppressWarnings("serial")
public class SimpleAsyncTaskScheduler extends SimpleAsyncTaskExecutor implements TaskScheduler,
ApplicationContextAware, SmartLifecycle, ApplicationListener<ContextClosedEvent>, AutoCloseable {
ApplicationContextAware, SmartLifecycle, ApplicationListener<ContextClosedEvent> {

private static final TimeUnit NANO = TimeUnit.NANOSECONDS;

Expand Down Expand Up @@ -275,6 +288,7 @@ public void close() {
future.cancel(true);
}
}
super.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

Expand All @@ -33,6 +34,7 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import org.springframework.core.task.TaskExecutor;
import org.springframework.core.testfixture.EnabledForTestGroups;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler;
Expand Down Expand Up @@ -66,6 +68,10 @@ public void tearDown() {
}


/*
* Tests compatibility between default executor in TaskSchedulerRouter
* and explicit ThreadPoolTaskScheduler in configuration subclass.
*/
@ParameterizedTest
@ValueSource(classes = {FixedRateTaskConfig.class, FixedRateTaskConfigSubclass.class})
@EnabledForTestGroups(LONG_RUNNING)
Expand All @@ -77,8 +83,14 @@ public void withFixedRateTask(Class<?> configClass) throws InterruptedException
assertThat(ctx.getBean(AtomicInteger.class).get()).isGreaterThanOrEqualTo(10);
}

/*
* Tests compatibility between SimpleAsyncTaskScheduler in regular configuration
* and explicit ThreadPoolTaskScheduler in configuration subclass. This includes
* pause/resume behavior and a controlled shutdown with a 1s termination timeout.
*/
@ParameterizedTest
@ValueSource(classes = {ExplicitSchedulerConfig.class, ExplicitSchedulerConfigSubclass.class})
@Timeout(2) // should actually complete within 1s
@EnabledForTestGroups(LONG_RUNNING)
public void withExplicitScheduler(Class<?> configClass) throws InterruptedException {
ctx = new AnnotationConfigApplicationContext(configClass);
Expand All @@ -96,9 +108,35 @@ public void withExplicitScheduler(Class<?> configClass) throws InterruptedExcept
int count3 = ctx.getBean(AtomicInteger.class).get();
assertThat(count3).isGreaterThanOrEqualTo(20);

TaskExecutor executor = ctx.getBean(TaskExecutor.class);
AtomicInteger count = new AtomicInteger(0);
for (int i = 0; i < 2; i++) {
executor.execute(() -> {
try {
Thread.sleep(10000); // try to break test timeout
}
catch (InterruptedException ex) {
// expected during executor shutdown
try {
Thread.sleep(500);
// should get here within task termination timeout (1000)
count.incrementAndGet();
}
catch (InterruptedException ex2) {
// not expected
}
}
});
}

assertThat(ctx.getBean(ExplicitSchedulerConfig.class).threadName).startsWith("explicitScheduler-");
assertThat(Arrays.asList(ctx.getDefaultListableBeanFactory().getDependentBeans("myTaskScheduler")).contains(
TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)).isTrue();
assertThat(Arrays.asList(ctx.getDefaultListableBeanFactory().getDependentBeans("myTaskScheduler"))
.contains(TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)).isTrue();

// Include executor shutdown in test timeout (2 seconds),
// expecting interruption of the sleeping thread...
ctx.close();
assertThat(count.intValue()).isEqualTo(2);
}

@Test
Expand Down Expand Up @@ -226,6 +264,11 @@ public void task() {

@Configuration
static class FixedRateTaskConfigSubclass extends FixedRateTaskConfig {

@Bean
public TaskScheduler taskScheduler() {
return new ThreadPoolTaskScheduler();
}
}


Expand All @@ -239,6 +282,7 @@ static class ExplicitSchedulerConfig {
public TaskScheduler myTaskScheduler() {
SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler();
scheduler.setThreadNamePrefix("explicitScheduler-");
scheduler.setTaskTerminationTimeout(1000);
return scheduler;
}

Expand All @@ -263,6 +307,8 @@ static class ExplicitSchedulerConfigSubclass extends ExplicitSchedulerConfig {
public TaskScheduler myTaskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("explicitScheduler-");
scheduler.setAwaitTerminationMillis(1000);
scheduler.setPoolSize(2);
return scheduler;
}
}
Expand Down Expand Up @@ -437,6 +483,7 @@ public void task() {
public TaskScheduler taskScheduler1() {
SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler();
scheduler.setThreadNamePrefix("explicitScheduler1");
scheduler.setConcurrencyLimit(1);
return scheduler;
}

Expand Down Expand Up @@ -478,6 +525,7 @@ public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
public TaskScheduler taskScheduler1() {
SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler();
scheduler.setThreadNamePrefix("explicitScheduler1-");
scheduler.setConcurrencyLimit(1);
return scheduler;
}

Expand Down Expand Up @@ -508,6 +556,7 @@ public ThreadAwareWorker worker() {
public TaskScheduler taskScheduler1() {
SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler();
scheduler.setThreadNamePrefix("explicitScheduler1-");
scheduler.setConcurrencyLimit(1);
return scheduler;
}

Expand Down
Loading

0 comments on commit ce80637

Please sign in to comment.