Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.junit.platform.engine.support.hierarchical.DefaultParallelExecutionConfigurationStrategy.CONFIG_FIXED_MAX_POOL_SIZE_PROPERTY_NAME;
import static org.junit.platform.engine.support.hierarchical.DefaultParallelExecutionConfigurationStrategy.CONFIG_FIXED_PARALLELISM_PROPERTY_NAME;
import static org.junit.platform.engine.support.hierarchical.DefaultParallelExecutionConfigurationStrategy.CONFIG_FIXED_SATURATE_PROPERTY_NAME;
import static org.junit.platform.engine.support.hierarchical.DefaultParallelExecutionConfigurationStrategy.CONFIG_INTERCEPTOR_CLASS_PROPERTY_NAME;
import static org.junit.platform.engine.support.hierarchical.DefaultParallelExecutionConfigurationStrategy.CONFIG_STRATEGY_PROPERTY_NAME;

import org.apiguardian.api.API;
Expand All @@ -39,6 +40,7 @@
import org.junit.jupiter.engine.config.JupiterConfiguration;
import org.junit.platform.commons.util.ClassNamePatternFilterUtils;
import org.junit.platform.engine.support.hierarchical.ParallelExecutionConfigurationStrategy;
import org.junit.platform.engine.support.hierarchical.ParallelExecutionInterceptor;

/**
* Collection of constants related to the {@link JupiterTestEngine}.
Expand Down Expand Up @@ -318,6 +320,17 @@ public final class Constants {
public static final String PARALLEL_CONFIG_CUSTOM_CLASS_PROPERTY_NAME = PARALLEL_CONFIG_PREFIX
+ CONFIG_CUSTOM_CLASS_PROPERTY_NAME;

/**
* Property name used to specify the fully qualified class name of the
* {@link ParallelExecutionInterceptor} to be used regardless of the
* configuration strategy: {@value}
*
* @since 6.1
*/
@API(status = EXPERIMENTAL, since = "6.1")
public static final String PARALLEL_CONFIG_CONFIG_INTERCEPTOR_CLASS_PROPERTY_NAME = PARALLEL_CONFIG_PREFIX
+ CONFIG_INTERCEPTOR_CLASS_PROPERTY_NAME;

/**
* Property name used to set the default timeout for all testable and
* lifecycle methods: {@value}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,10 @@
/**
* @since 1.3
*/
class DefaultParallelExecutionConfiguration implements ParallelExecutionConfiguration {

private final int parallelism;
private final int minimumRunnable;
private final int maxPoolSize;
private final int corePoolSize;
private final int keepAliveSeconds;
private final Predicate<? super ForkJoinPool> saturate;

DefaultParallelExecutionConfiguration(int parallelism, int minimumRunnable, int maxPoolSize, int corePoolSize,
int keepAliveSeconds, Predicate<? super ForkJoinPool> saturate) {
this.parallelism = parallelism;
this.minimumRunnable = minimumRunnable;
this.maxPoolSize = maxPoolSize;
this.corePoolSize = corePoolSize;
this.keepAliveSeconds = keepAliveSeconds;
this.saturate = saturate;
}
record DefaultParallelExecutionConfiguration(int parallelism, int minimumRunnable, int maxPoolSize, int corePoolSize,
int keepAliveSeconds, Predicate<? super ForkJoinPool> saturate,
Class<? extends ParallelExecutionInterceptor> executionInterceptorClass)
implements ParallelExecutionConfiguration {

@Override
public int getParallelism() {
Expand Down Expand Up @@ -64,4 +50,9 @@ public int getKeepAliveSeconds() {
public Predicate<? super ForkJoinPool> getSaturatePredicate() {
return saturate;
}

@Override
public Class<? extends ParallelExecutionInterceptor> getExecutionInterceptorClass() {
return executionInterceptorClass;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import java.math.BigDecimal;
import java.util.Locale;
import java.util.Optional;

import org.apiguardian.api.API;
import org.junit.platform.commons.JUnitException;
Expand Down Expand Up @@ -51,7 +52,7 @@ public ParallelExecutionConfiguration createConfiguration(ConfigurationParameter
Boolean::valueOf).orElse(true);

return new DefaultParallelExecutionConfiguration(parallelism, parallelism, maxPoolSize, parallelism,
KEEP_ALIVE_SECONDS, __ -> saturate);
KEEP_ALIVE_SECONDS, __ -> saturate, getExecutionInterceptorClass(configurationParameters));
}
},

Expand Down Expand Up @@ -85,7 +86,7 @@ public ParallelExecutionConfiguration createConfiguration(ConfigurationParameter
Boolean::valueOf).orElse(true);

return new DefaultParallelExecutionConfiguration(parallelism, parallelism, maxPoolSize, parallelism,
KEEP_ALIVE_SECONDS, __ -> saturate);
KEEP_ALIVE_SECONDS, __ -> saturate, getExecutionInterceptorClass(configurationParameters));
}
},

