Skip to content

Commit

Permalink
🔥 Improve unpark scheduling: mark PARKING fiber tasks for immedia…
Browse files Browse the repository at this point in the history
…te resume and return instead of spinwaiting
  • Loading branch information
Fabio Tudone committed May 1, 2016
1 parent d6fdf31 commit f37b8c8
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 23 deletions.
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2013-2014, Parallel Universe Software Co. All rights reserved.
* Copyright (c) 2013-2016, Parallel Universe Software Co. All rights reserved.
*
* This program and the accompanying materials are dual-licensed under
* either the terms of the Eclipse Public License v1.0 as published by
Expand Down Expand Up @@ -168,10 +168,32 @@ protected StackTraceElement[] getUnparkStackTrace() {
}

protected void doPark(boolean yield) {
if (yield)
if (yield) {
submit();
else
this.state = PARKED;
} else {
int newState;
int _state;
loop: do {
_state = getState();
switch (_state) {
case PARKING:
newState = PARKED;
break;
case RUNNABLE:
newState = RUNNABLE;
break loop;
case LEASED:
newState = RUNNABLE;
break;
default:
throw new AssertionError("Illegal task state (a fiber has no chance to enter `doPark` in anything else than `PARKED` or `RESTART`): " + _state);
}
} while (!compareAndSetState(_state, newState));

if (newState == RUNNABLE)
submit();
}

onParked(yield);
}

Expand Down Expand Up @@ -241,7 +263,8 @@ public void unpark(ForkJoinPool fjPool, Object unblocker) {
newState = RUNNABLE;
break;
case PARKING:
continue; // spin and wait
newState = RUNNABLE; // Represents immediate resume for `doPark`
break;
case LEASED:
if (Debug.isDebug())
record("unpark", "current: %s - %s. return.", this, _state);
Expand All @@ -259,10 +282,12 @@ public void unpark(ForkJoinPool fjPool, Object unblocker) {
this.unparker = unblocker;
if (CAPTURE_UNPARK_STACK)
this.unparkStackTrace = Thread.currentThread().getStackTrace();
if (fjPool != null)
submit(fjPool);
else
submit();
if (_state != PARKING) {
if (fjPool != null)
submit(fjPool);
else
submit();
}
}
}

Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2013-2014, Parallel Universe Software Co. All rights reserved.
* Copyright (c) 2013-2016, Parallel Universe Software Co. All rights reserved.
*
* This program and the accompanying materials are dual-licensed under
* either the terms of the Eclipse Public License v1.0 as published by
Expand Down Expand Up @@ -168,10 +168,32 @@ protected StackTraceElement[] getUnparkStackTrace() {
}

protected void doPark(boolean yield) {
if (yield)
if (yield) {
submit();
else
this.state = PARKED;
} else {
int newState;
int _state;
loop: do {
_state = getState();
switch (_state) {
case PARKING:
newState = PARKED;
break;
case RUNNABLE:
newState = RUNNABLE;
break loop;
case LEASED:
newState = RUNNABLE;
break;
default:
throw new AssertionError("Illegal task state (a fiber has no chance to enter `doPark` in anything else than `PARKED` or `RESTART`): " + _state);
}
} while (!compareAndSetState(_state, newState));

if (newState == RUNNABLE)
submit();
}

onParked(yield);
}

Expand Down Expand Up @@ -241,7 +263,8 @@ public void unpark(ForkJoinPool fjPool, Object unblocker) {
newState = RUNNABLE;
break;
case PARKING:
continue; // spin and wait
newState = RUNNABLE; // Represents immediate resume for `doPark`
break;
case LEASED:
if (Debug.isDebug())
record("unpark", "current: %s - %s. return.", this, _state);
Expand All @@ -259,10 +282,12 @@ public void unpark(ForkJoinPool fjPool, Object unblocker) {
this.unparker = unblocker;
if (CAPTURE_UNPARK_STACK)
this.unparkStackTrace = Thread.currentThread().getStackTrace();
if (fjPool != null)
submit(fjPool);
else
submit();
if (_state != PARKING) {
if (fjPool != null)
submit(fjPool);
else
submit();
}
}
}

Expand Down
Expand Up @@ -149,10 +149,32 @@ public StackTraceElement[] getUnparkStackTrace() {

@Override
public void doPark(boolean yield) {
if (yield)
if (yield) {
submit();
else
this.state = PARKED;
} else {
int newState;
int _state;
loop: do {
_state = getState();
switch (_state) {
case PARKING:
newState = PARKED;
break;
case RUNNABLE:
newState = RUNNABLE;
break loop;
case LEASED:
newState = RUNNABLE;
break;
default:
throw new AssertionError("Illegal task state (a fiber has no chance to enter `doPark` in anything else than `PARKED` or `RESTART`): " + _state);
}
} while (!compareAndSetState(_state, newState));

if (newState == RUNNABLE)
submit();
}

onParked(yield);
}

Expand Down Expand Up @@ -223,7 +245,8 @@ public void unpark(Object unblocker) {
newState = RUNNABLE;
break;
case PARKING:
continue; // spin and wait
newState = RUNNABLE; // Represents immediate resume for `doPark`
break;
case LEASED:
if (Debug.isDebug())
record("unpark", "current: %s - %s. return.", this, _state);
Expand All @@ -241,7 +264,8 @@ public void unpark(Object unblocker) {
this.unparker = unblocker;
if (CAPTURE_UNPARK_STACK)
this.unparkStackTrace = Thread.currentThread().getStackTrace();
submit();
if (_state != PARKING)
submit();
}
}

Expand Down

0 comments on commit f37b8c8

Please sign in to comment.