From f37b8c892a80bef52cfe31aa0d9005bf5eed6606 Mon Sep 17 00:00:00 2001 From: Fabio Tudone Date: Sun, 1 May 2016 21:50:47 +0300 Subject: [PATCH] :fire: Improve `unpark` scheduling: mark `PARKING` fiber tasks for immediate resume and return instead of spinwaiting --- .../forkjoin/ParkableForkJoinTask.java | 43 +++++++++++++++---- .../forkjoin/ParkableForkJoinTask.java | 43 +++++++++++++++---- .../fibers/RunnableFiberTask.java | 34 ++++++++++++--- 3 files changed, 97 insertions(+), 23 deletions(-) diff --git a/quasar-core/src/jdk7/java/co/paralleluniverse/concurrent/forkjoin/ParkableForkJoinTask.java b/quasar-core/src/jdk7/java/co/paralleluniverse/concurrent/forkjoin/ParkableForkJoinTask.java index d6b53d430c..0af9211273 100644 --- a/quasar-core/src/jdk7/java/co/paralleluniverse/concurrent/forkjoin/ParkableForkJoinTask.java +++ b/quasar-core/src/jdk7/java/co/paralleluniverse/concurrent/forkjoin/ParkableForkJoinTask.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013-2014, Parallel Universe Software Co. All rights reserved. + * Copyright (c) 2013-2016, Parallel Universe Software Co. All rights reserved. * * This program and the accompanying materials are dual-licensed under * either the terms of the Eclipse Public License v1.0 as published by @@ -168,10 +168,32 @@ protected StackTraceElement[] getUnparkStackTrace() { } protected void doPark(boolean yield) { - if (yield) + if (yield) { submit(); - else - this.state = PARKED; + } else { + int newState; + int _state; + loop: do { + _state = getState(); + switch (_state) { + case PARKING: + newState = PARKED; + break; + case RUNNABLE: + newState = RUNNABLE; + break loop; + case LEASED: + newState = RUNNABLE; + break; + default: + throw new AssertionError("Illegal task state (a fiber has no chance to enter `doPark` in anything else than `PARKED` or `RESTART`): " + _state); + } + } while (!compareAndSetState(_state, newState)); + + if (newState == RUNNABLE) + submit(); + } + onParked(yield); } @@ -241,7 +263,8 @@ public void unpark(ForkJoinPool fjPool, Object unblocker) { newState = RUNNABLE; break; case PARKING: - continue; // spin and wait + newState = RUNNABLE; // Represents immediate resume for `doPark` + break; case LEASED: if (Debug.isDebug()) record("unpark", "current: %s - %s. return.", this, _state); @@ -259,10 +282,12 @@ public void unpark(ForkJoinPool fjPool, Object unblocker) { this.unparker = unblocker; if (CAPTURE_UNPARK_STACK) this.unparkStackTrace = Thread.currentThread().getStackTrace(); - if (fjPool != null) - submit(fjPool); - else - submit(); + if (_state != PARKING) { + if (fjPool != null) + submit(fjPool); + else + submit(); + } } } diff --git a/quasar-core/src/jdk8/java/co/paralleluniverse/concurrent/forkjoin/ParkableForkJoinTask.java b/quasar-core/src/jdk8/java/co/paralleluniverse/concurrent/forkjoin/ParkableForkJoinTask.java index 7a08f93462..0abfda0122 100644 --- a/quasar-core/src/jdk8/java/co/paralleluniverse/concurrent/forkjoin/ParkableForkJoinTask.java +++ b/quasar-core/src/jdk8/java/co/paralleluniverse/concurrent/forkjoin/ParkableForkJoinTask.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013-2014, Parallel Universe Software Co. All rights reserved. + * Copyright (c) 2013-2016, Parallel Universe Software Co. All rights reserved. * * This program and the accompanying materials are dual-licensed under * either the terms of the Eclipse Public License v1.0 as published by @@ -168,10 +168,32 @@ protected StackTraceElement[] getUnparkStackTrace() { } protected void doPark(boolean yield) { - if (yield) + if (yield) { submit(); - else - this.state = PARKED; + } else { + int newState; + int _state; + loop: do { + _state = getState(); + switch (_state) { + case PARKING: + newState = PARKED; + break; + case RUNNABLE: + newState = RUNNABLE; + break loop; + case LEASED: + newState = RUNNABLE; + break; + default: + throw new AssertionError("Illegal task state (a fiber has no chance to enter `doPark` in anything else than `PARKED` or `RESTART`): " + _state); + } + } while (!compareAndSetState(_state, newState)); + + if (newState == RUNNABLE) + submit(); + } + onParked(yield); } @@ -241,7 +263,8 @@ public void unpark(ForkJoinPool fjPool, Object unblocker) { newState = RUNNABLE; break; case PARKING: - continue; // spin and wait + newState = RUNNABLE; // Represents immediate resume for `doPark` + break; case LEASED: if (Debug.isDebug()) record("unpark", "current: %s - %s. return.", this, _state); @@ -259,10 +282,12 @@ public void unpark(ForkJoinPool fjPool, Object unblocker) { this.unparker = unblocker; if (CAPTURE_UNPARK_STACK) this.unparkStackTrace = Thread.currentThread().getStackTrace(); - if (fjPool != null) - submit(fjPool); - else - submit(); + if (_state != PARKING) { + if (fjPool != null) + submit(fjPool); + else + submit(); + } } } diff --git a/quasar-core/src/main/java/co/paralleluniverse/fibers/RunnableFiberTask.java b/quasar-core/src/main/java/co/paralleluniverse/fibers/RunnableFiberTask.java index 1aa4daeb94..18c8312d83 100644 --- a/quasar-core/src/main/java/co/paralleluniverse/fibers/RunnableFiberTask.java +++ b/quasar-core/src/main/java/co/paralleluniverse/fibers/RunnableFiberTask.java @@ -149,10 +149,32 @@ public StackTraceElement[] getUnparkStackTrace() { @Override public void doPark(boolean yield) { - if (yield) + if (yield) { submit(); - else - this.state = PARKED; + } else { + int newState; + int _state; + loop: do { + _state = getState(); + switch (_state) { + case PARKING: + newState = PARKED; + break; + case RUNNABLE: + newState = RUNNABLE; + break loop; + case LEASED: + newState = RUNNABLE; + break; + default: + throw new AssertionError("Illegal task state (a fiber has no chance to enter `doPark` in anything else than `PARKED` or `RESTART`): " + _state); + } + } while (!compareAndSetState(_state, newState)); + + if (newState == RUNNABLE) + submit(); + } + onParked(yield); } @@ -223,7 +245,8 @@ public void unpark(Object unblocker) { newState = RUNNABLE; break; case PARKING: - continue; // spin and wait + newState = RUNNABLE; // Represents immediate resume for `doPark` + break; case LEASED: if (Debug.isDebug()) record("unpark", "current: %s - %s. return.", this, _state); @@ -241,7 +264,8 @@ public void unpark(Object unblocker) { this.unparker = unblocker; if (CAPTURE_UNPARK_STACK) this.unparkStackTrace = Thread.currentThread().getStackTrace(); - submit(); + if (_state != PARKING) + submit(); } }