From 00f9bb1edd9747769a8f3b7abe6d2a762f0cc13a Mon Sep 17 00:00:00 2001 From: Marc Philipp Date: Thu, 2 Oct 2025 16:32:06 +0200 Subject: [PATCH 1/2] Introduce `ParallelExecutionInterceptor` and sample implementation The new `junit.jupiter.execution.parallel.config.interceptor.class` configuration parameter allows configuring the fully qualified class name of a `ParallelExecutionInterceptor` implementation that gets called for each executed `TestTask`. The `FixedThreadPoolForTests` sample implementation uses a separate fixed thread pool to run all tasks of type `TEST` with execution mode `CONCURRENT` in. Resolves #3108. --- .../org/junit/jupiter/engine/Constants.java | 13 ++ ...DefaultParallelExecutionConfiguration.java | 27 +-- ...arallelExecutionConfigurationStrategy.java | 39 ++++- ...ltParallelExecutionInterceptorContext.java | 24 +++ ...inPoolHierarchicalTestExecutorService.java | 36 +++- .../ParallelExecutionConfiguration.java | 4 + .../ParallelExecutionInterceptor.java | 157 ++++++++++++++++++ ...elExecutionConfigurationStrategyTests.java | 3 +- ...lHierarchicalTestExecutorServiceTests.java | 9 +- .../ParallelExecutionIntegrationTests.java | 66 +++++++- 10 files changed, 347 insertions(+), 31 deletions(-) create mode 100644 junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/DefaultParallelExecutionInterceptorContext.java create mode 100644 junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ParallelExecutionInterceptor.java diff --git a/junit-jupiter-engine/src/main/java/org/junit/jupiter/engine/Constants.java b/junit-jupiter-engine/src/main/java/org/junit/jupiter/engine/Constants.java index fb08e6e57dee..e7f5470a82af 100644 --- a/junit-jupiter-engine/src/main/java/org/junit/jupiter/engine/Constants.java +++ b/junit-jupiter-engine/src/main/java/org/junit/jupiter/engine/Constants.java @@ -18,6 +18,7 @@ import static org.junit.platform.engine.support.hierarchical.DefaultParallelExecutionConfigurationStrategy.CONFIG_FIXED_MAX_POOL_SIZE_PROPERTY_NAME; import static org.junit.platform.engine.support.hierarchical.DefaultParallelExecutionConfigurationStrategy.CONFIG_FIXED_PARALLELISM_PROPERTY_NAME; import static org.junit.platform.engine.support.hierarchical.DefaultParallelExecutionConfigurationStrategy.CONFIG_FIXED_SATURATE_PROPERTY_NAME; +import static org.junit.platform.engine.support.hierarchical.DefaultParallelExecutionConfigurationStrategy.CONFIG_INTERCEPTOR_CLASS_PROPERTY_NAME; import static org.junit.platform.engine.support.hierarchical.DefaultParallelExecutionConfigurationStrategy.CONFIG_STRATEGY_PROPERTY_NAME; import org.apiguardian.api.API; @@ -39,6 +40,7 @@ import org.junit.jupiter.engine.config.JupiterConfiguration; import org.junit.platform.commons.util.ClassNamePatternFilterUtils; import org.junit.platform.engine.support.hierarchical.ParallelExecutionConfigurationStrategy; +import org.junit.platform.engine.support.hierarchical.ParallelExecutionInterceptor; /** * Collection of constants related to the {@link JupiterTestEngine}. @@ -318,6 +320,17 @@ public final class Constants { public static final String PARALLEL_CONFIG_CUSTOM_CLASS_PROPERTY_NAME = PARALLEL_CONFIG_PREFIX + CONFIG_CUSTOM_CLASS_PROPERTY_NAME; + /** + * Property name used to specify the fully qualified class name of the + * {@link ParallelExecutionInterceptor} to be used regardless of the + * configuration strategy: {@value} + * + * @since 6.1 + */ + @API(status = EXPERIMENTAL, since = "6.1") + public static final String PARALLEL_CONFIG_CONFIG_INTERCEPTOR_CLASS_PROPERTY_NAME = PARALLEL_CONFIG_PREFIX + + CONFIG_INTERCEPTOR_CLASS_PROPERTY_NAME; + /** * Property name used to set the default timeout for all testable and * lifecycle methods: {@value}. diff --git a/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/DefaultParallelExecutionConfiguration.java b/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/DefaultParallelExecutionConfiguration.java index c766436ccc98..8137162f98ae 100644 --- a/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/DefaultParallelExecutionConfiguration.java +++ b/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/DefaultParallelExecutionConfiguration.java @@ -16,24 +16,10 @@ /** * @since 1.3 */ -class DefaultParallelExecutionConfiguration implements ParallelExecutionConfiguration { - - private final int parallelism; - private final int minimumRunnable; - private final int maxPoolSize; - private final int corePoolSize; - private final int keepAliveSeconds; - private final Predicate saturate; - - DefaultParallelExecutionConfiguration(int parallelism, int minimumRunnable, int maxPoolSize, int corePoolSize, - int keepAliveSeconds, Predicate saturate) { - this.parallelism = parallelism; - this.minimumRunnable = minimumRunnable; - this.maxPoolSize = maxPoolSize; - this.corePoolSize = corePoolSize; - this.keepAliveSeconds = keepAliveSeconds; - this.saturate = saturate; - } +record DefaultParallelExecutionConfiguration(int parallelism, int minimumRunnable, int maxPoolSize, int corePoolSize, + int keepAliveSeconds, Predicate saturate, + Class executionInterceptorClass) + implements ParallelExecutionConfiguration { @Override public int getParallelism() { @@ -64,4 +50,9 @@ public int getKeepAliveSeconds() { public Predicate getSaturatePredicate() { return saturate; } + + @Override + public Class getExecutionInterceptorClass() { + return executionInterceptorClass; + } } diff --git a/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/DefaultParallelExecutionConfigurationStrategy.java b/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/DefaultParallelExecutionConfigurationStrategy.java index 47e88a603deb..ecfb68dde6b8 100644 --- a/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/DefaultParallelExecutionConfigurationStrategy.java +++ b/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/DefaultParallelExecutionConfigurationStrategy.java @@ -16,6 +16,7 @@ import java.math.BigDecimal; import java.util.Locale; +import java.util.Optional; import org.apiguardian.api.API; import org.junit.platform.commons.JUnitException; @@ -51,7 +52,7 @@ public ParallelExecutionConfiguration createConfiguration(ConfigurationParameter Boolean::valueOf).orElse(true); return new DefaultParallelExecutionConfiguration(parallelism, parallelism, maxPoolSize, parallelism, - KEEP_ALIVE_SECONDS, __ -> saturate); + KEEP_ALIVE_SECONDS, __ -> saturate, getExecutionInterceptorClass(configurationParameters)); } }, @@ -85,7 +86,7 @@ public ParallelExecutionConfiguration createConfiguration(ConfigurationParameter Boolean::valueOf).orElse(true); return new DefaultParallelExecutionConfiguration(parallelism, parallelism, maxPoolSize, parallelism, - KEEP_ALIVE_SECONDS, __ -> saturate); + KEEP_ALIVE_SECONDS, __ -> saturate, getExecutionInterceptorClass(configurationParameters)); } }, @@ -97,8 +98,8 @@ public ParallelExecutionConfiguration createConfiguration(ConfigurationParameter CUSTOM { @Override public ParallelExecutionConfiguration createConfiguration(ConfigurationParameters configurationParameters) { - String className = configurationParameters.get(CONFIG_CUSTOM_CLASS_PROPERTY_NAME).orElseThrow( - () -> new JUnitException(CONFIG_CUSTOM_CLASS_PROPERTY_NAME + " must be set")); + String className = configurationParameters.get(CONFIG_CUSTOM_CLASS_PROPERTY_NAME) // + .orElseThrow(() -> new JUnitException(CONFIG_CUSTOM_CLASS_PROPERTY_NAME + " must be set")); return ReflectionSupport.tryToLoadClass(className) // .andThenTry(strategyClass -> { Preconditions.condition( @@ -113,6 +114,27 @@ public ParallelExecutionConfiguration createConfiguration(ConfigurationParameter } }; + protected Class getExecutionInterceptorClass( + ConfigurationParameters configurationParameters) { + return getExecutionInterceptorClassFromConfig(configurationParameters) // + .orElse(ParallelExecutionInterceptor.Default.class); + } + + private static Optional> getExecutionInterceptorClassFromConfig( + ConfigurationParameters configurationParameters) { + return configurationParameters.get(CONFIG_INTERCEPTOR_CLASS_PROPERTY_NAME) // + .map(className -> { + Class interceptorClass = ReflectionSupport.tryToLoadClass(className) // + .getNonNullOrThrow(cause -> new JUnitException( + "Failed to load execution interceptor class: " + className, cause)); + Preconditions.condition( + ParallelExecutionInterceptor.class.isAssignableFrom(requireNonNull(interceptorClass)), + CONFIG_INTERCEPTOR_CLASS_PROPERTY_NAME + " does not implement " + + ParallelExecutionConfigurationStrategy.class); + return interceptorClass.asSubclass(ParallelExecutionInterceptor.class); + }); + } + private static final int KEEP_ALIVE_SECONDS = 30; /** @@ -216,6 +238,15 @@ public ParallelExecutionConfiguration createConfiguration(ConfigurationParameter */ public static final String CONFIG_CUSTOM_CLASS_PROPERTY_NAME = "custom.class"; + /** + * Property name used to specify the fully qualified class name of the + * {@link ParallelExecutionInterceptor} to be used regardless of the + * configuration strategy: {@value} + * + * @since 6.1 + */ + public static final String CONFIG_INTERCEPTOR_CLASS_PROPERTY_NAME = "interceptor.class"; + static ParallelExecutionConfigurationStrategy getStrategy(ConfigurationParameters configurationParameters) { return valueOf( configurationParameters.get(CONFIG_STRATEGY_PROPERTY_NAME).orElse("dynamic").toUpperCase(Locale.ROOT)); diff --git a/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/DefaultParallelExecutionInterceptorContext.java b/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/DefaultParallelExecutionInterceptorContext.java new file mode 100644 index 000000000000..4dc8c881f7af --- /dev/null +++ b/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/DefaultParallelExecutionInterceptorContext.java @@ -0,0 +1,24 @@ +/* + * Copyright 2015-2025 the original author or authors. + * + * All rights reserved. This program and the accompanying materials are + * made available under the terms of the Eclipse Public License v2.0 which + * accompanies this distribution and is available at + * + * https://www.eclipse.org/legal/epl-v20.html + */ + +package org.junit.platform.engine.support.hierarchical; + +/** + * @since 6.1 + */ +record DefaultParallelExecutionInterceptorContext(ParallelExecutionConfiguration configuration) + implements ParallelExecutionInterceptor.Context { + + @Override + public ParallelExecutionConfiguration getConfiguration() { + return configuration; + } + +} diff --git a/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ForkJoinPoolHierarchicalTestExecutorService.java b/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ForkJoinPoolHierarchicalTestExecutorService.java index 52bd2e05cf5b..13702412078f 100644 --- a/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ForkJoinPoolHierarchicalTestExecutorService.java +++ b/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ForkJoinPoolHierarchicalTestExecutorService.java @@ -17,6 +17,7 @@ import static org.junit.platform.engine.support.hierarchical.Node.ExecutionMode.SAME_THREAD; import java.io.Serial; +import java.lang.reflect.Constructor; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Deque; @@ -30,8 +31,10 @@ import org.apiguardian.api.API; import org.jspecify.annotations.Nullable; import org.junit.platform.commons.JUnitException; +import org.junit.platform.commons.PreconditionViolationException; import org.junit.platform.commons.logging.LoggerFactory; import org.junit.platform.commons.util.ExceptionUtils; +import org.junit.platform.commons.util.ReflectionUtils; import org.junit.platform.engine.ConfigurationParameters; /** @@ -52,6 +55,7 @@ public class ForkJoinPoolHierarchicalTestExecutorService implements Hierarchical private final TaskEventListener taskEventListener; private final int parallelism; private final ThreadLocal threadLocks = ThreadLocal.withInitial(ThreadLock::new); + private final ParallelExecutionInterceptor interceptor; /** * Create a new {@code ForkJoinPoolHierarchicalTestExecutorService} based on @@ -76,9 +80,10 @@ public ForkJoinPoolHierarchicalTestExecutorService(ParallelExecutionConfiguratio ForkJoinPoolHierarchicalTestExecutorService(ParallelExecutionConfiguration configuration, TaskEventListener taskEventListener) { - forkJoinPool = createForkJoinPool(configuration); + this.forkJoinPool = createForkJoinPool(configuration); + this.interceptor = instantiateInterceptor(configuration); this.taskEventListener = taskEventListener; - parallelism = forkJoinPool.getParallelism(); + this.parallelism = forkJoinPool.getParallelism(); LoggerFactory.getLogger(getClass()).config(() -> "Using ForkJoinPool with parallelism of " + parallelism); } @@ -99,6 +104,30 @@ private ForkJoinPool createForkJoinPool(ParallelExecutionConfiguration configura } } + private static ParallelExecutionInterceptor instantiateInterceptor(ParallelExecutionConfiguration configuration) { + + Class interceptorClass = configuration.getExecutionInterceptorClass(); + + if (interceptorClass.equals(ParallelExecutionInterceptor.Default.class)) { + return ParallelExecutionInterceptor.Default.INSTANCE; + } + + Constructor constructor = ReflectionUtils.getDeclaredConstructor( + interceptorClass); + var parameters = constructor.getParameters(); + var arguments = new Object[parameters.length]; + for (int i = 0; i < arguments.length; i++) { + if (ParallelExecutionInterceptor.Context.class.isAssignableFrom(parameters[i].getType())) { + arguments[i] = new DefaultParallelExecutionInterceptorContext(configuration); + } + else { + throw new PreconditionViolationException("Unable to resolve [%s] in constructor [%s].".formatted( + parameters[i], constructor.toGenericString())); + } + } + return ReflectionUtils.newInstance(constructor, arguments); + } + @Override public Future<@Nullable Void> submit(TestTask testTask) { ExclusiveTask exclusiveTask = new ExclusiveTask(testTask); @@ -189,6 +218,7 @@ private void resubmitDeferredTasks() { @Override public void close() { + interceptor.close(); forkJoinPool.shutdownNow(); } @@ -250,7 +280,7 @@ public boolean exec() { @SuppressWarnings("unused") ThreadLock.NestedResourceLock nested = threadLock.withNesting(lock) // ) { - testTask.execute(); + interceptor.execute(testTask); return true; } catch (InterruptedException e) { diff --git a/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ParallelExecutionConfiguration.java b/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ParallelExecutionConfiguration.java index b7cd9cb64ffe..00206b2ab09c 100644 --- a/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ParallelExecutionConfiguration.java +++ b/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ParallelExecutionConfiguration.java @@ -74,4 +74,8 @@ public interface ParallelExecutionConfiguration { return null; } + default Class getExecutionInterceptorClass() { + return ParallelExecutionInterceptor.Default.class; + } + } diff --git a/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ParallelExecutionInterceptor.java b/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ParallelExecutionInterceptor.java new file mode 100644 index 000000000000..f8569797a55f --- /dev/null +++ b/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ParallelExecutionInterceptor.java @@ -0,0 +1,157 @@ +/* + * Copyright 2015-2025 the original author or authors. + * + * All rights reserved. This program and the accompanying materials are + * made available under the terms of the Eclipse Public License v2.0 which + * accompanies this distribution and is available at + * + * https://www.eclipse.org/legal/epl-v20.html + */ + +package org.junit.platform.engine.support.hierarchical; + +import static org.apiguardian.api.API.Status.EXPERIMENTAL; +import static org.junit.platform.engine.TestDescriptor.Type.TEST; +import static org.junit.platform.engine.support.hierarchical.Node.ExecutionMode.CONCURRENT; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apiguardian.api.API; +import org.junit.platform.commons.util.ExceptionUtils; +import org.junit.platform.engine.TestDescriptor; +import org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutorService.TestTask; +import org.junit.platform.engine.support.hierarchical.Node.ExecutionMode; + +/** + * Interceptor for the execution of {@link TestTask TestTasks}. + * + *

