Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move SemaphoreStep states into a single map to try to simplify the implementation #260

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import edu.umd.cs.findbugs.annotations.CheckForNull;
Expand Down Expand Up @@ -78,11 +77,7 @@ synchronized int allocateNumber(String id) {
iota.put(id, number);
return number;
}
/** map from {@link #k} to serial form of {@link StepContext} */
final Map<String,String> contexts = new HashMap<>();
final Map<String,Object> returnValues = new HashMap<>();
final Map<String,Throwable> errors = new HashMap<>();
final Set<String> started = new HashSet<>();
final Map<String,KeyState> keyStates = new HashMap<>();
}

private final String id;
Expand Down Expand Up @@ -112,12 +107,16 @@ public static void success(String k, Object returnValue) {
State s = State.get();
StepContext c;
synchronized (s) {
if (!s.contexts.containsKey(k)) {
KeyState keyState = s.keyStates.get(k);
if (keyState == null || keyState instanceof WillSucceedState || keyState instanceof WillFailState) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe introduce a common supertype WillFinishState.

LOGGER.info(() -> "Planning to unblock " + k + " as success");
s.returnValues.put(k, returnValue);
s.keyStates.put(k, new WillSucceedState(returnValue));
return;
} else if (keyState instanceof FinishedState) {
throw new IllegalStateException(k + " already finished");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This breaks tests. In particular at least workflow-step-api / org.jenkinsci.plugins.workflow.steps.GeneralNonBlockingStepExecutionTest.stop and workflow-cps / org.jenkinsci.plugins.workflow.cps.CpsFlowExecutionTest.stepsAreStoppedWhenCpsVmExecutorServiceHandlesUncaughtException. I'll take a look.

Copy link
Member Author

@dwnusbaum dwnusbaum Mar 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GeneralNonBlockingStepExecutionTest.stop I think just has a redundant call to SemaphoreStep.success that could be removed, but for compatibility we could replace this exception with just a logged warning.

CpsFlowExecutionTest.stepsAreStoppedWhenCpsVmExecutorServiceHandlesUncaughtException is a little unique, basically the test was using semaphore as a way to call StepContext.onSuccess after a Pipeline dies. To preserve that test's behavior without changes, we'd have to make FinishedState keep the serialized context around. Probably clearer to update that test to use a custom step instead.

}
c = getContext(s, k);
s.keyStates.put(k, new FinishedState());
}
LOGGER.info(() -> "Unblocking " + k + " as success");
c.onSuccess(returnValue);
Expand All @@ -134,12 +133,16 @@ public static void failure(String k, Throwable error) {
State s = State.get();
StepContext c;
synchronized (s) {
if (!s.contexts.containsKey(k)) {
Object keyState = s.keyStates.get(k);
if (keyState == null || keyState instanceof WillSucceedState || keyState instanceof WillFailState) {
LOGGER.info(() -> "Planning to unblock " + k + " as failure");
s.errors.put(k, error);
s.keyStates.put(k, new WillFailState(error));
return;
} else if (keyState instanceof FinishedState) {
throw new IllegalStateException(k + " already finished");
}
c = getContext(s, k);
s.keyStates.put(k, new FinishedState());
}
LOGGER.info(() -> "Unblocking " + k + " as failure");
c.onFailure(error);
Expand All @@ -156,13 +159,14 @@ public StepContext getContext() {

private static StepContext getContext(State s, String k) {
assert Thread.holdsLock(s);
return (StepContext) Jenkins.XSTREAM.fromXML(s.contexts.get(k));
return (StepContext) Jenkins.XSTREAM.fromXML(((WaitingState) s.keyStates.get(k)).context);
}

public static void waitForStart(@NonNull String k, @CheckForNull Run<?,?> b) throws IOException, InterruptedException {
State s = State.get();
synchronized (s) {
while (!s.started.contains(k)) {
KeyState keyState;
while (!(((keyState = s.keyStates.get(k)) instanceof WaitingState) || keyState instanceof FinishedState)) {
if (b != null && !b.isBuilding()) {
throw new AssertionError(JenkinsRule.getLog(b));
}
Expand All @@ -189,13 +193,21 @@ public static class Execution extends AbstractStepExecutionImpl {
Object returnValue = null;
Throwable error = null;
boolean success = false, failure = false, sync = true;
String c = Jenkins.XSTREAM.toXML(getContext());
synchronized (s) {
if (s.returnValues.containsKey(k)) {
Object keyState = s.keyStates.get(k);
if (keyState instanceof WillSucceedState) {
success = true;
returnValue = s.returnValues.get(k);
} else if (s.errors.containsKey(k)) {
returnValue = ((WillSucceedState) keyState).returnValue;
s.keyStates.put(k, new FinishedState());
} else if (keyState instanceof WillFailState) {
failure = true;
error = s.errors.get(k);
error = ((WillFailState) keyState).error;
s.keyStates.put(k, new FinishedState());
} else if (keyState == null) {
s.keyStates.put(k, new WaitingState(c));
} else {
throw new IllegalStateException("Unable to start " + k + " in state " + keyState);
}
}
if (success) {
Expand All @@ -206,14 +218,9 @@ public static class Execution extends AbstractStepExecutionImpl {
getContext().onFailure(error);
} else {
LOGGER.info(() -> "Blocking " + k);
String c = Jenkins.XSTREAM.toXML(getContext());
synchronized (s) {
s.contexts.put(k, c);
}
sync = false;
}
synchronized (s) {
s.started.add(k);
s.notifyAll();
}
return sync;
Expand All @@ -222,7 +229,7 @@ public static class Execution extends AbstractStepExecutionImpl {
@Override public void stop(Throwable cause) throws Exception {
State s = State.get();
synchronized (s) {
s.contexts.remove(k);
s.keyStates.put(k, new FinishedState());
}
LOGGER.log(Level.INFO, cause, () -> "Stopping " + k);
super.stop(cause);
Expand All @@ -231,7 +238,16 @@ public static class Execution extends AbstractStepExecutionImpl {
@Override public String getStatus() {
State s = State.get();
synchronized (s) {
return s.contexts.containsKey(k) ? "waiting on " + k : "finished " + k;
KeyState keyState = s.keyStates.get(k);
if (keyState instanceof WillSucceedState) {
return k + " will immediately succeed";
} else if (keyState instanceof WillFailState) {
return k + " will immediately fail";
} else if (keyState instanceof FinishedState) {
return "finished " + k;
} else {
return "waiting on " + k;
}
}
}

Expand All @@ -255,5 +271,37 @@ public static class Execution extends AbstractStepExecutionImpl {

}

// Marker interface just for clarity.
private interface KeyState { }

/**
* The step has not yet started, and will succeed immediately when it does start.
*/
private static class WillSucceedState implements KeyState {
private final Object returnValue;
private WillSucceedState(Object returnValue) {
this.returnValue = returnValue;
}
}

/**
* The step has not yet started, and will fail immediately when it does start.
*/
private static class WillFailState implements KeyState {
private final Throwable error;
private WillFailState(Throwable error) {
this.error = error;
}
}

private static class WaitingState implements KeyState {
private final String context;
private WaitingState(String context) {
this.context = context;
}
}

private static class FinishedState implements KeyState { }

private static final long serialVersionUID = 1L;
}