Skip to content

Commit

Permalink
8297495: j.u.concurrent updates for JDK 20
Browse files Browse the repository at this point in the history
Reviewed-by: jpai
  • Loading branch information
Alan Bateman committed Dec 5, 2022
1 parent 3288459 commit 19d8498
Show file tree
Hide file tree
Showing 4 changed files with 276 additions and 4 deletions.
33 changes: 33 additions & 0 deletions src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -2852,6 +2852,10 @@ public void execute(Runnable task) {
/**
* Submits a ForkJoinTask for execution.
*
* @implSpec
* This method is equivalent to {@link #externalSubmit(ForkJoinTask)}
* when called from a thread that is not in this pool.
*
* @param task the task to submit
* @param <T> the type of the task's result
* @return the task
Expand Down Expand Up @@ -2898,6 +2902,31 @@ public ForkJoinTask<?> submit(Runnable task) {

// Added mainly for possible use in Loom

/**
* Submits the given task as if submitted from a non-{@code ForkJoinTask}
* client. The task is added to a scheduling queue for submissions to the
* pool even when called from a thread in the pool.
*
* @implSpec
* This method is equivalent to {@link #submit(ForkJoinTask)} when called
* from a thread that is not in this pool.
*
* @return the task
* @param task the task to submit
* @param <T> the type of the task's result
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @since 20
*/
public <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) {
U.storeStoreFence(); // ensure safely publishable
task.markPoolSubmission();
WorkQueue q = submissionQueue(true);
q.push(task, this, true);
return task;
}