Constructor Requirements

+ * + *

Implementation of this interface must either declare a single constructor + * or rely on the default constructor. If they declare a constructor, it may + * optionally declare a single parameter of type {@link Context}. + * + * @since 6.1 + * @see Context + */ +@API(status = EXPERIMENTAL, since = "6.1") +public interface ParallelExecutionInterceptor extends AutoCloseable { + + /** + * Execute the supplied {@link TestTask}. + * + *

Implementations must ensure the supplied task was executed exactly + * once before this method returns unless execution is interrupted. + * + * @param testTask the task to executed + * @throws InterruptedException in case the current thread was interrupted + * while waiting for the test task to be executed in a different thread + */ + void execute(TestTask testTask) throws InterruptedException; + + /** + * Release any resources held by this interceptor. + */ + @Override + void close(); + + /** + * Context of a {@link ParallelExecutionInterceptor}. + * + * @since 6.1 + */ + @API(status = EXPERIMENTAL, since = "6.1") + sealed interface Context permits DefaultParallelExecutionInterceptorContext { + + /** + * {@return the configuration to use for parallel test execution} + */ + ParallelExecutionConfiguration getConfiguration(); + + } + + /** + * Default interceptor implementation that executes any passed + * {@link TestTask} directly. + * + * @since 6.1 + */ + @API(status = EXPERIMENTAL, since = "6.1") + final class Default implements ParallelExecutionInterceptor { + + public static final Default INSTANCE = new Default(); + + private Default() { + } + + @Override + public void execute(TestTask testTask) { + testTask.execute(); + } + + @Override + public void close() { + // do nothing + } + } + + /** + * Interceptor that runs {@link TestTask TestTasks} with + * {@link ExecutionMode#CONCURRENT ExecutionMode.CONCURRENT} for descriptors + * of type {@link TestDescriptor.Type#TEST TEST} in a separate thread pool + * with a fixed number of threads. + * + * @since 6.1 + */ + @API(status = EXPERIMENTAL, since = "6.1") + final class FixedThreadPoolForTests implements ParallelExecutionInterceptor { + + private final ExecutorService executorService; + + FixedThreadPoolForTests(Context context) { + this.executorService = Executors.newFixedThreadPool(context.getConfiguration().getParallelism()); + } + + @Override + public void execute(TestTask testTask) throws InterruptedException { + if (shouldRunInSeparateThread(testTask)) { + executeInThreadPool(testTask); + } + else { + testTask.execute(); + } + } + + private boolean shouldRunInSeparateThread(TestTask task) { + return task.getExecutionMode() == CONCURRENT // + && task.getTestDescriptor().getType() == TEST; + } + + private void executeInThreadPool(TestTask testTask) throws InterruptedException { + var newContextClassLoader = Thread.currentThread().getContextClassLoader(); + Runnable runnable = () -> { + Thread.currentThread().setContextClassLoader(newContextClassLoader); + testTask.execute(); + }; + try { + CompletableFuture.runAsync(runnable, executorService).get(); + } + catch (ExecutionException ex) { + if (ex.getCause() != null) { + throw ExceptionUtils.throwAsUncheckedException(ex.getCause()); + } + throw ExceptionUtils.throwAsUncheckedException(ex); + } + } + + @Override + public void close() { + executorService.shutdownNow(); + } + + } + +} diff --git a/platform-tests/src/test/java/org/junit/platform/engine/support/hierarchical/DefaultParallelExecutionConfigurationStrategyTests.java b/platform-tests/src/test/java/org/junit/platform/engine/support/hierarchical/DefaultParallelExecutionConfigurationStrategyTests.java index d4683aad6d7b..831427336f19 100644 --- a/platform-tests/src/test/java/org/junit/platform/engine/support/hierarchical/DefaultParallelExecutionConfigurationStrategyTests.java +++ b/platform-tests/src/test/java/org/junit/platform/engine/support/hierarchical/DefaultParallelExecutionConfigurationStrategyTests.java @@ -216,7 +216,8 @@ void customStrategyThrowsExceptionWhenClassDoesNotExist() { static class CustomParallelExecutionConfigurationStrategy implements ParallelExecutionConfigurationStrategy { @Override public ParallelExecutionConfiguration createConfiguration(ConfigurationParameters configurationParameters) { - return new DefaultParallelExecutionConfiguration(1, 2, 3, 4, 5, __ -> true); + return new DefaultParallelExecutionConfiguration(1, 2, 3, 4, 5, __ -> true, + ParallelExecutionInterceptor.Default.class); } } diff --git a/platform-tests/src/test/java/org/junit/platform/engine/support/hierarchical/ForkJoinPoolHierarchicalTestExecutorServiceTests.java b/platform-tests/src/test/java/org/junit/platform/engine/support/hierarchical/ForkJoinPoolHierarchicalTestExecutorServiceTests.java index 5845bd164892..44cbd621fe96 100644 --- a/platform-tests/src/test/java/org/junit/platform/engine/support/hierarchical/ForkJoinPoolHierarchicalTestExecutorServiceTests.java +++ b/platform-tests/src/test/java/org/junit/platform/engine/support/hierarchical/ForkJoinPoolHierarchicalTestExecutorServiceTests.java @@ -55,7 +55,8 @@ class ForkJoinPoolHierarchicalTestExecutorServiceTests { @Test void exceptionsFromInvalidConfigurationAreNotSwallowed() { - var configuration = new DefaultParallelExecutionConfiguration(2, 1, 1, 1, 0, __ -> true); + var configuration = new DefaultParallelExecutionConfiguration(2, 1, 1, 1, 0, __ -> true, + ParallelExecutionInterceptor.Default.class); JUnitException exception = assertThrows(JUnitException.class, () -> { try (var pool = new ForkJoinPoolHierarchicalTestExecutorService(configuration)) { @@ -214,7 +215,8 @@ void defersTasksWithIncompatibleLocksOnMultipleLevels() throws Throwable { var incompatibleTask2 = taskFactory.create("incompatibleTask2", incompatibleLock2); deferred.put(incompatibleTask2, new CountDownLatch(1)); - var configuration = new DefaultParallelExecutionConfiguration(2, 2, 2, 2, 1, __1 -> true); + var configuration = new DefaultParallelExecutionConfiguration(2, 2, 2, 2, 1, __1 -> true, + ParallelExecutionInterceptor.Default.class); withForkJoinPoolHierarchicalTestExecutorService(configuration, taskEventListener, service -> { @@ -246,7 +248,8 @@ void defersTasksWithIncompatibleLocksOnMultipleLevels() throws Throwable { private Map runWithAttemptedWorkStealing(TaskEventListener taskEventListener, DummyTestTask taskToBeStolen, ResourceLock initialLock, Runnable waitAction) throws Throwable { - var configuration = new DefaultParallelExecutionConfiguration(2, 2, 2, 2, 1, __ -> true); + var configuration = new DefaultParallelExecutionConfiguration(2, 2, 2, 2, 1, __ -> true, + ParallelExecutionInterceptor.Default.class); withForkJoinPoolHierarchicalTestExecutorService(configuration, taskEventListener, service -> { diff --git a/platform-tests/src/test/java/org/junit/platform/engine/support/hierarchical/ParallelExecutionIntegrationTests.java b/platform-tests/src/test/java/org/junit/platform/engine/support/hierarchical/ParallelExecutionIntegrationTests.java index 624bd58d4d67..8887a9df8678 100644 --- a/platform-tests/src/test/java/org/junit/platform/engine/support/hierarchical/ParallelExecutionIntegrationTests.java +++ b/platform-tests/src/test/java/org/junit/platform/engine/support/hierarchical/ParallelExecutionIntegrationTests.java @@ -20,6 +20,7 @@ import static org.junit.jupiter.api.parallel.ResourceAccessMode.READ_WRITE; import static org.junit.jupiter.engine.Constants.DEFAULT_CLASSES_EXECUTION_MODE_PROPERTY_NAME; import static org.junit.jupiter.engine.Constants.DEFAULT_PARALLEL_EXECUTION_MODE; +import static org.junit.jupiter.engine.Constants.PARALLEL_CONFIG_CONFIG_INTERCEPTOR_CLASS_PROPERTY_NAME; import static org.junit.jupiter.engine.Constants.PARALLEL_CONFIG_FIXED_MAX_POOL_SIZE_PROPERTY_NAME; import static org.junit.jupiter.engine.Constants.PARALLEL_CONFIG_FIXED_PARALLELISM_PROPERTY_NAME; import static org.junit.jupiter.engine.Constants.PARALLEL_CONFIG_STRATEGY_PROPERTY_NAME; @@ -41,16 +42,21 @@ import java.time.Instant; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.IntStream; import java.util.stream.Stream; import org.assertj.core.api.Condition; +import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.AutoClose; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.DynamicTest; import org.junit.jupiter.api.MethodOrderer.MethodName; @@ -66,6 +72,8 @@ import org.junit.jupiter.api.parallel.Execution; import org.junit.jupiter.api.parallel.Isolated; import org.junit.jupiter.api.parallel.ResourceLock; +import org.junit.jupiter.params.Parameter; +import org.junit.jupiter.params.ParameterizedClass; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.junit.platform.engine.TestDescriptor; @@ -80,8 +88,22 @@ * @since 1.3 */ @SuppressWarnings({ "JUnitMalformedDeclaration", "NewClassNamingConvention" }) +@ParameterizedClass +@ValueSource(classes = { ParallelExecutionInterceptor.Default.class, + ParallelExecutionInterceptor.FixedThreadPoolForTests.class }) class ParallelExecutionIntegrationTests { + @Parameter + Class interceptorClass; + + @Test + void forkJoinPoolCompensatesWhenUserCodeBlocks() { + var events = executeConcurrentlySuccessfully(1, BlockingTestCase.class).list(); + + assertThat(ThreadReporter.getThreadNames(events)) // + .hasSize(interceptorClass == ParallelExecutionInterceptor.Default.class ? 2 : 1); + } + @Test void successfulParallelTest(TestReporter reporter) { var events = executeConcurrentlySuccessfully(3, SuccessfulParallelTestCase.class).list(); @@ -95,7 +117,16 @@ void successfulParallelTest(TestReporter reporter) { assertThat(finishedTimestamps).hasSize(3); assertThat(startedTimestamps).allMatch(startTimestamp -> finishedTimestamps.stream().noneMatch( finishedTimestamp -> finishedTimestamp.isBefore(startTimestamp))); - assertThat(ThreadReporter.getThreadNames(events)).hasSize(3); + + var threadNames = ThreadReporter.getThreadNames(events).toList(); + assertThat(threadNames).hasSize(3); + + if (interceptorClass == ParallelExecutionInterceptor.Default.class) { + assertThat(threadNames).allSatisfy(it -> assertThat(it).startsWith("ForkJoinPool")); + } + else { + assertThat(threadNames).noneSatisfy(it -> assertThat(it).startsWith("ForkJoinPool")); + } } @Test @@ -188,7 +219,8 @@ void executesTestTemplatesWithResourceLocksInSameThread() { var events = executeConcurrentlySuccessfully(2, ConcurrentTemplateTestCase.class).list(); assertThat(events.stream().filter(event(test(), finishedSuccessfully())::matches)).hasSize(10); - assertThat(ThreadReporter.getThreadNames(events)).hasSize(1); + assertThat(ThreadReporter.getThreadNames(events)).hasSize(1) // + .first(InstanceOfAssertFactories.STRING).startsWith("ForkJoinPool"); } @Test @@ -559,6 +591,8 @@ private EngineExecutionResults executeWithFixedParallelism(int parallelism, Map< .configurationParameter(PARALLEL_EXECUTION_ENABLED_PROPERTY_NAME, String.valueOf(true)) // .configurationParameter(PARALLEL_CONFIG_STRATEGY_PROPERTY_NAME, "fixed") // .configurationParameter(PARALLEL_CONFIG_FIXED_PARALLELISM_PROPERTY_NAME, String.valueOf(parallelism)) // + .configurationParameter(PARALLEL_CONFIG_CONFIG_INTERCEPTOR_CLASS_PROPERTY_NAME, + interceptorClass.getName()) // .configurationParameters(configParams) // .execute(); } @@ -1009,4 +1043,32 @@ public void afterTestExecution(ExtensionContext context) { } } + @ExtendWith(ThreadReporter.class) + static class BlockingTestCase { + + @AutoClose + private static ExecutorService executorService; + + final CountDownLatch latch = new CountDownLatch(2); + + @BeforeAll + static void createForkJoinPool() { + executorService = Executors.newWorkStealingPool(1); + } + + @SuppressWarnings("ResultOfMethodCallIgnored") + @RepeatedTest(2) + void test() throws Exception { + CompletableFuture.runAsync(() -> { + try { + latch.countDown(); + latch.await(100, MILLISECONDS); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }, executorService).get(); + } + } + } From 687d661ec777e681dc0a735f113abeabf4ad7af3 Mon Sep 17 00:00:00 2001 From: Marc Philipp Date: Mon, 6 Oct 2025 15:22:49 +0200 Subject: [PATCH 2/2] Check precondition of running in correct ForkJoinPool --- .../ForkJoinPoolHierarchicalTestExecutorService.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ForkJoinPoolHierarchicalTestExecutorService.java b/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ForkJoinPoolHierarchicalTestExecutorService.java index 13702412078f..c0db94c2ff1b 100644 --- a/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ForkJoinPoolHierarchicalTestExecutorService.java +++ b/junit-platform-engine/src/main/java/org/junit/platform/engine/support/hierarchical/ForkJoinPoolHierarchicalTestExecutorService.java @@ -34,6 +34,7 @@ import org.junit.platform.commons.PreconditionViolationException; import org.junit.platform.commons.logging.LoggerFactory; import org.junit.platform.commons.util.ExceptionUtils; +import org.junit.platform.commons.util.Preconditions; import org.junit.platform.commons.util.ReflectionUtils; import org.junit.platform.engine.ConfigurationParameters; @@ -158,10 +159,17 @@ private boolean isAlreadyRunningInForkJoinPool() { @Override public void invokeAll(List tasks) { + if (tasks.size() == 1) { new ExclusiveTask(tasks.get(0)).execSync(); return; } + + // If this method is called from outside the used ForkJoinPool, + // calls to fork() will schedule tasks in the commonPool + Preconditions.condition(isAlreadyRunningInForkJoinPool(), + "invokeAll() must be called from a thread in the ForkJoinPool"); + Deque isolatedTasks = new ArrayDeque<>(); Deque sameThreadTasks = new ArrayDeque<>(); Deque concurrentTasksInReverseOrder = new ArrayDeque<>();