Skip to content

Commit

Permalink
Support scheduler qualifier for reactive SubscribingRunnable as well
Browse files Browse the repository at this point in the history
  • Loading branch information
jhoeller committed Jul 8, 2023
1 parent a0c80ff commit 5243c22
Showing 1 changed file with 23 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.springframework.core.ReactiveAdapter;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.scheduling.support.DefaultScheduledTaskObservationConvention;
import org.springframework.scheduling.support.ScheduledTaskObservationContext;
import org.springframework.scheduling.support.ScheduledTaskObservationConvention;
Expand Down Expand Up @@ -120,8 +121,10 @@ public static Runnable createSubscriptionRunnable(Method method, Object targetBe

boolean shouldBlock = (scheduled.fixedDelay() > 0 || StringUtils.hasText(scheduled.fixedDelayString()));
Publisher<?> publisher = getPublisherFor(method, targetBean);
Supplier<ScheduledTaskObservationContext> contextSupplier = () -> new ScheduledTaskObservationContext(targetBean, method);
return new SubscribingRunnable(publisher, shouldBlock, subscriptionTrackerRegistry, observationRegistrySupplier, contextSupplier);
Supplier<ScheduledTaskObservationContext> contextSupplier =
() -> new ScheduledTaskObservationContext(targetBean, method);
return new SubscribingRunnable(publisher, shouldBlock, scheduled.scheduler(),
subscriptionTrackerRegistry, observationRegistrySupplier, contextSupplier);
}

/**
Expand Down Expand Up @@ -180,30 +183,43 @@ static Publisher<?> getPublisherFor(Method method, Object bean) {
* Utility implementation of {@code Runnable} that subscribes to a {@code Publisher}
* or subscribes-then-blocks if {@code shouldBlock} is set to {@code true}.
*/
static final class SubscribingRunnable implements Runnable {
static final class SubscribingRunnable implements SchedulingAwareRunnable {

private final Publisher<?> publisher;
private static final ScheduledTaskObservationConvention DEFAULT_CONVENTION =
new DefaultScheduledTaskObservationConvention();

private static final ScheduledTaskObservationConvention DEFAULT_CONVENTION = new DefaultScheduledTaskObservationConvention();
private final Publisher<?> publisher;

final boolean shouldBlock;

@Nullable
private final String qualifier;

private final List<Runnable> subscriptionTrackerRegistry;

final Supplier<ObservationRegistry> observationRegistrySupplier;

final Supplier<ScheduledTaskObservationContext> contextSupplier;

SubscribingRunnable(Publisher<?> publisher, boolean shouldBlock, List<Runnable> subscriptionTrackerRegistry,
Supplier<ObservationRegistry> observationRegistrySupplier, Supplier<ScheduledTaskObservationContext> contextSupplier) {
SubscribingRunnable(Publisher<?> publisher, boolean shouldBlock,
@Nullable String qualifier, List<Runnable> subscriptionTrackerRegistry,
Supplier<ObservationRegistry> observationRegistrySupplier,
Supplier<ScheduledTaskObservationContext> contextSupplier) {

this.publisher = publisher;
this.shouldBlock = shouldBlock;
this.qualifier = qualifier;
this.subscriptionTrackerRegistry = subscriptionTrackerRegistry;
this.observationRegistrySupplier = observationRegistrySupplier;
this.contextSupplier = contextSupplier;
}

@Override
@Nullable
public String getQualifier() {
return this.qualifier;
}

@Override
public void run() {
Observation observation = TASKS_SCHEDULED_EXECUTION.observation(null, DEFAULT_CONVENTION,
Expand Down

0 comments on commit 5243c22

Please sign in to comment.