/**
* Submits the given task without guaranteeing that it will
* eventually execute in the absence of available active threads.
Expand All @@ -2909,6 +2938,9 @@ public ForkJoinTask<?> submit(Runnable task) {
* @param task the task
* @param <T> the type of the task's result
* @return the task
* @throws NullPointerException if the task is null
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @since 19
*/
public <T> ForkJoinTask<T> lazySubmit(ForkJoinTask<T> task) {
Expand Down Expand Up @@ -3267,6 +3299,7 @@ public long getStealCount() {
* granularities.
*
* @return the number of queued tasks
* @see ForkJoinWorkerThread#getQueuedTaskCount()
*/
public long getQueuedTaskCount() {
WorkQueue[] qs; WorkQueue q;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,17 @@ public int getPoolIndex() {
return workQueue.getPoolIndex();
}

/**
* {@return a (non-negative) estimate of the number of tasks in the
* thread's queue}
*
* @since 20
* @see ForkJoinPool#getQueuedTaskCount()
*/
public int getQueuedTaskCount() {
return workQueue.queueSize();
}

/**
* Initializes internal state after construction but before
* processing any tasks. If you override this method, you must
Expand Down
219 changes: 219 additions & 0 deletions test/jdk/java/util/concurrent/tck/ForkJoinPool20Test.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/

/*
* This file is available under and governed by the GNU General Public
* License version 2 only, as published by the Free Software Foundation.
* However, the following notice accompanied the original version of this
* file:
*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;

import junit.framework.Test;
import junit.framework.TestSuite;

/**
* Tests for ForkJoinPool and ForkJoinWorkerThread additions in JDK 20.
*/
public class ForkJoinPool20Test extends JSR166TestCase {
public static void main(String[] args) {
main(suite(), args);
}

public static Test suite() {
return new TestSuite(ForkJoinPool20Test.class);
}

/**
* Test that tasks submitted with externalSubmit execute.
*/
public void testExternalSubmit1() throws Exception {
try (var pool = new ForkJoinPool()) {
// submit from external client
var task1 = ForkJoinTask.adapt(() -> "foo");
pool.externalSubmit(task1);
assertEquals(task1.get(), "foo");

// submit from worker thread
Future<Future<String>> task2 = pool.submit(() -> {
return pool.externalSubmit(ForkJoinTask.adapt(() -> "foo"));
});
assertEquals(task2.get().get(), "foo");
}
}

/**
* Test that tasks submitted with externalSubmit are pushed to a submission queue.
*/
public void testExternalSubmit2() throws Exception {
try (var pool = new ForkJoinPool(1)) {
pool.submit(() -> {
assertTrue(pool.getQueuedTaskCount() == 0);
assertTrue(pool.getQueuedSubmissionCount() == 0);

for (int count = 1; count <= 3; count++) {
var task = ForkJoinTask.adapt(() -> { });
pool.externalSubmit(task);

assertTrue(pool.getQueuedTaskCount() == 0);
assertTrue(pool.getQueuedSubmissionCount() == count);
}
}).get();
}
}

/**
* Test externalSubmit return value.
*/
public void testExternalSubmitReturnsTask() {
try (var pool = new ForkJoinPool()) {
var task = ForkJoinTask.adapt(() -> "foo");
assertTrue(pool.externalSubmit(task) == task);
}
}

/**
* Test externalSubmit(null) throws NullPointerException.
*/
public void testExternalSubmitWithNull() {
try (var pool = new ForkJoinPool()) {
assertThrows(NullPointerException.class, () -> pool.externalSubmit(null));
}
}

/**
* Test externalSubmit throws RejectedExecutionException when pool is shutdown.
*/
public void testExternalSubmitWhenShutdown() {
try (var pool = new ForkJoinPool()) {
pool.shutdown();
var task = ForkJoinTask.adapt(() -> { });
assertThrows(RejectedExecutionException.class, () -> pool.externalSubmit(task));
}
}

/**
* Test that tasks submitted with submit(ForkJoinTask) are pushed to a
* submission queue.
*/
public void testSubmit() throws Exception {
try (var pool = new ForkJoinPool(1)) {
ForkJoinWorkerThread worker = submitBusyTask(pool);
try {
assertTrue(worker.getQueuedTaskCount() == 0);
assertTrue(pool.getQueuedTaskCount() == 0);
assertTrue(pool.getQueuedSubmissionCount() == 0);

for (int count = 1; count <= 3; count++) {
var task = ForkJoinTask.adapt(() -> { });
pool.submit(task);

// task should be in submission queue
assertTrue(worker.getQueuedTaskCount() == 0);
assertTrue(pool.getQueuedTaskCount() == 0);
assertTrue(pool.getQueuedSubmissionCount() == count);
}
} finally {
LockSupport.unpark(worker);
}
}
}

/**
* Test ForkJoinWorkerThread::getQueuedTaskCount returns the number of tasks in the
* current thread's queue. This test runs with parallelism of 1 to ensure that tasks
* aren't stolen.
*/
public void testGetQueuedTaskCount1() throws Exception {
try (var pool = new ForkJoinPool(1)) {
pool.submit(() -> {
var worker = (ForkJoinWorkerThread) Thread.currentThread();
assertTrue(worker.getQueuedTaskCount() == 0);

for (int count = 1; count <= 3; count++) {
pool.submit(() -> { });

// task should be in this thread's task queue
assertTrue(worker.getQueuedTaskCount() == count);
assertTrue(pool.getQueuedTaskCount() == count);
assertTrue(pool.getQueuedSubmissionCount() == 0);
}
}).get();
}
}

/**
* Test ForkJoinWorkerThread::getQueuedTaskCount returns the number of tasks in the
* thread's queue. This test runs with parallelism of 2 and one worker active running
* a task. This gives the test two task queues to sample.
*/
public void testGetQueuedTaskCount2() throws Exception {
try (var pool = new ForkJoinPool(2)) {
// keep one worker thread active
ForkJoinWorkerThread worker1 = submitBusyTask(pool);
try {
pool.submit(() -> {
var worker2 = (ForkJoinWorkerThread) Thread.currentThread();
for (int count = 1; count <= 3; count++) {
pool.submit(() -> { });

// task should be in this thread's task queue
assertTrue(worker1.getQueuedTaskCount() == 0);
assertTrue(worker2.getQueuedTaskCount() == count);
assertTrue(pool.getQueuedTaskCount() == count);
assertTrue(pool.getQueuedSubmissionCount() == 0);
}
}).get();
} finally {
LockSupport.unpark(worker1); // release worker1
}
}
}

/**
* Submits a task to the pool, returning the worker thread that runs the
* task. The task runs until the thread is unparked.
*/
static ForkJoinWorkerThread submitBusyTask(ForkJoinPool pool) throws Exception {
var ref = new AtomicReference<ForkJoinWorkerThread>();
pool.submit(() -> {
ref.set((ForkJoinWorkerThread) Thread.currentThread());
LockSupport.park();
});
ForkJoinWorkerThread worker;
while ((worker = ref.get()) == null) {
Thread.sleep(20);
}
return worker;
}
}
17 changes: 13 additions & 4 deletions test/jdk/java/util/concurrent/tck/JSR166TestCase.java
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,8 @@ public String run() {
public static boolean atLeastJava14() { return JAVA_CLASS_VERSION >= 58.0; }
public static boolean atLeastJava15() { return JAVA_CLASS_VERSION >= 59.0; }
public static boolean atLeastJava16() { return JAVA_CLASS_VERSION >= 60.0; }
public static boolean atLeastJava17() { return JAVA_CLASS_VERSION >= 61.0; }
public static boolean atLeastJava19() { return JAVA_CLASS_VERSION >= 63.0; }
public static boolean atLeastJava20() { return JAVA_CLASS_VERSION >= 64.0; }

/**
* Collects all JSR166 unit tests as one suite.
Expand Down Expand Up @@ -633,12 +634,20 @@ public static Test suite() {
addNamedTestClasses(suite, java9TestClassNames);
}

if (atLeastJava17()) {
String[] java17TestClassNames = {
if (atLeastJava19()) {
String[] java19TestClassNames = {
"ForkJoinPool19Test",
};
addNamedTestClasses(suite, java17TestClassNames);
addNamedTestClasses(suite, java19TestClassNames);
}

if (atLeastJava20()) {
String[] java20TestClassNames = {
"ForkJoinPool20Test",
};
addNamedTestClasses(suite, java20TestClassNames);
}

return suite;
}

Expand Down

1 comment on commit 19d8498

@openjdk-notifier
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.