Skip to content

Commit

Permalink
Refactored configurable trampoline handling into Continuations class.
Browse files Browse the repository at this point in the history
  • Loading branch information
jodzga committed Mar 8, 2017
1 parent 9270bd0 commit 0a0bf44
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 40 deletions.
38 changes: 6 additions & 32 deletions src/com/linkedin/parseq/FusionTask.java
Expand Up @@ -233,17 +233,9 @@ public void done(final T value) throws PromiseResolvedException {
}
settable.done(value);
traceContext.getParent().getTaskLogger().logTaskEnd(FusionTask.this, _traceValueProvider);
if (ParSeqGlobalConfiguration.isTrampolineEnabled()) {
CONTINUATIONS.submit(() -> dest.done(value));
} else {
dest.done(value);
}
CONTINUATIONS.submit(() -> dest.done(value));
} catch (Exception e) {
if (ParSeqGlobalConfiguration.isTrampolineEnabled()) {
CONTINUATIONS.submit(() -> dest.fail(e));
} else {
dest.fail(e);
}
CONTINUATIONS.submit(() -> dest.fail(e));
}
}

Expand All @@ -254,38 +246,20 @@ public void fail(final Throwable error) throws PromiseResolvedException {
traceFailure(error);
settable.fail(error);
traceContext.getParent().getTaskLogger().logTaskEnd(FusionTask.this, _traceValueProvider);
if (ParSeqGlobalConfiguration.isTrampolineEnabled()) {
CONTINUATIONS.submit(() -> dest.fail(error));
} else {
dest.fail(error);
}
CONTINUATIONS.submit(() -> dest.fail(error));
} catch (Exception e) {
if (ParSeqGlobalConfiguration.isTrampolineEnabled()) {
CONTINUATIONS.submit(() -> dest.fail(e));
} else {
dest.fail(e);
}
CONTINUATIONS.submit(() -> dest.fail(e));
}
}
};

if (ParSeqGlobalConfiguration.isTrampolineEnabled()) {
CONTINUATIONS.submit(() -> {
try {
propagator.accept(traceContext, src, next);
} catch (Exception e) {
/* This can only happen if there is an internal problem. Propagators should not throw any exceptions. */
LOGGER.error("ParSeq ingternal error. An exception was thrown by propagator.", e);
}
});
} else {
CONTINUATIONS.submit(() -> {
try {
propagator.accept(traceContext, src, next);
} catch (Exception e) {
/* This can only happen if there is an internal problem. Propagators should not throw any exceptions. */
LOGGER.error("ParSeq ingternal error. An exception was thrown by propagator.", e);
}
}
});
} else {
//non-parent tasks subsequent executions
addPotentialRelationships(traceContext, traceContext.getParent().getTraceBuilder());
Expand Down
8 changes: 7 additions & 1 deletion src/com/linkedin/parseq/internal/Continuations.java
Expand Up @@ -4,6 +4,8 @@
import java.util.ArrayDeque;
import java.util.Deque;

import com.linkedin.parseq.ParSeqGlobalConfiguration;

/**
* This class allows running the following code structure:
* <pre><code>
Expand Down Expand Up @@ -38,7 +40,11 @@ protected Continuation initialValue() {
};

public void submit(final Runnable action) {
CONTINUATION.get().submit(action);
if (ParSeqGlobalConfiguration.isTrampolineEnabled()) {
CONTINUATION.get().submit(action);
} else {
action.run();
}
}

private static final class Continuation {
Expand Down
9 changes: 2 additions & 7 deletions src/com/linkedin/parseq/promise/SettablePromiseImpl.java
Expand Up @@ -115,13 +115,8 @@ public boolean isFailed() {

private void doFinish(final T value, final Throwable error) throws PromiseResolvedException {
final List<PromiseListener<T>> listeners = finalizeResult(value, error);
if (ParSeqGlobalConfiguration.isTrampolineEnabled()) {
CONTINUATIONS.submit(() -> notifyListeners(listeners));
CONTINUATIONS.submit(_awaitLatch::countDown);
} else {
notifyListeners(listeners);
_awaitLatch.countDown();
}
CONTINUATIONS.submit(() -> notifyListeners(listeners));
CONTINUATIONS.submit(_awaitLatch::countDown);
}

private List<PromiseListener<T>> finalizeResult(T value, Throwable error) {
Expand Down

0 comments on commit 0a0bf44

Please sign in to comment.