Skip to content
Browse files

SI-7442 Update bundled Fork/Join pool (JSR166y)

- Updates ForkJoinPool and dependent classes to the latest jsr166y revisions:

    ForkJoinPool.java:
    Revision 1.185
    Sat Feb 16 20:50:29 2013 UTC (2 months, 2 weeks ago) by jsr166

    ForkJoinTask.java:
    Revision 1.100
    Tue Feb 5 17:09:54 2013 UTC (3 months ago) by jsr166

    ForkJoinWorkerThread.java:
    Revision 1.73
    Wed Nov 21 19:54:39 2012 UTC (5 months, 2 weeks ago) by dl

- Includes Akka-contributed `sun.misc.Unsafe` detection to support Android.
  See changeset 06d685c

- Adds private `CountedCompleter` class.
  This class is only visible and used in `ForkJoinPool.java`.

- Updates desired.sha1 for updated forkjoin.jar.

- Updates binary compatibility whitelists to exclude package-private methods
  in the `forkjoin` package.

- Also fixes SI-7438.
  • Loading branch information...
1 parent 082ca2e commit 77437ffa521a6d1b073283624a722848a8c0b33c @phaller phaller committed May 11, 2013
View
29 bincompat-backward.whitelist.conf
@@ -190,6 +190,35 @@ filter {
{
matchName="scala.reflect.internal.Types#TypeVar.setInst"
problemName=IncompatibleResultTypeProblem
+ },
+ # scala.concurrent.forkjoin (SI-7442)
+ {
+ matchName="scala.concurrent.forkjoin.ForkJoinTask.internalGetCompleter"
+ problemName=MissingMethodProblem
+ },
+ {
+ matchName="scala.concurrent.forkjoin.ForkJoinPool.registerWorker"
+ problemName=IncompatibleMethTypeProblem
+ },
+ {
+ matchName="scala.concurrent.forkjoin.ForkJoinPool.nextWorkerName"
+ problemName=MissingMethodProblem
+ },
+ {
+ matchName="scala.concurrent.forkjoin.ForkJoinPool.signalWork"
+ problemName=MissingMethodProblem
+ },
+ {
+ matchName="scala.concurrent.forkjoin.ForkJoinPool.idlePerActive"
+ problemName=MissingMethodProblem
+ },
+ {
+ matchName="scala.concurrent.forkjoin.ForkJoinPool.tryCompensate"
+ problemName=MissingMethodProblem
+ },
+ {
+ matchName="scala.concurrent.forkjoin.ForkJoinPool.helpJoinOnce"
+ problemName=IncompatibleResultTypeProblem
}
]
}
View
37 bincompat-forward.whitelist.conf
@@ -430,6 +430,43 @@ filter {
{
matchName="scala.reflect.internal.ModifierFlags.DEFAULTMETHOD"
problemName=MissingMethodProblem
+ },
+ # scala.concurrent.forkjoin (SI-7442)
+ {
+ matchName="scala.concurrent.forkjoin.ForkJoinPool.registerWorker"
+ problemName=IncompatibleMethTypeProblem
+ },
+ {
+ matchName="scala.concurrent.forkjoin.ForkJoinPool.externalPush"
+ problemName=MissingMethodProblem
+ },
+ {
+ matchName="scala.concurrent.forkjoin.ForkJoinPool.this"
+ problemName=IncompatibleMethTypeProblem
+ },
+ {
+ matchName="scala.concurrent.forkjoin.ForkJoinPool.signalWork"
+ problemName=MissingMethodProblem
+ },
+ {
+ matchName="scala.concurrent.forkjoin.ForkJoinPool.awaitQuiescence"
+ problemName=MissingMethodProblem
+ },
+ {
+ matchName="scala.concurrent.forkjoin.ForkJoinPool.tryCompensate"
+ problemName=MissingMethodProblem
+ },
+ {
+ matchName="scala.concurrent.forkjoin.ForkJoinTask.recordExceptionalCompletion"
+ problemName=MissingMethodProblem
+ },
+ {
+ matchName="scala.concurrent.forkjoin.ForkJoinTask.internalPropagateException"
+ problemName=MissingMethodProblem
+ },
+ {
+ matchName="scala.concurrent.forkjoin.ForkJoinPool.helpJoinOnce"
+ problemName=IncompatibleResultTypeProblem
}
]
}
View
2 lib/forkjoin.jar.desired.sha1
@@ -1 +1 @@
-f93a2525b5616d3a4bee7848fabbb2856b56f653 *forkjoin.jar
+ddd7d5398733c4fbbb8355c049e258d47af636cf ?forkjoin.jar
View
2,607 src/forkjoin/scala/concurrent/forkjoin/ForkJoinPool.java
1,748 additions, 859 deletions not shown because the diff is too large. Please use a local Git client to view these changes.
View
356 src/forkjoin/scala/concurrent/forkjoin/ForkJoinTask.java
@@ -5,6 +5,7 @@
*/
package scala.concurrent.forkjoin;
+
import java.io.Serializable;
import java.util.Collection;
import java.util.List;
@@ -29,15 +30,18 @@
* subtasks may be hosted by a small number of actual threads in a
* ForkJoinPool, at the price of some usage limitations.
*
- * <p>A "main" {@code ForkJoinTask} begins execution when submitted
- * to a {@link ForkJoinPool}. Once started, it will usually in turn
- * start other subtasks. As indicated by the name of this class,
- * many programs using {@code ForkJoinTask} employ only methods
- * {@link #fork} and {@link #join}, or derivatives such as {@link
+ * <p>A "main" {@code ForkJoinTask} begins execution when it is
+ * explicitly submitted to a {@link ForkJoinPool}, or, if not already
+ * engaged in a ForkJoin computation, commenced in the {@link
+ * ForkJoinPool#commonPool()} via {@link #fork}, {@link #invoke}, or
+ * related methods. Once started, it will usually in turn start other
+ * subtasks. As indicated by the name of this class, many programs
+ * using {@code ForkJoinTask} employ only methods {@link #fork} and
+ * {@link #join}, or derivatives such as {@link
* #invokeAll(ForkJoinTask...) invokeAll}. However, this class also
* provides a number of other methods that can come into play in
- * advanced usages, as well as extension mechanics that allow
- * support of new forms of fork/join processing.
+ * advanced usages, as well as extension mechanics that allow support
+ * of new forms of fork/join processing.
*
* <p>A {@code ForkJoinTask} is a lightweight form of {@link Future}.
* The efficiency of {@code ForkJoinTask}s stems from a set of
@@ -51,7 +55,7 @@
* minimize other blocking synchronization apart from joining other
* tasks or using synchronizers such as Phasers that are advertised to
* cooperate with fork/join scheduling. Subdividable tasks should also
- * not perform blocking IO, and should ideally access variables that
+ * not perform blocking I/O, and should ideally access variables that
* are completely independent of those accessed by other running
* tasks. These guidelines are loosely enforced by not permitting
* checked exceptions such as {@code IOExceptions} to be
@@ -69,10 +73,11 @@
* <p>It is possible to define and use ForkJoinTasks that may block,
* but doing do requires three further considerations: (1) Completion
* of few if any <em>other</em> tasks should be dependent on a task
- * that blocks on external synchronization or IO. Event-style async
- * tasks that are never joined often fall into this category. (2) To
- * minimize resource impact, tasks should be small; ideally performing
- * only the (possibly) blocking action. (3) Unless the {@link
+ * that blocks on external synchronization or I/O. Event-style async
+ * tasks that are never joined (for example, those subclassing {@link
+ * CountedCompleter}) often fall into this category. (2) To minimize
+ * resource impact, tasks should be small; ideally performing only the
+ * (possibly) blocking action. (3) Unless the {@link
* ForkJoinPool.ManagedBlocker} API is used, or the number of possibly
* blocked tasks is known to be less than the pool's {@link
* ForkJoinPool#getParallelism} level, the pool cannot guarantee that
@@ -121,13 +126,7 @@
* other actions. Normally, a concrete ForkJoinTask subclass declares
* fields comprising its parameters, established in a constructor, and
* then defines a {@code compute} method that somehow uses the control
- * methods supplied by this base class. While these methods have
- * {@code public} access (to allow instances of different task
- * subclasses to call each other's methods), some of them may only be
- * called from within other ForkJoinTasks (as may be determined using
- * method {@link #inForkJoinPool}). Attempts to invoke them in other
- * contexts result in exceptions or errors, possibly including {@code
- * ClassCastException}.
+ * methods supplied by this base class.
*
* <p>Method {@link #join} and its variants are appropriate for use
* only when completion dependencies are acyclic; that is, the
@@ -138,17 +137,16 @@
* {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that
* may be of use in constructing custom subclasses for problems that
* are not statically structured as DAGs. To support such usages a
- * ForkJoinTask may be atomically <em>tagged</em> with a {@code
- * short} value using {@link #setForkJoinTaskTag} or {@link
+ * ForkJoinTask may be atomically <em>tagged</em> with a {@code short}
+ * value using {@link #setForkJoinTaskTag} or {@link
* #compareAndSetForkJoinTaskTag} and checked using {@link
- * #getForkJoinTaskTag}. The ForkJoinTask implementation does not
- * use these {@code protected} methods or tags for any purpose, but
- * they may be of use in the construction of specialized subclasses.
- * For example, parallel graph traversals can use the supplied methods
- * to avoid revisiting nodes/tasks that have already been processed.
- * Also, completion based designs can use them to record that subtasks
- * have completed. (Method names for tagging are bulky in part to
- * encourage definition of methods that reflect their usage patterns.)
+ * #getForkJoinTaskTag}. The ForkJoinTask implementation does not use
+ * these {@code protected} methods or tags for any purpose, but they
+ * may be of use in the construction of specialized subclasses. For
+ * example, parallel graph traversals can use the supplied methods to
+ * avoid revisiting nodes/tasks that have already been processed.
+ * (Method names for tagging are bulky in part to encourage definition
+ * of methods that reflect their usage patterns.)
*
* <p>Most base support methods are {@code final}, to prevent
* overriding of implementations that are intrinsically tied to the
@@ -286,8 +284,9 @@ final boolean trySetSignal() {
* @return status upon completion
*/
private int externalAwaitDone() {
- boolean interrupted = false;
int s;
+ ForkJoinPool.externalHelpJoin(this);
+ boolean interrupted = false;
while ((s = status) >= 0) {
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
synchronized (this) {
@@ -315,6 +314,7 @@ private int externalInterruptibleAwaitDone() throws InterruptedException {
int s;
if (Thread.interrupted())
throw new InterruptedException();
+ ForkJoinPool.externalHelpJoin(this);
while ((s = status) >= 0) {
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
synchronized (this) {
@@ -328,6 +328,7 @@ private int externalInterruptibleAwaitDone() throws InterruptedException {
return s;
}
+
/**
* Implementation for join, get, quietlyJoin. Directly handles
* only cases of already-completed, external wait, and
@@ -337,16 +338,12 @@ private int externalInterruptibleAwaitDone() throws InterruptedException {
*/
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
- if ((s = status) >= 0) {
- if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)) {
- if (!(w = (wt = (ForkJoinWorkerThread)t).workQueue).
- tryUnpush(this) || (s = doExec()) >= 0)
- s = wt.pool.awaitJoin(w, this);
- }
- else
- s = externalAwaitDone();
- }
- return s;
+ return (s = status) < 0 ? s :
+ ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
+ (w = (wt = (ForkJoinWorkerThread)t).workQueue).
+ tryUnpush(this) && (s = doExec()) < 0 ? s :
+ wt.pool.awaitJoin(w, this) :
+ externalAwaitDone();
}
/**
@@ -356,14 +353,10 @@ private int doJoin() {
*/
private int doInvoke() {
int s; Thread t; ForkJoinWorkerThread wt;
- if ((s = doExec()) >= 0) {
- if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
- s = (wt = (ForkJoinWorkerThread)t).pool.awaitJoin(wt.workQueue,
- this);
- else
- s = externalAwaitDone();
- }
- return s;
+ return (s = doExec()) < 0 ? s :
+ ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
+ (wt = (ForkJoinWorkerThread)t).pool.awaitJoin(wt.workQueue, this) :
+ externalAwaitDone();
}
// Exception table support
@@ -411,11 +404,11 @@ private int doInvoke() {
}
/**
- * Records exception and sets exceptional completion.
+ * Records exception and sets status.
*
* @return status on exit
*/
- private int setExceptionalCompletion(Throwable ex) {
+ final int recordExceptionalCompletion(Throwable ex) {
int s;
if ((s = status) >= 0) {
int h = System.identityHashCode(this);
@@ -438,17 +431,25 @@ private int setExceptionalCompletion(Throwable ex) {
}
s = setCompletion(EXCEPTIONAL);
}
- ForkJoinTask<?> p = internalGetCompleter(); // propagate
- if (p != null && p.status >= 0)
- p.setExceptionalCompletion(ex);
return s;
}
/**
- * Exception propagation support for tasks with completers.
+ * Records exception and possibly propagates.
+ *
+ * @return status on exit
*/
- ForkJoinTask<?> internalGetCompleter() {
- return null;
+ private int setExceptionalCompletion(Throwable ex) {
+ int s = recordExceptionalCompletion(ex);
+ if ((s & DONE_MASK) == EXCEPTIONAL)
+ internalPropagateException(ex);
+ return s;
+ }
+
+ /**
+ * Hook for exception propagation support for tasks with completers.
+ */
+ void internalPropagateException(Throwable ex) {
}
/**
@@ -467,7 +468,7 @@ static final void cancelIgnoringExceptions(ForkJoinTask<?> t) {
}
/**
- * Removes exception node and clears status
+ * Removes exception node and clears status.
*/
private void clearExceptionalCompletion() {
int h = System.identityHashCode(this);
@@ -595,7 +596,7 @@ static final void helpExpungeStaleExceptions() {
}
}
}
-
+
/**
* A version of "sneaky throw" to relay exceptions
*/
@@ -624,35 +625,35 @@ void uncheckedThrow(Throwable t) throws T {
* Throws exception, if any, associated with the given status.
*/
private void reportException(int s) {
- Throwable ex = ((s == CANCELLED) ? new CancellationException() :
- (s == EXCEPTIONAL) ? getThrowableException() :
- null);
- if (ex != null)
- ForkJoinTask.rethrow(ex);
+ if (s == CANCELLED)
+ throw new CancellationException();
+ if (s == EXCEPTIONAL)
+ rethrow(getThrowableException());
}
// public methods
/**
- * Arranges to asynchronously execute this task. While it is not
- * necessarily enforced, it is a usage error to fork a task more
- * than once unless it has completed and been reinitialized.
- * Subsequent modifications to the state of this task or any data
- * it operates on are not necessarily consistently observable by
- * any thread other than the one executing it unless preceded by a
- * call to {@link #join} or related methods, or a call to {@link
- * #isDone} returning {@code true}.
- *
- * <p>This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
+ * Arranges to asynchronously execute this task in the pool the
+ * current task is running in, if applicable, or using the {@link
+ * ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}. While
+ * it is not necessarily enforced, it is a usage error to fork a
+ * task more than once unless it has completed and been
+ * reinitialized. Subsequent modifications to the state of this
+ * task or any data it operates on are not necessarily
+ * consistently observable by any thread other than the one
+ * executing it unless preceded by a call to {@link #join} or
+ * related methods, or a call to {@link #isDone} returning {@code
+ * true}.
*
* @return {@code this}, to simplify usage
*/
public final ForkJoinTask<V> fork() {
- ((ForkJoinWorkerThread)Thread.currentThread()).workQueue.push(this);
+ Thread t;
+ if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
+ ((ForkJoinWorkerThread)t).workQueue.push(this);
+ else
+ ForkJoinPool.common.externalPush(this);
return this;
}
@@ -702,12 +703,6 @@ public final V invoke() {
* cancelled, completed normally or exceptionally, or left
* unprocessed.
*
- * <p>This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
- *
* @param t1 the first task
* @param t2 the second task
* @throws NullPointerException if any task is null
@@ -733,12 +728,6 @@ public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
* related methods to check if they have been cancelled, completed
* normally or exceptionally, or left unprocessed.
*
- * <p>This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
- *
* @param tasks the tasks
* @throws NullPointerException if any task is null
*/
@@ -766,7 +755,7 @@ else if (t.doJoin() < NORMAL)
}
}
if (ex != null)
- ForkJoinTask.rethrow(ex);
+ rethrow(ex);
}
/**
@@ -782,12 +771,6 @@ else if (t.doJoin() < NORMAL)
* cancelled, completed normally or exceptionally, or left
* unprocessed.
*
- * <p>This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
- *
* @param tasks the collection of tasks
* @return the tasks argument, to simplify usage
* @throws NullPointerException if tasks or any element are null
@@ -823,7 +806,7 @@ else if (t.doJoin() < NORMAL)
}
}
if (ex != null)
- ForkJoinTask.rethrow(ex);
+ rethrow(ex);
return tasks;
}
@@ -996,8 +979,9 @@ public final V get(long timeout, TimeUnit unit)
if (Thread.interrupted())
throw new InterruptedException();
// Messy in part because we measure in nanosecs, but wait in millisecs
- int s; long ns, ms;
- if ((s = status) >= 0 && (ns = unit.toNanos(timeout)) > 0L) {
+ int s; long ms;
+ long ns = unit.toNanos(timeout);
+ if ((s = status) >= 0 && ns > 0L) {
long deadline = System.nanoTime() + ns;
ForkJoinPool p = null;
ForkJoinPool.WorkQueue w = null;
@@ -1006,16 +990,18 @@ public final V get(long timeout, TimeUnit unit)
ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
p = wt.pool;
w = wt.workQueue;
- s = p.helpJoinOnce(w, this); // no retries on failure
+ p.helpJoinOnce(w, this); // no retries on failure
}
+ else
+ ForkJoinPool.externalHelpJoin(this);
boolean canBlock = false;
boolean interrupted = false;
try {
while ((s = status) >= 0) {
- if (w != null && w.runState < 0)
+ if (w != null && w.qlock < 0)
cancelIgnoringExceptions(this);
else if (!canBlock) {
- if (p == null || p.tryCompensate(this, null))
+ if (p == null || p.tryCompensate())
canBlock = true;
}
else {
@@ -1083,17 +1069,15 @@ public final void quietlyInvoke() {
* be of use in designs in which many tasks are forked, but none
* are explicitly joined, instead executing them until all are
* processed.
- *
- * <p>This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
*/
public static void helpQuiesce() {
- ForkJoinWorkerThread wt =
- (ForkJoinWorkerThread)Thread.currentThread();
- wt.pool.helpQuiescePool(wt.workQueue);
+ Thread t;
+ if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
+ ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
+ wt.pool.helpQuiescePool(wt.workQueue);
+ }
+ else
+ ForkJoinPool.quiesceCommonPool();
}
/**
@@ -1146,23 +1130,19 @@ public static boolean inForkJoinPool() {
/**
* Tries to unschedule this task for execution. This method will
- * typically succeed if this task is the most recently forked task
- * by the current thread, and has not commenced executing in
- * another thread. This method may be useful when arranging
- * alternative local processing of tasks that could have been, but
- * were not, stolen.
- *
- * <p>This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
+ * typically (but is not guaranteed to) succeed if this task is
+ * the most recently forked task by the current thread, and has
+ * not commenced executing in another thread. This method may be
+ * useful when arranging alternative local processing of tasks
+ * that could have been, but were not, stolen.
*
* @return {@code true} if unforked
*/
public boolean tryUnfork() {
- return ((ForkJoinWorkerThread)Thread.currentThread())
- .workQueue.tryUnpush(this);
+ Thread t;
+ return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
+ ((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) :
+ ForkJoinPool.tryExternalUnpush(this));
}
/**
@@ -1171,84 +1151,32 @@ public boolean tryUnfork() {
* value may be useful for heuristic decisions about whether to
* fork other tasks.
*
- * <p>This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
- *
* @return the number of tasks
*/
public static int getQueuedTaskCount() {
- return ((ForkJoinWorkerThread) Thread.currentThread())
- .workQueue.queueSize();
+ Thread t; ForkJoinPool.WorkQueue q;
+ if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
+ q = ((ForkJoinWorkerThread)t).workQueue;
+ else
+ q = ForkJoinPool.commonSubmitterQueue();
+ return (q == null) ? 0 : q.queueSize();
}
/**
* Returns an estimate of how many more locally queued tasks are
* held by the current worker thread than there are other worker
- * threads that might steal them. This value may be useful for
+ * threads that might steal them, or zero if this thread is not
+ * operating in a ForkJoinPool. This value may be useful for
* heuristic decisions about whether to fork other tasks. In many
* usages of ForkJoinTasks, at steady state, each worker should
* aim to maintain a small constant surplus (for example, 3) of
* tasks, and to process computations locally if this threshold is
* exceeded.
*
- * <p>This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
- *
* @return the surplus number of tasks, which may be negative
*/
public static int getSurplusQueuedTaskCount() {
- /*
- * The aim of this method is to return a cheap heuristic guide
- * for task partitioning when programmers, frameworks, tools,
- * or languages have little or no idea about task granularity.
- * In essence by offering this method, we ask users only about
- * tradeoffs in overhead vs expected throughput and its
- * variance, rather than how finely to partition tasks.
- *
- * In a steady state strict (tree-structured) computation,
- * each thread makes available for stealing enough tasks for
- * other threads to remain active. Inductively, if all threads
- * play by the same rules, each thread should make available
- * only a constant number of tasks.
- *
- * The minimum useful constant is just 1. But using a value of
- * 1 would require immediate replenishment upon each steal to
- * maintain enough tasks, which is infeasible. Further,
- * partitionings/granularities of offered tasks should
- * minimize steal rates, which in general means that threads
- * nearer the top of computation tree should generate more
- * than those nearer the bottom. In perfect steady state, each
- * thread is at approximately the same level of computation
- * tree. However, producing extra tasks amortizes the
- * uncertainty of progress and diffusion assumptions.
- *
- * So, users will want to use values larger, but not much
- * larger than 1 to both smooth over transient shortages and
- * hedge against uneven progress; as traded off against the
- * cost of extra task overhead. We leave the user to pick a
- * threshold value to compare with the results of this call to
- * guide decisions, but recommend values such as 3.
- *
- * When all threads are active, it is on average OK to
- * estimate surplus strictly locally. In steady-state, if one
- * thread is maintaining say 2 surplus tasks, then so are
- * others. So we can just use estimated queue length.
- * However, this strategy alone leads to serious mis-estimates
- * in some non-steady-state conditions (ramp-up, ramp-down,
- * other stalls). We can detect many of these by further
- * considering the number of "idle" threads, that are known to
- * have zero queued tasks, so compensate by a factor of
- * (#idle/#active) threads.
- */
- ForkJoinWorkerThread wt =
- (ForkJoinWorkerThread)Thread.currentThread();
- return wt.workQueue.queueSize() - wt.pool.idlePerActive();
+ return ForkJoinPool.getSurplusQueuedTaskCount();
}
// Extension methods
@@ -1299,59 +1227,51 @@ public static int getSurplusQueuedTaskCount() {
* primarily to support extensions, and is unlikely to be useful
* otherwise.
*
- * <p>This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
- *
* @return the next task, or {@code null} if none are available
*/
protected static ForkJoinTask<?> peekNextLocalTask() {
- return ((ForkJoinWorkerThread) Thread.currentThread()).workQueue.peek();
+ Thread t; ForkJoinPool.WorkQueue q;
+ if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
+ q = ((ForkJoinWorkerThread)t).workQueue;
+ else
+ q = ForkJoinPool.commonSubmitterQueue();
+ return (q == null) ? null : q.peek();
}
/**
* Unschedules and returns, without executing, the next task
- * queued by the current thread but not yet executed. This method
- * is designed primarily to support extensions, and is unlikely to
- * be useful otherwise.
- *
- * <p>This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
+ * queued by the current thread but not yet executed, if the
+ * current thread is operating in a ForkJoinPool. This method is
+ * designed primarily to support extensions, and is unlikely to be
+ * useful otherwise.
*
* @return the next task, or {@code null} if none are available
*/
protected static ForkJoinTask<?> pollNextLocalTask() {
- return ((ForkJoinWorkerThread) Thread.currentThread())
- .workQueue.nextLocalTask();
+ Thread t;
+ return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
+ ((ForkJoinWorkerThread)t).workQueue.nextLocalTask() :
+ null;
}
/**
- * Unschedules and returns, without executing, the next task
+ * If the current thread is operating in a ForkJoinPool,
+ * unschedules and returns, without executing, the next task
* queued by the current thread but not yet executed, if one is
* available, or if not available, a task that was forked by some
* other thread, if available. Availability may be transient, so a
- * {@code null} result does not necessarily imply quiescence
- * of the pool this task is operating in. This method is designed
+ * {@code null} result does not necessarily imply quiescence of
+ * the pool this task is operating in. This method is designed
* primarily to support extensions, and is unlikely to be useful
* otherwise.
*
- * <p>This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
- *
* @return a task, or {@code null} if none are available
*/
protected static ForkJoinTask<?> pollTask() {
- ForkJoinWorkerThread wt =
- (ForkJoinWorkerThread)Thread.currentThread();
- return wt.pool.nextTaskFor(wt.workQueue);
+ Thread t; ForkJoinWorkerThread wt;
+ return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
+ (wt = (ForkJoinWorkerThread)t).pool.nextTaskFor(wt.workQueue) :
+ null;
}
// tag operations
@@ -1540,14 +1460,16 @@ private void readObject(java.io.ObjectInputStream s)
// Unsafe mechanics
private static final sun.misc.Unsafe U;
private static final long STATUS;
+
static {
exceptionTableLock = new ReentrantLock();
exceptionTableRefQueue = new ReferenceQueue<Object>();
exceptionTable = new ExceptionNode[EXCEPTION_MAP_CAPACITY];
try {
U = getUnsafe();
+ Class<?> k = ForkJoinTask.class;
STATUS = U.objectFieldOffset
- (ForkJoinTask.class.getDeclaredField("status"));
+ (k.getDeclaredField("status"));
} catch (Exception e) {
throw new Error(e);
}
View
20 src/forkjoin/scala/concurrent/forkjoin/ForkJoinWorkerThread.java
@@ -25,10 +25,17 @@
* ForkJoinWorkerThreads are managed by ForkJoinPools and perform
* ForkJoinTasks. For explanation, see the internal documentation
* of class ForkJoinPool.
+ *
+ * This class just maintains links to its pool and WorkQueue. The
+ * pool field is set immediately upon construction, but the
+ * workQueue field is not set until a call to registerWorker
+ * completes. This leads to a visibility race, that is tolerated
+ * by requiring that the workQueue field is only accessed by the
+ * owning thread.
*/
- final ForkJoinPool.WorkQueue workQueue; // Work-stealing mechanics
final ForkJoinPool pool; // the pool this thread works in
+ final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics
/**
* Creates a ForkJoinWorkerThread operating in the given pool.
@@ -37,14 +44,10 @@
* @throws NullPointerException if pool is null
*/
protected ForkJoinWorkerThread(ForkJoinPool pool) {
- super(pool.nextWorkerName());
- setDaemon(true);
- Thread.UncaughtExceptionHandler ueh = pool.ueh;
- if (ueh != null)
- setUncaughtExceptionHandler(ueh);
+ // Use a placeholder until a useful name can be set in registerWorker
+ super("aForkJoinWorkerThread");
this.pool = pool;
- pool.registerWorker(this.workQueue = new ForkJoinPool.WorkQueue
- (pool, this, pool.localMode));
+ this.workQueue = pool.registerWorker(this);
}
/**
@@ -116,4 +119,3 @@ public void run() {
}
}
}
-

0 comments on commit 77437ff

Please sign in to comment.
Something went wrong with that request. Please try again.