From 59cf96166ed29a01d0c325603cbce6cc8c2be358 Mon Sep 17 00:00:00 2001 From: Alan Bateman Date: Wed, 17 Sep 2025 15:38:42 +0100 Subject: [PATCH 1/9] Initial commit --- .../classes/java/util/concurrent/Joiners.java | 5 + .../util/concurrent/StructuredTaskScope.java | 263 ++++++++++------- .../concurrent/StructuredTaskScopeImpl.java | 40 ++- .../StructuredTaskScopeTest.java | 272 ++++++++++++++++-- 4 files changed, 445 insertions(+), 135 deletions(-) diff --git a/src/java.base/share/classes/java/util/concurrent/Joiners.java b/src/java.base/share/classes/java/util/concurrent/Joiners.java index 768e260803ec3..8b9f71f8c8f5e 100644 --- a/src/java.base/share/classes/java/util/concurrent/Joiners.java +++ b/src/java.base/share/classes/java/util/concurrent/Joiners.java @@ -212,6 +212,11 @@ public boolean onComplete(Subtask subtask) { return isDone.test(subtask); } + @Override + public void onTimeout() { + // do nothing, this joiner does not throw TimeoutException + } + @Override public Stream> result() { return subtasks.stream(); diff --git a/src/java.base/share/classes/java/util/concurrent/StructuredTaskScope.java b/src/java.base/share/classes/java/util/concurrent/StructuredTaskScope.java index 0088e486d4a91..5feb804f804d6 100644 --- a/src/java.base/share/classes/java/util/concurrent/StructuredTaskScope.java +++ b/src/java.base/share/classes/java/util/concurrent/StructuredTaskScope.java @@ -25,9 +25,9 @@ package java.util.concurrent; import java.time.Duration; -import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; +import java.util.function.UnaryOperator; import java.util.stream.Stream; import jdk.internal.javac.PreviewFeature; @@ -51,9 +51,9 @@ * To ensure correct usage, the {@code fork}, {@code join} and {@code close} methods may * only be invoked by the owner thread (the thread that opened the {@code * StructuredTaskScope}), the {@code fork} method may not be called after {@code join}, - * the {@code join} method may only be invoked once, and the {@code close} method throws - * an exception after closing if the owner did not invoke the {@code join} method after - * forking subtasks. + * the {@code join} method may only be invoked once to get outcome, and the {@code close} + * method throws an exception after closing if the owner did not invoke the {@code join} + * method after forking subtasks. * *

As a first example, consider a task that splits into two subtasks to concurrently * fetch resources from two URL locations "left" and "right". Both subtasks may complete @@ -198,22 +198,21 @@ * *

Configuration

* - * * A {@code StructuredTaskScope} is opened with {@linkplain Configuration configuration} * that consists of a {@link ThreadFactory} to create threads, an optional name for * monitoring and management purposes, and an optional timeout. * *

The {@link #open()} and {@link #open(Joiner)} methods create a {@code StructuredTaskScope} * with the default configuration. The default - * configuration has a {@code ThreadFactory} that creates unnamed - * virtual threads, - * is unnamed for monitoring and management purposes, and has no timeout. + * configuration has a {@code ThreadFactory} that creates unnamed {@linkplain + * Thread##virtual-threads virtual threads}, is unnamed for monitoring and management + * purposes, and has no timeout. * - *

The 2-arg {@link #open(Joiner, Function) open} method can be used to create a + *

The 2-arg {@link #open(Joiner, UnaryOperator) open} method can be used to create a * {@code StructuredTaskScope} that uses a different {@code ThreadFactory}, has a name for * the purposes of monitoring and management, or has a timeout that cancels the scope if * the timeout expires before or while waiting for subtasks to complete. The {@code open} - * method is called with a {@linkplain Function function} that is applied to the default + * method is called with a {@linkplain UnaryOperator operator} that is applied to the default * configuration and returns a {@link Configuration Configuration} for the * {@code StructuredTaskScope} under construction. * @@ -237,9 +236,10 @@ * *

A second example sets a timeout, represented by a {@link Duration}. The timeout * starts when the new scope is opened. If the timeout expires before the {@code join} - * method has completed then the scope is cancelled. This - * interrupts the threads executing the two subtasks and causes the {@link #join() join} - * method to throw {@link TimeoutException}. + * method has completed then the scope is {@linkplain ##Cancallation cancelled} (this + * interrupts the threads executing the two subtasks) and the {@code Joiner}'s {@link + * Joiner#onTimeout() onTimeout} method is invoked to throw {@link TimeoutException + * TimeoutException}. * {@snippet lang=java : * Duration timeout = Duration.ofSeconds(10); * @@ -314,11 +314,10 @@ * }); * } * - *

A scoped value inherited into a subtask may be - * rebound to a new - * value in the subtask for the bounded execution of some method executed in the subtask. - * When the method completes, the value of the {@code ScopedValue} reverts to its previous - * value, the value inherited from the thread executing the task. + *

A scoped value inherited into a subtask may be {@linkplain ScopedValue##rebind + * rebound} to a new value in the subtask for the bounded execution of some method executed + * in the subtask. When the method completes, the value of the {@code ScopedValue} reverts + * to its previous value, the value inherited from the thread executing the task. * *

A subtask may execute code that itself opens a new {@code StructuredTaskScope}. * A task executing in thread T1 opens a {@code StructuredTaskScope} and forks a @@ -331,10 +330,9 @@ * *

Memory consistency effects

* - *

Actions in the owner thread of a {@code StructuredTaskScope} prior to - * {@linkplain #fork forking} of a subtask - * - * happen-before any actions taken by that subtask, which in turn + *

Actions in the owner thread of a {@code StructuredTaskScope} prior to {@linkplain + * #fork forking} of a subtask {@linkplain java.util.concurrent##MemoryVisibility + * happen-before} any actions taken by that subtask, which in turn * happen-before the subtask result is {@linkplain Subtask#get() retrieved}. * *

General exceptions

@@ -459,7 +457,7 @@ enum State { * {@code Joiner} that waits for all successful subtasks. It cancels the scope and * causes {@code join} to throw if any subtask fails. *
  • {@link #awaitAll() awaitAll()} creates a {@code Joiner} that waits for all - * subtasks. It does not cancel the scope or cause {@code join} to throw. + * subtasks to complete. It does not cancel the scope or cause {@code join} to throw. * * *

    In addition to the methods to create {@code Joiner} objects for common cases, @@ -472,22 +470,33 @@ enum State { *

    More advanced policies can be developed by implementing the {@code Joiner} * interface. The {@link #onFork(Subtask)} method is invoked when subtasks are forked. * The {@link #onComplete(Subtask)} method is invoked when subtasks complete with a - * result or exception. These methods return a {@code boolean} to indicate if scope + * result or exception. These methods return a {@code boolean} to indicate if the scope * should be cancelled. These methods can be used to collect subtasks, results, or * exceptions, and control when to cancel the scope. The {@link #result()} method * must be implemented to produce the result (or exception) for the {@code join} * method. * + *

    If a {@code StructuredTaskScope} is opened with a {@linkplain + * Configuration#withTimeout(Duration) timeout}, and the timeout expires before or + * while waiting in {@link StructuredTaskScope#join() join()}, then the scope is + * {@linkplain StructuredTaskScope##Cancallation cancelled}, and the {@code Joiners}'s + * {@link #onTimeout()} method is invoked to notify the {@code Joiner} and optionally + * throw {@link TimeoutException TimeoutException}. If the {@code onTimeout()} method + * does not throw then the {@code join()} method will invoke the {@link #result()} + * method to produce a result. This result may be based on the outcome of subtasks + * that completed before the timeout expired. + * *

    Unless otherwise specified, passing a {@code null} argument to a method * in this class will cause a {@link NullPointerException} to be thrown. * * @implSpec Implementations of this interface must be thread safe. The {@link * #onComplete(Subtask)} method defined by this interface may be invoked by several - * threads concurrently. + * threads concurrently. The {@link #onTimeout()} method may be invoked at around + * the same time that subtasks complete. * * @apiNote It is very important that a new {@code Joiner} object is created for each * {@code StructuredTaskScope}. {@code Joiner} objects should never be shared with - * different scopes or re-used after a task is closed. + * different scopes or re-used after a scope is closed. * *

    Designing a {@code Joiner} should take into account the code at the use-site * where the results from the {@link StructuredTaskScope#join() join} method are @@ -502,13 +511,11 @@ enum State { * @see #open(Joiner) */ @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY) - @FunctionalInterface interface Joiner { /** * Invoked by {@link #fork(Callable) fork(Callable)} and {@link #fork(Runnable) - * fork(Runnable)} when forking a subtask. The method is invoked from the task - * owner thread. The method is invoked before a thread is created to run the - * subtask. + * fork(Runnable)} when forking a subtask. The method is invoked before a thread + * is created to run the subtask. * * @implSpec The default implementation throws {@code NullPointerException} if the * subtask is {@code null}. It throws {@code IllegalArgumentException} if the @@ -551,6 +558,27 @@ default boolean onComplete(Subtask subtask) { return false; } + /** + * Invoked by the {@link #join() join()} method if the scope was opened with a + * timeout, and the timeout expires before or while waiting in the {@code join} + * method. + * + * @implSpec The default implementation throws {@link TimeoutException TimeoutException}. + * + * @apiNote This method is intended for {@code Joiner} implementations that do not + * throw {@link TimeoutException TimeoutException}, or require a notification when + * the timeout expires before or while waiting in {@code join}. + * + *

    This method is invoked by the {@code join} method. It should not be + * invoked directly. + * + * @throws TimeoutException for {@code join} to throw + * @since 26 + */ + default void onTimeout() { + throw new TimeoutException(); + } + /** * Invoked by the {@link #join() join()} method to produce the result (or exception) * after waiting for all subtasks to complete or the scope cancelled. The result @@ -575,13 +603,15 @@ default boolean onComplete(Subtask subtask) { /** * {@return a new Joiner object that yields a stream of all subtasks when all * subtasks complete successfully} - * The {@code Joiner} cancels + * The {@code Joiner} {@linkplain StructuredTaskScope##Cancallation cancels} * the scope and causes {@code join} to throw if any subtask fails. * - *

    If all subtasks complete successfully, the joiner's {@link Joiner#result()} - * method returns a stream of all subtasks in the order that they were forked. - * If any subtask failed then the {@code result} method throws the exception from - * the first subtask to fail. + *

    If all subtasks complete successfully then the joiner's {@link + * Joiner#result()} method returns a stream of all subtasks, in the order that they + * were forked, for the {@link StructuredTaskScope#join() join()} to return. If + * the scope was opened with a {@linkplain Configuration#withTimeout(Duration) + * timeout}, and the timeout expires before or while waiting for all subtasks to + * complete, then the {@code join} method throws {@code TimeoutException}. * * @apiNote Joiners returned by this method are suited to cases where all subtasks * return a result of the same type. Joiners returned by {@link @@ -599,10 +629,14 @@ static Joiner>> allSuccessfulOrThrow() { * completed successfully} * The {@code Joiner} causes {@code join} to throw if all subtasks fail. * - *

    The joiner's {@link Joiner#result()} method returns the result of a subtask - * that completed successfully. If all subtasks fail then the {@code result} method - * throws the exception from one of the failed subtasks. The {@code result} method - * throws {@code NoSuchElementException} if no subtasks were forked. + *

    The joiner's {@link Joiner#result()} method returns the result of a subtask, + * that completed successfully, for the {@link StructuredTaskScope#join() join()} + * to return. If all subtasks fail then the {@code result} method throws the + * exception from one of the failed subtasks. The {@code result} method throws + * {@code NoSuchElementException} if no subtasks were forked. If the scope was + * opened with a {@linkplain Configuration#withTimeout(Duration) timeout}, and + * the timeout expires before or while waiting for any subtask to complete + * successfully, then the {@code join} method throws {@code TimeoutException}. * * @param the result type of subtasks */ @@ -612,12 +646,15 @@ static Joiner anySuccessfulResultOrThrow() { /** * {@return a new Joiner object that waits for subtasks to complete successfully} - * The {@code Joiner} cancels + * The {@code Joiner} {@linkplain StructuredTaskScope##Cancallation cancels} * the scope and causes {@code join} to throw if any subtask fails. * *

    The joiner's {@link Joiner#result() result} method returns {@code null} * if all subtasks complete successfully, or throws the exception from the first - * subtask to fail. + * subtask to fail. If the scope was opened with a {@linkplain + * Configuration#withTimeout(Duration) timeout}, and the timeout expires before or + * while waiting for all subtasks to complete, then the {@code join} method throws + * {@code TimeoutException}. * * @apiNote Joiners returned by this method are suited to cases where subtasks * return results of different types. Joiners returned by {@link #allSuccessfulOrThrow()} @@ -634,6 +671,9 @@ static Joiner awaitAllSuccessfulOrThrow() { * The {@code Joiner} does not cancel the scope if a subtask fails. * *

    The joiner's {@link Joiner#result() result} method returns {@code null}. + * If the scope was opened with a {@linkplain Configuration#withTimeout(Duration) + * timeout}, and the timeout expires before or while waiting for all subtasks to + * complete, then the {@code join} method throws {@code TimeoutException}. * * @apiNote This Joiner is useful for cases where subtasks make use of * side-effects rather than return results or fail with exceptions. @@ -671,11 +711,11 @@ public Void result() { * {@return a new Joiner object that yields a stream of all subtasks when all * subtasks complete or a predicate returns {@code true} to cancel the scope} * - *

    The joiner's {@link Joiner#onComplete(Subtask)} method invokes the - * predicate's {@link Predicate#test(Object) test} method with the subtask that - * completed successfully or failed with an exception. If the {@code test} method - * returns {@code true} then - * the scope is cancelled. The {@code test} method must be thread safe as it + *

    The joiner's {@link #onComplete(Subtask)} method invokes the predicate's + * {@link Predicate#test(Object) test} method with the subtask that completed + * successfully or failed with an exception. If the {@code test} method + * returns {@code true} then {@linkplain StructuredTaskScope##Cancallation + * the scope is cancelled}. The {@code test} method must be thread safe as it * may be invoked concurrently from several threads. If the {@code test} method * completes with an exception or error, then the thread that executed the subtask * invokes the {@linkplain Thread.UncaughtExceptionHandler uncaught exception handler} @@ -687,9 +727,16 @@ public Void result() { * state) or subtasks in the {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state * if the scope was cancelled before all subtasks were forked or completed. * + *

    The joiner's {@link #onTimeout()} method does nothing. If configured with + * a {@linkplain Configuration#withTimeout(Duration) timeout}, and the timeout + * expires before or while waiting in {@link StructuredTaskScope#join() join}, + * then the {@link #result()} method returns the stream of all subtasks. + * Subtasks that did not complete before the timeout expired will be in the + * {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state. + * *

    The following example uses this method to create a {@code Joiner} that - * cancels the scope when - * two or more subtasks fail. + * {@linkplain StructuredTaskScope##Cancallation cancels} the scope when two or + * more subtasks fail. * {@snippet lang=java : * class CancelAfterTwoFailures implements Predicate> { * private final AtomicInteger failedCount = new AtomicInteger(); @@ -715,6 +762,20 @@ public Void result() { * } * } * + *

    The following example uses {@code allUntil} to get the results of all + * subtasks that complete successfully within a timeout period. + * {@snippet lang=java : + * List invokeAll(Collection> tasks, Duration timeout) throws InterruptedException { + * try (var scope = StructuredTaskScope.open(Joiner.allUntil(_ -> false), cf -> cf.withTimeout(timeout))) { + * tasks.forEach(scope::fork); + * return scope.join() + * .filter(s -> s.state() == Subtask.State.SUCCESS) + * .map(Subtask::get) + * .toList(); + * } + * } + * } + * * @param isDone the predicate to evaluate completed subtasks * @param the result type of subtasks */ @@ -731,15 +792,15 @@ static Joiner>> allUntil(Predicate * and management, and an optional timeout. * *

    Creating a {@code StructuredTaskScope} with {@link #open()} or {@link #open(Joiner)} - * uses the default - * configuration. The default configuration consists of a thread factory that - * creates unnamed - * virtual threads, no name for monitoring and management purposes, and no timeout. + * uses the {@linkplain StructuredTaskScope##DefaultConfiguration default configuration}. + * The default configuration consists of a thread factory that creates unnamed + * {@linkplain Thread##virtual-threads virtual threads}, no name for monitoring and + * management purposes, and no timeout. * - *

    Creating a {@code StructuredTaskScope} with its 2-arg {@link #open(Joiner, Function) - * open} method allows a different configuration to be used. The function specified + *

    Creating a {@code StructuredTaskScope} with its 2-arg {@link #open(Joiner, UnaryOperator) + * open} method allows a different configuration to be used. The operator specified * to the {@code open} method is applied to the default configuration and returns the - * configuration for the {@code StructuredTaskScope} under construction. The function + * configuration for the {@code StructuredTaskScope} under construction. The operator * can use the {@code with-} prefixed methods defined here to specify the components * of the configuration to use. * @@ -756,10 +817,10 @@ sealed interface Configuration permits StructuredTaskScopeImpl.ConfigImpl { * a scope to create threads when {@linkplain #fork(Callable) forking} subtasks. * @param threadFactory the thread factory * - * @apiNote The thread factory will typically create - * virtual threads, - * maybe with names for monitoring purposes, an {@linkplain Thread.UncaughtExceptionHandler - * uncaught exception handler}, or other properties configured. + * @apiNote The thread factory will typically create {@linkplain Thread##virtual-threads + * virtual threads}, maybe with names for monitoring purposes, an {@linkplain + * Thread.UncaughtExceptionHandler uncaught exception handler}, or other properties + * configured. * * @see #fork(Callable) */ @@ -783,6 +844,7 @@ sealed interface Configuration permits StructuredTaskScopeImpl.ConfigImpl { * compute the timeout for this method. * * @see #join() + * @see Joiner#onTimeout() */ Configuration withTimeout(Duration timeout); } @@ -809,11 +871,13 @@ final class FailedException extends RuntimeException { } /** - * Exception thrown by {@link #join()} if the scope was created with a timeout and - * the timeout expired before or while waiting in {@code join}. + * Exception thrown by {@link #join()} if the scope was opened with a timeout, + * the timeout expired before or while waiting in {@code join}, and the {@link + * Joiner#onTimeout() Joiner.onTimeout} method throws this exception. * * @since 25 * @see Configuration#withTimeout(Duration) + * @see Joiner#onTimeout() */ @PreviewFeature(feature = PreviewFeature.Feature.STRUCTURED_CONCURRENCY) final class TimeoutException extends RuntimeException { @@ -828,14 +892,14 @@ final class TimeoutException extends RuntimeException { /** * Opens a new {@code StructuredTaskScope} to use the given {@code Joiner} object and - * with configuration that is the result of applying the given function to the - * default configuration. + * with configuration that is the result of applying the given operator to the + * {@linkplain ##DefaultConfiguration default configuration}. * - *

    The {@code configFunction} is called with the default configuration and returns - * the configuration for the new scope. The function may, for example, set the + *

    The {@code configOperator} is called with the default configuration and returns + * the configuration for the new scope. The operator may, for example, set the * {@linkplain Configuration#withThreadFactory(ThreadFactory) ThreadFactory} or set a - * {@linkplain Configuration#withTimeout(Duration) timeout}. If the function completes - * with an exception or error then it is propagated by this method. If the function + * {@linkplain Configuration#withTimeout(Duration) timeout}. If the operator completes + * with an exception or error then it is propagated by this method. If the operator * returns {@code null} then {@code NullPointerException} is thrown. * *

    If a {@code ThreadFactory} is set then its {@link ThreadFactory#newThread(Runnable) @@ -845,8 +909,9 @@ final class TimeoutException extends RuntimeException { * *

    If a {@linkplain Configuration#withTimeout(Duration) timeout} is set then it * starts when the scope is opened. If the timeout expires before the scope has - * {@linkplain #join() joined} then the scope is cancelled - * and the {@code join} method throws {@link TimeoutException}. + * {@linkplain #join() joined} then the scope is {@linkplain ##Cancallation cancelled} + * and the {@code Joiner}'s {@link Joiner#onTimeout()} method is invoked to throw + * optionally throw {@link TimeoutException TimeoutException}. * *

    The new scope is owned by the current thread. Only code executing in this * thread can {@linkplain #fork(Callable) fork}, {@linkplain #join() join}, or @@ -856,27 +921,27 @@ final class TimeoutException extends RuntimeException { * value} bindings for inheritance by threads started in the scope. * * @param joiner the joiner - * @param configFunction a function to produce the configuration + * @param configOperator the operator to produce the configuration * @return a new scope * @param the result type of subtasks executed in the scope * @param the result type of the scope - * @since 25 + * @since 26 */ static StructuredTaskScope open(Joiner joiner, - Function configFunction) { - return StructuredTaskScopeImpl.open(joiner, configFunction); + UnaryOperator configOperator) { + return StructuredTaskScopeImpl.open(joiner, configOperator); } /** * Opens a new {@code StructuredTaskScope}to use the given {@code Joiner} object. The - * scope is created with the default configuration. + * scope is created with the {@linkplain ##DefaultConfiguration default configuration}. * The default configuration has a {@code ThreadFactory} that creates unnamed - * virtual threads, - * is unnamed for monitoring and management purposes, and has no timeout. + * {@linkplain Thread##irtual-threads virtual threads}, is unnamed for monitoring and + * management purposes, and has no timeout. * * @implSpec * This factory method is equivalent to invoking the 2-arg open method with the given - * joiner and the {@linkplain Function#identity() identity function}. + * joiner and the {@linkplain UnaryOperator#identity() identity operator}. * * @param joiner the joiner * @return a new scope @@ -885,7 +950,7 @@ static StructuredTaskScope open(Joiner join * @since 25 */ static StructuredTaskScope open(Joiner joiner) { - return open(joiner, Function.identity()); + return open(joiner, UnaryOperator.identity()); } /** @@ -897,22 +962,22 @@ static StructuredTaskScope open(Joiner join * It throws {@link FailedException} if any subtask fails, with the exception from * the first subtask to fail as the cause. * - *

    The scope is created with the default - * configuration. The default configuration has a {@code ThreadFactory} that creates - * unnamed virtual - * threads, is unnamed for monitoring and management purposes, and has no timeout. + *

    The scope is created with the {@linkplain ##DefaultConfiguration default + * configuration}. The default configuration has a {@code ThreadFactory} that creates + * unnamed {@linkplain Thread##virtual-threads virtual threads}, is unnamed for + * monitoring and management purposes, and has no timeout. * * @implSpec * This factory method is equivalent to invoking the 2-arg open method with a joiner * created with {@link Joiner#awaitAllSuccessfulOrThrow() awaitAllSuccessfulOrThrow()} - * and the {@linkplain Function#identity() identity function}. + * and the {@linkplain UnaryOperator#identity() identity operator}. * * @param the result type of subtasks * @return a new scope * @since 25 */ static StructuredTaskScope open() { - return open(Joiner.awaitAllSuccessfulOrThrow(), Function.identity()); + return open(Joiner.awaitAllSuccessfulOrThrow(), UnaryOperator.identity()); } /** @@ -926,7 +991,7 @@ static StructuredTaskScope open() { * method with the subtask in the {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state. * If the {@code onFork} completes with an exception or error then it is propagated by * the {@code fork} method without creating a thread. If the scope is already - * cancelled, or {@code onFork} returns {@code true} to + * {@linkplain ##Cancallation cancelled}, or {@code onFork} returns {@code true} to * cancel the scope, then this method returns the {@code Subtask}, in the * {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state, without creating a thread to * execute the subtask. @@ -993,32 +1058,38 @@ static StructuredTaskScope open() { /** * Returns the result, or throws, after waiting for all subtasks to complete or - * the scope to be cancelled. + * the scope to be {@linkplain ##Cancallation cancelled}. * *

    This method waits for all subtasks started in this scope to complete or the * scope to be cancelled. If a {@linkplain Configuration#withTimeout(Duration) timeout} - * is configured and the timeout expires before or while waiting, then the scope is - * cancelled and {@link TimeoutException TimeoutException} is thrown. Once finished - * waiting, the {@code Joiner}'s {@link Joiner#result() result()} method is invoked - * to get the result or throw an exception. If the {@code result()} method throws - * then this method throws {@code FailedException} with the exception as the cause. + * is configured, and the timeout expires before or while waiting, then the scope is + * cancelled and the {@code Joiner}'s {@link Joiner#onTimeout() onTimeout} method is + * invoked to optionally throw {@link TimeoutException TimeoutException}. If the + * {@code onTimeout} method throws another exception or error then it is propagated + * by this method. Once finished waiting, and {@code onTimeout} does not throw, the + * {@code Joiner}'s {@link Joiner#result() result()} method is invoked to get the result + * or throw an exception. If the {@code result()} method throws then this method throws + * {@code FailedException} with the exception from the {@code Joiner} as the cause. * - *

    This method may only be invoked by the scope owner, and only once. + *

    This method may only be invoked by the scope owner. Once the result or + * exception outcome is obtained, this method may not be invoked again. The only + * case where the method may be called again is where {@code InterruptedException} + * is thrown while waiting. * * @return the result * @throws WrongThreadException if the current thread is not the scope owner * @throws IllegalStateException if already joined or this scope is closed * @throws FailedException if the outcome is an exception, thrown with the * exception from {@link Joiner#result() Joiner.result()} as the cause - * @throws TimeoutException if a timeout is set and the timeout expires before or - * while waiting + * @throws TimeoutException if a timeout is set, the timeout expires before or while + * waiting, and {@link Joiner#onTimeout() Joiner.onTimeout()} throws this exception * @throws InterruptedException if interrupted while waiting * @since 25 */ R join() throws InterruptedException; /** - * {@return {@code true} if this scope is cancelled or in + * {@return {@code true} if this scope is {@linkplain ##Cancallation cancelled} or in * the process of being cancelled, otherwise {@code false}} * *

    Cancelling the scope prevents new threads from starting in the scope and @@ -1038,7 +1109,7 @@ static StructuredTaskScope open() { /** * Closes this scope. * - *

    This method first cancels the scope, if not + *

    This method first {@linkplain ##Cancallation cancels} the scope, if not * already cancelled. This interrupts the threads executing unfinished subtasks. This * method then waits for all threads to finish. If interrupted while waiting then it * will continue to wait until the threads finish, before completing with the interrupt diff --git a/src/java.base/share/classes/java/util/concurrent/StructuredTaskScopeImpl.java b/src/java.base/share/classes/java/util/concurrent/StructuredTaskScopeImpl.java index f65ea18e22679..8da457f14de0b 100644 --- a/src/java.base/share/classes/java/util/concurrent/StructuredTaskScopeImpl.java +++ b/src/java.base/share/classes/java/util/concurrent/StructuredTaskScopeImpl.java @@ -28,7 +28,7 @@ import java.lang.invoke.VarHandle; import java.time.Duration; import java.util.Objects; -import java.util.function.Function; +import java.util.function.UnaryOperator; import jdk.internal.misc.ThreadFlock; import jdk.internal.invoke.MhUtil; @@ -76,10 +76,10 @@ private StructuredTaskScopeImpl(Joiner joiner, * default configuration. */ static StructuredTaskScope open(Joiner joiner, - Function configFunction) { + UnaryOperator configOperator) { Objects.requireNonNull(joiner); - var config = (ConfigImpl) configFunction.apply(ConfigImpl.defaultConfig()); + var config = (ConfigImpl) configOperator.apply(ConfigImpl.defaultConfig()); var scope = new StructuredTaskScopeImpl(joiner, config.threadFactory(), config.name()); // schedule timeout @@ -108,22 +108,12 @@ private void ensureOwner() { } } - /** - * Throws IllegalStateException if already joined or scope is closed. - */ - private void ensureNotJoined() { - assert Thread.currentThread() == flock.owner(); - if (state > ST_FORKED) { - throw new IllegalStateException("Already joined or scope is closed"); - } - } - /** * Throws IllegalStateException if invoked by the owner thread and the owner thread * has not joined. */ private void ensureJoinedIfOwner() { - if (Thread.currentThread() == flock.owner() && state <= ST_JOIN_STARTED) { + if (Thread.currentThread() == flock.owner() && state < ST_JOIN_STARTED) { throw new IllegalStateException("join not called"); } } @@ -195,7 +185,9 @@ private void onComplete(SubtaskImpl subtask) { public Subtask fork(Callable task) { Objects.requireNonNull(task); ensureOwner(); - ensureNotJoined(); + if (state > ST_FORKED) { + throw new IllegalStateException("join already called or scope is closed"); + } var subtask = new SubtaskImpl(this, task); @@ -234,7 +226,9 @@ public Subtask fork(Runnable task) { @Override public R join() throws InterruptedException { ensureOwner(); - ensureNotJoined(); + if (state >= ST_JOIN_COMPLETED) { + throw new IllegalStateException("Already joined or scope is closed"); + } // join started state = ST_JOIN_STARTED; @@ -242,14 +236,16 @@ public R join() throws InterruptedException { // wait for all subtasks, the scope to be cancelled, or interrupt flock.awaitAll(); - // throw if timeout expired + // all subtasks completed or scope cancelled + state = ST_JOIN_COMPLETED; + + // invoke joiner onTimeout if timeout expired if (timeoutExpired) { - throw new TimeoutException(); + cancel(); // ensure cancelled before calling onTimeout + joiner.onTimeout(); + } else { + cancelTimeout(); } - cancelTimeout(); - - // all subtasks completed or cancelled - state = ST_JOIN_COMPLETED; // invoke joiner to get result try { diff --git a/test/jdk/java/util/concurrent/StructuredTaskScope/StructuredTaskScopeTest.java b/test/jdk/java/util/concurrent/StructuredTaskScope/StructuredTaskScopeTest.java index 79f4a9351955d..42490a5491acd 100644 --- a/test/jdk/java/util/concurrent/StructuredTaskScope/StructuredTaskScopeTest.java +++ b/test/jdk/java/util/concurrent/StructuredTaskScope/StructuredTaskScopeTest.java @@ -61,8 +61,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Function; import java.util.function.Predicate; +import java.util.function.UnaryOperator; import java.util.stream.Stream; import static java.lang.Thread.State.*; @@ -209,7 +209,7 @@ void testForkConfined(ThreadFactory factory) throws Exception { */ @ParameterizedTest @MethodSource("factories") - void testForkAfterJoin1(ThreadFactory factory) throws Exception { + void testForkAfterJoinCompleted1(ThreadFactory factory) throws Exception { try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withThreadFactory(factory))) { scope.join(); @@ -222,7 +222,7 @@ void testForkAfterJoin1(ThreadFactory factory) throws Exception { */ @ParameterizedTest @MethodSource("factories") - void testForkAfterJoin2(ThreadFactory factory) throws Exception { + void testForkAfterJoinCompleted2(ThreadFactory factory) throws Exception { try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withThreadFactory(factory))) { scope.fork(() -> "foo"); @@ -232,16 +232,15 @@ void testForkAfterJoin2(ThreadFactory factory) throws Exception { } /** - * Test fork after join throws. + * Test fork after join interrupted. */ @ParameterizedTest @MethodSource("factories") - void testForkAfterJoinThrows(ThreadFactory factory) throws Exception { + void testForkAfterJoinInterrupted(ThreadFactory factory) throws Exception { try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withThreadFactory(factory))) { - var latch = new CountDownLatch(1); var subtask1 = scope.fork(() -> { - latch.await(); + Thread.sleep(Duration.ofDays(1)); return "foo"; }); @@ -254,6 +253,25 @@ void testForkAfterJoinThrows(ThreadFactory factory) throws Exception { } } + /** + * Test fork after join timeout. + */ + @ParameterizedTest + @MethodSource("factories") + void testForkAfterJoinTimeout(ThreadFactory factory) throws Exception { + try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), + cf -> cf.withThreadFactory(factory) + .withTimeout(Duration.ofMillis(100)))) { + awaitCancelled(scope); + + // join throws + assertThrows(TimeoutException.class, scope::join); + + // fork should throw + assertThrows(IllegalStateException.class, () -> scope.fork(() -> "bar")); + } + } + /** * Test fork after task scope is cancelled. This test uses a custom Joiner to * cancel execution. @@ -296,9 +314,11 @@ void testForkAfterCancel2(ThreadFactory factory) throws Exception { /** * Test fork after task scope is closed. */ - @Test - void testForkAfterClose() { - try (var scope = StructuredTaskScope.open(Joiner.awaitAll())) { + @ParameterizedTest + @MethodSource("factories") + void testForkAfterClose(ThreadFactory factory) { + try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), + cf -> cf.withThreadFactory(factory))) { scope.close(); assertThrows(IllegalStateException.class, () -> scope.fork(() -> null)); } @@ -382,7 +402,34 @@ void testJoinAfterJoin2() throws Exception { * Test join after join completed with a timeout. */ @Test - void testJoinAfterJoin3() throws Exception { + void testJoinAfterJoinInterrupted() throws Exception { + try (var scope = StructuredTaskScope.open()) { + var latch = new CountDownLatch(1); + var subtask = scope.fork(() -> { + latch.await(); + return "foo"; + }); + + // join throws InterruptedException + Thread.currentThread().interrupt(); + assertThrows(InterruptedException.class, scope::join); + + latch.countDown(); + + // retry join to get result + scope.join(); + assertEquals("foo", subtask.get()); + + // retry after otbaining result + assertThrows(IllegalStateException.class, scope::join); + } + } + + /** + * Test join after join completed with a timeout. + */ + @Test + void testJoinAfterJoinTimeout() throws Exception { try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(), cf -> cf.withTimeout(Duration.ofMillis(100)))) { // wait for scope to be cancelled by timeout @@ -396,6 +443,35 @@ void testJoinAfterJoin3() throws Exception { } } + /** + * Test join invoked from Joiner.onTimeout. + */ + @Test + void testJoinInOnTimeout() throws Exception { + Thread owner = Thread.currentThread(); + var scopeRef = new AtomicReference>(); + + var joiner = new Joiner() { + @Override + public void onTimeout() { + assertTrue(Thread.currentThread() == owner); + var scope = scopeRef.get(); + assertThrows(IllegalStateException.class, scope::join); + } + @Override + public Void result() { + return null; + } + }; + + try (var scope = StructuredTaskScope.open(joiner, + cf -> cf.withTimeout(Duration.ofMillis(100)))) { + awaitCancelled(scope); + scopeRef.set(scope); + scope.join(); // invokes onTimeout + } + } + /** * Test join method is owner confined. */ @@ -434,7 +510,7 @@ void testInterruptJoin1(ThreadFactory factory) throws Exception { cf -> cf.withThreadFactory(factory))) { Subtask subtask = scope.fork(() -> { - Thread.sleep(60_000); + Thread.sleep(Duration.ofDays(1)); return "foo"; }); @@ -457,10 +533,8 @@ void testInterruptJoin1(ThreadFactory factory) throws Exception { void testInterruptJoin2(ThreadFactory factory) throws Exception { try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withThreadFactory(factory))) { - - var latch = new CountDownLatch(1); Subtask subtask = scope.fork(() -> { - Thread.sleep(60_000); + Thread.sleep(Duration.ofDays(1)); return "foo"; }); @@ -965,6 +1039,66 @@ public Void result() { } } + /** + * Test Joiner.onTimeout invoked by owner thread when timeout expires. + */ + @Test + void testOnTimeoutInvoked() throws Exception { + var scopeRef = new AtomicReference>(); + Thread owner = Thread.currentThread(); + var invokeCount = new AtomicInteger(); + var joiner = new Joiner() { + @Override + public void onTimeout() { + assertTrue(Thread.currentThread() == owner); + assertTrue(scopeRef.get().isCancelled()); + invokeCount.incrementAndGet(); + } + @Override + public Void result() { + return null; + } + }; + try (var scope = StructuredTaskScope.open(joiner, + cf -> cf.withTimeout(Duration.ofMillis(100)))) { + scopeRef.set(scope); + scope.fork(() -> { + Thread.sleep(Duration.ofDays(1)); + return null; + }); + scope.join(); + assertEquals(1, invokeCount.get()); + } + } + + /** + * Test Joiner.onTimeout throwing an excepiton. + */ + @Test + void testOnTimeoutThrows() throws Exception { + var joiner = new Joiner() { + @Override + public void onTimeout() { + throw new FooException(); + } + @Override + public Void result() { + return null; + } + }; + try (var scope = StructuredTaskScope.open(joiner, + cf -> cf.withTimeout(Duration.ofMillis(100)))) { + // wait for scope to be cancelled by timeout + awaitCancelled(scope); + + // join should throw FooException on first usage + assertThrows(FooException.class, scope::join); + + // retry after onTimeout fails + assertThrows(IllegalStateException.class, scope::join); + } + } + /** * Test toString. */ @@ -1156,6 +1290,25 @@ void testAllSuccessfulOrThrow3(ThreadFactory factory) throws Throwable { } } + /** + * Test Joiner.allSuccessfulOrThrow() with a timeout. + */ + @Test + void testAllSuccessfulOrThrow4() throws Exception { + try (var scope = StructuredTaskScope.open(Joiner.allSuccessfulOrThrow(), + cf -> cf.withTimeout(Duration.ofMillis(100)))) { + scope.fork(() -> "foo"); + scope.fork(() -> { + Thread.sleep(Duration.ofDays(1)); + return "bar"; + }); + assertThrows(TimeoutException.class, scope::join); + + // retry after join throws TimeoutException + assertThrows(IllegalStateException.class, scope::join); + } + } + /** * Test Joiner.anySuccessfulResultOrThrow() with no subtasks. */ @@ -1229,6 +1382,25 @@ void testAnySuccessfulResultOrThrow5(ThreadFactory factory) throws Exception { } } + /** + * Test Joiner.allSuccessfulOrThrow() with a timeout. + */ + @Test + void anySuccessfulResultOrThrow6() throws Exception { + try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(), + cf -> cf.withTimeout(Duration.ofMillis(100)))) { + scope.fork(() -> { throw new FooException(); }); + scope.fork(() -> { + Thread.sleep(Duration.ofDays(1)); + return "bar"; + }); + assertThrows(TimeoutException.class, scope::join); + + // retry after join throws TimeoutException + assertThrows(IllegalStateException.class, scope::join); + } + } + /** * Test Joiner.awaitAllSuccessfulOrThrow() with no subtasks. */ @@ -1276,6 +1448,25 @@ void testAwaitSuccessfulOrThrow3(ThreadFactory factory) throws Throwable { } } + /** + * Test Joiner.awaitAllSuccessfulOrThrow() with a timeout. + */ + @Test + void testAwaitSuccessfulOrThrow4() throws Exception { + try (var scope = StructuredTaskScope.open(Joiner.awaitAllSuccessfulOrThrow(), + cf -> cf.withTimeout(Duration.ofMillis(100)))) { + scope.fork(() -> "foo"); + scope.fork(() -> { + Thread.sleep(Duration.ofDays(1)); + return "bar"; + }); + assertThrows(TimeoutException.class, scope::join); + + // retry after join throws TimeoutException + assertThrows(IllegalStateException.class, scope::join); + } + } + /** * Test Joiner.awaitAll() with no subtasks. */ @@ -1322,6 +1513,25 @@ void testAwaitAll3(ThreadFactory factory) throws Throwable { } } + /** + * Test Joiner.awaitAll() with a timeout. + */ + @Test + void testAwaitAll4() throws Exception { + try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), + cf -> cf.withTimeout(Duration.ofMillis(100)))) { + scope.fork(() -> "foo"); + scope.fork(() -> { + Thread.sleep(Duration.ofDays(1)); + return "bar"; + }); + assertThrows(TimeoutException.class, scope::join); + + // retry after join throws TimeoutException + assertThrows(IllegalStateException.class, scope::join); + } + } + /** * Test Joiner.allUntil(Predicate) with no subtasks. */ @@ -1437,6 +1647,33 @@ void testAllUntil5() throws Exception { } } + /** + * Test Joiner.allUntil(Predicate) with a timeout. + */ + @Test + void testAllUntil6() throws Exception { + try (var scope = StructuredTaskScope.open(Joiner.allUntil(s -> false), + cf -> cf.withTimeout(Duration.ofMillis(100)))) { + var subtask1 = scope.fork(() -> "foo"); + var subtask2 = scope.fork(() -> { + Thread.sleep(Duration.ofDays(1)); + return "bar"; + }); + + // TimeoutException should not be thrown + var subtasks = scope.join().toList(); + + // stream should have two elements, subtask1 may or may not have completed + assertEquals(2, subtasks.size()); + assertSame(subtask1, subtasks.get(0)); + assertSame(subtask2, subtasks.get(1)); + assertEquals(Subtask.State.UNAVAILABLE, subtask2.state()); + + // retry after join throws TimeoutException + assertThrows(IllegalStateException.class, scope::join); + } + } + /** * Test Joiner default methods. */ @@ -1461,6 +1698,7 @@ void testJoinerDefaultMethods() throws Exception { assertFalse(joiner.onFork(subtask2)); assertFalse(joiner.onComplete(subtask1)); assertThrows(IllegalArgumentException.class, () -> joiner.onComplete(subtask2)); + assertThrows(TimeoutException.class, joiner::onTimeout); } } @@ -1523,7 +1761,7 @@ void testConfigFunctionThrows() throws Exception { */ @Test void testConfigMethods() throws Exception { - Function testConfig = cf -> { + UnaryOperator configOperator = cf -> { var name = "duke"; var threadFactory = Thread.ofPlatform().factory(); var timeout = Duration.ofSeconds(10); @@ -1548,7 +1786,7 @@ void testConfigMethods() throws Exception { return cf; }; - try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), testConfig)) { + try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), configOperator)) { // do nothing } } From dd7cf8920252b3f87921bb97ea750228c3e6686f Mon Sep 17 00:00:00 2001 From: Alan Bateman Date: Fri, 19 Sep 2025 15:48:21 +0100 Subject: [PATCH 2/9] Update --- .../classes/java/util/concurrent/StructuredTaskScope.java | 6 +++--- .../share/classes/jdk/internal/javac/PreviewFeature.java | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/java.base/share/classes/java/util/concurrent/StructuredTaskScope.java b/src/java.base/share/classes/java/util/concurrent/StructuredTaskScope.java index 5feb804f804d6..5d5ff7fc3a9d1 100644 --- a/src/java.base/share/classes/java/util/concurrent/StructuredTaskScope.java +++ b/src/java.base/share/classes/java/util/concurrent/StructuredTaskScope.java @@ -51,9 +51,9 @@ * To ensure correct usage, the {@code fork}, {@code join} and {@code close} methods may * only be invoked by the owner thread (the thread that opened the {@code * StructuredTaskScope}), the {@code fork} method may not be called after {@code join}, - * the {@code join} method may only be invoked once to get outcome, and the {@code close} - * method throws an exception after closing if the owner did not invoke the {@code join} - * method after forking subtasks. + * the {@code join} method may only be invoked once to get the outcome, and the + * {@code close} method throws an exception after closing if the owner did not invoke the + * {@code join} method after forking subtasks. * *

    As a first example, consider a task that splits into two subtasks to concurrently * fetch resources from two URL locations "left" and "right". Both subtasks may complete diff --git a/src/java.base/share/classes/jdk/internal/javac/PreviewFeature.java b/src/java.base/share/classes/jdk/internal/javac/PreviewFeature.java index e6c994a12b1b6..64c2846745fad 100644 --- a/src/java.base/share/classes/jdk/internal/javac/PreviewFeature.java +++ b/src/java.base/share/classes/jdk/internal/javac/PreviewFeature.java @@ -70,7 +70,7 @@ public enum Feature { //--- IMPLICIT_CLASSES, //to be removed when boot JDK is 25 SCOPED_VALUES, - @JEP(number=505, title="Structured Concurrency", status="Fifth Preview") + @JEP(number=505, title="Structured Concurrency", status="Sixth Preview") STRUCTURED_CONCURRENCY, CLASSFILE_API, STREAM_GATHERERS, From 157a0b2d0744caff934ccd4b4e93512bfa17d538 Mon Sep 17 00:00:00 2001 From: Alan Bateman Date: Tue, 23 Sep 2025 07:49:56 +0100 Subject: [PATCH 3/9] Add JEP number --- .../share/classes/jdk/internal/javac/PreviewFeature.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/java.base/share/classes/jdk/internal/javac/PreviewFeature.java b/src/java.base/share/classes/jdk/internal/javac/PreviewFeature.java index 64c2846745fad..2a6fb7548794d 100644 --- a/src/java.base/share/classes/jdk/internal/javac/PreviewFeature.java +++ b/src/java.base/share/classes/jdk/internal/javac/PreviewFeature.java @@ -70,7 +70,7 @@ public enum Feature { //--- IMPLICIT_CLASSES, //to be removed when boot JDK is 25 SCOPED_VALUES, - @JEP(number=505, title="Structured Concurrency", status="Sixth Preview") + @JEP(number=525, title="Structured Concurrency", status="Sixth Preview") STRUCTURED_CONCURRENCY, CLASSFILE_API, STREAM_GATHERERS, From 66aca21ef32b4225211ea074c8a686fe42c154a8 Mon Sep 17 00:00:00 2001 From: Alan Bateman Date: Tue, 23 Sep 2025 19:12:58 +0100 Subject: [PATCH 4/9] Improve docs and review feedback --- .../util/concurrent/StructuredTaskScope.java | 65 +++++++++---------- .../StructuredTaskScopeTest.java | 12 +--- 2 files changed, 34 insertions(+), 43 deletions(-) diff --git a/src/java.base/share/classes/java/util/concurrent/StructuredTaskScope.java b/src/java.base/share/classes/java/util/concurrent/StructuredTaskScope.java index 5d5ff7fc3a9d1..729bcc12c2143 100644 --- a/src/java.base/share/classes/java/util/concurrent/StructuredTaskScope.java +++ b/src/java.base/share/classes/java/util/concurrent/StructuredTaskScope.java @@ -199,21 +199,21 @@ *

    Configuration

    * * A {@code StructuredTaskScope} is opened with {@linkplain Configuration configuration} - * that consists of a {@link ThreadFactory} to create threads, an optional name for - * monitoring and management purposes, and an optional timeout. + * that consists of a {@link ThreadFactory} to create threads, an optional name for the + * scope, and an optional timeout. The name is intended for monitoring and management + * purposes. * *

    The {@link #open()} and {@link #open(Joiner)} methods create a {@code StructuredTaskScope} * with the default configuration. The default * configuration has a {@code ThreadFactory} that creates unnamed {@linkplain - * Thread##virtual-threads virtual threads}, is unnamed for monitoring and management - * purposes, and has no timeout. + * Thread##virtual-threads virtual threads}, does not name the scope, and has no timeout. * *

    The 2-arg {@link #open(Joiner, UnaryOperator) open} method can be used to create a - * {@code StructuredTaskScope} that uses a different {@code ThreadFactory}, has a name for - * the purposes of monitoring and management, or has a timeout that cancels the scope if - * the timeout expires before or while waiting for subtasks to complete. The {@code open} - * method is called with a {@linkplain UnaryOperator operator} that is applied to the default - * configuration and returns a {@link Configuration Configuration} for the + * {@code StructuredTaskScope} that uses a different {@code ThreadFactory}, is named for + * monitoring and management purposes, or has a timeout that cancels the scope if the + * timeout expires before or while waiting for subtasks to complete. The {@code open} + * method is called with a {@linkplain UnaryOperator operator} that is applied to the + * default configuration and returns a {@link Configuration Configuration} for the * {@code StructuredTaskScope} under construction. * *

    The following example opens a new {@code StructuredTaskScope} with a {@code @@ -237,9 +237,8 @@ *

    A second example sets a timeout, represented by a {@link Duration}. The timeout * starts when the new scope is opened. If the timeout expires before the {@code join} * method has completed then the scope is {@linkplain ##Cancallation cancelled} (this - * interrupts the threads executing the two subtasks) and the {@code Joiner}'s {@link - * Joiner#onTimeout() onTimeout} method is invoked to throw {@link TimeoutException - * TimeoutException}. + * interrupts the threads executing the two subtasks), and the {@code join} method + * throws {@link TimeoutException TimeoutException}. * {@snippet lang=java : * Duration timeout = Duration.ofSeconds(10); * @@ -788,14 +787,8 @@ static Joiner>> allUntil(Predicate * Represents the configuration for a {@code StructuredTaskScope}. * *

    The configuration for a {@code StructuredTaskScope} consists of a {@link - * ThreadFactory} to create threads, an optional name for the purposes of monitoring - * and management, and an optional timeout. - * - *

    Creating a {@code StructuredTaskScope} with {@link #open()} or {@link #open(Joiner)} - * uses the {@linkplain StructuredTaskScope##DefaultConfiguration default configuration}. - * The default configuration consists of a thread factory that creates unnamed - * {@linkplain Thread##virtual-threads virtual threads}, no name for monitoring and - * management purposes, and no timeout. + * ThreadFactory} to create threads, an optional name for the scope, and an optional + * timeout. The name is intended for monitoring and management purposes. * *

    Creating a {@code StructuredTaskScope} with its 2-arg {@link #open(Joiner, UnaryOperator) * open} method allows a different configuration to be used. The operator specified @@ -827,7 +820,7 @@ sealed interface Configuration permits StructuredTaskScopeImpl.ConfigImpl { Configuration withThreadFactory(ThreadFactory threadFactory); /** - * {@return a new {@code Configuration} object with the given name} + * {@return a new {@code Configuration} object with the given scope name} * The other components are the same as this object. A scope is optionally * named for the purposes of monitoring and management. * @param name the name @@ -936,8 +929,8 @@ static StructuredTaskScope open(Joiner join * Opens a new {@code StructuredTaskScope}to use the given {@code Joiner} object. The * scope is created with the {@linkplain ##DefaultConfiguration default configuration}. * The default configuration has a {@code ThreadFactory} that creates unnamed - * {@linkplain Thread##irtual-threads virtual threads}, is unnamed for monitoring and - * management purposes, and has no timeout. + * {@linkplain Thread##irtual-threads virtual threads}, does not name the scope, and + * has no timeout. * * @implSpec * This factory method is equivalent to invoking the 2-arg open method with the given @@ -964,8 +957,8 @@ static StructuredTaskScope open(Joiner join * *

    The scope is created with the {@linkplain ##DefaultConfiguration default * configuration}. The default configuration has a {@code ThreadFactory} that creates - * unnamed {@linkplain Thread##virtual-threads virtual threads}, is unnamed for - * monitoring and management purposes, and has no timeout. + * unnamed {@linkplain Thread##virtual-threads virtual threads}, does not name the + * scope, and has no timeout. * * @implSpec * This factory method is equivalent to invoking the 2-arg open method with a joiner @@ -1061,16 +1054,20 @@ static StructuredTaskScope open() { * the scope to be {@linkplain ##Cancallation cancelled}. * *

    This method waits for all subtasks started in this scope to complete or the - * scope to be cancelled. If a {@linkplain Configuration#withTimeout(Duration) timeout} - * is configured, and the timeout expires before or while waiting, then the scope is - * cancelled and the {@code Joiner}'s {@link Joiner#onTimeout() onTimeout} method is - * invoked to optionally throw {@link TimeoutException TimeoutException}. If the - * {@code onTimeout} method throws another exception or error then it is propagated - * by this method. Once finished waiting, and {@code onTimeout} does not throw, the - * {@code Joiner}'s {@link Joiner#result() result()} method is invoked to get the result - * or throw an exception. If the {@code result()} method throws then this method throws + * scope to be cancelled. Once finished waiting, the {@code Joiner}'s {@link + * Joiner#result() result()} method is invoked to get the result or throw an exception. + * If the {@code result()} method throws then {@code join()} throws * {@code FailedException} with the exception from the {@code Joiner} as the cause. * + *

    If a {@linkplain Configuration#withTimeout(Duration) timeout} is configured, + * and the timeout expires before or while waiting, then the scope is cancelled and + * the {@code Joiner}'s {@link Joiner#onTimeout() onTimeout()} method is invoked + * before calling the {@code Joiner}'s {@code result()} method. If the {@code onTimeout()} + * method throws {@link TimeoutException TimeoutException} (or throws any exception + * or error), then it is propagated by this method. If the {@code onTimeout()} method + * does not throw then the {@code Joiner}'s {@code result()} method is invoked to + * get the result or throw. + * *

    This method may only be invoked by the scope owner. Once the result or * exception outcome is obtained, this method may not be invoked again. The only * case where the method may be called again is where {@code InterruptedException} @@ -1140,4 +1137,4 @@ static StructuredTaskScope open() { */ @Override void close(); -} \ No newline at end of file +} diff --git a/test/jdk/java/util/concurrent/StructuredTaskScope/StructuredTaskScopeTest.java b/test/jdk/java/util/concurrent/StructuredTaskScope/StructuredTaskScopeTest.java index 42490a5491acd..b2576a958e879 100644 --- a/test/jdk/java/util/concurrent/StructuredTaskScope/StructuredTaskScopeTest.java +++ b/test/jdk/java/util/concurrent/StructuredTaskScope/StructuredTaskScopeTest.java @@ -1556,10 +1556,8 @@ void testAllUntil2(ThreadFactory factory) throws Exception { var subtask2 = scope.fork(() -> { throw new FooException(); }); var subtasks = scope.join().toList(); - assertEquals(2, subtasks.size()); + assertEquals(List.of(subtask1, subtask2), subtasks); - assertSame(subtask1, subtasks.get(0)); - assertSame(subtask2, subtasks.get(1)); assertEquals("foo", subtask1.get()); assertTrue(subtask2.exception() instanceof FooException); } @@ -1581,10 +1579,8 @@ void testAllUntil3(ThreadFactory factory) throws Exception { }); var subtasks = scope.join().toList(); + assertEquals(List.of(subtask1, subtask2), subtasks); - assertEquals(2, subtasks.size()); - assertSame(subtask1, subtasks.get(0)); - assertSame(subtask2, subtasks.get(1)); assertEquals("foo", subtask1.get()); assertEquals(Subtask.State.UNAVAILABLE, subtask2.state()); } @@ -1664,9 +1660,7 @@ void testAllUntil6() throws Exception { var subtasks = scope.join().toList(); // stream should have two elements, subtask1 may or may not have completed - assertEquals(2, subtasks.size()); - assertSame(subtask1, subtasks.get(0)); - assertSame(subtask2, subtasks.get(1)); + assertEquals(List.of(subtask1, subtask2), subtasks); assertEquals(Subtask.State.UNAVAILABLE, subtask2.state()); // retry after join throws TimeoutException From c487fd60f414b2fff8a9cabd4e41bd4d41c47911 Mon Sep 17 00:00:00 2001 From: Alan Bateman Date: Fri, 3 Oct 2025 18:01:30 +0100 Subject: [PATCH 5/9] Sync up from loom repo --- .../util/concurrent/StructuredTaskScope.java | 24 +++-- .../concurrent/StructuredTaskScopeImpl.java | 69 +++++++++----- .../StructuredTaskScopeTest.java | 90 ++++++++++++++++--- 3 files changed, 142 insertions(+), 41 deletions(-) diff --git a/src/java.base/share/classes/java/util/concurrent/StructuredTaskScope.java b/src/java.base/share/classes/java/util/concurrent/StructuredTaskScope.java index 729bcc12c2143..96379580e6a68 100644 --- a/src/java.base/share/classes/java/util/concurrent/StructuredTaskScope.java +++ b/src/java.base/share/classes/java/util/concurrent/StructuredTaskScope.java @@ -402,16 +402,20 @@ enum State { * {@link #fork(Runnable) fork(Runnable)} then {@code null} is returned. * *

    Code executing in the scope owner thread can use this method to get the - * result of a successful subtask only after it has {@linkplain #join() joined}. + * result of a successful subtask after it has {@linkplain #join() joined}. * *

    Code executing in the {@code Joiner} {@link Joiner#onComplete(Subtask) * onComplete} method should test that the {@linkplain #state() subtask state} is * {@link State#SUCCESS SUCCESS} before using this method to get the result. * + *

    This method may be invoked by any thread after the scope owner has joined. + * The only case where this method can be used to get the result before the scope + * owner has joined is when called from the {@code onComplete(Subtask)} method. + * * @return the possibly-null result - * @throws IllegalStateException if the subtask has not completed, did not complete - * successfully, or the current thread is the scope owner invoking this - * method before {@linkplain #join() joining} + * @throws IllegalStateException if the subtask has not completed or did not + * complete successfully, or this method if invoked outside the context of the + * {@code onComplete(Subtask)} method before the owner thread has joined * @see State#SUCCESS */ T get(); @@ -424,15 +428,19 @@ enum State { * exception or error thrown by the {@link Runnable#run() run} method is returned. * *

    Code executing in the scope owner thread can use this method to get the - * exception thrown by a failed subtask only after it has {@linkplain #join() joined}. + * exception thrown by a failed subtask after it has {@linkplain #join() joined}. * *

    Code executing in a {@code Joiner} {@link Joiner#onComplete(Subtask) * onComplete} method should test that the {@linkplain #state() subtask state} is * {@link State#FAILED FAILED} before using this method to get the exception. * - * @throws IllegalStateException if the subtask has not completed, completed with - * a result, or the current thread is the scope owner invoking this method - * before {@linkplain #join() joining} + *

    This method may be invoked by any thread after the scope owner has joined. + * The only case where this method can be used to get the exception before the scope + * owner has joined is when called from the {@code onComplete(Subtask)} method. + * + * @throws IllegalStateException if the subtask has not completed or completed + * with a result, or this method if invoked outside the context of the {@code + * onComplete(Subtask)} method before the owner thread has joined * @see State#FAILED */ Throwable exception(); diff --git a/src/java.base/share/classes/java/util/concurrent/StructuredTaskScopeImpl.java b/src/java.base/share/classes/java/util/concurrent/StructuredTaskScopeImpl.java index 8da457f14de0b..fc6f823e063b3 100644 --- a/src/java.base/share/classes/java/util/concurrent/StructuredTaskScopeImpl.java +++ b/src/java.base/share/classes/java/util/concurrent/StructuredTaskScopeImpl.java @@ -31,6 +31,7 @@ import java.util.function.UnaryOperator; import jdk.internal.misc.ThreadFlock; import jdk.internal.invoke.MhUtil; +import jdk.internal.vm.annotation.Stable; /** * StructuredTaskScope implementation. @@ -43,20 +44,19 @@ final class StructuredTaskScopeImpl implements StructuredTaskScope { private final ThreadFactory threadFactory; private final ThreadFlock flock; - // state, only accessed by owner thread - private static final int ST_NEW = 0, - ST_FORKED = 1, // subtasks forked, need to join + // scope state, set by owner thread, read by any thread + private static final int ST_FORKED = 1, // subtasks forked, need to join ST_JOIN_STARTED = 2, // join started, can no longer fork ST_JOIN_COMPLETED = 3, // join completed ST_CLOSED = 4; // closed - private int state; - - // timer task, only accessed by owner thread - private Future timerTask; + private volatile int state; // set or read by any thread private volatile boolean cancelled; + // timer task, only accessed by owner thread + private Future timerTask; + // set by the timer thread, read by the owner thread private volatile boolean timeoutExpired; @@ -67,7 +67,6 @@ private StructuredTaskScopeImpl(Joiner joiner, this.joiner = joiner; this.threadFactory = threadFactory; this.flock = ThreadFlock.open((name != null) ? name : Objects.toIdentityString(this)); - this.state = ST_NEW; } /** @@ -109,13 +108,10 @@ private void ensureOwner() { } /** - * Throws IllegalStateException if invoked by the owner thread and the owner thread - * has not joined. + * Returns true if join has been invoked and there is an outcome. */ - private void ensureJoinedIfOwner() { - if (Thread.currentThread() == flock.owner() && state < ST_JOIN_STARTED) { - throw new IllegalStateException("join not called"); - } + private boolean isJoinCompleted() { + return state >= ST_JOIN_COMPLETED; } /** @@ -185,7 +181,8 @@ private void onComplete(SubtaskImpl subtask) { public Subtask fork(Callable task) { Objects.requireNonNull(task); ensureOwner(); - if (state > ST_FORKED) { + int s = state; + if (s > ST_FORKED) { throw new IllegalStateException("join already called or scope is closed"); } @@ -204,6 +201,7 @@ public Subtask fork(Callable task) { } // attempt to start the thread + subtask.setThread(thread); try { flock.start(thread); } catch (IllegalStateException e) { @@ -213,7 +211,9 @@ public Subtask fork(Callable task) { } // force owner to join - state = ST_FORKED; + if (s < ST_FORKED) { + state = ST_FORKED; + } return subtask; } @@ -230,11 +230,13 @@ public R join() throws InterruptedException { throw new IllegalStateException("Already joined or scope is closed"); } - // join started - state = ST_JOIN_STARTED; - // wait for all subtasks, the scope to be cancelled, or interrupt - flock.awaitAll(); + try { + flock.awaitAll(); + } catch (InterruptedException e) { + state = ST_JOIN_STARTED; // joining not completed, prevent new forks + throw e; + } // all subtasks completed or scope cancelled state = ST_JOIN_COMPLETED; @@ -307,14 +309,37 @@ private record AltResult(Subtask.State state, Throwable exception) { private final StructuredTaskScopeImpl scope; private final Callable task; private volatile Object result; + @Stable private Thread thread; SubtaskImpl(StructuredTaskScopeImpl scope, Callable task) { this.scope = scope; this.task = task; } + /** + * Sets the thread for this subtask. + */ + void setThread(Thread thread) { + assert thread.getState() == Thread.State.NEW; + this.thread = thread; + } + + /** + * Throws IllegalStateException if the caller thread is not the subtask and + * the scope owner has not joined. + */ + private void ensureJoinedIfNotSubtask() { + if (Thread.currentThread() != thread && !scope.isJoinCompleted()) { + throw new IllegalStateException(); + } + } + @Override public void run() { + if (Thread.currentThread() != thread) { + throw new WrongThreadException(); + } + T result = null; Throwable ex = null; try { @@ -351,7 +376,7 @@ public Subtask.State state() { @Override public T get() { - scope.ensureJoinedIfOwner(); + ensureJoinedIfNotSubtask(); Object result = this.result; if (result instanceof AltResult) { if (result == RESULT_NULL) return null; @@ -366,7 +391,7 @@ public T get() { @Override public Throwable exception() { - scope.ensureJoinedIfOwner(); + ensureJoinedIfNotSubtask(); Object result = this.result; if (result instanceof AltResult alt && alt.state() == State.FAILED) { return alt.exception(); diff --git a/test/jdk/java/util/concurrent/StructuredTaskScope/StructuredTaskScopeTest.java b/test/jdk/java/util/concurrent/StructuredTaskScope/StructuredTaskScopeTest.java index b2576a958e879..fc121a32bfa84 100644 --- a/test/jdk/java/util/concurrent/StructuredTaskScope/StructuredTaskScopeTest.java +++ b/test/jdk/java/util/concurrent/StructuredTaskScope/StructuredTaskScopeTest.java @@ -1124,19 +1124,27 @@ void testToString() throws Exception { void testSubtaskWhenSuccess(ThreadFactory factory) throws Exception { try (var scope = StructuredTaskScope.open(Joiner.awaitAll(), cf -> cf.withThreadFactory(factory))) { - Subtask subtask = scope.fork(() -> "foo"); - // before join + // before join, owner thread assertThrows(IllegalStateException.class, subtask::get); assertThrows(IllegalStateException.class, subtask::exception); + // before join, another thread + assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get)); + assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception)); + scope.join(); - // after join assertEquals(Subtask.State.SUCCESS, subtask.state()); + + // after join, owner thread assertEquals("foo", subtask.get()); assertThrows(IllegalStateException.class, subtask::exception); + + // after join, another thread + assertEquals("foo", callInOtherThread(subtask::get)); + assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception)); } } @@ -1151,16 +1159,25 @@ void testSubtaskWhenFailed(ThreadFactory factory) throws Exception { Subtask subtask = scope.fork(() -> { throw new FooException(); }); - // before join + // before join, owner thread assertThrows(IllegalStateException.class, subtask::get); assertThrows(IllegalStateException.class, subtask::exception); + // before join, another thread + assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get)); + assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception)); + scope.join(); - // after join assertEquals(Subtask.State.FAILED, subtask.state()); + + // after join, owner thread assertThrows(IllegalStateException.class, subtask::get); assertTrue(subtask.exception() instanceof FooException); + + // after join, another thread + assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get)); + assertTrue(callInOtherThread(subtask::exception) instanceof FooException); } } @@ -1176,20 +1193,29 @@ void testSubtaskWhenNotCompleted(ThreadFactory factory) throws Exception { Thread.sleep(Duration.ofDays(1)); return null; }); - - // before join assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); + + // before join, owner thread assertThrows(IllegalStateException.class, subtask::get); assertThrows(IllegalStateException.class, subtask::exception); + // before join, another thread + assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get)); + assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception)); + // attempt join, join throws Thread.currentThread().interrupt(); assertThrows(InterruptedException.class, scope::join); - // after join assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); + + // after join, owner thread assertThrows(IllegalStateException.class, subtask::get); assertThrows(IllegalStateException.class, subtask::exception); + + // before join, another thread + assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get)); + assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception)); } } @@ -1205,17 +1231,25 @@ void testSubtaskWhenCancelled(ThreadFactory factory) throws Exception { var subtask = scope.fork(() -> "foo"); - // before join - assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); + // before join, owner thread assertThrows(IllegalStateException.class, subtask::get); assertThrows(IllegalStateException.class, subtask::exception); + // before join, another thread + assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get)); + assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception)); + scope.join(); - // after join assertEquals(Subtask.State.UNAVAILABLE, subtask.state()); + + // after join, owner thread assertThrows(IllegalStateException.class, subtask::get); assertThrows(IllegalStateException.class, subtask::exception); + + // before join, another thread + assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::get)); + assertThrows(IllegalStateException.class, () -> callInOtherThread(subtask::exception)); } } @@ -1987,6 +2021,40 @@ private void scheduleInterruptAt(String location) { }); } + /** + * Calls a result returning task from another thread. + */ + private V callInOtherThread(Callable task) throws Exception { + var result = new AtomicReference(); + var exc = new AtomicReference(); + Thread thread = Thread.ofVirtual().start(() -> { + try { + result.set(task.call()); + } catch (Exception e) { + exc.set(e); + } + }); + boolean interrupted = false; + boolean terminated = false; + while (!terminated) { + try { + thread.join(); + terminated = true; + } catch (InterruptedException e) { + interrupted = true; + } + } + if (interrupted) { + Thread.currentThread().interrupt(); + } + Exception e = exc.get(); + if (e != null) { + throw e; + } else { + return result.get(); + } + } + /** * Returns true if the given stack trace contains an element for the given class * and method name. From 336e708afbc991bdddd42d92feeea5d7b4d2adb0 Mon Sep 17 00:00:00 2001 From: Alan Bateman Date: Mon, 20 Oct 2025 15:16:03 +0100 Subject: [PATCH 6/9] Sync up from loom repo --- .../classes/java/util/concurrent/Joiners.java | 41 +++-- .../util/concurrent/StructuredTaskScope.java | 77 ++++----- .../concurrent/StructuredTaskScopeImpl.java | 10 +- .../StressCancellation.java | 2 +- .../StructuredTaskScopeTest.java | 146 ++++++++++++------ 5 files changed, 162 insertions(+), 114 deletions(-) diff --git a/src/java.base/share/classes/java/util/concurrent/Joiners.java b/src/java.base/share/classes/java/util/concurrent/Joiners.java index 8b9f71f8c8f5e..0b07e89336f83 100644 --- a/src/java.base/share/classes/java/util/concurrent/Joiners.java +++ b/src/java.base/share/classes/java/util/concurrent/Joiners.java @@ -34,7 +34,6 @@ import java.util.concurrent.StructuredTaskScope.Joiner; import java.util.concurrent.StructuredTaskScope.Subtask; import java.util.function.Predicate; -import java.util.stream.Stream; import jdk.internal.invoke.MhUtil; /** @@ -64,10 +63,10 @@ private static Subtask.State ensureCompleted(Subtask subtask) { } /** - * A joiner that returns a stream of all subtasks when all subtasks complete + * A joiner that returns a list of all results when all subtasks complete * successfully. Cancels the scope if any subtask fails. */ - static final class AllSuccessful implements Joiner>> { + static final class AllSuccessful implements Joiner> { private static final VarHandle FIRST_EXCEPTION = MhUtil.findVarHandle(MethodHandles.lookup(), "firstException", Throwable.class); @@ -77,16 +76,14 @@ static final class AllSuccessful implements Joiner>> { private volatile Throwable firstException; @Override - public boolean onFork(Subtask subtask) { + public boolean onFork(Subtask subtask) { ensureUnavailable(subtask); - @SuppressWarnings("unchecked") - var s = (Subtask) subtask; - subtasks.add(s); + subtasks.add(subtask); return false; } @Override - public boolean onComplete(Subtask subtask) { + public boolean onComplete(Subtask subtask) { Subtask.State state = ensureCompleted(subtask); return (state == Subtask.State.FAILED) && (firstException == null) @@ -94,12 +91,12 @@ public boolean onComplete(Subtask subtask) { } @Override - public Stream> result() throws Throwable { + public List result() throws Throwable { Throwable ex = firstException; if (ex != null) { throw ex; } else { - return subtasks.stream(); + return subtasks.stream().map(Subtask::get).toList(); } } } @@ -130,7 +127,7 @@ private static int stateToInt(Subtask.State s) { } @Override - public boolean onComplete(Subtask subtask) { + public boolean onComplete(Subtask subtask) { Subtask.State state = ensureCompleted(subtask); Subtask s; while (((s = this.subtask) == null) @@ -166,7 +163,7 @@ static final class AwaitSuccessful implements Joiner { private volatile Throwable firstException; @Override - public boolean onComplete(Subtask subtask) { + public boolean onComplete(Subtask subtask) { Subtask.State state = ensureCompleted(subtask); return (state == Subtask.State.FAILED) && (firstException == null) @@ -185,29 +182,27 @@ public Void result() throws Throwable { } /** - * A joiner that returns a stream of all subtasks. + * A joiner that returns a list of all subtasks. */ - static final class AllSubtasks implements Joiner>> { - private final Predicate> isDone; + static final class AllSubtasks implements Joiner>> { + private final Predicate> isDone; // list of forked subtasks, only accessed by owner thread private final List> subtasks = new ArrayList<>(); - AllSubtasks(Predicate> isDone) { + AllSubtasks(Predicate> isDone) { this.isDone = Objects.requireNonNull(isDone); } @Override - public boolean onFork(Subtask subtask) { + public boolean onFork(Subtask subtask) { ensureUnavailable(subtask); - @SuppressWarnings("unchecked") - var s = (Subtask) subtask; - subtasks.add(s); + subtasks.add(subtask); return false; } @Override - public boolean onComplete(Subtask subtask) { + public boolean onComplete(Subtask subtask) { ensureCompleted(subtask); return isDone.test(subtask); } @@ -218,8 +213,8 @@ public void onTimeout() { } @Override - public Stream> result() { - return subtasks.stream(); + public List> result() { + return List.copyOf(subtasks); } } } diff --git a/src/java.base/share/classes/java/util/concurrent/StructuredTaskScope.java b/src/java.base/share/classes/java/util/concurrent/StructuredTaskScope.java index 96379580e6a68..b2d1e7b01308e 100644 --- a/src/java.base/share/classes/java/util/concurrent/StructuredTaskScope.java +++ b/src/java.base/share/classes/java/util/concurrent/StructuredTaskScope.java @@ -25,10 +25,10 @@ package java.util.concurrent; import java.time.Duration; +import java.util.List; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.function.UnaryOperator; -import java.util.stream.Stream; import jdk.internal.javac.PreviewFeature; /** @@ -51,9 +51,9 @@ * To ensure correct usage, the {@code fork}, {@code join} and {@code close} methods may * only be invoked by the owner thread (the thread that opened the {@code * StructuredTaskScope}), the {@code fork} method may not be called after {@code join}, - * the {@code join} method may only be invoked once to get the outcome, and the - * {@code close} method throws an exception after closing if the owner did not invoke the - * {@code join} method after forking subtasks. + * the {@code join} method must be invoked to get the outcome after forking subtasks, and + * the {@code close} method throws an exception after closing if the owner did not invoke + * the {@code join} method after forking subtasks. * *

    As a first example, consider a task that splits into two subtasks to concurrently * fetch resources from two URL locations "left" and "right". Both subtasks may complete @@ -107,7 +107,7 @@ * implements the desired policy. A {@code Joiner} handles subtask completion and produces * the outcome for the {@link #join() join} method. In the example above, {@code join} * returns {@code null}. Depending on the {@code Joiner}, {@code join} may return a - * result, a stream of elements, or some other object. The {@code Joiner} interface defines + * result, a list of elements, or some other object. The {@code Joiner} interface defines * factory methods to create {@code Joiner}s for some common cases. * *

    A {@code Joiner} may cancel the scope (sometimes called @@ -124,13 +124,13 @@ *

    Now consider another example that splits into two subtasks. In this example, * each subtask produces a {@code String} result and the task is only interested in * the result from the first subtask to complete successfully. The example uses {@link - * Joiner#anySuccessfulResultOrThrow() Joiner.anySuccessfulResultOrThrow()} to - * create a {@code Joiner} that makes available the result of the first subtask to - * complete successfully. The type parameter in the example is "{@code String}" so that - * only subtasks that return a {@code String} can be forked. + * Joiner#anySuccessfulOrThrow() Joiner.anySuccessfulOrThrow()} to create a {@code Joiner} + * that makes available the result of the first subtask to complete successfully. The type + * parameter in the example is "{@code String}" so that only subtasks that return a + * {@code String} can be forked. * {@snippet lang=java : * // @link substring="open" target="#open(Joiner)" : - * try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) { + * try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulOrThrow())) { * * scope.fork(callable1); * scope.fork(callable2); @@ -154,8 +154,9 @@ * the {@code Joiner} and usage. Some {@code Joiner} implementations are suited to subtasks * that return results of the same type and where the {@code join} method returns a result * for the task to use. Code that forks subtasks that return results of different - * types, and uses a {@code Joiner} such as {@code Joiner.awaitAllSuccessfulOrThrow()} that - * does not return a result, will use {@link Subtask#get() Subtask.get()} after joining. + * types, and uses a {@code Joiner} such as {@link Joiner#awaitAllSuccessfulOrThrow() + * awaitAllSuccessfulOrThrow} that does not return a result, will use {@link Subtask#get() + * Subtask.get()} after joining. * *

    Exception handling

    * @@ -250,9 +251,7 @@ * scope.fork(callable1); * scope.fork(callable2); * - * List result = scope.join() - * .map(Subtask::get) - * .toList(); + * List results = scope.join(); * * } * } @@ -454,12 +453,12 @@ enum State { *

    Joiner defines static methods to create {@code Joiner} objects for common cases: *

      *
    • {@link #allSuccessfulOrThrow() allSuccessfulOrThrow()} creates a {@code Joiner} - * that yields a stream of the completed subtasks for {@code join} to return when - * all subtasks complete successfully. It cancels the scope and causes {@code join} - * to throw if any subtask fails. - *
    • {@link #anySuccessfulResultOrThrow() anySuccessfulResultOrThrow()} creates a - * {@code Joiner} that yields the result of the first subtask to succeed for {@code - * join} to return. It causes {@code join} to throw if all subtasks fail. + * that yields a list of all results for {@code join} to return when all subtasks + * complete successfully. It cancels the scope and causes {@code join} to throw if + * any subtask fails. + *
    • {@link #anySuccessfulOrThrow() anySuccessfulOrThrow()} creates a {@code Joiner} + * that yields the result of the first subtask to succeed for {@code join} to return. + * It causes {@code join} to throw if all subtasks fail. *
    • {@link #awaitAllSuccessfulOrThrow() awaitAllSuccessfulOrThrow()} creates a * {@code Joiner} that waits for all successful subtasks. It cancels the scope and * causes {@code join} to throw if any subtask fails. @@ -469,7 +468,7 @@ enum State { * *

      In addition to the methods to create {@code Joiner} objects for common cases, * the {@link #allUntil(Predicate) allUntil(Predicate)} method is defined to create a - * {@code Joiner} that yields a stream of all subtasks. It is created with a {@link + * {@code Joiner} that yields a list of all subtasks. It is created with a {@link * Predicate Predicate} that determines if the scope should continue or be cancelled. * This {@code Joiner} can be built upon to create custom policies that cancel the * scope based on some condition. @@ -535,7 +534,7 @@ interface Joiner { * @param subtask the subtask * @return {@code true} to cancel the scope, otherwise {@code false} */ - default boolean onFork(Subtask subtask) { + default boolean onFork(Subtask subtask) { if (subtask.state() != Subtask.State.UNAVAILABLE) { throw new IllegalArgumentException("Subtask not in UNAVAILABLE state"); } @@ -558,7 +557,7 @@ default boolean onFork(Subtask subtask) { * @param subtask the subtask * @return {@code true} to cancel the scope, otherwise {@code false} */ - default boolean onComplete(Subtask subtask) { + default boolean onComplete(Subtask subtask) { if (subtask.state() == Subtask.State.UNAVAILABLE) { throw new IllegalArgumentException("Subtask has not completed"); } @@ -608,15 +607,15 @@ default void onTimeout() { R result() throws Throwable; /** - * {@return a new Joiner object that yields a stream of all subtasks when all + * {@return a new Joiner object that yields a list of all results when all * subtasks complete successfully} * The {@code Joiner} {@linkplain StructuredTaskScope##Cancallation cancels} * the scope and causes {@code join} to throw if any subtask fails. * *

      If all subtasks complete successfully then the joiner's {@link - * Joiner#result()} method returns a stream of all subtasks, in the order that they - * were forked, for the {@link StructuredTaskScope#join() join()} to return. If - * the scope was opened with a {@linkplain Configuration#withTimeout(Duration) + * Joiner#result()} method returns a list of all results, in the order that the + * subtasks were forked, for the {@link StructuredTaskScope#join() join()} to return. + * If the scope was opened with a {@linkplain Configuration#withTimeout(Duration) * timeout}, and the timeout expires before or while waiting for all subtasks to * complete, then the {@code join} method throws {@code TimeoutException}. * @@ -627,7 +626,7 @@ default void onTimeout() { * * @param the result type of subtasks */ - static Joiner>> allSuccessfulOrThrow() { + static Joiner> allSuccessfulOrThrow() { return new Joiners.AllSuccessful<>(); } @@ -646,8 +645,9 @@ static Joiner>> allSuccessfulOrThrow() { * successfully, then the {@code join} method throws {@code TimeoutException}. * * @param the result type of subtasks + * @since 26 */ - static Joiner anySuccessfulResultOrThrow() { + static Joiner anySuccessfulOrThrow() { return new Joiners.AnySuccessful<>(); } @@ -715,7 +715,7 @@ public Void result() { } /** - * {@return a new Joiner object that yields a stream of all subtasks when all + * {@return a new Joiner object that yields a list of all subtasks when all * subtasks complete or a predicate returns {@code true} to cancel the scope} * *

      The joiner's {@link #onComplete(Subtask)} method invokes the predicate's @@ -728,8 +728,8 @@ public Void result() { * invokes the {@linkplain Thread.UncaughtExceptionHandler uncaught exception handler} * with the exception or error before the thread terminates. * - *

      The joiner's {@link #result()} method returns the stream of all subtasks, - * in fork order. The stream may contain subtasks that have completed + *

      The joiner's {@link #result()} method returns the list of all subtasks, + * in fork order. The list may contain subtasks that have completed * (in {@link Subtask.State#SUCCESS SUCCESS} or {@link Subtask.State#FAILED FAILED} * state) or subtasks in the {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state * if the scope was cancelled before all subtasks were forked or completed. @@ -737,7 +737,7 @@ public Void result() { *

      The joiner's {@link #onTimeout()} method does nothing. If configured with * a {@linkplain Configuration#withTimeout(Duration) timeout}, and the timeout * expires before or while waiting in {@link StructuredTaskScope#join() join}, - * then the {@link #result()} method returns the stream of all subtasks. + * then the {@link #result()} method returns the list of all subtasks. * Subtasks that did not complete before the timeout expired will be in the * {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state. * @@ -745,10 +745,10 @@ public Void result() { * {@linkplain StructuredTaskScope##Cancallation cancels} the scope when two or * more subtasks fail. * {@snippet lang=java : - * class CancelAfterTwoFailures implements Predicate> { + * class CancelAfterTwoFailures implements Predicate> { * private final AtomicInteger failedCount = new AtomicInteger(); * @Override - * public boolean test(Subtask subtask) { + * public boolean test(Subtask subtask) { * return subtask.state() == Subtask.State.FAILED * && failedCount.incrementAndGet() >= 2; * } @@ -764,7 +764,7 @@ public Void result() { * List> invokeAll(Collection> tasks) throws InterruptedException { * try (var scope = StructuredTaskScope.open(Joiner.allUntil(_ -> false))) { * tasks.forEach(scope::fork); - * return scope.join().toList(); + * return scope.join(); * } * } * } @@ -776,6 +776,7 @@ public Void result() { * try (var scope = StructuredTaskScope.open(Joiner.allUntil(_ -> false), cf -> cf.withTimeout(timeout))) { * tasks.forEach(scope::fork); * return scope.join() + * .stream() * .filter(s -> s.state() == Subtask.State.SUCCESS) * .map(Subtask::get) * .toList(); @@ -786,7 +787,7 @@ public Void result() { * @param isDone the predicate to evaluate completed subtasks * @param the result type of subtasks */ - static Joiner>> allUntil(Predicate> isDone) { + static Joiner>> allUntil(Predicate> isDone) { return new Joiners.AllSubtasks<>(isDone); } } diff --git a/src/java.base/share/classes/java/util/concurrent/StructuredTaskScopeImpl.java b/src/java.base/share/classes/java/util/concurrent/StructuredTaskScopeImpl.java index fc6f823e063b3..1051f9244f3e3 100644 --- a/src/java.base/share/classes/java/util/concurrent/StructuredTaskScopeImpl.java +++ b/src/java.base/share/classes/java/util/concurrent/StructuredTaskScopeImpl.java @@ -170,9 +170,11 @@ private void cancelTimeout() { /** * Invoked by the thread for a subtask when the subtask completes before scope is cancelled. */ - private void onComplete(SubtaskImpl subtask) { + private void onComplete(SubtaskImpl subtask) { assert subtask.state() != Subtask.State.UNAVAILABLE; - if (joiner.onComplete(subtask)) { + @SuppressWarnings("unchecked") + var j = (Joiner) joiner; + if (j.onComplete(subtask)) { cancel(); } } @@ -189,7 +191,9 @@ public Subtask fork(Callable task) { var subtask = new SubtaskImpl(this, task); // notify joiner, even if cancelled - if (joiner.onFork(subtask)) { + @SuppressWarnings("unchecked") + var j = (Joiner) joiner; + if (j.onFork(subtask)) { cancel(); } diff --git a/test/jdk/java/util/concurrent/StructuredTaskScope/StressCancellation.java b/test/jdk/java/util/concurrent/StructuredTaskScope/StressCancellation.java index e88943535191f..83adc930bb3c5 100644 --- a/test/jdk/java/util/concurrent/StructuredTaskScope/StressCancellation.java +++ b/test/jdk/java/util/concurrent/StructuredTaskScope/StressCancellation.java @@ -67,7 +67,7 @@ static Stream testCases() { void test(ThreadFactory factory, int beforeCancel, int afterCancel) throws Exception { var joiner = new Joiner() { @Override - public boolean onComplete(Subtask subtask) { + public boolean onComplete(Subtask subtask) { boolean cancel = subtask.get(); return cancel; } diff --git a/test/jdk/java/util/concurrent/StructuredTaskScope/StructuredTaskScopeTest.java b/test/jdk/java/util/concurrent/StructuredTaskScope/StructuredTaskScopeTest.java index fc121a32bfa84..3690178310f3c 100644 --- a/test/jdk/java/util/concurrent/StructuredTaskScope/StructuredTaskScopeTest.java +++ b/test/jdk/java/util/concurrent/StructuredTaskScope/StructuredTaskScopeTest.java @@ -386,7 +386,7 @@ void testJoinAfterJoin1() throws Exception { */ @Test void testJoinAfterJoin2() throws Exception { - try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) { + try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulOrThrow())) { scope.fork(() -> { throw new FooException(); }); Throwable ex = assertThrows(FailedException.class, scope::join); assertTrue(ex.getCause() instanceof FooException); @@ -399,7 +399,7 @@ void testJoinAfterJoin2() throws Exception { } /** - * Test join after join completed with a timeout. + * Test join after join interrupted. */ @Test void testJoinAfterJoinInterrupted() throws Exception { @@ -430,7 +430,7 @@ void testJoinAfterJoinInterrupted() throws Exception { */ @Test void testJoinAfterJoinTimeout() throws Exception { - try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(), + try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulOrThrow(), cf -> cf.withTimeout(Duration.ofMillis(100)))) { // wait for scope to be cancelled by timeout awaitCancelled(scope); @@ -953,7 +953,7 @@ void testIsCancelledAfterClose() throws Exception { void testOnForkThrows() throws Exception { var joiner = new Joiner() { @Override - public boolean onFork(Subtask subtask) { + public boolean onFork(Subtask subtask) { throw new FooException(); } @Override @@ -973,7 +973,7 @@ public Void result() { void testOnForkCancelsExecution() throws Exception { var joiner = new Joiner() { @Override - public boolean onFork(Subtask subtask) { + public boolean onFork(Subtask subtask) { return true; } @Override @@ -996,7 +996,7 @@ public Void result() { void testOnCompleteThrows() throws Exception { var joiner = new Joiner() { @Override - public boolean onComplete(Subtask subtask) { + public boolean onComplete(Subtask subtask) { throw new FooException(); } @Override @@ -1023,7 +1023,7 @@ public Void result() { void testOnCompleteCancelsExecution() throws Exception { var joiner = new Joiner() { @Override - public boolean onComplete(Subtask subtask) { + public boolean onComplete(Subtask subtask) { return true; } @Override @@ -1283,8 +1283,8 @@ void testSubtaskToString() throws Exception { @Test void testAllSuccessfulOrThrow1() throws Throwable { try (var scope = StructuredTaskScope.open(Joiner.allSuccessfulOrThrow())) { - var subtasks = scope.join().toList(); - assertTrue(subtasks.isEmpty()); + var results = scope.join(); + assertTrue(results.isEmpty()); } } @@ -1296,12 +1296,10 @@ void testAllSuccessfulOrThrow1() throws Throwable { void testAllSuccessfulOrThrow2(ThreadFactory factory) throws Throwable { try (var scope = StructuredTaskScope.open(Joiner.allSuccessfulOrThrow(), cf -> cf.withThreadFactory(factory))) { - var subtask1 = scope.fork(() -> "foo"); - var subtask2 = scope.fork(() -> "bar"); - var subtasks = scope.join().toList(); - assertEquals(List.of(subtask1, subtask2), subtasks); - assertEquals("foo", subtask1.get()); - assertEquals("bar", subtask2.get()); + scope.fork(() -> "foo"); + scope.fork(() -> "bar"); + var results = scope.join(); + assertEquals(List.of("foo", "bar"), results); } } @@ -1344,11 +1342,33 @@ void testAllSuccessfulOrThrow4() throws Exception { } /** - * Test Joiner.anySuccessfulResultOrThrow() with no subtasks. + * Test Joiner.allSuccessfulOrThrow() yields an unmodifiable list. + */ + @Test + void testAllSuccessfulOrThrow5() throws Exception { + // empty list + try (var scope = StructuredTaskScope.open(Joiner.allSuccessfulOrThrow())) { + var results = scope.join(); + assertEquals(0, results.size()); + assertThrows(UnsupportedOperationException.class, () -> results.add("foo")); + } + + // non-empty list + try (var scope = StructuredTaskScope.open(Joiner.allSuccessfulOrThrow())) { + scope.fork(() -> "foo"); + var results = scope.join(); + assertEquals(1, results.size()); + assertThrows(UnsupportedOperationException.class, () -> results.add("foo")); + assertThrows(UnsupportedOperationException.class, () -> results.add("bar")); + } + } + + /** + * Test Joiner.anySuccessfulOrThrow() with no subtasks. */ @Test - void testAnySuccessfulResultOrThrow1() throws Exception { - try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow())) { + void testAnySuccessfulOrThrow1() throws Exception { + try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulOrThrow())) { try { scope.join(); } catch (FailedException e) { @@ -1358,12 +1378,12 @@ void testAnySuccessfulResultOrThrow1() throws Exception { } /** - * Test Joiner.anySuccessfulResultOrThrow() with a subtask that completes successfully. + * Test Joiner.anySuccessfulOrThrow() with a subtask that completes successfully. */ @ParameterizedTest @MethodSource("factories") - void testAnySuccessfulResultOrThrow2(ThreadFactory factory) throws Exception { - try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(), + void testAnySuccessfulOrThrow2(ThreadFactory factory) throws Exception { + try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulOrThrow(), cf -> cf.withThreadFactory(factory))) { scope.fork(() -> "foo"); String result = scope.join(); @@ -1372,13 +1392,13 @@ void testAnySuccessfulResultOrThrow2(ThreadFactory factory) throws Exception { } /** - * Test Joiner.anySuccessfulResultOrThrow() with a subtask that completes successfully + * Test Joiner.anySuccessfulOrThrow() with a subtask that completes successfully * with a null result. */ @ParameterizedTest @MethodSource("factories") - void testAnySuccessfulResultOrThrow3(ThreadFactory factory) throws Exception { - try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(), + void testAnySuccessfulOrThrow3(ThreadFactory factory) throws Exception { + try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulOrThrow(), cf -> cf.withThreadFactory(factory))) { scope.fork(() -> null); String result = scope.join(); @@ -1387,13 +1407,13 @@ void testAnySuccessfulResultOrThrow3(ThreadFactory factory) throws Exception { } /** - * Test Joiner.anySuccessfulResultOrThrow() with a subtask that complete succcessfully + * Test Joiner.anySuccessfulOrThrow() with a subtask that complete succcessfully * and a subtask that fails. */ @ParameterizedTest @MethodSource("factories") - void testAnySuccessfulResultOrThrow4(ThreadFactory factory) throws Exception { - try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(), + void testAnySuccessfulOrThrow4(ThreadFactory factory) throws Exception { + try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulOrThrow(), cf -> cf.withThreadFactory(factory))) { scope.fork(() -> "foo"); scope.fork(() -> { throw new FooException(); }); @@ -1403,12 +1423,12 @@ void testAnySuccessfulResultOrThrow4(ThreadFactory factory) throws Exception { } /** - * Test Joiner.anySuccessfulResultOrThrow() with a subtask that fails. + * Test Joiner.anySuccessfulOrThrow() with a subtask that fails. */ @ParameterizedTest @MethodSource("factories") - void testAnySuccessfulResultOrThrow5(ThreadFactory factory) throws Exception { - try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(), + void testAnySuccessfulOrThrow5(ThreadFactory factory) throws Exception { + try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulOrThrow(), cf -> cf.withThreadFactory(factory))) { scope.fork(() -> { throw new FooException(); }); Throwable ex = assertThrows(FailedException.class, scope::join); @@ -1417,11 +1437,11 @@ void testAnySuccessfulResultOrThrow5(ThreadFactory factory) throws Exception { } /** - * Test Joiner.allSuccessfulOrThrow() with a timeout. + * Test Joiner.anySuccessfulOrThrow() with a timeout. */ @Test - void anySuccessfulResultOrThrow6() throws Exception { - try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulResultOrThrow(), + void anySuccessfulOrThrow6() throws Exception { + try (var scope = StructuredTaskScope.open(Joiner.anySuccessfulOrThrow(), cf -> cf.withTimeout(Duration.ofMillis(100)))) { scope.fork(() -> { throw new FooException(); }); scope.fork(() -> { @@ -1573,7 +1593,7 @@ void testAwaitAll4() throws Exception { void testAllUntil1() throws Throwable { try (var scope = StructuredTaskScope.open(Joiner.allUntil(s -> false))) { var subtasks = scope.join(); - assertEquals(0, subtasks.count()); + assertEquals(0, subtasks.size()); } } @@ -1589,7 +1609,7 @@ void testAllUntil2(ThreadFactory factory) throws Exception { var subtask1 = scope.fork(() -> "foo"); var subtask2 = scope.fork(() -> { throw new FooException(); }); - var subtasks = scope.join().toList(); + var subtasks = scope.join(); assertEquals(List.of(subtask1, subtask2), subtasks); assertEquals("foo", subtask1.get()); @@ -1612,7 +1632,7 @@ void testAllUntil3(ThreadFactory factory) throws Exception { return "bar"; }); - var subtasks = scope.join().toList(); + var subtasks = scope.join(); assertEquals(List.of(subtask1, subtask2), subtasks); assertEquals("foo", subtask1.get()); @@ -1628,10 +1648,10 @@ void testAllUntil3(ThreadFactory factory) throws Exception { void testAllUntil4(ThreadFactory factory) throws Exception { // cancel execution after two or more failures - class CancelAfterTwoFailures implements Predicate> { + class CancelAfterTwoFailures implements Predicate> { final AtomicInteger failedCount = new AtomicInteger(); @Override - public boolean test(Subtask subtask) { + public boolean test(Subtask subtask) { return subtask.state() == Subtask.State.FAILED && failedCount.incrementAndGet() >= 2; } @@ -1649,7 +1669,7 @@ public boolean test(Subtask subtask) { Thread.sleep(Duration.ofMillis(20)); } - var subtasks = scope.join().toList(); + var subtasks = scope.join(); assertEquals(forkCount, subtasks.size()); long failedCount = subtasks.stream() @@ -1691,7 +1711,7 @@ void testAllUntil6() throws Exception { }); // TimeoutException should not be thrown - var subtasks = scope.join().toList(); + var subtasks = scope.join(); // stream should have two elements, subtask1 may or may not have completed assertEquals(List.of(subtask1, subtask2), subtasks); @@ -1702,6 +1722,34 @@ void testAllUntil6() throws Exception { } } + /** + * Test Joiner.allUntil(Predicate) yields an unmodifiable list. + */ + @Test + void testAllUntil7() throws Exception { + Subtask subtask1; + try (var scope = StructuredTaskScope.open(Joiner.allUntil(s -> false))) { + subtask1 = scope.fork(() -> "?"); + scope.join(); + } + + // empty list + try (var scope = StructuredTaskScope.open(Joiner.allUntil(s -> false))) { + var subtasks = scope.join(); + assertEquals(0, subtasks.size()); + assertThrows(UnsupportedOperationException.class, () -> subtasks.add(subtask1)); + } + + // non-empty list + try (var scope = StructuredTaskScope.open(Joiner.allUntil(s -> false))) { + var subtask2 = scope.fork(() -> "foo"); + var subtasks = scope.join(); + assertEquals(1, subtasks.size()); + assertThrows(UnsupportedOperationException.class, () -> subtasks.add(subtask1)); + assertThrows(UnsupportedOperationException.class, () -> subtasks.add(subtask2)); + } + } + /** * Test Joiner default methods. */ @@ -1719,7 +1767,7 @@ void testJoinerDefaultMethods() throws Exception { assertEquals(Subtask.State.UNAVAILABLE, subtask2.state()); // Joiner that does not override default methods - Joiner joiner = () -> null; + Joiner joiner = () -> null; assertThrows(NullPointerException.class, () -> joiner.onFork(null)); assertThrows(NullPointerException.class, () -> joiner.onComplete(null)); assertThrows(IllegalArgumentException.class, () -> joiner.onFork(subtask1)); @@ -1747,7 +1795,7 @@ void testJoinersWithUnavailableResult() throws Exception { assertThrows(IllegalArgumentException.class, () -> Joiner.allSuccessfulOrThrow().onComplete(subtask)); assertThrows(IllegalArgumentException.class, - () -> Joiner.anySuccessfulResultOrThrow().onComplete(subtask)); + () -> Joiner.anySuccessfulOrThrow().onComplete(subtask)); assertThrows(IllegalArgumentException.class, () -> Joiner.awaitAllSuccessfulOrThrow().onComplete(subtask)); assertThrows(IllegalArgumentException.class, @@ -1763,7 +1811,7 @@ void testJoinersWithUnavailableResult() throws Exception { assertThrows(IllegalArgumentException.class, () -> Joiner.allSuccessfulOrThrow().onFork(subtask)); assertThrows(IllegalArgumentException.class, - () -> Joiner.anySuccessfulResultOrThrow().onFork(subtask)); + () -> Joiner.anySuccessfulOrThrow().onFork(subtask)); assertThrows(IllegalArgumentException.class, () -> Joiner.awaitAllSuccessfulOrThrow().onFork(subtask)); assertThrows(IllegalArgumentException.class, @@ -1863,9 +1911,9 @@ void testNulls() throws Exception { assertThrows(NullPointerException.class, () -> Joiner.allSuccessfulOrThrow().onComplete(null)); assertThrows(NullPointerException.class, - () -> Joiner.anySuccessfulResultOrThrow().onFork(null)); + () -> Joiner.anySuccessfulOrThrow().onFork(null)); assertThrows(NullPointerException.class, - () -> Joiner.anySuccessfulResultOrThrow().onComplete(null)); + () -> Joiner.anySuccessfulOrThrow().onComplete(null)); } /** @@ -1895,12 +1943,12 @@ private static class CountingJoiner implements Joiner { final AtomicInteger onForkCount = new AtomicInteger(); final AtomicInteger onCompleteCount = new AtomicInteger(); @Override - public boolean onFork(Subtask subtask) { + public boolean onFork(Subtask subtask) { onForkCount.incrementAndGet(); return false; } @Override - public boolean onComplete(Subtask subtask) { + public boolean onComplete(Subtask subtask) { onCompleteCount.incrementAndGet(); return false; } @@ -1924,12 +1972,12 @@ private static class CancelAfterOneJoiner implements Joiner { final AtomicInteger onForkCount = new AtomicInteger(); final AtomicInteger onCompleteCount = new AtomicInteger(); @Override - public boolean onFork(Subtask subtask) { + public boolean onFork(Subtask subtask) { onForkCount.incrementAndGet(); return false; } @Override - public boolean onComplete(Subtask subtask) { + public boolean onComplete(Subtask subtask) { onCompleteCount.incrementAndGet(); return true; } From 28617cffcd3c845a012c44a923f42fb7a58f5f0d Mon Sep 17 00:00:00 2001 From: Alan Bateman Date: Thu, 23 Oct 2025 14:33:45 +0100 Subject: [PATCH 7/9] Sync up from loom repo --- .../util/concurrent/StructuredTaskScope.java | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/java.base/share/classes/java/util/concurrent/StructuredTaskScope.java b/src/java.base/share/classes/java/util/concurrent/StructuredTaskScope.java index b2d1e7b01308e..b7ef226927b3b 100644 --- a/src/java.base/share/classes/java/util/concurrent/StructuredTaskScope.java +++ b/src/java.base/share/classes/java/util/concurrent/StructuredTaskScope.java @@ -110,7 +110,7 @@ * result, a list of elements, or some other object. The {@code Joiner} interface defines * factory methods to create {@code Joiner}s for some common cases. * - *

      A {@code Joiner} may cancel the scope (sometimes called + *

      A {@code Joiner} may cancel the scope (sometimes called * "short-circuiting") when some condition is reached that does not require the result of * subtasks that are still executing. Cancelling the scope prevents new threads from being * started to execute further subtasks, {@linkplain Thread#interrupt() interrupts} the @@ -213,7 +213,7 @@ * {@code StructuredTaskScope} that uses a different {@code ThreadFactory}, is named for * monitoring and management purposes, or has a timeout that cancels the scope if the * timeout expires before or while waiting for subtasks to complete. The {@code open} - * method is called with a {@linkplain UnaryOperator operator} that is applied to the + * method is called with an {@linkplain UnaryOperator operator} that is applied to the * default configuration and returns a {@link Configuration Configuration} for the * {@code StructuredTaskScope} under construction. * @@ -237,7 +237,7 @@ * *

      A second example sets a timeout, represented by a {@link Duration}. The timeout * starts when the new scope is opened. If the timeout expires before the {@code join} - * method has completed then the scope is {@linkplain ##Cancallation cancelled} (this + * method has completed then the scope is {@linkplain ##Cancellation cancelled} (this * interrupts the threads executing the two subtasks), and the {@code join} method * throws {@link TimeoutException TimeoutException}. * {@snippet lang=java : @@ -485,7 +485,7 @@ enum State { *

      If a {@code StructuredTaskScope} is opened with a {@linkplain * Configuration#withTimeout(Duration) timeout}, and the timeout expires before or * while waiting in {@link StructuredTaskScope#join() join()}, then the scope is - * {@linkplain StructuredTaskScope##Cancallation cancelled}, and the {@code Joiners}'s + * {@linkplain StructuredTaskScope##Cancellation cancelled}, and the {@code Joiners}'s * {@link #onTimeout()} method is invoked to notify the {@code Joiner} and optionally * throw {@link TimeoutException TimeoutException}. If the {@code onTimeout()} method * does not throw then the {@code join()} method will invoke the {@link #result()} @@ -609,7 +609,7 @@ default void onTimeout() { /** * {@return a new Joiner object that yields a list of all results when all * subtasks complete successfully} - * The {@code Joiner} {@linkplain StructuredTaskScope##Cancallation cancels} + * The {@code Joiner} {@linkplain StructuredTaskScope##Cancellation cancels} * the scope and causes {@code join} to throw if any subtask fails. * *

      If all subtasks complete successfully then the joiner's {@link @@ -653,7 +653,7 @@ static Joiner anySuccessfulOrThrow() { /** * {@return a new Joiner object that waits for subtasks to complete successfully} - * The {@code Joiner} {@linkplain StructuredTaskScope##Cancallation cancels} + * The {@code Joiner} {@linkplain StructuredTaskScope##Cancellation cancels} * the scope and causes {@code join} to throw if any subtask fails. * *

      The joiner's {@link Joiner#result() result} method returns {@code null} @@ -721,7 +721,7 @@ public Void result() { *

      The joiner's {@link #onComplete(Subtask)} method invokes the predicate's * {@link Predicate#test(Object) test} method with the subtask that completed * successfully or failed with an exception. If the {@code test} method - * returns {@code true} then {@linkplain StructuredTaskScope##Cancallation + * returns {@code true} then {@linkplain StructuredTaskScope##Cancellation * the scope is cancelled}. The {@code test} method must be thread safe as it * may be invoked concurrently from several threads. If the {@code test} method * completes with an exception or error, then the thread that executed the subtask @@ -742,7 +742,7 @@ public Void result() { * {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state. * *

      The following example uses this method to create a {@code Joiner} that - * {@linkplain StructuredTaskScope##Cancallation cancels} the scope when two or + * {@linkplain StructuredTaskScope##Cancellation cancels} the scope when two or * more subtasks fail. * {@snippet lang=java : * class CancelAfterTwoFailures implements Predicate> { @@ -911,7 +911,7 @@ final class TimeoutException extends RuntimeException { * *

      If a {@linkplain Configuration#withTimeout(Duration) timeout} is set then it * starts when the scope is opened. If the timeout expires before the scope has - * {@linkplain #join() joined} then the scope is {@linkplain ##Cancallation cancelled} + * {@linkplain #join() joined} then the scope is {@linkplain ##Cancellation cancelled} * and the {@code Joiner}'s {@link Joiner#onTimeout()} method is invoked to throw * optionally throw {@link TimeoutException TimeoutException}. * @@ -993,7 +993,7 @@ static StructuredTaskScope open() { * method with the subtask in the {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state. * If the {@code onFork} completes with an exception or error then it is propagated by * the {@code fork} method without creating a thread. If the scope is already - * {@linkplain ##Cancallation cancelled}, or {@code onFork} returns {@code true} to + * {@linkplain ##Cancellation cancelled}, or {@code onFork} returns {@code true} to * cancel the scope, then this method returns the {@code Subtask}, in the * {@link Subtask.State#UNAVAILABLE UNAVAILABLE} state, without creating a thread to * execute the subtask. @@ -1060,7 +1060,7 @@ static StructuredTaskScope open() { /** * Returns the result, or throws, after waiting for all subtasks to complete or - * the scope to be {@linkplain ##Cancallation cancelled}. + * the scope to be {@linkplain ##Cancellation cancelled}. * *

      This method waits for all subtasks started in this scope to complete or the * scope to be cancelled. Once finished waiting, the {@code Joiner}'s {@link @@ -1095,7 +1095,7 @@ static StructuredTaskScope open() { R join() throws InterruptedException; /** - * {@return {@code true} if this scope is {@linkplain ##Cancallation cancelled} or in + * {@return {@code true} if this scope is {@linkplain ##Cancellation cancelled} or in * the process of being cancelled, otherwise {@code false}} * *

      Cancelling the scope prevents new threads from starting in the scope and @@ -1115,7 +1115,7 @@ static StructuredTaskScope open() { /** * Closes this scope. * - *

      This method first {@linkplain ##Cancallation cancels} the scope, if not + *

      This method first {@linkplain ##Cancellation cancels} the scope, if not * already cancelled. This interrupts the threads executing unfinished subtasks. This * method then waits for all threads to finish. If interrupted while waiting then it * will continue to wait until the threads finish, before completing with the interrupt From 5be33593efe04534ef7c4df53a9ea268bd5a5ff5 Mon Sep 17 00:00:00 2001 From: Alan Bateman Date: Fri, 24 Oct 2025 12:41:34 +0100 Subject: [PATCH 8/9] Reviewer feedback, sync up from loom repo --- .../classes/java/util/concurrent/Joiners.java | 35 ++++++++++++++----- .../util/concurrent/StructuredTaskScope.java | 22 +++++++----- 2 files changed, 39 insertions(+), 18 deletions(-) diff --git a/src/java.base/share/classes/java/util/concurrent/Joiners.java b/src/java.base/share/classes/java/util/concurrent/Joiners.java index 0b07e89336f83..5d2a0f23ad70f 100644 --- a/src/java.base/share/classes/java/util/concurrent/Joiners.java +++ b/src/java.base/share/classes/java/util/concurrent/Joiners.java @@ -70,14 +70,17 @@ static final class AllSuccessful implements Joiner> { private static final VarHandle FIRST_EXCEPTION = MhUtil.findVarHandle(MethodHandles.lookup(), "firstException", Throwable.class); - // list of forked subtasks, only accessed by owner thread - private final List> subtasks = new ArrayList<>(); + // list of forked subtasks, created lazily, only accessed by owner thread + private List> subtasks; private volatile Throwable firstException; @Override public boolean onFork(Subtask subtask) { ensureUnavailable(subtask); + if (subtasks == null) { + subtasks = new ArrayList<>(); + } subtasks.add(subtask); return false; } @@ -93,10 +96,15 @@ public boolean onComplete(Subtask subtask) { @Override public List result() throws Throwable { Throwable ex = firstException; - if (ex != null) { - throw ex; - } else { - return subtasks.stream().map(Subtask::get).toList(); + try { + if (ex != null) { + throw ex; + } + return (subtasks != null) + ? subtasks.stream().map(Subtask::get).toList() + : List.of(); + } finally { + subtasks = null; // allow subtasks to be GC'ed } } } @@ -187,8 +195,8 @@ public Void result() throws Throwable { static final class AllSubtasks implements Joiner>> { private final Predicate> isDone; - // list of forked subtasks, only accessed by owner thread - private final List> subtasks = new ArrayList<>(); + // list of forked subtasks, created lazily, only accessed by owner thread + private List> subtasks; AllSubtasks(Predicate> isDone) { this.isDone = Objects.requireNonNull(isDone); @@ -197,6 +205,9 @@ static final class AllSubtasks implements Joiner>> { @Override public boolean onFork(Subtask subtask) { ensureUnavailable(subtask); + if (subtasks == null) { + subtasks = new ArrayList<>(); + } subtasks.add(subtask); return false; } @@ -214,7 +225,13 @@ public void onTimeout() { @Override public List> result() { - return List.copyOf(subtasks); + if (subtasks != null) { + List> result = List.copyOf(subtasks); + subtasks = null; // allow subtasks to be GC'ed + return result; + } else { + return List.of(); + } } } } diff --git a/src/java.base/share/classes/java/util/concurrent/StructuredTaskScope.java b/src/java.base/share/classes/java/util/concurrent/StructuredTaskScope.java index b7ef226927b3b..c1751ef364a33 100644 --- a/src/java.base/share/classes/java/util/concurrent/StructuredTaskScope.java +++ b/src/java.base/share/classes/java/util/concurrent/StructuredTaskScope.java @@ -330,8 +330,13 @@ * *

      Actions in the owner thread of a {@code StructuredTaskScope} prior to {@linkplain * #fork forking} of a subtask {@linkplain java.util.concurrent##MemoryVisibility - * happen-before} any actions taken by that subtask, which in turn - * happen-before the subtask result is {@linkplain Subtask#get() retrieved}. + * happen-before} any actions taken by the thread that executes the subtask, which + * in turn happen-before actions in any thread that successfully obtains the + * subtask outcome with {@link Subtask#get() Subtask.get()} or {@link Subtask#exception() + * Subtask.exception()}. If a subtask's outcome contributes to the result or exception + * from {@link #join()}, then any actions taken by the thread executing that subtask + * happen-before the owner thread returns from {@code join} with a result or + * {@link FailedException FailedException}. * *

      General exceptions

      * @@ -497,8 +502,9 @@ enum State { * * @implSpec Implementations of this interface must be thread safe. The {@link * #onComplete(Subtask)} method defined by this interface may be invoked by several - * threads concurrently. The {@link #onTimeout()} method may be invoked at around - * the same time that subtasks complete. + * threads concurrently, concurrently with the owner thread invoking the {@link + * #onFork(Subtask)} method, or if a timeout is configured, concurrently with the owner + * thread invoking the {@link #onTimeout()} method. * * @apiNote It is very important that a new {@code Joiner} object is created for each * {@code StructuredTaskScope}. {@code Joiner} objects should never be shared with @@ -594,9 +600,7 @@ default void onTimeout() { * *

      In normal usage, this method will be called at most once by the {@code join} * method to produce the result (or exception). The behavior of this method when - * invoked directly, and invoked more than once, is undefined. Where possible, an - * implementation should return an equal result (or throw the same exception) on - * second or subsequent calls to produce the outcome. + * invoked directly is undefined. * * @apiNote This method is invoked by the {@code join} method. It should not be * invoked directly. @@ -1118,8 +1122,8 @@ static StructuredTaskScope open() { *

      This method first {@linkplain ##Cancellation cancels} the scope, if not * already cancelled. This interrupts the threads executing unfinished subtasks. This * method then waits for all threads to finish. If interrupted while waiting then it - * will continue to wait until the threads finish, before completing with the interrupt - * status set. + * will continue to wait until the threads finish, before completing with the + * {@linkplain Thread#isInterrupted() interrupted status} set. * *

      This method may only be invoked by the scope owner. If the scope * is already closed then the scope owner invoking this method has no effect. From 824cc5d274ca98a3f18995c8d1b8d7775d43c956 Mon Sep 17 00:00:00 2001 From: Alan Bateman Date: Fri, 24 Oct 2025 14:58:25 +0100 Subject: [PATCH 9/9] Make implSpec clearer --- .../java/util/concurrent/StructuredTaskScope.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/java.base/share/classes/java/util/concurrent/StructuredTaskScope.java b/src/java.base/share/classes/java/util/concurrent/StructuredTaskScope.java index c1751ef364a33..a64a9e967fe7e 100644 --- a/src/java.base/share/classes/java/util/concurrent/StructuredTaskScope.java +++ b/src/java.base/share/classes/java/util/concurrent/StructuredTaskScope.java @@ -500,11 +500,11 @@ enum State { *

      Unless otherwise specified, passing a {@code null} argument to a method * in this class will cause a {@link NullPointerException} to be thrown. * - * @implSpec Implementations of this interface must be thread safe. The {@link - * #onComplete(Subtask)} method defined by this interface may be invoked by several - * threads concurrently, concurrently with the owner thread invoking the {@link - * #onFork(Subtask)} method, or if a timeout is configured, concurrently with the owner - * thread invoking the {@link #onTimeout()} method. + * @implSpec Implementations of this interface must be thread-safe. The {@link + * #onComplete(Subtask)} method may be invoked concurrently, as multiple subtasks can + * complete at the same time. Additionally, the {@code onComplete} method may be + * called concurrently with the scope owner thread invoking the {@link #onFork(Subtask)} + * or {@link #onTimeout()} methods. * * @apiNote It is very important that a new {@code Joiner} object is created for each * {@code StructuredTaskScope}. {@code Joiner} objects should never be shared with