Skip to content

Commit

Permalink
6801020: Concurrent Semaphore release may cause some require thread n…
Browse files Browse the repository at this point in the history
…ot signaled

Introduce PROPAGATE waitStatus

Reviewed-by: martin
  • Loading branch information
Doug Lea committed Mar 26, 2009
1 parent 5d6fffa commit b63d6d6
Show file tree
Hide file tree
Showing 3 changed files with 320 additions and 78 deletions.
Expand Up @@ -166,6 +166,11 @@ static final class Node {
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;

/**
* Status field, taking on only the values:
Expand All @@ -180,10 +185,16 @@ static final class Node {
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node until
* transferred. (Use of this value here
* has nothing to do with the other uses
* of the field, but simplifies mechanics.)
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
Expand Down Expand Up @@ -403,10 +414,13 @@ private void setHead(Node node) {
*/
private void unparkSuccessor(Node node) {
/*
* Try to clear status in anticipation of signalling. It is
* OK if this fails or if status is changed by waiting thread.
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
compareAndSetWaitStatus(node, Node.SIGNAL, 0);
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
Expand All @@ -425,24 +439,71 @@ private void unparkSuccessor(Node node) {
LockSupport.unpark(s.thread);
}

/**
* Release action for shared mode -- signal successor and ensure
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

/**
* Sets head of queue, and checks if successor may be waiting
* in shared mode, if so propagating if propagate > 0.
* in shared mode, if so propagating if either propagate > 0 or
* PROPAGATE status was set.
*
* @param pred the node holding waitStatus for node
* @param node the node
* @param propagate the return value from a tryAcquireShared
*/
private void setHeadAndPropagate(Node node, long propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 && node.waitStatus != 0) {
/*
* Don't bother fully figuring out successor. If it
* looks null, call unparkSuccessor anyway to be safe.
*/
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
unparkSuccessor(node);
doReleaseShared();
}
}

Expand All @@ -465,23 +526,27 @@ private void cancelAcquire(Node node) {
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;

// Getting this before setting waitStatus ensures staleness
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;

// Can use unconditional write instead of CAS here
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;

// If we are the tail, remove ourselves
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If "active" predecessor found...
if (pred != head
&& (pred.waitStatus == Node.SIGNAL
|| compareAndSetWaitStatus(pred, 0, Node.SIGNAL))
&& pred.thread != null) {

// If successor is active, set predecessor's next link
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
Expand All @@ -503,14 +568,14 @@ private void cancelAcquire(Node node) {
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int s = pred.waitStatus;
if (s < 0)
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (s > 0) {
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
Expand All @@ -519,14 +584,14 @@ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
}
else
} else {
/*
* Indicate that we need a signal, but don't park yet. Caller
* will need to retry to make sure it cannot acquire before
* parking.
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, 0, Node.SIGNAL);
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

Expand Down Expand Up @@ -1046,9 +1111,7 @@ public final boolean tryAcquireSharedNanos(long arg, long nanosTimeout) throws I
*/
public final boolean releaseShared(long arg) {
if (tryReleaseShared(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
doReleaseShared();
return true;
}
return false;
Expand Down Expand Up @@ -1390,8 +1453,8 @@ final boolean transferForSignal(Node node) {
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int c = p.waitStatus;
if (c > 0 || !compareAndSetWaitStatus(p, c, Node.SIGNAL))
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
Expand Down

0 comments on commit b63d6d6

Please sign in to comment.