@@ -96,37 +96,37 @@ final class VirtualThread extends BaseVirtualThread {
96
96
* RUNNING -> PARKING // Thread parking with LockSupport.park
97
97
* PARKING -> PARKED // cont.yield successful, parked indefinitely
98
98
* PARKING -> PINNED // cont.yield failed, parked indefinitely on carrier
99
- * PARKED -> RUNNABLE // unparked, schedule to continue
99
+ * PARKED -> UNPARKED // unparked, may be scheduled to continue
100
100
* PINNED -> RUNNING // unparked, continue execution on same carrier
101
+ * UNPARKED -> RUNNING // continue execution after park
101
102
*
102
103
* RUNNING -> TIMED_PARKING // Thread parking with LockSupport.parkNanos
103
104
* TIMED_PARKING -> TIMED_PARKED // cont.yield successful, timed-parked
104
105
* TIMED_PARKING -> TIMED_PINNED // cont.yield failed, timed-parked on carrier
105
- * TIMED_PARKED -> RUNNABLE // unparked, schedule to continue
106
+ * TIMED_PARKED -> UNPARKED // unparked, may be scheduled to continue
106
107
* TIMED_PINNED -> RUNNING // unparked, continue execution on same carrier
107
108
*
108
- * RUNNABLE -> RUNNING // continue execution
109
- *
110
109
* RUNNING -> YIELDING // Thread.yield
111
- * YIELDING -> RUNNABLE // yield successful
112
- * YIELDING -> RUNNING // yield failed
110
+ * YIELDING -> YIELDED // cont.yield successful, may be scheduled to continue
111
+ * YIELDING -> RUNNING // cont.yield failed
112
+ * YIELDED -> RUNNING // continue execution after Thread.yield
113
113
*/
114
114
private static final int NEW = 0 ;
115
115
private static final int STARTED = 1 ;
116
- private static final int RUNNABLE = 2 ; // runnable-unmounted
117
- private static final int RUNNING = 3 ; // runnable-mounted
118
-
119
- // untimed parking
120
- private static final int PARKING = 4 ;
121
- private static final int PARKED = 5 ; // unmounted
122
- private static final int PINNED = 6 ; // mounted
116
+ private static final int RUNNING = 2 ; // runnable-mounted
123
117
124
- // timed parking
125
- private static final int TIMED_PARKING = 7 ;
126
- private static final int TIMED_PARKED = 8 ;
127
- private static final int TIMED_PINNED = 9 ;
118
+ // untimed and timed parking
119
+ private static final int PARKING = 3 ;
120
+ private static final int PARKED = 4 ; // unmounted
121
+ private static final int PINNED = 5 ; // mounted
122
+ private static final int TIMED_PARKING = 6 ;
123
+ private static final int TIMED_PARKED = 7 ; // unmounted
124
+ private static final int TIMED_PINNED = 8 ; // mounted
125
+ private static final int UNPARKED = 9 ; // unmounted but runnable
128
126
129
- private static final int YIELDING = 10 ; // Thread.yield
127
+ // Thread.yield
128
+ private static final int YIELDING = 10 ;
129
+ private static final int YIELDED = 11 ; // unmounted but runnable
130
130
131
131
private static final int TERMINATED = 99 ; // final state
132
132
@@ -218,11 +218,15 @@ private void runContinuation() {
218
218
219
219
// set state to RUNNING
220
220
int initialState = state ();
221
- if (initialState == STARTED && compareAndSetState (STARTED , RUNNING )) {
222
- // first run
223
- } else if (initialState == RUNNABLE && compareAndSetState (RUNNABLE , RUNNING )) {
224
- // consume parking permit
225
- setParkPermit (false );
221
+ if (initialState == STARTED || initialState == UNPARKED || initialState == YIELDED ) {
222
+ // newly started or continue after parking/blocking/Thread.yield
223
+ if (!compareAndSetState (initialState , RUNNING )) {
224
+ return ;
225
+ }
226
+ // consume parking permit when continuing after parking
227
+ if (initialState == UNPARKED ) {
228
+ setParkPermit (false );
229
+ }
226
230
} else {
227
231
// not runnable
228
232
return ;
@@ -244,8 +248,7 @@ private void runContinuation() {
244
248
/**
245
249
* Submits the runContinuation task to the scheduler. For the default scheduler,
246
250
* and calling it on a worker thread, the task will be pushed to the local queue,
247
- * otherwise it will be pushed to a submission queue.
248
- *
251
+ * otherwise it will be pushed to an external submission queue.
249
252
* @throws RejectedExecutionException
250
253
*/
251
254
private void submitRunContinuation () {
@@ -258,7 +261,7 @@ private void submitRunContinuation() {
258
261
}
259
262
260
263
/**
261
- * Submits the runContinuation task to the scheduler with a lazy submit.
264
+ * Submits the runContinuation task to given scheduler with a lazy submit.
262
265
* @throws RejectedExecutionException
263
266
* @see ForkJoinPool#lazySubmit(ForkJoinTask)
264
267
*/
@@ -272,7 +275,7 @@ private void lazySubmitRunContinuation(ForkJoinPool pool) {
272
275
}
273
276
274
277
/**
275
- * Submits the runContinuation task to the scheduler as an external submit.
278
+ * Submits the runContinuation task to the given scheduler as an external submit.
276
279
* @throws RejectedExecutionException
277
280
* @see ForkJoinPool#externalSubmit(ForkJoinTask)
278
281
*/
@@ -457,7 +460,7 @@ private void afterYield() {
457
460
setState (newState );
458
461
459
462
// may have been unparked while parking
460
- if (parkPermit && compareAndSetState (newState , RUNNABLE )) {
463
+ if (parkPermit && compareAndSetState (newState , UNPARKED )) {
461
464
// lazy submit to continue on the current thread as carrier if possible
462
465
if (currentThread () instanceof CarrierThread ct ) {
463
466
lazySubmitRunContinuation (ct .getPool ());
@@ -471,7 +474,7 @@ private void afterYield() {
471
474
472
475
// Thread.yield
473
476
if (s == YIELDING ) {
474
- setState (RUNNABLE );
477
+ setState (YIELDED );
475
478
476
479
// external submit if there are no tasks in the local task queue
477
480
if (currentThread () instanceof CarrierThread ct && ct .getQueuedTaskCount () == 0 ) {
@@ -618,7 +621,7 @@ void parkNanos(long nanos) {
618
621
long startTime = System .nanoTime ();
619
622
620
623
boolean yielded = false ;
621
- Future <?> unparker = scheduleUnpark (this :: unpark , nanos );
624
+ Future <?> unparker = scheduleUnpark (nanos ); // may throw OOME
622
625
setState (TIMED_PARKING );
623
626
try {
624
627
yielded = yieldContinuation (); // may throw
@@ -683,14 +686,15 @@ private void parkOnCarrierThread(boolean timed, long nanos) {
683
686
}
684
687
685
688
/**
686
- * Schedule an unpark task to run after a given delay.
689
+ * Schedule this virtual thread to be unparked after a given delay.
687
690
*/
688
691
@ ChangesCurrentThread
689
- private Future <?> scheduleUnpark (Runnable unparker , long nanos ) {
692
+ private Future <?> scheduleUnpark (long nanos ) {
693
+ assert Thread .currentThread () == this ;
690
694
// need to switch to current carrier thread to avoid nested parking
691
695
switchToCarrierThread ();
692
696
try {
693
- return UNPARKER .schedule (unparker , nanos , NANOSECONDS );
697
+ return UNPARKER .schedule (this :: unpark , nanos , NANOSECONDS );
694
698
} finally {
695
699
switchToVirtualThread (this );
696
700
}
@@ -726,7 +730,7 @@ void unpark() {
726
730
if (!getAndSetParkPermit (true ) && currentThread != this ) {
727
731
int s = state ();
728
732
boolean parked = (s == PARKED ) || (s == TIMED_PARKED );
729
- if (parked && compareAndSetState (s , RUNNABLE )) {
733
+ if (parked && compareAndSetState (s , UNPARKED )) {
730
734
if (currentThread instanceof VirtualThread vthread ) {
731
735
vthread .switchToCarrierThread ();
732
736
try {
@@ -738,7 +742,7 @@ void unpark() {
738
742
submitRunContinuation ();
739
743
}
740
744
} else if ((s == PINNED ) || (s == TIMED_PINNED )) {
741
- // unpark carrier thread when pinned.
745
+ // unpark carrier thread when pinned
742
746
synchronized (carrierThreadAccessLock ()) {
743
747
Thread carrier = carrierThread ;
744
748
if (carrier != null && ((s = state ()) == PINNED || s == TIMED_PINNED )) {
@@ -889,7 +893,8 @@ Thread.State threadState() {
889
893
} else {
890
894
return Thread .State .RUNNABLE ;
891
895
}
892
- case RUNNABLE :
896
+ case UNPARKED :
897
+ case YIELDED :
893
898
// runnable, not mounted
894
899
return Thread .State .RUNNABLE ;
895
900
case RUNNING :
@@ -905,7 +910,7 @@ Thread.State threadState() {
905
910
case PARKING :
906
911
case TIMED_PARKING :
907
912
case YIELDING :
908
- // runnable, mounted, not yet waiting
913
+ // runnable, in transition
909
914
return Thread .State .RUNNABLE ;
910
915
case PARKED :
911
916
case PINNED :
@@ -947,35 +952,58 @@ StackTraceElement[] asyncGetStackTrace() {
947
952
948
953
/**
949
954
* Returns the stack trace for this virtual thread if it is unmounted.
950
- * Returns null if the thread is in another state .
955
+ * Returns null if the thread is mounted or in transition .
951
956
*/
952
957
private StackTraceElement [] tryGetStackTrace () {
953
958
int initialState = state ();
954
- return switch (initialState ) {
955
- case RUNNABLE , PARKED , TIMED_PARKED -> {
956
- int suspendedState = initialState | SUSPENDED ;
957
- if (compareAndSetState (initialState , suspendedState )) {
958
- try {
959
- yield cont .getStackTrace ();
960
- } finally {
961
- assert state == suspendedState ;
962
- setState (initialState );
963
-
964
- // re-submit if runnable
965
- // re-submit if unparked while suspended
966
- if (initialState == RUNNABLE
967
- || (parkPermit && compareAndSetState (initialState , RUNNABLE ))) {
968
- try {
969
- submitRunContinuation ();
970
- } catch (RejectedExecutionException ignore ) { }
971
- }
972
- }
973
- }
974
- yield null ;
959
+ switch (initialState ) {
960
+ case NEW , STARTED , TERMINATED -> {
961
+ return new StackTraceElement [0 ]; // unmounted, empty stack
962
+ }
963
+ case RUNNING , PINNED -> {
964
+ return null ; // mounted
965
+ }
966
+ case PARKED , TIMED_PARKED -> {
967
+ // unmounted, not runnable
968
+ }
969
+ case UNPARKED , YIELDED -> {
970
+ // unmounted, runnable
975
971
}
976
- case NEW , STARTED , TERMINATED -> new StackTraceElement [0 ]; // empty stack
977
- default -> null ;
972
+ case PARKING , TIMED_PARKING , YIELDING -> {
973
+ return null ; // in transition
974
+ }
975
+ default -> throw new InternalError ();
976
+ }
977
+
978
+ // thread is unmounted, prevent it from continuing
979
+ int suspendedState = initialState | SUSPENDED ;
980
+ if (!compareAndSetState (initialState , suspendedState )) {
981
+ return null ;
982
+ }
983
+
984
+ // get stack trace and restore state
985
+ StackTraceElement [] stack ;
986
+ try {
987
+ stack = cont .getStackTrace ();
988
+ } finally {
989
+ assert state == suspendedState ;
990
+ setState (initialState );
991
+ }
992
+ boolean resubmit = switch (initialState ) {
993
+ case UNPARKED , YIELDED -> {
994
+ // resubmit as task may have run while suspended
995
+ yield true ;
996
+ }
997
+ case PARKED , TIMED_PARKED -> {
998
+ // resubmit if unparked while suspended
999
+ yield parkPermit && compareAndSetState (initialState , UNPARKED );
1000
+ }
1001
+ default -> throw new InternalError ();
978
1002
};
1003
+ if (resubmit ) {
1004
+ submitRunContinuation ();
1005
+ }
1006
+ return stack ;
979
1007
}
980
1008
981
1009
@ Override
0 commit comments