Skip to content

Commit

Permalink
Sophisticated lifecycle support for ThreadPoolTaskExecutor/Scheduler
Browse files Browse the repository at this point in the history
Pause/resume capability through SmartLifecycle implementation.
ContextClosedEvent triggers early executor/scheduler shutdown.
Programmatic initiateShutdown() option for custom early shutdown.

Closes gh-30831
Closes gh-27090
Closes gh-24497
  • Loading branch information
jhoeller committed Jul 7, 2023
1 parent 029a69f commit b12115b
Show file tree
Hide file tree
Showing 7 changed files with 308 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,20 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.SmartLifecycle;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.lang.Nullable;

/**
Expand All @@ -50,7 +57,8 @@
*/
@SuppressWarnings("serial")
public abstract class ExecutorConfigurationSupport extends CustomizableThreadFactory
implements BeanNameAware, InitializingBean, DisposableBean {
implements BeanNameAware, ApplicationContextAware, InitializingBean, DisposableBean,
SmartLifecycle, ApplicationListener<ContextClosedEvent> {

protected final Log logger = LogFactory.getLog(getClass());

Expand All @@ -60,16 +68,34 @@ public abstract class ExecutorConfigurationSupport extends CustomizableThreadFac

private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();

private boolean acceptTasksAfterContextClose = false;

private boolean waitForTasksToCompleteOnShutdown = false;

private long awaitTerminationMillis = 0;

private int phase = DEFAULT_PHASE;

@Nullable
private String beanName;

@Nullable
private ApplicationContext applicationContext;

@Nullable
private ExecutorService executor;

private final ReentrantLock pauseLock = new ReentrantLock();

private final Condition unpaused = this.pauseLock.newCondition();

private volatile boolean paused;

private int executingTaskCount = 0;

@Nullable
private Runnable stopCallback;


/**
* Set the ThreadFactory to use for the ExecutorService's thread pool.
Expand Down Expand Up @@ -105,12 +131,32 @@ public void setRejectedExecutionHandler(@Nullable RejectedExecutionHandler rejec
(rejectedExecutionHandler != null ? rejectedExecutionHandler : new ThreadPoolExecutor.AbortPolicy());
}

/**
* Set whether to accept further tasks after the application context close phase
* has begun.
* <p>Default is {@code false} as of 6.1, triggering an early soft shutdown of
* the executor and therefore rejecting any further task submissions. Switch this
* to {@code true} in order to let other components submit tasks even during their
* own destruction callbacks, at the expense of a longer shutdown phase.
* This will usually go along with
* {@link #setWaitForTasksToCompleteOnShutdown "waitForTasksToCompleteOnShutdown"}.
* <p>This flag will only have effect when the executor is running in a Spring
* application context and able to receive the {@link ContextClosedEvent}.
* @since 6.1
* @see org.springframework.context.ConfigurableApplicationContext#close()
* @see DisposableBean#destroy()
* @see #shutdown()
*/
public void setAcceptTasksAfterContextClose(boolean acceptTasksAfterContextClose) {
this.acceptTasksAfterContextClose = acceptTasksAfterContextClose;
}

/**
* Set whether to wait for scheduled tasks to complete on shutdown,
* not interrupting running tasks and executing all tasks in the queue.
* <p>Default is "false", shutting down immediately through interrupting
* ongoing tasks and clearing the queue. Switch this flag to "true" if you
* prefer fully completed tasks at the expense of a longer shutdown phase.
* <p>Default is {@code false}, shutting down immediately through interrupting
* ongoing tasks and clearing the queue. Switch this flag to {@code true} if
* you prefer fully completed tasks at the expense of a longer shutdown phase.
* <p>Note that Spring's container shutdown continues while ongoing tasks
* are being completed. If you want this executor to block and wait for the
* termination of tasks before the rest of the container continues to shut
Expand Down Expand Up @@ -161,11 +207,36 @@ public void setAwaitTerminationMillis(long awaitTerminationMillis) {
this.awaitTerminationMillis = awaitTerminationMillis;
}

/**
* Specify the lifecycle phase for pausing and resuming this executor.
* The default is {@link #DEFAULT_PHASE}.
* @since 6.1
* @see SmartLifecycle#getPhase()
*/
public void setPhase(int phase) {
this.phase = phase;
}

/**
* Return the lifecycle phase for pausing and resuming this executor.
* @since 6.1
* @see #setPhase
*/
@Override
public int getPhase() {
return this.phase;
}

@Override
public void setBeanName(String name) {
this.beanName = name;
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}


/**
* Calls {@code initialize()} after the container applied all property values.
Expand Down Expand Up @@ -211,9 +282,33 @@ public void destroy() {
}

/**
* Perform a shutdown on the underlying ExecutorService.
* Initiate a shutdown on the underlying ExecutorService,
* rejecting further task submissions.
* <p>The executor will not accept further tasks and will prevent further
* scheduling of periodic tasks, letting existing tasks complete still.
* This step is non-blocking and can be applied as an early shutdown signal
* before following up with a full {@link #shutdown()} call later on.
* @since 6.1
* @see #shutdown()
* @see java.util.concurrent.ExecutorService#shutdown()
*/
public void initiateShutdown() {
if (this.executor != null) {
this.executor.shutdown();
}
}

/**
* Perform a full shutdown on the underlying ExecutorService,
* according to the corresponding configuration settings.
* <p>This step potentially blocks for the configured termination period,
* waiting for remaining tasks to complete. For an early shutdown signal
* to not accept further tasks, call {@link #initiateShutdown()} first.
* @see #setWaitForTasksToCompleteOnShutdown
* @see #setAwaitTerminationMillis
* @see java.util.concurrent.ExecutorService#shutdown()
* @see java.util.concurrent.ExecutorService#shutdownNow()
* @see java.util.concurrent.ExecutorService#awaitTermination
*/
public void shutdown() {
if (logger.isDebugEnabled()) {
Expand Down Expand Up @@ -270,4 +365,136 @@ private void awaitTerminationIfNecessary(ExecutorService executor) {
}
}


/**
* Resume this executor if paused before (otherwise a no-op).
* @since 6.1
*/
@Override
public void start() {
this.pauseLock.lock();
try {
this.paused = false;
this.unpaused.signalAll();
}
finally {
this.pauseLock.unlock();
}
}

/**
* Pause this executor, not waiting for tasks to complete.
* @since 6.1
*/
@Override
public void stop() {
this.pauseLock.lock();
try {
this.paused = true;
this.stopCallback = null;
}
finally {
this.pauseLock.unlock();
}
}

/**
* Pause this executor, triggering the given callback
* once all currently executing tasks have completed.
* @since 6.1
*/
@Override
public void stop(Runnable callback) {
this.pauseLock.lock();
try {
this.paused = true;
if (this.executingTaskCount == 0) {
this.stopCallback = null;
callback.run();
}
else {
this.stopCallback = callback;
}
}
finally {
this.pauseLock.unlock();
}
}

/**
* Check whether this executor is not paused and has not been shut down either.
* @since 6.1
* @see #start()
* @see #stop()
*/
@Override
public boolean isRunning() {
return (this.executor != null && !this.executor.isShutdown() & !this.paused);
}

/**
* A before-execute callback for framework subclasses to delegate to
* (for start/stop handling), and possibly also for custom subclasses
* to extend (making sure to call this implementation as well).
* @param thread the thread to run the task
* @param task the task to be executed
* @since 6.1
* @see ThreadPoolExecutor#beforeExecute(Thread, Runnable)
*/
protected void beforeExecute(Thread thread, Runnable task) {
this.pauseLock.lock();
try {
while (this.paused && this.executor != null && !this.executor.isShutdown()) {
this.unpaused.await();
}
}
catch (InterruptedException ex) {
thread.interrupt();
}
finally {
this.executingTaskCount++;
this.pauseLock.unlock();
}
}

/**
* An after-execute callback for framework subclasses to delegate to
* (for start/stop handling), and possibly also for custom subclasses
* to extend (making sure to call this implementation as well).
* @param task the task that has been executed
* @param ex the exception thrown during execution, if any
* @since 6.1
* @see ThreadPoolExecutor#afterExecute(Runnable, Throwable)
*/
protected void afterExecute(Runnable task, @Nullable Throwable ex) {
this.pauseLock.lock();
try {
this.executingTaskCount--;
if (this.executingTaskCount == 0) {
Runnable callback = this.stopCallback;
if (callback != null) {
callback.run();
this.stopCallback = null;
}
}
}
finally {
this.pauseLock.unlock();
}
}

/**
* {@link ContextClosedEvent} handler for initiating an early shutdown.
* @since 6.1
* @see #initiateShutdown()
*/
@Override
public void onApplicationEvent(ContextClosedEvent event) {
if (event.getApplicationContext() == this.applicationContext && !this.acceptTasksAfterContextClose) {
// Early shutdown signal: accept no further tasks, let existing tasks complete
// before hitting the actual destruction step in the shutdown() method above.
initiateShutdown();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,16 @@ protected ExecutorService initializeExecutor(
protected ScheduledExecutorService createExecutor(
int poolSize, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

return new ScheduledThreadPoolExecutor(poolSize, threadFactory, rejectedExecutionHandler);
return new ScheduledThreadPoolExecutor(poolSize, threadFactory, rejectedExecutionHandler) {
@Override
protected void beforeExecute(Thread thread, Runnable task) {
ScheduledExecutorFactoryBean.this.beforeExecute(thread, task);
}
@Override
protected void afterExecute(Runnable task, Throwable ex) {
ScheduledExecutorFactoryBean.this.afterExecute(task, ex);
}
};
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -192,7 +192,16 @@ protected ThreadPoolExecutor createExecutor(
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

return new ThreadPoolExecutor(corePoolSize, maxPoolSize,
keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler);
keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler) {
@Override
protected void beforeExecute(Thread thread, Runnable task) {
ThreadPoolExecutorFactoryBean.this.beforeExecute(thread, task);
}
@Override
protected void afterExecute(Runnable task, Throwable ex) {
ThreadPoolExecutorFactoryBean.this.afterExecute(task, ex);
}
};
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,27 +254,29 @@ protected ExecutorService initializeExecutor(

BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);

ThreadPoolExecutor executor;
if (this.taskDecorator != null) {
executor = new ThreadPoolExecutor(
ThreadPoolExecutor executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler) {
@Override
public void execute(Runnable command) {
Runnable decorated = taskDecorator.decorate(command);
@Override
public void execute(Runnable command) {
Runnable decorated = command;
if (taskDecorator != null) {
decorated = taskDecorator.decorate(command);
if (decorated != command) {
decoratedTaskMap.put(decorated, command);
}
super.execute(decorated);
}
};
}
else {
executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler);

}
super.execute(decorated);
}
@Override
protected void beforeExecute(Thread thread, Runnable task) {
ThreadPoolTaskExecutor.this.beforeExecute(thread, task);
}
@Override
protected void afterExecute(Runnable task, Throwable ex) {
ThreadPoolTaskExecutor.this.afterExecute(task, ex);
}
};

if (this.allowCoreThreadTimeOut) {
executor.allowCoreThreadTimeOut(true);
Expand Down
Loading

0 comments on commit b12115b

Please sign in to comment.