@@ -792,25 +792,25 @@ static final class DefaultForkJoinWorkerThreadFactory
792
792
private static final AccessControlContext ACC = contextWithPermissions (
793
793
new RuntimePermission ("getClassLoader" ),
794
794
new RuntimePermission ("setContextClassLoader" ));
795
-
796
795
public final ForkJoinWorkerThread newThread (ForkJoinPool pool ) {
797
796
return AccessController .doPrivileged (
798
797
new PrivilegedAction <>() {
799
798
public ForkJoinWorkerThread run () {
800
- return new ForkJoinWorkerThread (null , pool , true , true );
799
+ return new ForkJoinWorkerThread (null , pool , true , false );
801
800
}},
802
801
ACC );
803
802
}
804
803
}
805
804
806
805
/**
807
- * Factory for InnocuousForkJoinWorkerThread. Support requires
808
- * that we break quite a lot of encapsulation (some via helper
809
- * methods in ThreadLocalRandom) to access and set Thread fields.
806
+ * Factory for CommonPool unless overridded by System
807
+ * property. Creates InnocuousForkJoinWorkerThreads if a security
808
+ * manager is present at time of invocation. Support requires that
809
+ * we break quite a lot of encapsulation (some via helper methods
810
+ * in ThreadLocalRandom) to access and set Thread fields.
810
811
*/
811
- static final class InnocuousForkJoinWorkerThreadFactory
812
+ static final class DefaultCommonPoolForkJoinWorkerThreadFactory
812
813
implements ForkJoinWorkerThreadFactory {
813
- // ACC for access to the factory
814
814
private static final AccessControlContext ACC = contextWithPermissions (
815
815
modifyThreadPermission ,
816
816
new RuntimePermission ("enableContextClassLoaderOverride" ),
@@ -820,11 +820,13 @@ static final class InnocuousForkJoinWorkerThreadFactory
820
820
821
821
public final ForkJoinWorkerThread newThread (ForkJoinPool pool ) {
822
822
return AccessController .doPrivileged (
823
- new PrivilegedAction <>() {
824
- public ForkJoinWorkerThread run () {
825
- return new ForkJoinWorkerThread .
826
- InnocuousForkJoinWorkerThread (pool ); }},
827
- ACC );
823
+ new PrivilegedAction <>() {
824
+ public ForkJoinWorkerThread run () {
825
+ return System .getSecurityManager () == null ?
826
+ new ForkJoinWorkerThread (null , pool , true , true ):
827
+ new ForkJoinWorkerThread .
828
+ InnocuousForkJoinWorkerThread (pool ); }},
829
+ ACC );
828
830
}
829
831
}
830
832
@@ -1689,15 +1691,18 @@ else if (mode < 0)
1689
1691
return -1 ;
1690
1692
else if ((int )(ctl >> RC_SHIFT ) > ac )
1691
1693
Thread .onSpinWait (); // signal in progress
1692
- else if (!(alt = !alt )) { // check between park calls
1693
- if (!Thread .interrupted () && deadline != 0L &&
1694
- deadline - System .currentTimeMillis () <= TIMEOUT_SLOP &&
1695
- compareAndSetCtl (c , ((UC_MASK & (c - TC_UNIT )) |
1696
- (w .stackPred & SP_MASK )))) {
1694
+ else if (deadline != 0L &&
1695
+ deadline - System .currentTimeMillis () <= TIMEOUT_SLOP ) {
1696
+ if (c != (c = ctl )) // ensure consistent
1697
+ ac = (int )(c >> RC_SHIFT );
1698
+ else if (compareAndSetCtl (c , ((UC_MASK & (c - TC_UNIT )) |
1699
+ (w .stackPred & SP_MASK )))) {
1697
1700
w .phase = QUIET ;
1698
1701
return -1 ; // drop on timeout
1699
1702
}
1700
1703
}
1704
+ else if (!(alt = !alt )) // check between park calls
1705
+ Thread .interrupted ();
1701
1706
else if (deadline != 0L )
1702
1707
LockSupport .parkUntil (deadline );
1703
1708
else
@@ -1710,11 +1715,19 @@ else if (deadline != 0L)
1710
1715
// Utilities used by ForkJoinTask
1711
1716
1712
1717
/**
1713
- * Returns true if all workers are busy
1718
+ * Returns true if all workers are busy, possibly creating one if allowed
1714
1719
*/
1715
1720
final boolean isSaturated () {
1716
- long c ;
1717
- return (int )((c = ctl ) >> RC_SHIFT ) >= 0 && ((int )c & ~UNSIGNALLED ) == 0 ;
1721
+ int maxTotal = bounds >>> SWIDTH ;
1722
+ for (long c ;;) {
1723
+ if (((int )(c = ctl ) & ~UNSIGNALLED ) != 0 )
1724
+ return false ;
1725
+ if ((short )(c >>> TC_SHIFT ) >= maxTotal )
1726
+ return true ;
1727
+ long nc = ((c + TC_UNIT ) & TC_MASK ) | (c & ~TC_MASK );
1728
+ if (compareAndSetCtl (c , nc ))
1729
+ return !createWorker ();
1730
+ }
1718
1731
}
1719
1732
1720
1733
/**
@@ -2534,9 +2547,7 @@ private ForkJoinPool(byte forCommonPoolOnly) {
2534
2547
int p = this .mode = Math .min (Math .max (parallelism , 0 ), MAX_CAP );
2535
2548
int size = 1 << (33 - Integer .numberOfLeadingZeros (p > 0 ? p - 1 : 1 ));
2536
2549
this .factory = (fac != null ) ? fac :
2537
- (System .getSecurityManager () == null ?
2538
- defaultForkJoinWorkerThreadFactory :
2539
- new InnocuousForkJoinWorkerThreadFactory ());
2550
+ new DefaultCommonPoolForkJoinWorkerThreadFactory ();
2540
2551
this .ueh = handler ;
2541
2552
this .keepAlive = DEFAULT_KEEPALIVE ;
2542
2553
this .saturate = null ;
@@ -2676,7 +2687,10 @@ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
2676
2687
ForkJoinTask <T > f =
2677
2688
new ForkJoinTask .AdaptedInterruptibleCallable <T >(t );
2678
2689
futures .add (f );
2679
- externalSubmit (f );
2690
+ if (isSaturated ())
2691
+ f .doExec ();
2692
+ else
2693
+ externalSubmit (f );
2680
2694
}
2681
2695
for (int i = futures .size () - 1 ; i >= 0 ; --i )
2682
2696
((ForkJoinTask <?>)futures .get (i )).quietlyJoin ();
@@ -2699,7 +2713,10 @@ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
2699
2713
ForkJoinTask <T > f =
2700
2714
new ForkJoinTask .AdaptedInterruptibleCallable <T >(t );
2701
2715
futures .add (f );
2702
- externalSubmit (f );
2716
+ if (isSaturated ())
2717
+ f .doExec ();
2718
+ else
2719
+ externalSubmit (f );
2703
2720
}
2704
2721
long startTime = System .nanoTime (), ns = nanos ;
2705
2722
boolean timedOut = (ns < 0L );
@@ -2735,14 +2752,22 @@ static final class InvokeAnyRoot<E> extends ForkJoinTask<E> {
2735
2752
final AtomicInteger count ; // in case all throw
2736
2753
InvokeAnyRoot (int n ) { count = new AtomicInteger (n ); }
2737
2754
final void tryComplete (Callable <E > c ) { // called by InvokeAnyTasks
2738
- if (c != null && !isDone ()) { // raciness OK
2739
- try {
2740
- complete (c .call ());
2741
- } catch (Throwable ex ) {
2742
- if (count .getAndDecrement () <= 1 )
2743
- trySetThrown (ex );
2755
+ Throwable ex = null ;
2756
+ boolean failed = false ;
2757
+ if (c != null ) { // raciness OK
2758
+ if (isCancelled ())
2759
+ failed = true ;
2760
+ else if (!isDone ()) {
2761
+ try {
2762
+ complete (c .call ());
2763
+ } catch (Throwable tx ) {
2764
+ ex = tx ;
2765
+ failed = true ;
2766
+ }
2744
2767
}
2745
2768
}
2769
+ if (failed && count .getAndDecrement () <= 1 )
2770
+ trySetThrown (ex != null ? ex : new CancellationException ());
2746
2771
}
2747
2772
public final boolean exec () { return false ; } // never forked
2748
2773
public final E getRawResult () { return result ; }
0 commit comments