Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
  • 2 commits
  • 2 files changed
  • 0 commit comments
  • 1 contributor
Commits on Apr 08, 2012
@tsuna Keep the state of a Deferred in a plain int.
Change-Id: Ib87d90cb8d7a45dc438548ae0ae57479af39aa3d
174586d
@tsuna Allow up to 16383 callbacks on a Deferred.
Change-Id: I9f9a7fb5110f693db6813c0b8a62f0d03b896a1e
d67b649
Showing with 55 additions and 35 deletions.
  1. +9 −0 NEWS
  2. +46 −35 src/Deferred.java
View
9 NEWS
@@ -2,6 +2,15 @@ StumbleUpon's Async Library - User visible changes.
* Version 1.2.0 (2012-??-??) [???????]
+Noteworthy changes:
+ - Optimization: the internal state of a Deferred is now captured
+ in a plain `int' instead of using an enum.
+ - The maximum number of callbacks allowed on a Deferred has been
+ significantly increased, from 128 to 16383, to allow for some
+ use cases where a large number of operations are waiting on a
+ single asynchronous operation to complete. This implementation
+ cannot support more than this many callbacks without refactoring.
+
* Version 1.1.0 (2011-09-27) [cf62d8d]
View
81 src/Deferred.java
@@ -27,7 +27,7 @@
import java.util.ArrayList;
import java.util.Collection;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -439,11 +439,14 @@
/**
* Maximum length of the callback chain we allow.
- * Set this to some aggressive limit to quickly detect problems in code
+ * Set this to some arbitrary limit to quickly detect problems in code
* that creates potentially infinite chains. I can't imagine a practical
* case that would require a chain with more callbacks than this.
*/
- private static final short MAX_CALLBACK_CHAIN_LENGTH = 128;
+ private static final short MAX_CALLBACK_CHAIN_LENGTH = Short.MAX_VALUE >> 1;
+ // NOTE: The current implementation cannot support more than this many
+ // callbacks because indexes used to access the `callbacks' array are
+ // of type `short' to save memory.
/**
* How many entries do we create in the callback+errback chain by default.
@@ -481,12 +484,10 @@
* execute them immediately (since we already have the deferred result
* available), which means we're going to return to the RUNNING state.
*/
- private static enum State {
- PENDING,
- RUNNING,
- PAUSED,
- DONE,
- }
+ private static final byte PENDING = 0;
+ private static final byte RUNNING = 1;
+ private static final byte PAUSED = 2;
+ private static final byte DONE = 3;
/**
* The current state of this Deferred.
@@ -495,7 +496,9 @@
* transition with a test first ("if state is A, move to state B") then you
* must use {@link #casState} to do an atomic CAS (Compare And Swap).
*/
- private volatile State state;
+ private volatile int state;
+ // Technically ^^^ this could be a byte, but unfortunately !@#$% Java only
+ // supports CAS on 3 things: references, long, and int. Even through Unsafe.
/**
* The current result. This reference is either of type T or Exception.
@@ -529,8 +532,8 @@
* - When not null, the length is between {@link #INIT_CALLBACK_CHAIN_SIZE}
* and {@link #MAX_CALLBACK_CHAIN_LENGTH} * 2 (both limits inclusive).
* - This array is only grown, never shrunk.
- * - If state is State.PENDING, this list may be null.
- * - If state is State.DONE, this list must be null.
+ * - If state is PENDING, this list may be null.
+ * - If state is DONE, this list must be null.
* - All accesses to this list must be done while synchronizing on `this'.
* Technically, this array could be used as a circular buffer to save RAM.
* However because the typical life of a Deferred is to accumulate callbacks
@@ -545,7 +548,7 @@
/**
* Index in {@link #callbacks} of the next callback to invoke.
* Invariants:
- * - When entering State.DONE, this value is reset to 0.
+ * - When entering DONE, this value is reset to 0.
* - All the callbacks prior to this index are null.
* - If `callbacks' isn't null, the callback at this index is not null
* unless {@code next_callback == last_callback}.
@@ -556,7 +559,7 @@
/**
* Index in {@link #callbacks} past the last callback to invoke.
* Invariants:
- * - When entering State.DONE, this value is reset to 0.
+ * - When entering DONE, this value is reset to 0.
* - All the callbacks at and after this index are null.
* - This value might be equal to {@code callbacks.length}.
* - All accesses to this value must be done while synchronizing on `this'.
@@ -564,10 +567,8 @@
private short last_callback;
/** Helper for atomic CAS on the state. */
- private static final
- AtomicReferenceFieldUpdater<Deferred, State> stateUpdater =
- AtomicReferenceFieldUpdater.newUpdater(Deferred.class, State.class,
- "state");
+ private static final AtomicIntegerFieldUpdater<Deferred> stateUpdater =
+ AtomicIntegerFieldUpdater.newUpdater(Deferred.class, "state");
/**
* Atomically compares and swaps the state of this Deferred.
@@ -575,19 +576,19 @@
* @param val The new state to transition to if the comparison is successful.
* @return {@code true} if the CAS succeeded, {@code false} otherwise.
*/
- private boolean casState(final State cmp, final State val) {
+ private boolean casState(final int cmp, final int val) {
return stateUpdater.compareAndSet(this, cmp, val);
}
/** Constructor. */
public Deferred() {
- state = State.PENDING;
+ state = PENDING;
}
/** Private constructor used when the result is already available. */
private Deferred(final Object result) {
this.result = result;
- state = State.DONE;
+ state = DONE;
}
/**
@@ -649,10 +650,10 @@ private Deferred(final Object result) {
// before we add another callback.
synchronized (this) {
// If we're DONE, switch to RUNNING atomically.
- if (state == State.DONE) {
+ if (state == DONE) {
// This "check-then-act" sequence is safe as this is the only code
// path that transitions from DONE to RUNNING and it's synchronized.
- state = State.RUNNING;
+ state = RUNNING;
} else {
// We get here if weren't DONE (most common code path)
// -or-
@@ -699,7 +700,7 @@ else if (last_callback == callbacks.length) {
if (more) {
runCallbacks(); // Will put us back either in DONE or in PAUSED.
} else {
- state = State.DONE;
+ state = DONE;
}
}
return (Deferred<R>) ((Object) this);
@@ -960,7 +961,7 @@ public String toString() {
* @throws AssertionError if {@code initresult == this}.
*/
public void callback(final Object initresult) {
- if (!casState(State.PENDING, State.RUNNING)) {
+ if (!casState(PENDING, RUNNING)) {
throw new AssertionError("This Deferred was already called!"
+ " New result=" + initresult + ", this=" + this);
}
@@ -1097,7 +1098,7 @@ public T joinUninterruptibly(final long timeout) throws Exception {
@SuppressWarnings("unchecked")
private T doJoin(final boolean interruptible, final long timeout)
throws InterruptedException, Exception {
- if (state == State.DONE) { // Nothing to join, we're already DONE.
+ if (state == DONE) { // Nothing to join, we're already DONE.
if (result instanceof Exception) {
throw (Exception) result;
}
@@ -1227,7 +1228,7 @@ private void runCallbacks() {
// leave this method, and then addCallbacks would add callbacks that
// would never get called.
else {
- state = State.DONE;
+ state = DONE;
callbacks = null;
next_callback = last_callback = 0;
break;
@@ -1251,7 +1252,7 @@ private void runCallbacks() {
* Executes a single callback, handling continuation if it returns a Deferred.
* @param cb The callback to execute.
* @return {@code true} if the callback returned a Deferred and we switched to
- * State.PAUSED, {@code false} otherwise and we didn't change state.
+ * PAUSED, {@code false} otherwise and we didn't change state.
*/
@SuppressWarnings("unchecked")
private boolean doCall(final Callback cb) {
@@ -1289,9 +1290,9 @@ private void handleContinuation(final Deferred d, final Callback cb) {
// adding a callback on `d' to continue when `d' completes,
// we atomically read the result off of `d' immediately. To
// do this safely, we need to change `d's state momentarily.
- if (d.casState(State.DONE, State.RUNNING)) {
+ if (d.casState(DONE, RUNNING)) {
result = d.result; // No one will change `d.result' now.
- d.state = State.DONE;
+ d.state = DONE;
runCallbacks();
return;
}
@@ -1301,11 +1302,11 @@ private void handleContinuation(final Deferred d, final Callback cb) {
// resume when the result of that other Deferred is available.
// If it was DONE, well we're no longer DONE because we now need to wait
// on that other Deferred to complete, so we're PAUSED again.
- state = State.PAUSED;
+ state = PAUSED;
d.addBoth(new Continue(d, cb));
// If d is DONE and our callback chain is empty, we're now in state DONE.
// Otherwise we're still in state PAUSED.
- if (LOG.isDebugEnabled() && state == State.PAUSED) {
+ if (LOG.isDebugEnabled() && state == PAUSED) {
if (cb != null) {
LOG.debug("callback=" + cb + '@' + cb.hashCode() + " returned " + d
+ ", so the following Deferred is getting paused: " + this);
@@ -1337,7 +1338,7 @@ public Continue(final Deferred d, final Callback cb) {
public Object call(final Object arg) {
if (arg instanceof Deferred) {
handleContinuation((Deferred) arg, cb);
- } else if (!casState(State.PAUSED, State.RUNNING)) {
+ } else if (!casState(PAUSED, RUNNING)) {
final String cb2s = cb == null ? "null" : cb + "@" + cb.hashCode();
throw new AssertionError("Tried to resume the execution of "
+ Deferred.this + ") although it's not in state=PAUSED."
@@ -1372,7 +1373,7 @@ public String toString() {
* especially if this is in the fast-path of your application.
*/
public String toString() {
- final State state = this.state; // volatile access before reading result.
+ final int state = this.state; // volatile access before reading result.
String result;
if (this.result == null) {
result = "null";
@@ -1390,7 +1391,7 @@ public String toString() {
final StringBuilder buf = new StringBuilder((9 + 10 + 7 + 7
+ result.length()) * 2);
buf.append("Deferred@").append(super.hashCode())
- .append("(state=").append(state)
+ .append("(state=").append(stateString(state))
.append(", result=").append(result)
.append(", callback=");
synchronized (this) {
@@ -1412,4 +1413,14 @@ public String toString() {
return buf.toString();
}
+ private static String stateString(final int state) {
+ switch (state) {
+ case PENDING: return "PENDING";
+ case RUNNING: return "RUNNING";
+ case PAUSED: return "PAUSED";
+ case DONE: return "DONE";
+ }
+ throw new AssertionError("Should never be here. WTF: state=" + state);
+ }
+
}

No commit comments for this range

Something went wrong with that request. Please try again.