Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 47 additions & 30 deletions src/java.base/share/classes/java/util/concurrent/Joiners.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.concurrent.StructuredTaskScope.Joiner;
import java.util.concurrent.StructuredTaskScope.Subtask;
import java.util.function.Predicate;
import java.util.stream.Stream;
import jdk.internal.invoke.MhUtil;

/**
Expand Down Expand Up @@ -64,42 +63,48 @@ private static Subtask.State ensureCompleted(Subtask<?> subtask) {
}

/**
* A joiner that returns a stream of all subtasks when all subtasks complete
* A joiner that returns a list of all results when all subtasks complete
* successfully. Cancels the scope if any subtask fails.
*/
static final class AllSuccessful<T> implements Joiner<T, Stream<Subtask<T>>> {
static final class AllSuccessful<T> implements Joiner<T, List<T>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to clear() the results onTimeout before throwing the TimeoutException.

private static final VarHandle FIRST_EXCEPTION =
MhUtil.findVarHandle(MethodHandles.lookup(), "firstException", Throwable.class);

// list of forked subtasks, only accessed by owner thread
private final List<Subtask<T>> subtasks = new ArrayList<>();
// list of forked subtasks, created lazily, only accessed by owner thread
private List<Subtask<T>> subtasks;

private volatile Throwable firstException;

@Override
public boolean onFork(Subtask<? extends T> subtask) {
public boolean onFork(Subtask<T> subtask) {
ensureUnavailable(subtask);
@SuppressWarnings("unchecked")
var s = (Subtask<T>) subtask;
subtasks.add(s);
if (subtasks == null) {
subtasks = new ArrayList<>();
}
subtasks.add(subtask);
return false;
}

@Override
public boolean onComplete(Subtask<? extends T> subtask) {
public boolean onComplete(Subtask<T> subtask) {
Subtask.State state = ensureCompleted(subtask);
return (state == Subtask.State.FAILED)
&& (firstException == null)
&& FIRST_EXCEPTION.compareAndSet(this, null, subtask.exception());
}

@Override
public Stream<Subtask<T>> result() throws Throwable {
public List<T> result() throws Throwable {
Throwable ex = firstException;
if (ex != null) {
throw ex;
} else {
return subtasks.stream();
try {
if (ex != null) {
throw ex;
}
return (subtasks != null)
? subtasks.stream().map(Subtask::get).toList()
: List.of();
} finally {
subtasks = null; // allow subtasks to be GC'ed
}
}
}
Expand Down Expand Up @@ -130,7 +135,7 @@ private static int stateToInt(Subtask.State s) {
}

@Override
public boolean onComplete(Subtask<? extends T> subtask) {
public boolean onComplete(Subtask<T> subtask) {
Subtask.State state = ensureCompleted(subtask);
Subtask<T> s;
while (((s = this.subtask) == null)
Expand Down Expand Up @@ -166,7 +171,7 @@ static final class AwaitSuccessful<T> implements Joiner<T, Void> {
private volatile Throwable firstException;

@Override
public boolean onComplete(Subtask<? extends T> subtask) {
public boolean onComplete(Subtask<T> subtask) {
Subtask.State state = ensureCompleted(subtask);
return (state == Subtask.State.FAILED)
&& (firstException == null)
Expand All @@ -185,36 +190,48 @@ public Void result() throws Throwable {
}

/**
* A joiner that returns a stream of all subtasks.
* A joiner that returns a list of all subtasks.
*/
static final class AllSubtasks<T> implements Joiner<T, Stream<Subtask<T>>> {
private final Predicate<Subtask<? extends T>> isDone;
static final class AllSubtasks<T> implements Joiner<T, List<Subtask<T>>> {
private final Predicate<Subtask<T>> isDone;

// list of forked subtasks, only accessed by owner thread
private final List<Subtask<T>> subtasks = new ArrayList<>();
// list of forked subtasks, created lazily, only accessed by owner thread
private List<Subtask<T>> subtasks;

AllSubtasks(Predicate<Subtask<? extends T>> isDone) {
AllSubtasks(Predicate<Subtask<T>> isDone) {
this.isDone = Objects.requireNonNull(isDone);
}

@Override
public boolean onFork(Subtask<? extends T> subtask) {
public boolean onFork(Subtask<T> subtask) {
ensureUnavailable(subtask);
@SuppressWarnings("unchecked")
var s = (Subtask<T>) subtask;
subtasks.add(s);
if (subtasks == null) {
subtasks = new ArrayList<>();
}
subtasks.add(subtask);
return false;
}

@Override
public boolean onComplete(Subtask<? extends T> subtask) {
public boolean onComplete(Subtask<T> subtask) {
ensureCompleted(subtask);
return isDone.test(subtask);
}

@Override
public Stream<Subtask<T>> result() {
return subtasks.stream();
public void onTimeout() {
// do nothing, this joiner does not throw TimeoutException
}

@Override
public List<Subtask<T>> result() {
if (subtasks != null) {
List<Subtask<T>> result = List.copyOf(subtasks);
subtasks = null; // allow subtasks to be GC'ed
return result;
} else {
return List.of();
}
}
}
}
Loading