diff --git a/doc/persistence.md b/doc/persistence.md
new file mode 100644
index 000000000..6ff4b405d
--- /dev/null
+++ b/doc/persistence.md
@@ -0,0 +1,40 @@
+# The Pipeline Persistence Model
+
+# Data Model
+Running pipelines persist in 3 pieces:
+
+1. The `FlowNode`s - stored by a `FlowNodeStorage` - this holds the FlowNodes created to map to `Step`s, and for block scoped Steps, start and end of blocks
+2. The `CpsFlowExecution` - this is currently stored in the WorkflowRun, and the primary pieces of interest are:
+ * heads - the current "tips" of the Flow Graph, i.e. the FlowNodes that represent running steps and are appended to
+ - A head maps to a `CpsThread` in the Pipeline program, within the `CpsThreadGroup`
+ * starts - the `BlockStartNode`s marking the start(s) of the currently executing blocks
+ * scripts - the loaded Pipeline script files (text)
+ * persistedClean
+ - If true, Pipeline saved its execution cleanly to disk and we *might* be able to resume it
+ - If false, something went wrong saving the execution, so we cannot resume even if we'd otherwise be able to
+ - If null, probably the build dates back to before this field was added - we check to see if this is running in a highly persistent DurabilityMode (Max_survivability generally)
+ * done - if true, this execution completed, if false or un-set, the pipeline is a candidate to resume unless its only head is a FlowEndNode
+ - The handling of false is for legacy reasons, since it was only recently made persistent.
+ *
+ * various other boolean flags & settings for the execution (durability setting, user that started the build, is it sandboxed, etc)
+3. The Program -- this is the current execution state of the Pipeline
+ * This holds the Groovy state - the `CpsThreadGroup` - with runtime calls transformed by CPS so they can persist
+ * The `CpsThread`s map to the running branches of the Pipeline
+ * The program depends on the FlowNodes from the FlowNodeStorage, since it reads them by ID rather than storing them in the program
+ * This also depends on the heads in the CpsFlowExecution, because its FlowHeads are loaded from the heads of the CpsFlowExecution
+ * Also holds the CpsStepContext, i.e. the variables such as EnvVars, Executor and Workspace uses (the latter stored as Pickles)
+ - The pickles will be specially restored when the Pipeline resumes since they don't serialize/deserialize normally
+
+## Persistence Issues And Logic
+
+Some basic rules:
+
+1. If the FlowNodeStorage is corrupt, incomplete, or un-persisted, all manner of heck will break loose
+ - In terms of Pipeline execution, the impact is like the Resonance Cascade from the Half-Life games
+ - The pipeline can never be resumed (the key piece is missing)
+ - Usually we fake up some placeholder FlowNodes to cover this situation and save them
+2. Whenever persisting data, the Pipeline *must* have the FlowNodes persisted on disk (via `storage.flush()` generally)
+in order to be able to load the heads and restore the program.
+3. Once we've set persistedClean as false and saved the FlowExecution, then it doesn't matter what we do -- the Pipeline will assume
+ it already has incomplete persistence data (as with 1) when trying to resume. This is how we handle the low-durability modes, to
+ avoid resuming a stale state of the Pipeline simply because we have old data persisted and are not updating it.
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index baa9f2e76..3fc2e6615 100644
--- a/pom.xml
+++ b/pom.xml
@@ -141,7 +141,7 @@
org.jenkins-ci.plugins.workflow
workflow-job
- 2.20
+ 2.21
test
diff --git a/src/main/java/org/jenkinsci/plugins/workflow/cps/CpsBodyExecution.java b/src/main/java/org/jenkinsci/plugins/workflow/cps/CpsBodyExecution.java
index 6a61e0f29..bd564ca30 100644
--- a/src/main/java/org/jenkinsci/plugins/workflow/cps/CpsBodyExecution.java
+++ b/src/main/java/org/jenkinsci/plugins/workflow/cps/CpsBodyExecution.java
@@ -120,7 +120,6 @@ class CpsBodyExecution extends BodyExecution {
}
head.setNewHead(sn);
- CpsFlowExecution.maybeAutoPersistNode(sn);
StepContext sc = new CpsBodySubContext(context, sn);
for (BodyExecutionCallback c : callbacks) {
@@ -337,7 +336,6 @@ public Next receive(Object o) {
FlowHead h = CpsThread.current().head;
StepStartNode ssn = addBodyStartFlowNode(h);
h.setNewHead(ssn);
- CpsFlowExecution.maybeAutoPersistNode(ssn);
}
StepEndNode en = addBodyEndFlowNode();
@@ -367,7 +365,6 @@ public Next receive(Object o) {
for (BodyExecutionCallback c : callbacks) {
c.onSuccess(sc, o);
}
-
return Next.terminate(null);
}
diff --git a/src/main/java/org/jenkinsci/plugins/workflow/cps/CpsFlowExecution.java b/src/main/java/org/jenkinsci/plugins/workflow/cps/CpsFlowExecution.java
index 65fd00423..5b9e2a59d 100644
--- a/src/main/java/org/jenkinsci/plugins/workflow/cps/CpsFlowExecution.java
+++ b/src/main/java/org/jenkinsci/plugins/workflow/cps/CpsFlowExecution.java
@@ -98,6 +98,8 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -134,7 +136,6 @@
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collector;
import java.util.stream.Collectors;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
@@ -609,95 +610,90 @@ private synchronized String getHeadsAsString() {
} else if (myHeads.size() == 0) {
return "empty-heads";
} else {
- return myHeads.entrySet().stream().map(h->h.getKey()+"::"+h.getValue()).collect(Collectors.joining(","));
+ return myHeads.entrySet().stream().map(h -> h.getKey() + "::" + h.getValue()).collect(Collectors.joining(","));
}
-
}
- /** Handle failures where we can't load heads. */
- private void rebuildEmptyGraph() {
- synchronized (this) {
- // something went catastrophically wrong and there's no live head. fake one
- LOGGER.log(Level.WARNING, "Failed to load pipeline heads, so faking some up for execution " + this.toString());
- if (this.startNodes == null) {
- this.startNodes = new Stack();
+ /**
+ * In the event we're missing FlowNodes, fail-fast and create some mockup FlowNodes so we can continue.
+ * This avoids nulling out all of the execution's data
+ * Bypasses {@link #croak(Throwable)} and {@link #onProgramEnd(Outcome)} to guarantee a clean path.
+ */
+ @GuardedBy("this")
+ synchronized void createPlaceholderNodes(Throwable failureReason) throws Exception {
+ this.done = true;
+
+ if (this.owner != null) {
+ // Ensure that the Run is marked as completed (failed) if it isn't already so it won't show as running
+ Queue.Executable ex = owner.getExecutable();
+ if (ex instanceof Run) {
+ Result res = ((Run)ex).getResult();
+ setResult(res != null ? res : Result.FAILURE);
}
+ }
- if (this.heads != null && this.heads.size() > 0) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.log(Level.INFO, "Resetting heads to rebuild the Pipeline structure, tossing existing heads: "+getHeadsAsString());
+ try {
+ programPromise = Futures.immediateFailedFuture(new IllegalStateException("Failed loading heads", failureReason));
+ LOGGER.log(Level.INFO, "Creating placeholder flownodes for execution: "+this);
+ if (this.owner != null) {
+ try {
+ owner.getListener().getLogger().println("Creating placeholder flownodes because failed loading originals.");
+ } catch (Exception ex) {
+ // It's okay to fail to log
}
- this.heads.clear();
}
- this.startNodes.clear();
+ // Switch to fallback storage so we don't delete original node data
+ this.storageDir = (this.storageDir != null) ? this.storageDir+"-fallback" : "workflow-fallback";
+ this.storage = createStorage(); // Empty storage
+
+ // Clear out old start nodes and heads
+ this.startNodes = new Stack();
FlowHead head = new FlowHead(this);
+ this.heads = new TreeMap();
heads.put(head.getId(), head);
- try {
- FlowStartNode start = new FlowStartNode(this, iotaStr());
- startNodes.push(start);
- head.newStartNode(start);
- } catch (IOException e) {
- LOGGER.log(Level.WARNING, "Failed to persist", e);
- }
- persistedClean = false;
- startNodesSerial = null;
- headsSerial = null;
+ FlowStartNode start = new FlowStartNode(this, iotaStr());
+ head.newStartNode(start);
+
+ // Create end
+ FlowNode end = new FlowEndNode(this, iotaStr(), (FlowStartNode)startNodes.pop(), result, getCurrentHeads().toArray(new FlowNode[0]));
+ end.addAction(new ErrorAction(failureReason));
+ head.setNewHead(end);
+ saveOwner();
+
+ } catch (Exception ex) {
+ throw ex;
}
}
@SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC", justification="Storage does not actually NEED to be synchronized but the rest does.")
protected synchronized void initializeStorage() throws IOException {
- boolean storageErrors = false; // Maybe storage didn't get to persist properly or files were deleted.
- try {
- storage = createStorage();
-
- heads = new TreeMap();
- for (Map.Entry entry : headsSerial.entrySet()) {
- FlowHead h = new FlowHead(this, entry.getKey());
-
- FlowNode n = storage.getNode(entry.getValue());
- if (n != null) {
- h.setForDeserialize(storage.getNode(entry.getValue()));
- heads.put(h.getId(), h);
- } else {
- LOGGER.log(Level.WARNING, "Tried to load head FlowNodes for execution "+this.owner+" but FlowNode was not found in storage for head id:FlowNodeId "+entry.getKey()+":"+entry.getValue());
- storageErrors = true;
- break;
- }
- }
-
- headsSerial = null;
-
- if (!storageErrors) {
- // Same for startNodes:
- storageErrors = false;
- startNodes = new Stack();
- for (String id : startNodesSerial) {
- FlowNode node = storage.getNode(id);
- if (node != null) {
- startNodes.add((BlockStartNode) storage.getNode(id));
- } else {
- // TODO if possible, consider trying to close out unterminated blocks using heads, to keep existing graph history
- LOGGER.log(Level.WARNING, "Tried to load startNode FlowNodes for execution "+this.owner+" but FlowNode was not found in storage for FlowNode Id "+id);
- storageErrors = true;
- break;
- }
- }
+ storage = createStorage();
+ heads = new TreeMap();
+ for (Map.Entry entry : headsSerial.entrySet()) {
+ FlowHead h = new FlowHead(this, entry.getKey());
+
+ FlowNode n = storage.getNode(entry.getValue());
+ if (n != null) {
+ h.setForDeserialize(storage.getNode(entry.getValue()));
+ heads.put(h.getId(), h);
+ } else {
+ throw new IOException("Tried to load head FlowNodes for execution "+this.owner+" but FlowNode was not found in storage for head id:FlowNodeId "+entry.getKey()+":"+entry.getValue());
}
- startNodesSerial = null;
-
- } catch (IOException ioe) {
- LOGGER.log(Level.WARNING, "Error initializing storage and loading nodes", ioe);
- storageErrors = true;
}
+ headsSerial = null;
- if (storageErrors) { //
- this.storageDir = (this.storageDir != null) ? this.storageDir+"-fallback" : "workflow-fallback"; // Avoid overwriting data
- this.storage = createStorage(); // Empty storage
- // Need to find a way to mimic up the heads and fail cleanly, far enough to let the canResume do its thing
- rebuildEmptyGraph();
+ startNodes = new Stack();
+ for (String id : startNodesSerial) {
+ FlowNode node = storage.getNode(id);
+ if (node != null) {
+ startNodes.add((BlockStartNode) storage.getNode(id));
+ } else {
+ // TODO if possible, consider trying to close out unterminated blocks using heads, to keep existing graph history
+ throw new IOException( "Tried to load startNode FlowNodes for execution "+this.owner+" but FlowNode was not found in storage for FlowNode Id "+id);
+ }
}
+ startNodesSerial = null;
}
/** If true, we are allowed to resume the build because resume is enabled AND we shut down cleanly. */
@@ -716,26 +712,45 @@ public boolean canResume() {
@SuppressFBWarnings(value = "RC_REF_COMPARISON_BAD_PRACTICE_BOOLEAN", justification = "We want to explicitly check for boolean not-null and true")
public void onLoad(FlowExecutionOwner owner) throws IOException {
this.owner = owner;
+
try {
- initializeStorage();
try {
- if (!isComplete()) {
- if (canResume()) {
- loadProgramAsync(getProgramDataFile());
- } else {
- // TODO if possible, consider trying to close out unterminated blocks to keep existing graph history
- // That way we can visualize the graph in some error cases.
- LOGGER.log(Level.WARNING, "Pipeline state not properly persisted, cannot resume "+owner.getUrl());
- throw new IOException("Cannot resume build -- was not cleanly saved when Jenkins shut down.");
- }
- } else if (done && !super.isComplete()) {
- LOGGER.log(Level.WARNING, "Completed flow without FlowEndNode: "+this+" heads:"+getHeadsAsString());
+ initializeStorage(); // Throws exception and bombs out if we can't load FlowNodes
+ } catch (Exception ex) {
+ LOGGER.log(Level.WARNING, "Error initializing storage and loading nodes, will try to create placeholders for: "+this, ex);
+ createPlaceholderNodes(ex);
+ return;
+ }
+ } catch (Exception ex) {
+ done = true;
+ programPromise = Futures.immediateFailedFuture(ex);
+ throw new IOException("Failed to even create placeholder nodes for execution", ex);
+ }
+
+ try {
+ if (isComplete()) {
+ if (done == Boolean.TRUE && !super.isComplete()) {
+ LOGGER.log(Level.INFO, "Completed flow without FlowEndNode: "+this+" heads:"+getHeadsAsString());
+ }
+ if (super.isComplete() && done != Boolean.TRUE) {
+ LOGGER.log(Level.FINE, "Flow has FlowEndNode, but is not marked as done, fixing this for"+this);
+ done = true;
+ saveOwner();
+ }
+ } else { // See if we can/should resume build
+ if (canResume()) {
+ loadProgramAsync(getProgramDataFile());
+ } else {
+ // TODO if possible, consider trying to close out unterminated blocks to keep existing graph history
+ // That way we can visualize the graph in some error cases.
+ LOGGER.log(Level.WARNING, "Pipeline state not properly persisted, cannot resume "+owner.getUrl());
+ throw new IOException("Cannot resume build -- was not cleanly saved when Jenkins shut down.");
}
- } catch (Exception e) { // Multicatch ensures that failure to load does not nuke the master
- SettableFuture p = SettableFuture.create();
- programPromise = p;
- loadProgramFailed(e, p);
}
+ } catch (Exception e) { // Broad catch ensures that failure to load do NOT nuke the master
+ SettableFuture p = SettableFuture.create();
+ programPromise = p;
+ loadProgramFailed(e, p);
} finally {
if (programPromise == null) {
programPromise = Futures.immediateFailedFuture(new IllegalStateException("completed or broken execution"));
@@ -906,15 +921,19 @@ public void onFailure(Throwable t) {
});
}
+ /** See JENKINS-22941 for why this exists. */
@Override public boolean blocksRestart() {
if (programPromise == null || !programPromise.isDone()) {
+ // Can't restart cleanly while trying to set up the build
return true;
}
CpsThreadGroup g;
try {
g = programPromise.get();
} catch (Exception x) {
- return true;
+ // TODO Check this won't cause issues due to depickling delays etc
+ LOGGER.log(Level.FINE, "Not blocking restart due to exception in ProgramPromise: "+this, x);
+ return false;
}
return g.busy;
}
@@ -1055,10 +1074,12 @@ public synchronized boolean isCurrentHead(FlowNode n) {
//
synchronized void addHead(FlowHead h) {
heads.put(h.getId(), h);
+ saveExecutionIfDurable(); // We need to save the mutated heads for the run
}
synchronized void removeHead(FlowHead h) {
heads.remove(h.getId());
+ saveExecutionIfDurable(); // We need to save the mutated heads for the run
}
/**
@@ -1074,6 +1095,7 @@ void subsumeHead(FlowNode n) {
for (FlowHead h : _heads) {
if (h.get()==n) {
h.remove();
+ saveExecutionIfDurable(); // We need to save the mutated heads for the run
return;
}
}
@@ -1209,7 +1231,7 @@ synchronized void onProgramEnd(Outcome outcome) {
if (heads != null) {
FlowHead first = getFirstHead();
first.setNewHead(head);
- done = Boolean.TRUE; // After setting the final head
+ done = true; // After setting the final head
heads.clear();
heads.put(first.getId(), first);
@@ -1221,8 +1243,8 @@ synchronized void onProgramEnd(Outcome outcome) {
}
}
} catch (Exception ex) {
- done = Boolean.TRUE;
- throw ex;
+ done = true;
+ LOGGER.log(Level.WARNING, "Error trying to end execution "+this, ex);
}
try {
@@ -1230,6 +1252,9 @@ synchronized void onProgramEnd(Outcome outcome) {
} catch (IOException ioe) {
LOGGER.log(Level.WARNING, "Error flushing FlowNodeStorage to disk at end of run", ioe);
}
+
+ this.persistedClean = Boolean.TRUE;
+ saveOwner();
}
void cleanUpHeap() {
@@ -1450,6 +1475,13 @@ public String getNextScriptName(String path) {
return shell.generateScriptName().replaceFirst("[.]groovy$", "");
}
+ /** Has the execution been marked done - note that legacy builds may not have that flag persisted, in which case
+ * we look for a single FlowEndNode head (see: {@link #isComplete()} and {@link FlowExecution#isComplete()})
+ */
+ public boolean isDoneFlagSet() {
+ return done;
+ }
+
public boolean isPaused() {
if (programPromise.isDone()) {
try {
@@ -1477,12 +1509,12 @@ public void pause(final boolean v) throws IOException {
if (executable instanceof AccessControlled) {
((AccessControlled) executable).checkPermission(Item.CANCEL);
}
- done = Boolean.FALSE;
+ done = false;
Futures.addCallback(programPromise, new FutureCallback() {
@Override public void onSuccess(CpsThreadGroup g) {
if (v) {
g.pause();
- checkAndAbortNonresumableBuild();
+ checkAndAbortNonresumableBuild(); // TODO Verify if we can rely on just killing paused builds at shutdown via checkAndAbortNonresumableBuild()
checkpoint();
} else {
g.unpause();
@@ -1509,13 +1541,27 @@ public void pause(final boolean v) throws IOException {
try (Timeout t = Timeout.limit(3, TimeUnit.MINUTES)) { // TODO some complicated sequence of calls to Futures could allow all of them to run in parallel
LOGGER.fine("starting to suspend all executions");
for (FlowExecution execution : FlowExecutionList.get()) {
- if (execution instanceof CpsFlowExecution) {
- LOGGER.log(Level.FINE, "waiting to suspend {0}", execution);
- exec = (CpsFlowExecution) execution;
- // Like waitForSuspension but with a timeout:
- if (exec.programPromise != null) {
- exec.programPromise.get(1, TimeUnit.MINUTES).scheduleRun().get(1, TimeUnit.MINUTES);
+ try {
+ if (execution instanceof CpsFlowExecution) {
+ CpsFlowExecution cpsExec = (CpsFlowExecution)execution;
+ cpsExec.checkAndAbortNonresumableBuild();
+
+ LOGGER.log(Level.FINE, "waiting to suspend {0}", execution);
+ exec = (CpsFlowExecution) execution;
+ // Like waitForSuspension but with a timeout:
+ if (exec.programPromise != null) {
+ LOGGER.log(Level.FINER, "Waiting for Pipeline to go to sleep for shutdown: "+execution);
+ try {
+ exec.programPromise.get(1, TimeUnit.MINUTES).scheduleRun().get(1, TimeUnit.MINUTES);
+ LOGGER.log(Level.FINER, " Pipeline went to sleep OK: "+execution);
+ } catch (InterruptedException | TimeoutException ex) {
+ LOGGER.log(Level.WARNING, "Error waiting for Pipeline to suspend: "+exec, ex);
+ }
+ }
+ cpsExec.checkpoint();
}
+ } catch (Exception ex) {
+ LOGGER.log(Level.WARNING, "Error persisting Pipeline execution at shutdown: "+((CpsFlowExecution) execution).owner, ex);
}
}
LOGGER.fine("finished suspending all executions");
@@ -1696,60 +1742,86 @@ private T readChild(HierarchicalStreamReader r, UnmarshallingContext context
class TimingFlowNodeStorage extends FlowNodeStorage {
private final FlowNodeStorage delegate;
+ ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+
TimingFlowNodeStorage(FlowNodeStorage delegate) {
this.delegate = delegate;
}
@Override
public FlowNode getNode(String string) throws IOException {
+ readWriteLock.readLock().lock();
try (Timing t = time(TimingKind.flowNode)) {
return delegate.getNode(string);
+ } finally {
+ readWriteLock.readLock().unlock();
}
}
@Override
public void storeNode(@Nonnull FlowNode n) throws IOException {
+ readWriteLock.writeLock().lock();
try (Timing t = time(TimingKind.flowNode)) {
delegate.storeNode(n);
+ } finally {
+ readWriteLock.writeLock().unlock();
}
}
@Override
public void storeNode(@Nonnull FlowNode n, boolean delayWritingActions) throws IOException {
+ readWriteLock.writeLock().lock();
try (Timing t = time(TimingKind.flowNode)) {
delegate.storeNode(n, delayWritingActions);
+ } finally {
+ readWriteLock.writeLock().unlock();
}
}
@Override
public void flush() throws IOException {
+ readWriteLock.writeLock().lock();
try (Timing t = time(TimingKind.flowNode)) {
delegate.flush();
+ } finally {
+ readWriteLock.writeLock().unlock();
}
}
@Override
public void flushNode(FlowNode node) throws IOException {
+ readWriteLock.writeLock().lock();
try (Timing t = time(TimingKind.flowNode)) {
delegate.flushNode(node);
+ } finally {
+ readWriteLock.writeLock().unlock();
}
}
@Override
public void autopersist(@Nonnull FlowNode n) throws IOException {
+ readWriteLock.writeLock().lock();
try (Timing t = time(TimingKind.flowNode)) {
delegate.autopersist(n);
+ } finally {
+ readWriteLock.writeLock().unlock();
}
}
@Override public List loadActions(FlowNode node) throws IOException {
+ readWriteLock.readLock().lock();
try (Timing t = time(TimingKind.flowNode)) {
return delegate.loadActions(node);
+ } finally {
+ readWriteLock.readLock().unlock();
}
}
@Override public void saveActions(FlowNode node, List actions) throws IOException {
+ readWriteLock.writeLock().lock();
try (Timing t = time(TimingKind.flowNode)) {
delegate.saveActions(node, actions);
+ } finally {
+ readWriteLock.writeLock().unlock();
}
}
}
@@ -1798,15 +1870,32 @@ public void autopersist(@Nonnull FlowNode n) throws IOException {
}
+ /** Persist the execution if we are set up to save the execution with every step. */
+ void saveExecutionIfDurable() {
+ if (this.getDurabilityHint().isPersistWithEveryStep()) {
+ saveOwner();
+ }
+ }
+
/** Save the owner that holds this execution. */
void saveOwner() {
try {
if (this.owner != null && this.owner.getExecutable() instanceof Saveable) { // Null-check covers some anomalous cases we've seen
Saveable saveable = (Saveable)(this.owner.getExecutable());
+ persistedClean = true;
+ if (storage != null && storage.delegate != null) {
+ // Defensively flush FlowNodes to storage
+ try {
+ storage.flush();
+ } catch (Exception ex) {
+ LOGGER.log(Level.WARNING, "Error persisting FlowNodes for execution "+owner, ex);
+ persistedClean = false;
+ }
+ }
saveable.save();
}
} catch (IOException ex) {
- LOGGER.log(Level.WARNING, "Error persisting Run before shutdown", ex);
+ LOGGER.log(Level.WARNING, "Error persisting Run "+owner, ex);
persistedClean = false;
}
}
@@ -1830,12 +1919,15 @@ private void checkpoint() {
// Try to ensure we've saved the appropriate things -- the program is the last stumbling block.
try {
final SettableFuture myOutcome = SettableFuture.create();
+ LOGGER.log(Level.INFO, "About to try to checkpoint the program for build"+this);
if (programPromise != null && programPromise.isDone()) {
runInCpsVmThread(new FutureCallback() {
@Override
public void onSuccess(CpsThreadGroup result) {
try {
+ LOGGER.log(Level.INFO, "Trying to save program before shutdown "+this);
result.saveProgramIfPossible(true);
+ LOGGER.log(Level.INFO, "Finished saving program before shutdown "+this);
myOutcome.set(null);
} catch (Exception ex) {
LOGGER.log(Level.WARNING, "Error persisting program: "+ex);
@@ -1845,10 +1937,12 @@ public void onSuccess(CpsThreadGroup result) {
@Override
public void onFailure(Throwable t) {
+ LOGGER.log(Level.WARNING, "Failed trying to save program before shutdown "+this);
myOutcome.setException(t);
}
});
myOutcome.get(30, TimeUnit.SECONDS);
+ LOGGER.log(Level.FINE, "Successfully saved program for "+this);
}
} catch (TimeoutException te) {
@@ -1858,19 +1952,29 @@ public void onFailure(Throwable t) {
persistOk = false;
LOGGER.log(Level.FINE, "Error saving program, that should be handled elsewhere.", ex);
}
+ try { // Flush node storage just in case the Program mutated it, just to be sure
+ storage.flush();
+ LOGGER.log(Level.FINE, "Successfully did final flush of storage for "+this);
+ } catch (IOException ioe) {
+ persistOk=false;
+ LOGGER.log(Level.WARNING, "Error persisting FlowNode storage before shutdown", ioe);
+ }
persistedClean = persistOk;
- saveOwner();
+ try {
+ saveOwner();
+ } catch (Exception ex) {
+ LOGGER.log(Level.WARNING, "Error saving build for "+this, ex);
+ }
+
}
}
- /** Clean shutdown of build. */
+ /** Abort any running builds at Jenkins shutdown if they don't support resuming at next startup. */
private void checkAndAbortNonresumableBuild() {
if (isComplete() || this.getDurabilityHint().isPersistWithEveryStep() || !isResumeBlocked()) {
return;
}
try {
- // FIXME we need to actually kill the darn build
-
owner.getListener().getLogger().println("Failing build: shutting down master and build is marked to not resume");
final Throwable x = new FlowInterruptedException(Result.ABORTED);
Futures.addCallback(this.getCurrentExecutions(/* cf. JENKINS-26148 */true), new FutureCallback>() {
@@ -1901,8 +2005,7 @@ private void checkAndAbortNonresumableBuild() {
/** Ensures that even if we're limiting persistence of data for performance, we still write out data for shutdown. */
@Override
protected void notifyShutdown() {
- checkAndAbortNonresumableBuild();
- checkpoint();
+ // No-op, handled in the suspendAll terminator
}
}
diff --git a/src/main/java/org/jenkinsci/plugins/workflow/cps/CpsStepContext.java b/src/main/java/org/jenkinsci/plugins/workflow/cps/CpsStepContext.java
index d1195c358..2b90bec1b 100644
--- a/src/main/java/org/jenkinsci/plugins/workflow/cps/CpsStepContext.java
+++ b/src/main/java/org/jenkinsci/plugins/workflow/cps/CpsStepContext.java
@@ -436,7 +436,6 @@ public void onSuccess(CpsThreadGroup g) {
g.getExecution().subsumeHead(parents.get(i));
StepEndNode en = new StepEndNode(flow, (StepStartNode) n, parents);
thread.head.setNewHead(en);
- CpsFlowExecution.maybeAutoPersistNode(en);
}
thread.head.markIfFail(getOutcome());
thread.setStep(null);
diff --git a/src/main/java/org/jenkinsci/plugins/workflow/cps/DSL.java b/src/main/java/org/jenkinsci/plugins/workflow/cps/DSL.java
index d94df50b7..8603934e6 100644
--- a/src/main/java/org/jenkinsci/plugins/workflow/cps/DSL.java
+++ b/src/main/java/org/jenkinsci/plugins/workflow/cps/DSL.java
@@ -577,6 +577,9 @@ private void invokeBody(CpsThread cur) {
// the first one can reuse the current thread, but other ones need to create new heads
// we want to do this first before starting body so that the order of heads preserve
// natural ordering.
+
+ // TODO give this javadocs worth a darn, because this is how we create parallel branches and the docs are cryptic as can be!
+ // Also we need to double-check this logic because this might cause a failure of persistence
FlowHead[] heads = new FlowHead[context.bodyInvokers.size()];
for (int i = 0; i < heads.length; i++) {
heads[i] = i==0 ? cur.head : cur.head.fork();
diff --git a/src/main/java/org/jenkinsci/plugins/workflow/cps/FlowHead.java b/src/main/java/org/jenkinsci/plugins/workflow/cps/FlowHead.java
index b9f7f7e57..c525c5ca2 100644
--- a/src/main/java/org/jenkinsci/plugins/workflow/cps/FlowHead.java
+++ b/src/main/java/org/jenkinsci/plugins/workflow/cps/FlowHead.java
@@ -25,6 +25,7 @@
package org.jenkinsci.plugins.workflow.cps;
import com.cloudbees.groovy.cps.Outcome;
+import com.google.common.annotations.VisibleForTesting;
import hudson.model.Action;
import java.io.IOException;
import java.io.ObjectInputStream;
@@ -45,6 +46,7 @@
import org.jenkinsci.plugins.workflow.graph.FlowNode;
import org.jenkinsci.plugins.workflow.graph.FlowStartNode;
+import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
/**
@@ -68,7 +70,8 @@ final class FlowHead implements Serializable {
private /*almost final except for serialization*/ int id;
private /*almost final except for serialization*/ transient CpsFlowExecution execution;
- private FlowNode head; // TODO: rename to node
+ @VisibleForTesting
+ FlowNode head; // TODO: rename to node
FlowHead(CpsFlowExecution execution, int id) {
this.id = id;
@@ -116,6 +119,7 @@ void newStartNode(FlowStartNode n) throws IOException {
execution.storage.storeNode(head, false);
}
+ /** Could be better described as "append to Flow graph" except for parallel cases. */
void setNewHead(@Nonnull FlowNode v) {
if (v == null) {
// Because Findbugs isn't 100% at catching cases where this can happen and we really need to fail hard-and-fast
@@ -124,11 +128,16 @@ void setNewHead(@Nonnull FlowNode v) {
try {
if (this.head != null) {
CpsFlowExecution.maybeAutoPersistNode(head);
+ assert execution.storage.getNode(this.head.getId()) != null;
}
execution.storage.storeNode(v, true);
+ assert execution.storage.getNode(v.getId()) != null;
v.addAction(new TimingAction());
- } catch (IOException e) {
- LOGGER.log(Level.FINE, "Failed to record new head: " + v, e);
+ CpsFlowExecution.maybeAutoPersistNode(v); // Persist node before changing head, otherwise Program can have unpersisted nodes and will fail to deserialize
+ // NOTE: we may also need to persist the FlowExecution by persisting its owner (which will be a WorkflowRun)
+ // But this will be handled by the WorkflowRun GraphListener momentarily
+ } catch (Exception e) {
+ LOGGER.log(Level.WARNING, "Failed to record new head or persist old: " + v, e);
}
this.head = v;
CpsThreadGroup c = CpsThreadGroup.current();
@@ -177,12 +186,20 @@ private void readObject(ObjectInputStream ois) throws IOException, ClassNotFound
// we'll replace this with one of execution.heads()
}
+ @Nonnull
private Object readResolve() {
execution = CpsFlowExecution.PROGRAM_STATE_SERIALIZATION.get();
- if (execution!=null)
- return execution.getFlowHead(id);
- else
+ if (execution!=null) {
+ // See if parallel loading?
+ FlowHead myHead = execution.getFlowHead(id);
+ if (myHead == null) {
+ throw new IllegalStateException("FlowHead loading problem at deserialize: Null FlowHead with id "+id+" in execution "+execution);
+ }
+ return myHead;
+ } else {
+ LOGGER.log(Level.WARNING, "Tried to load a FlowHead from program with no Execution in PROGRAM_STATE_SERIALIZATION");
return this;
+ }
}
private static final Logger LOGGER = Logger.getLogger(FlowHead.class.getName());
diff --git a/src/test/java/org/jenkinsci/plugins/workflow/cps/FlowDurabilityTest.java b/src/test/java/org/jenkinsci/plugins/workflow/cps/FlowDurabilityTest.java
index dd5e95cd2..854f71c7f 100644
--- a/src/test/java/org/jenkinsci/plugins/workflow/cps/FlowDurabilityTest.java
+++ b/src/test/java/org/jenkinsci/plugins/workflow/cps/FlowDurabilityTest.java
@@ -47,6 +47,7 @@
import org.jvnet.hudson.test.JenkinsRule;
import org.jvnet.hudson.test.RestartableJenkinsRule;
+import javax.annotation.Nonnull;
import java.io.File;
import java.io.FileOutputStream;
import java.lang.annotation.ElementType;
@@ -84,7 +85,7 @@ public class FlowDurabilityTest {
@Rule
public TimedRepeatRule repeater = new TimedRepeatRule();
- // Used in testing
+ // Used in Race-condition/persistence fuzzing where we need to run repeatedly
static class TimedRepeatRule implements TestRule {
@Target({ElementType.METHOD})
@@ -604,7 +605,8 @@ public void evaluate() throws Throwable {
/** Verify that if we bomb out because we cannot resume, we at least try to finish the flow graph if we have something to work with. */
@Test
- @Ignore // Can be fleshed out later if we have a valid need for it.
+ @Ignore
+ // Can be fleshed out later if we have a valid need for it.
public void testPipelineFinishesFlowGraph() throws Exception {
final String[] logStart = new String[1];
final List nodesOut = new ArrayList();
@@ -790,7 +792,13 @@ public void evaluate() throws Throwable {
});
}
- /** Launches the job used for fuzzing in {@link #fuzzTimingDurable()} and {@link #fuzzTimingNonDurable()} -- including the timeout. */
+ private static void assertBuildNotHung(@Nonnull RestartableJenkinsRule story, @Nonnull WorkflowRun run, int timeOutMillis) throws Exception {
+ if (run.isBuilding()) {
+ story.j.waitUntilNoActivityUpTo(timeOutMillis);
+ }
+ }
+
+ /** Launches the job used for fuzzing in the various timed fuzzing tests to catch timing-sensitive issues -- including the timeout. */
private WorkflowRun runFuzzerJob(JenkinsRule jrule, String jobName, FlowDurabilityHint hint) throws Exception {
Jenkins jenkins = jrule.jenkins;
WorkflowJob job = jenkins.getItemByFullName(jobName, WorkflowJob.class);
@@ -865,19 +873,17 @@ public void evaluate() throws Throwable {
@Override
public void evaluate() throws Throwable {
WorkflowRun run = story.j.jenkins.getItemByFullName(jobName, WorkflowJob.class).getLastBuild();
+ if (run == null) { // Build killed so early the Run did not get to persist
+ return;
+ }
if (run.getExecution() != null) {
Assert.assertEquals(FlowDurabilityHint.MAX_SURVIVABILITY, run.getExecution().getDurabilityHint());
}
if (run.isBuilding()) {
- try {
- story.j.waitUntilNoActivityUpTo(30_000);
- } catch (AssertionError ase) {
- throw new AssertionError("Build hung: "+run, ase);
- }
+ assertBuildNotHung(story, run, 30_000);
Assert.assertEquals(Result.SUCCESS, run.getResult());
- } else {
- verifyCompletedCleanly(story.j.jenkins, run);
}
+ verifyCompletedCleanly(story.j.jenkins, run);
assertIncludesNodes(nodesOut, run);
story.j.assertLogContains(logStart[0], run);
}
@@ -891,7 +897,7 @@ public void evaluate() throws Throwable {
@Test
@Ignore //Too long to run as part of main suite
@TimedRepeatRule.RepeatForTime(repeatMillis = 150_000)
- public void fuzzTimingNonDurable() throws Exception {
+ public void fuzzTimingNonDurableWithDirtyRestart() throws Exception {
final String jobName = "NestedParallelDurableJob";
final String[] logStart = new String[1];
@@ -904,15 +910,86 @@ public void evaluate() throws Throwable {
if (run.getExecution() != null) {
Assert.assertEquals(FlowDurabilityHint.PERFORMANCE_OPTIMIZED, run.getExecution().getDurabilityHint());
}
+ if (run.isBuilding()) {
+ Assert.assertNotEquals(Boolean.TRUE, ((CpsFlowExecution)run.getExecution()).persistedClean);
+ } else {
+ Assert.assertEquals(Boolean.TRUE, ((CpsFlowExecution)run.getExecution()).persistedClean);
+ }
}
});
story.addStep(new Statement() {
@Override
public void evaluate() throws Throwable {
WorkflowRun run = story.j.jenkins.getItemByFullName(jobName, WorkflowJob.class).getLastBuild();
+ if (run == null) { // Build killed so early the Run did not get to persist
+ return;
+ }
+ if (run.getExecution() != null) {
+ Assert.assertEquals(FlowDurabilityHint.PERFORMANCE_OPTIMIZED, run.getExecution().getDurabilityHint());
+ }
+ assertBuildNotHung(story, run, 30_000);
+ verifyCompletedCleanly(story.j.jenkins, run);
+ story.j.assertLogContains(logStart[0], run);
+ }
+ });
+ story.addStep(new Statement() {
+ @Override
+ public void evaluate() throws Throwable {
+ // Verify build doesn't resume at next restart, see JENKINS-50199
+ Assert.assertFalse(FlowExecutionList.get().iterator().hasNext());
+ WorkflowRun run = story.j.jenkins.getItemByFullName(jobName, WorkflowJob.class).getLastBuild();
+ if (run == null) {
+ return;
+ }
+ Assert.assertFalse(run.isBuilding());
+ Assert.assertTrue(run.getExecution().isComplete());
+ if (run.getExecution() instanceof CpsFlowExecution) {
+ assert ((CpsFlowExecution)(run.getExecution())).done;
+ }
+ }
+ });
+
+ }
+
+ /** Test interrupting build by randomly restarting *cleanly* at unpredictable times and verify we stick pick up and resume. */
+ @Test
+ @Ignore //Too long to run as part of main suite
+ @TimedRepeatRule.RepeatForTime(repeatMillis = 150_000)
+ public void fuzzTimingNonDurableWithCleanRestart() throws Exception {
+
+ final String jobName = "NestedParallelDurableJob";
+ final String[] logStart = new String[1];
+ final List nodesOut = new ArrayList();
+
+ // Create thread that eventually interrupts Jenkins with a hard shutdown at a random time interval
+ story.addStep(new Statement() {
+ @Override
+ public void evaluate() throws Throwable {
+ WorkflowRun run = runFuzzerJob(story.j, jobName, FlowDurabilityHint.PERFORMANCE_OPTIMIZED);
+ logStart[0] = JenkinsRule.getLog(run);
+ if (run.getExecution() != null) {
+ Assert.assertEquals(FlowDurabilityHint.PERFORMANCE_OPTIMIZED, run.getExecution().getDurabilityHint());
+ }
+ nodesOut.clear();
+ nodesOut.addAll(new DepthFirstScanner().allNodes(run.getExecution()));
+ nodesOut.sort(FlowScanningUtils.ID_ORDER_COMPARATOR);
+ }
+ });
+ story.addStep(new Statement() {
+ @Override
+ public void evaluate() throws Throwable {
+ WorkflowRun run = story.j.jenkins.getItemByFullName(jobName, WorkflowJob.class).getLastBuild();
+ if (run == null) { // Build killed so early the Run did not get to persist
+ return;
+ }
if (run.getExecution() != null) {
Assert.assertEquals(FlowDurabilityHint.PERFORMANCE_OPTIMIZED, run.getExecution().getDurabilityHint());
}
+ if (run.isBuilding()) {
+ assertBuildNotHung(story, run, 30_000);
+ }
+ verifyCompletedCleanly(story.j.jenkins, run);
+ story.j.assertLogContains(logStart[0], run);
if (run.isBuilding()) {
try {
story.j.waitUntilNoActivityUpTo(30_000);
@@ -920,10 +997,9 @@ public void evaluate() throws Throwable {
throw new AssertionError("Build hung: "+run, ase);
}
}
- verifyCompletedCleanly(story.j.jenkins, run);
- story.j.assertLogContains(logStart[0], run);
+ Assert.assertEquals(Result.SUCCESS, run.getResult());
+ assertIncludesNodes(nodesOut, run);
}
});
-
}
}
diff --git a/src/test/java/org/jenkinsci/plugins/workflow/cps/PersistenceProblemsTest.java b/src/test/java/org/jenkinsci/plugins/workflow/cps/PersistenceProblemsTest.java
new file mode 100644
index 000000000..4e81de31d
--- /dev/null
+++ b/src/test/java/org/jenkinsci/plugins/workflow/cps/PersistenceProblemsTest.java
@@ -0,0 +1,340 @@
+package org.jenkinsci.plugins.workflow.cps;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import hudson.model.Queue;
+import hudson.model.Result;
+import org.apache.commons.io.FileUtils;
+import org.jenkinsci.plugins.workflow.flow.FlowDurabilityHint;
+import org.jenkinsci.plugins.workflow.flow.FlowExecution;
+import org.jenkinsci.plugins.workflow.flow.FlowExecutionList;
+import org.jenkinsci.plugins.workflow.graph.FlowEndNode;
+import org.jenkinsci.plugins.workflow.graph.FlowStartNode;
+import org.jenkinsci.plugins.workflow.job.WorkflowJob;
+import org.jenkinsci.plugins.workflow.job.WorkflowRun;
+import org.jenkinsci.plugins.workflow.job.properties.DurabilityHintJobProperty;
+import org.jenkinsci.plugins.workflow.support.steps.input.InputAction;
+import org.jenkinsci.plugins.workflow.support.steps.input.InputStepExecution;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.jvnet.hudson.test.BuildWatcher;
+import org.jvnet.hudson.test.JenkinsRule;
+import org.jvnet.hudson.test.RestartableJenkinsRule;
+
+import java.io.File;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Verifies we can cope with all the bizarre quirks that occur when persistence fails or something unexpected happens.
+ */
+public class PersistenceProblemsTest {
+ @ClassRule
+ public static BuildWatcher buildWatcher = new BuildWatcher();
+
+ @Rule
+ public RestartableJenkinsRule story = new RestartableJenkinsRule();
+
+ /** Execution bombed out due to some sort of irrecoverable persistence issue. */
+ static void assertNulledExecution(WorkflowRun run) throws Exception {
+ if (run.isBuilding()) {
+ System.out.println("Run initially building, going to wait a second to see if it finishes, run="+run);
+ Thread.sleep(1000);
+ }
+ Assert.assertFalse(run.isBuilding());
+ Assert.assertNotNull(run.getResult());
+ FlowExecution fe = run.getExecution();
+ Assert.assertNull(fe);
+ }
+
+ /** Verifies all the assumptions about a cleanly finished build. */
+ static void assertCompletedCleanly(WorkflowRun run) throws Exception {
+ if (run.isBuilding()) {
+ System.out.println("Run initially building, going to wait a second to see if it finishes, run="+run);
+ Thread.sleep(1000);
+ }
+ Assert.assertFalse(run.isBuilding());
+ Assert.assertNotNull(run.getResult());
+ FlowExecution fe = run.getExecution();
+ FlowExecutionList.get().forEach(f -> {
+ if (fe != null && f == fe) {
+ Assert.fail("FlowExecution still in FlowExecutionList!");
+ }
+ });
+ Assert.assertTrue("Queue not empty after completion!", Queue.getInstance().isEmpty());
+
+ if (fe instanceof CpsFlowExecution) {
+ CpsFlowExecution cpsExec = (CpsFlowExecution)fe;
+ Assert.assertTrue(cpsExec.isComplete());
+ Assert.assertEquals(Boolean.TRUE, cpsExec.done);
+ Assert.assertEquals(1, cpsExec.getCurrentHeads().size());
+ Assert.assertTrue(cpsExec.isComplete());
+ Assert.assertTrue(cpsExec.getCurrentHeads().get(0) instanceof FlowEndNode);
+ Assert.assertTrue(cpsExec.startNodes == null || cpsExec.startNodes.isEmpty());
+ Assert.assertFalse(cpsExec.blocksRestart());
+ } else {
+ System.out.println("WARNING: no FlowExecutionForBuild");
+ }
+ }
+
+ static void assertCleanInProgress(WorkflowRun run) throws Exception {
+ Assert.assertTrue(run.isBuilding());
+ Assert.assertNull(run.getResult());
+ FlowExecution fe = run.getExecution();
+ AtomicBoolean hasExecutionInList = new AtomicBoolean(false);
+ FlowExecutionList.get().forEach(f -> {
+ if (fe != null && f == fe) {
+ hasExecutionInList.set(true);
+ }
+ });
+ if (!hasExecutionInList.get()) {
+ Assert.fail("Build completed but should still show in FlowExecutionList");
+ }
+ CpsFlowExecution cpsExec = (CpsFlowExecution)fe;
+ Assert.assertFalse(cpsExec.isComplete());
+ Assert.assertEquals(Boolean.FALSE, cpsExec.done);
+ Assert.assertFalse(cpsExec.getCurrentHeads().get(0) instanceof FlowEndNode);
+ Assert.assertTrue(cpsExec.startNodes != null && !cpsExec.startNodes.isEmpty());
+ }
+
+ static void assertResultMatchExecutionAndRun(WorkflowRun run, Result[] executionAndBuildResult) throws Exception {
+ Assert.assertEquals(executionAndBuildResult[0], ((CpsFlowExecution)(run.getExecution())).getResult());
+ Assert.assertEquals(executionAndBuildResult[1], run.getResult());
+ }
+
+ /** Create and run a basic build before we mangle its persisted contents. Stores job number to jobIdNumber index 0. */
+ private static WorkflowRun runBasicBuild(JenkinsRule j, String jobName, int[] jobIdNumber, FlowDurabilityHint hint) throws Exception {
+ WorkflowJob job = j.jenkins.createProject(WorkflowJob.class, jobName);
+ job.setDefinition(new CpsFlowDefinition("echo 'doSomething'", true));
+ job.addProperty(new DurabilityHintJobProperty(hint));
+ WorkflowRun run = j.buildAndAssertSuccess(job);
+ jobIdNumber[0] = run.getNumber();
+ assertCompletedCleanly(run);
+ return run;
+ }
+
+ /** Create and run a basic build before we mangle its persisted contents. Stores job number to jobIdNumber index 0. */
+ private static WorkflowRun runBasicBuild(JenkinsRule j, String jobName, int[] jobIdNumber) throws Exception {
+ return runBasicBuild(j, jobName, jobIdNumber, FlowDurabilityHint.MAX_SURVIVABILITY);
+ }
+
+ /** Sets up a running build that is waiting on input. */
+ private static WorkflowRun runBasicPauseOnInput(JenkinsRule j, String jobName, int[] jobIdNumber, FlowDurabilityHint durabilityHint) throws Exception {
+ WorkflowJob job = j.jenkins.createProject(WorkflowJob.class, jobName);
+ job.setDefinition(new CpsFlowDefinition("input 'pause'", true));
+ job.addProperty(new DurabilityHintJobProperty(durabilityHint));
+
+ WorkflowRun run = job.scheduleBuild2(0).getStartCondition().get();
+ ListenableFuture listener = run.getExecutionPromise();
+ FlowExecution exec = listener.get();
+ while(exec.getCurrentHeads().isEmpty() || (exec.getCurrentHeads().get(0) instanceof FlowStartNode)) { // Wait until input step starts
+ System.out.println("Waiting for input step to begin");
+ Thread.sleep(50);
+ }
+ while(run.getAction(InputAction.class) == null) { // Wait until input step starts
+ System.out.println("Waiting for input action to get attached to run");
+ Thread.sleep(50);
+ }
+ Thread.sleep(100L); // A little extra buffer for persistence etc
+ jobIdNumber[0] = run.getNumber();
+ return run;
+ }
+
+ private static WorkflowRun runBasicPauseOnInput(JenkinsRule j, String jobName, int[] jobIdNumber) throws Exception {
+ return runBasicPauseOnInput(j, jobName, jobIdNumber, FlowDurabilityHint.MAX_SURVIVABILITY);
+ }
+
+ private static InputStepExecution getInputStepExecution(WorkflowRun run, String inputMessage) throws Exception {
+ InputAction ia = run.getAction(InputAction.class);
+ List execList = ia.getExecutions();
+ return execList.stream().filter(e -> inputMessage.equals(e.getInput().getMessage())).findFirst().orElse(null);
+ }
+
+ final static String DEFAULT_JOBNAME = "testJob";
+
+ /** Simulates something happening badly during final shutdown, which may cause build to not appear done. */
+ @Test
+ public void completedFinalFlowNodeNotPersisted() throws Exception {
+ final int[] build = new int[1];
+ final Result[] executionAndBuildResult = new Result[2];
+ story.thenWithHardShutdown( j -> {
+ WorkflowRun run = runBasicBuild(j, DEFAULT_JOBNAME, build);
+ String finalId = run.getExecution().getCurrentHeads().get(0).getId();
+
+ // Hack but deletes the file from disk
+ CpsFlowExecution cpsExec = (CpsFlowExecution)(run.getExecution());
+ File f = new File(cpsExec.getStorageDir(), finalId+".xml");
+ f.delete();
+ executionAndBuildResult[0] = ((CpsFlowExecution)(run.getExecution())).getResult();
+ executionAndBuildResult[1] = run.getResult();
+ });
+ story.then(j-> {
+ WorkflowJob r = j.jenkins.getItemByFullName(DEFAULT_JOBNAME, WorkflowJob.class);
+ WorkflowRun run = r.getBuildByNumber(build[0]);
+ assertCompletedCleanly(run);
+ // assertNulledExecution(run);
+ Assert.assertEquals(Result.SUCCESS, run.getResult());
+ assertResultMatchExecutionAndRun(run, executionAndBuildResult);
+ });
+ }
+ /** Perhaps there was a serialization error breaking the FlowGraph persistence for non-durable mode. */
+ @Test
+ public void completedNoNodesPersisted() throws Exception {
+ final int[] build = new int[1];
+ final Result[] executionAndBuildResult = new Result[2];
+ story.thenWithHardShutdown( j -> {
+ WorkflowRun run = runBasicBuild(j, DEFAULT_JOBNAME, build);
+ FileUtils.deleteDirectory(((CpsFlowExecution)(run.getExecution())).getStorageDir());
+ executionAndBuildResult[0] = ((CpsFlowExecution)(run.getExecution())).getResult();
+ executionAndBuildResult[1] = run.getResult();
+ });
+ story.then(j-> {
+ WorkflowJob r = j.jenkins.getItemByFullName(DEFAULT_JOBNAME, WorkflowJob.class);
+ WorkflowRun run = r.getBuildByNumber(build[0]);
+ assertCompletedCleanly(run);
+ // assertNulledExecution(run);
+ Assert.assertEquals(Result.SUCCESS, run.getResult());
+ assertResultMatchExecutionAndRun(run, executionAndBuildResult);
+ });
+ }
+
+ /** Simulates case where done flag was not persisted. */
+ @Test
+ public void completedButWrongDoneStatus() throws Exception {
+ final int[] build = new int[1];
+ final Result[] executionAndBuildResult = new Result[2];
+ story.thenWithHardShutdown( j -> {
+ WorkflowRun run = runBasicBuild(j, DEFAULT_JOBNAME, build);
+ String finalId = run.getExecution().getCurrentHeads().get(0).getId();
+
+ // Hack but deletes the FlowNodeStorage from disk
+ CpsFlowExecution cpsExec = (CpsFlowExecution)(run.getExecution());
+ cpsExec.done = false;
+ cpsExec.saveOwner();
+ executionAndBuildResult[0] = ((CpsFlowExecution)(run.getExecution())).getResult();
+ executionAndBuildResult[1] = run.getResult();
+ });
+ story.then(j-> {
+ WorkflowJob r = j.jenkins.getItemByFullName(DEFAULT_JOBNAME, WorkflowJob.class);
+ WorkflowRun run = r.getBuildByNumber(build[0]);
+ assertCompletedCleanly(run);
+ Assert.assertEquals(Result.SUCCESS, run.getResult());
+ assertResultMatchExecutionAndRun(run, executionAndBuildResult);
+ });
+ }
+
+ @Test
+ public void inProgressNormal() throws Exception {
+ final int[] build = new int[1];
+ story.then( j -> {
+ WorkflowRun run = runBasicPauseOnInput(j, DEFAULT_JOBNAME, build);
+ });
+ story.then( j->{
+ WorkflowJob r = j.jenkins.getItemByFullName(DEFAULT_JOBNAME, WorkflowJob.class);
+ WorkflowRun run = r.getBuildByNumber(build[0]);
+ assertCleanInProgress(run);
+ InputStepExecution exec = getInputStepExecution(run, "pause");
+ exec.doProceedEmpty();
+ j.waitForCompletion(run);
+ assertCompletedCleanly(run);
+ Assert.assertEquals(Result.SUCCESS, run.getResult());
+ });
+ }
+
+ @Test
+ @Ignore
+ public void inProgressMaxPerfCleanShutdown() throws Exception {
+ final int[] build = new int[1];
+ story.then( j -> {
+ WorkflowRun run = runBasicPauseOnInput(j, DEFAULT_JOBNAME, build, FlowDurabilityHint.PERFORMANCE_OPTIMIZED);
+ // SHOULD still save at end via persist-at-shutdown hooks
+ });
+ story.then( j->{
+ WorkflowJob r = j.jenkins.getItemByFullName(DEFAULT_JOBNAME, WorkflowJob.class);
+ WorkflowRun run = r.getBuildByNumber(build[0]);
+ assertCleanInProgress(run);
+ InputStepExecution exec = getInputStepExecution(run, "pause");
+ exec.doProceedEmpty();
+ j.waitForCompletion(run);
+ assertCompletedCleanly(run);
+ Assert.assertEquals(Result.SUCCESS, run.getResult());
+ });
+ }
+
+ @Test
+ @Ignore
+ public void inProgressMaxPerfDirtyShutdown() throws Exception {
+ final int[] build = new int[1];
+ final String[] finalNodeId = new String[1];
+ story.thenWithHardShutdown( j -> {
+ runBasicPauseOnInput(j, DEFAULT_JOBNAME, build, FlowDurabilityHint.PERFORMANCE_OPTIMIZED);
+ // SHOULD still save at end via persist-at-shutdown hooks
+ });
+ story.then( j->{
+ WorkflowJob r = j.jenkins.getItemByFullName(DEFAULT_JOBNAME, WorkflowJob.class);
+ WorkflowRun run = r.getBuildByNumber(build[0]);
+ Thread.sleep(1000);
+ j.waitForCompletion(run);
+ assertCompletedCleanly(run);
+ Assert.assertEquals(Result.FAILURE, run.getResult());
+ finalNodeId[0] = run.getExecution().getCurrentHeads().get(0).getId();
+ });
+ story.then(j-> {
+ WorkflowJob r = j.jenkins.getItemByFullName(DEFAULT_JOBNAME, WorkflowJob.class);
+ WorkflowRun run = r.getBuildByNumber(build[0]);
+ assertCompletedCleanly(run);
+ Assert.assertEquals(finalNodeId[0], run.getExecution().getCurrentHeads().get(0).getId());
+ // JENKINS-50199, verify it doesn't try to resume again
+ });
+ }
+
+ @Test
+ public void inProgressButFlowNodesLost() throws Exception {
+ final int[] build = new int[1];
+ story.thenWithHardShutdown( j -> {
+ WorkflowRun run = runBasicPauseOnInput(j, DEFAULT_JOBNAME, build);
+ CpsFlowExecution cpsExec = (CpsFlowExecution)(run.getExecution());
+ FileUtils.deleteDirectory(((CpsFlowExecution)(run.getExecution())).getStorageDir());
+ });
+ story.then( j->{
+ WorkflowJob r = j.jenkins.getItemByFullName(DEFAULT_JOBNAME, WorkflowJob.class);
+ WorkflowRun run = r.getBuildByNumber(build[0]);
+ assertCompletedCleanly(run);
+ });
+ }
+
+ @Test
+ /** Build okay but program fails to load */
+ public void inProgressButProgramLoadFailure() throws Exception {
+ final int[] build = new int[1];
+ story.thenWithHardShutdown( j -> {
+ WorkflowRun run = runBasicPauseOnInput(j, DEFAULT_JOBNAME, build);
+ CpsFlowExecution cpsExec = (CpsFlowExecution)(run.getExecution());
+ cpsExec.getProgramDataFile().delete();
+ });
+ story.then( j->{
+ WorkflowJob r = j.jenkins.getItemByFullName(DEFAULT_JOBNAME, WorkflowJob.class);
+ WorkflowRun run = r.getBuildByNumber(build[0]);
+ assertCompletedCleanly(run);
+ });
+ }
+
+ @Test
+ /** Build okay but then the start nodes get screwed up */
+ public void inProgressButStartBlocksLost() throws Exception {
+ final int[] build = new int[1];
+ story.thenWithHardShutdown( j -> {
+ WorkflowRun run = runBasicPauseOnInput(j, DEFAULT_JOBNAME, build);
+ CpsFlowExecution cpsExec = (CpsFlowExecution)(run.getExecution());
+ cpsExec.startNodes.push(new FlowStartNode(cpsExec, cpsExec.iotaStr()));
+ run.save();
+ });
+ story.then( j->{
+ WorkflowJob r = j.jenkins.getItemByFullName(DEFAULT_JOBNAME, WorkflowJob.class);
+ WorkflowRun run = r.getBuildByNumber(build[0]);
+ assertCompletedCleanly(run);
+ });
+ }
+}