Skip to content

Commit

Permalink
Support for determining a target scheduler for a specific task
Browse files Browse the repository at this point in the history
Introduces "scheduler" attribute on @scheduled annotation.
TaskSchedulerRouter delegates to qualified/default scheduler.
ScheduledMethodRunnable exposes qualifier through SchedulingAwareRunnable.

Closes gh-20818
  • Loading branch information
jhoeller committed Jul 8, 2023
1 parent f0fe58f commit a861453
Show file tree
Hide file tree
Showing 6 changed files with 460 additions and 109 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,8 @@

package org.springframework.scheduling;

import org.springframework.lang.Nullable;

/**
* Extension of the {@link Runnable} interface, adding special callbacks
* for long-running operations.
Expand All @@ -38,7 +40,27 @@ public interface SchedulingAwareRunnable extends Runnable {
* pool (if any) but rather be considered as long-running background thread.
* <p>This should be considered a hint. Of course TaskExecutor implementations
* are free to ignore this flag and the SchedulingAwareRunnable interface overall.
* <p>The default implementation returns {@code false}, as of 6.1.
*/
default boolean isLongLived() {
return false;
}

/**
* Return a qualifier associated with this Runnable.
* <p>The default implementation returns {@code null}.
* <p>May be used for custom purposes depending on the scheduler implementation.
* {@link org.springframework.scheduling.config.TaskSchedulerRouter} introspects
* this qualifier in order to determine the target scheduler to be used
* for a given Runnable, matching the qualifier value (or the bean name)
* of a specific {@link org.springframework.scheduling.TaskScheduler} or
* {@link java.util.concurrent.ScheduledExecutorService} bean definition.
* @since 6.1
* @see org.springframework.scheduling.annotation.Scheduled#scheduler()
*/
boolean isLongLived();
@Nullable
default String getQualifier() {
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -203,4 +203,16 @@
*/
TimeUnit timeUnit() default TimeUnit.MILLISECONDS;

/**
* A qualifier for determining a scheduler to run this scheduled method on.
* <p>Defaults to an empty String, suggesting the default scheduler.
* <p>May be used to determine the target scheduler to be used,
* matching the qualifier value (or the bean name) of a specific
* {@link org.springframework.scheduling.TaskScheduler} or
* {@link java.util.concurrent.ScheduledExecutorService} bean definition.
* @since 6.1
* @see org.springframework.scheduling.SchedulingAwareRunnable#getQualifier()
*/
String scheduler() default "";

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,8 @@
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.ListableBeanFactory;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.NoUniqueBeanDefinitionException;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.beans.factory.config.DestructionAwareBeanPostProcessor;
import org.springframework.beans.factory.config.NamedBeanHolder;
import org.springframework.beans.factory.support.MergedBeanDefinitionPostProcessor;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.context.ApplicationContext;
Expand All @@ -71,6 +66,7 @@
import org.springframework.scheduling.config.ScheduledTask;
import org.springframework.scheduling.config.ScheduledTaskHolder;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.scheduling.config.TaskSchedulerRouter;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.scheduling.support.ScheduledMethodRunnable;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -120,7 +116,7 @@ public class ScheduledAnnotationBeanPostProcessor
* in case of multiple scheduler beans found in the context.
* @since 4.2
*/
public static final String DEFAULT_TASK_SCHEDULER_BEAN_NAME = "taskScheduler";
public static final String DEFAULT_TASK_SCHEDULER_BEAN_NAME = TaskSchedulerRouter.DEFAULT_TASK_SCHEDULER_BEAN_NAME;


/**
Expand Down Expand Up @@ -254,6 +250,12 @@ private void finishRegistration() {
if (this.scheduler != null) {
this.registrar.setScheduler(this.scheduler);
}
else {
TaskSchedulerRouter router = new TaskSchedulerRouter();
router.setBeanName(this.beanName);
router.setBeanFactory(this.beanFactory);
this.registrar.setTaskScheduler(router);
}

if (this.beanFactory instanceof ListableBeanFactory lbf) {
Map<String, SchedulingConfigurer> beans = lbf.getBeansOfType(SchedulingConfigurer.class);
Expand All @@ -264,91 +266,9 @@ private void finishRegistration() {
}
}

if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");
try {
// Search for TaskScheduler bean...
this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
}
catch (NoUniqueBeanDefinitionException ex) {
if (logger.isTraceEnabled()) {
logger.trace("Could not find unique TaskScheduler bean - attempting to resolve by name: " +
ex.getMessage());
}
try {
this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));
}
catch (NoSuchBeanDefinitionException ex2) {
if (logger.isInfoEnabled()) {
logger.info("More than one TaskScheduler bean exists within the context, and " +
"none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
"(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
"ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
ex.getBeanNamesFound());
}
}
}
catch (NoSuchBeanDefinitionException ex) {
if (logger.isTraceEnabled()) {
logger.trace("Could not find default TaskScheduler bean - attempting to find ScheduledExecutorService: " +
ex.getMessage());
}
// Search for ScheduledExecutorService bean next...
try {
this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));
}
catch (NoUniqueBeanDefinitionException ex2) {
if (logger.isTraceEnabled()) {
logger.trace("Could not find unique ScheduledExecutorService bean - attempting to resolve by name: " +
ex2.getMessage());
}
try {
this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));
}
catch (NoSuchBeanDefinitionException ex3) {
if (logger.isInfoEnabled()) {
logger.info("More than one ScheduledExecutorService bean exists within the context, and " +
"none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
"(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
"ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
ex2.getBeanNamesFound());
}
}
}
catch (NoSuchBeanDefinitionException ex2) {
if (logger.isTraceEnabled()) {
logger.trace("Could not find default ScheduledExecutorService bean - falling back to default: " +
ex2.getMessage());
}
// Giving up -> falling back to default scheduler within the registrar...
logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing");
}
}
}

this.registrar.afterPropertiesSet();
}

private <T> T resolveSchedulerBean(BeanFactory beanFactory, Class<T> schedulerType, boolean byName) {
if (byName) {
T scheduler = beanFactory.getBean(DEFAULT_TASK_SCHEDULER_BEAN_NAME, schedulerType);
if (this.beanName != null && this.beanFactory instanceof ConfigurableBeanFactory cbf) {
cbf.registerDependentBean(DEFAULT_TASK_SCHEDULER_BEAN_NAME, this.beanName);
}
return scheduler;
}
else if (beanFactory instanceof AutowireCapableBeanFactory acbf) {
NamedBeanHolder<T> holder = acbf.resolveNamedBean(schedulerType);
if (this.beanName != null && beanFactory instanceof ConfigurableBeanFactory cbf) {
cbf.registerDependentBean(holder.getBeanName(), this.beanName);
}
return holder.getBeanInstance();
}
else {
return beanFactory.getBean(schedulerType);
}
}


@Override
public void postProcessMergedBeanDefinition(RootBeanDefinition beanDefinition, Class<?> beanType, String beanName) {
Expand Down Expand Up @@ -424,12 +344,11 @@ protected void processScheduled(Scheduled scheduled, Method method, Object bean)
* @param scheduled the {@code @Scheduled} annotation
* @param method the method that the annotation has been declared on
* @param bean the target bean instance
* @see #createRunnable(Object, Method)
*/
private void processScheduledSync(Scheduled scheduled, Method method, Object bean) {
Runnable task;
try {
task = createRunnable(bean, method);
task = createRunnable(bean, method, scheduled.scheduler());
}
catch (IllegalArgumentException ex) {
throw new IllegalStateException("Could not create recurring task for @Scheduled method '" +
Expand Down Expand Up @@ -606,13 +525,31 @@ private void processScheduledTask(Scheduled scheduled, Runnable runnable, Method
* <p>The default implementation creates a {@link ScheduledMethodRunnable}.
* @param target the target bean instance
* @param method the scheduled method to call
* @since 5.1
* @see ScheduledMethodRunnable#ScheduledMethodRunnable(Object, Method)
* @since 6.1
*/
protected Runnable createRunnable(Object target, Method method) {
@SuppressWarnings("deprecation")
protected Runnable createRunnable(Object target, Method method, @Nullable String qualifier) {
Runnable runnable = createRunnable(target, method);
if (runnable != null) {
return runnable;
}
Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled");
Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass());
return new ScheduledMethodRunnable(target, invocableMethod, this.registrar::getObservationRegistry);
return new ScheduledMethodRunnable(target, invocableMethod, qualifier, this.registrar::getObservationRegistry);
}

/**
* Create a {@link Runnable} for the given bean instance,
* calling the specified scheduled method.
* @param target the target bean instance
* @param method the scheduled method to call
* @since 5.1
* @deprecated in favor of {@link #createRunnable(Object, Method, String)}
*/
@Deprecated(since = "6.1")
@Nullable
protected Runnable createRunnable(Object target, Method method) {
return null;
}

private static Duration toDuration(long value, TimeUnit timeUnit) {
Expand Down

0 comments on commit a861453

Please sign in to comment.