Expand All @@ -97,8 +98,8 @@ public ParallelExecutionConfiguration createConfiguration(ConfigurationParameter
CUSTOM {
@Override
public ParallelExecutionConfiguration createConfiguration(ConfigurationParameters configurationParameters) {
String className = configurationParameters.get(CONFIG_CUSTOM_CLASS_PROPERTY_NAME).orElseThrow(
() -> new JUnitException(CONFIG_CUSTOM_CLASS_PROPERTY_NAME + " must be set"));
String className = configurationParameters.get(CONFIG_CUSTOM_CLASS_PROPERTY_NAME) //
.orElseThrow(() -> new JUnitException(CONFIG_CUSTOM_CLASS_PROPERTY_NAME + " must be set"));
return ReflectionSupport.tryToLoadClass(className) //
.andThenTry(strategyClass -> {
Preconditions.condition(
Expand All @@ -113,6 +114,27 @@ public ParallelExecutionConfiguration createConfiguration(ConfigurationParameter
}
};

protected Class<? extends ParallelExecutionInterceptor> getExecutionInterceptorClass(
ConfigurationParameters configurationParameters) {
return getExecutionInterceptorClassFromConfig(configurationParameters) //
.orElse(ParallelExecutionInterceptor.Default.class);
}

private static Optional<Class<? extends ParallelExecutionInterceptor>> getExecutionInterceptorClassFromConfig(
ConfigurationParameters configurationParameters) {
return configurationParameters.get(CONFIG_INTERCEPTOR_CLASS_PROPERTY_NAME) //
.map(className -> {
Class<?> interceptorClass = ReflectionSupport.tryToLoadClass(className) //
.getNonNullOrThrow(cause -> new JUnitException(
"Failed to load execution interceptor class: " + className, cause));
Preconditions.condition(
ParallelExecutionInterceptor.class.isAssignableFrom(requireNonNull(interceptorClass)),
CONFIG_INTERCEPTOR_CLASS_PROPERTY_NAME + " does not implement "
+ ParallelExecutionConfigurationStrategy.class);
return interceptorClass.asSubclass(ParallelExecutionInterceptor.class);
});
}

private static final int KEEP_ALIVE_SECONDS = 30;

/**
Expand Down Expand Up @@ -216,6 +238,15 @@ public ParallelExecutionConfiguration createConfiguration(ConfigurationParameter
*/
public static final String CONFIG_CUSTOM_CLASS_PROPERTY_NAME = "custom.class";

/**
* Property name used to specify the fully qualified class name of the
* {@link ParallelExecutionInterceptor} to be used regardless of the
* configuration strategy: {@value}
*
* @since 6.1
*/
public static final String CONFIG_INTERCEPTOR_CLASS_PROPERTY_NAME = "interceptor.class";

static ParallelExecutionConfigurationStrategy getStrategy(ConfigurationParameters configurationParameters) {
return valueOf(
configurationParameters.get(CONFIG_STRATEGY_PROPERTY_NAME).orElse("dynamic").toUpperCase(Locale.ROOT));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2015-2025 the original author or authors.
*
* All rights reserved. This program and the accompanying materials are
* made available under the terms of the Eclipse Public License v2.0 which
* accompanies this distribution and is available at
*
* https://www.eclipse.org/legal/epl-v20.html
*/

package org.junit.platform.engine.support.hierarchical;

/**
* @since 6.1
*/
record DefaultParallelExecutionInterceptorContext(ParallelExecutionConfiguration configuration)
implements ParallelExecutionInterceptor.Context {

@Override
public ParallelExecutionConfiguration getConfiguration() {
return configuration;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static org.junit.platform.engine.support.hierarchical.Node.ExecutionMode.SAME_THREAD;

import java.io.Serial;
import java.lang.reflect.Constructor;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
Expand All @@ -30,8 +31,11 @@
import org.apiguardian.api.API;
import org.jspecify.annotations.Nullable;
import org.junit.platform.commons.JUnitException;
import org.junit.platform.commons.PreconditionViolationException;
import org.junit.platform.commons.logging.LoggerFactory;
import org.junit.platform.commons.util.ExceptionUtils;
import org.junit.platform.commons.util.Preconditions;
import org.junit.platform.commons.util.ReflectionUtils;
import org.junit.platform.engine.ConfigurationParameters;

/**
Expand All @@ -52,6 +56,7 @@ public class ForkJoinPoolHierarchicalTestExecutorService implements Hierarchical
private final TaskEventListener taskEventListener;
private final int parallelism;
private final ThreadLocal<ThreadLock> threadLocks = ThreadLocal.withInitial(ThreadLock::new);
private final ParallelExecutionInterceptor interceptor;

/**
* Create a new {@code ForkJoinPoolHierarchicalTestExecutorService} based on
Expand All @@ -76,9 +81,10 @@ public ForkJoinPoolHierarchicalTestExecutorService(ParallelExecutionConfiguratio

ForkJoinPoolHierarchicalTestExecutorService(ParallelExecutionConfiguration configuration,
TaskEventListener taskEventListener) {
forkJoinPool = createForkJoinPool(configuration);
this.forkJoinPool = createForkJoinPool(configuration);
this.interceptor = instantiateInterceptor(configuration);
this.taskEventListener = taskEventListener;
parallelism = forkJoinPool.getParallelism();
this.parallelism = forkJoinPool.getParallelism();
LoggerFactory.getLogger(getClass()).config(() -> "Using ForkJoinPool with parallelism of " + parallelism);
}

Expand All @@ -99,6 +105,30 @@ private ForkJoinPool createForkJoinPool(ParallelExecutionConfiguration configura
}
}

private static ParallelExecutionInterceptor instantiateInterceptor(ParallelExecutionConfiguration configuration) {

Class<? extends ParallelExecutionInterceptor> interceptorClass = configuration.getExecutionInterceptorClass();

if (interceptorClass.equals(ParallelExecutionInterceptor.Default.class)) {
return ParallelExecutionInterceptor.Default.INSTANCE;
}

Constructor<? extends ParallelExecutionInterceptor> constructor = ReflectionUtils.getDeclaredConstructor(
interceptorClass);
var parameters = constructor.getParameters();
var arguments = new Object[parameters.length];
for (int i = 0; i < arguments.length; i++) {
if (ParallelExecutionInterceptor.Context.class.isAssignableFrom(parameters[i].getType())) {
arguments[i] = new DefaultParallelExecutionInterceptorContext(configuration);
}
else {
throw new PreconditionViolationException("Unable to resolve [%s] in constructor [%s].".formatted(
parameters[i], constructor.toGenericString()));
}
}
return ReflectionUtils.newInstance(constructor, arguments);
}

@Override
public Future<@Nullable Void> submit(TestTask testTask) {
ExclusiveTask exclusiveTask = new ExclusiveTask(testTask);
Expand Down Expand Up @@ -129,10 +159,17 @@ private boolean isAlreadyRunningInForkJoinPool() {

@Override
public void invokeAll(List<? extends TestTask> tasks) {

if (tasks.size() == 1) {
new ExclusiveTask(tasks.get(0)).execSync();
return;
}

// If this method is called from outside the used ForkJoinPool,
// calls to fork() will schedule tasks in the commonPool
Preconditions.condition(isAlreadyRunningInForkJoinPool(),
"invokeAll() must be called from a thread in the ForkJoinPool");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to create a dummy task that calls invokeAll and submit that task instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. No then SAME_THREAD won't work.

Copy link
Contributor

@mpkorstanje mpkorstanje Oct 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But what if only the call to joinConcurrentTasksInReverseOrderToEnableWorkStealing(concurrentTasksInReverseOrder); is submitted?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sketched out a way to make this work in #5038


Deque<ExclusiveTask> isolatedTasks = new ArrayDeque<>();
Deque<ExclusiveTask> sameThreadTasks = new ArrayDeque<>();
Deque<ExclusiveTask> concurrentTasksInReverseOrder = new ArrayDeque<>();
Expand Down Expand Up @@ -189,6 +226,7 @@ private void resubmitDeferredTasks() {

@Override
public void close() {
interceptor.close();
forkJoinPool.shutdownNow();
}

Expand Down Expand Up @@ -250,7 +288,7 @@ public boolean exec() {
@SuppressWarnings("unused")
ThreadLock.NestedResourceLock nested = threadLock.withNesting(lock) //
) {
testTask.execute();
interceptor.execute(testTask);
return true;
}
catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,8 @@ public interface ParallelExecutionConfiguration {
return null;
}

default Class<? extends ParallelExecutionInterceptor> getExecutionInterceptorClass() {
return ParallelExecutionInterceptor.Default.class;
}

}
Loading